From fac4d6ff73971bb6ad49848a0538778bf9d5fbe7 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Wed, 13 Nov 2024 22:17:30 +0100 Subject: [PATCH] [RelayMiner] Supplier rate limiting (#895) ## Summary This PR adds an optimistic `RelayMeter` to the `RelayMiner` to monitor the consumed stake and stops servicing when `maxStake` is reached. It works by * Intercepting `RelayRequest`s before being served. * Assume the relay will be mined and deduce the corresponding amount from the consumed stake. * Send a `rate-limited` error if app stake share cannot cover of a minable `RelayRequest`. * After a relay has been served check if the `Relay` is volume applicable and revert the initial deduction if it is not. * Updates the known `Application`s stake if an `EventApplicationStaked` is observed. * Resets the `Application`s stakes at the beginning of every session. ## Issue `RelayMiner`s need to know how much of the `Application`'s stake they are allowed to consume without over-servicing. ![image](https://github.com/user-attachments/assets/4354c747-23a6-4b44-a63a-62a13617d796) ## Type of change Select one or more from the following: - [x] New feature, functionality or library ## Testing - [x] **Unit Tests**: `make go_develop_and_test` - [x] **LocalNet E2E Tests**: `make test_e2e` - [ ] **DevNet E2E Tests**: Add the `devnet-test-e2e` label to the PR. ## Sanity Checklist - [x] I have tested my changes using the available tooling - [x] I have commented my code - [x] I have performed a self-review of my own code; both comments & source code --------- Co-authored-by: Daniel Olshansky --- config.yml | 6 +- pkg/client/interface.go | 3 + pkg/client/query/sessionquerier.go | 10 + pkg/relayer/cmd/cmd.go | 20 +- pkg/relayer/interface.go | 27 + pkg/relayer/miner/miner.go | 24 +- pkg/relayer/miner/miner_test.go | 19 +- pkg/relayer/proxy/errors.go | 3 + pkg/relayer/proxy/proxy.go | 21 +- pkg/relayer/proxy/relay_meter.go | 460 ++++++++++++++++++ pkg/relayer/proxy/server_builder.go | 1 + pkg/relayer/proxy/synchronous.go | 14 + pkg/relayer/session/sessiontree.go | 16 - x/proof/keeper/proof_validation.go | 4 - x/shared/types/session.go | 13 + x/tokenomics/keeper/settle_pending_claims.go | 8 + x/tokenomics/keeper/token_logic_modules.go | 49 +- .../keeper/token_logic_modules_test.go | 18 +- 18 files changed, 657 insertions(+), 59 deletions(-) create mode 100644 pkg/relayer/proxy/relay_meter.go diff --git a/config.yml b/config.yml index 5f8046a5a..d31907e7f 100644 --- a/config.yml +++ b/config.yml @@ -141,13 +141,15 @@ genesis: denom: upokt bank: supply: - - amount: "1102000204" + - amount: "1202000272" denom: upokt balances: # Application module - address: pokt1rl3gjgzexmplmds3tq3r3yk84zlwdl6djzgsvm coins: - - amount: "100000068" # MUST BE equal to the total of all app stakes below + # TODO_MAINNET(@olshansk): Pass config.yml into ChatGPT to build a script + # that ensures the amounts line up + - amount: "200000136" # MUST BE equal to the total of all app stakes below denom: upokt # Supplier module - address: pokt1j40dzzmn6cn9kxku7a5tjnud6hv37vesr5ccaa diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 0f8c8575a..365c24b74 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -291,6 +291,9 @@ type SessionQueryClient interface { serviceId string, blockHeight int64, ) (*sessiontypes.Session, error) + + // GetParams queries the chain for the session module parameters. + GetParams(ctx context.Context) (*sessiontypes.Params, error) } // SharedQueryClient defines an interface that enables the querying of the diff --git a/pkg/client/query/sessionquerier.go b/pkg/client/query/sessionquerier.go index 7dba6e87c..8553e3313 100644 --- a/pkg/client/query/sessionquerier.go +++ b/pkg/client/query/sessionquerier.go @@ -62,3 +62,13 @@ func (sessq *sessionQuerier) GetSession( } return res.Session, nil } + +// GetParams queries & returns the session module on-chain parameters. +func (sessq *sessionQuerier) GetParams(ctx context.Context) (*sessiontypes.Params, error) { + req := &sessiontypes.QueryParamsRequest{} + res, err := sessq.sessionQuerier.Params(ctx, req) + if err != nil { + return nil, ErrQuerySessionParams.Wrapf("[%v]", err) + } + return &res.Params, nil +} diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 98df7e375..18cd07d42 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -198,12 +198,13 @@ func setupRelayerDependencies( config.NewSupplyDelegationClientFn(), // leaf config.NewSupplySharedQueryClientFn(), // leaf config.NewSupplyServiceQueryClientFn(), + config.NewSupplyApplicationQuerierFn(), + config.NewSupplySessionQuerierFn(), + supplyRelayMeter, supplyMiner, config.NewSupplyAccountQuerierFn(), config.NewSupplyBankQuerierFn(), - config.NewSupplyApplicationQuerierFn(), config.NewSupplySupplierQuerierFn(), - config.NewSupplySessionQuerierFn(), config.NewSupplyProofQueryClientFn(), config.NewSupplyRingCacheFn(), supplyTxFactory, @@ -231,6 +232,21 @@ func supplyMiner( return depinject.Configs(deps, depinject.Supply(mnr)), nil } +// supplyRelayMeter constructs a RelayMeter instance and returns a new +// depinject.Config which is supplied with the given deps and the new RelayMeter. +func supplyRelayMeter( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + rm, err := proxy.NewRelayMeter(deps) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(rm)), nil +} + // supplyTxFactory constructs a cosmostx.Factory instance and returns a new // depinject.Config which is supplied with the given deps and the new // cosmostx.Factory. diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 1f231e7f1..766dcb5ce 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -1,6 +1,7 @@ //go:generate mockgen -destination=../../testutil/mockrelayer/relayer_proxy_mock.go -package=mockrelayer . RelayerProxy //go:generate mockgen -destination=../../testutil/mockrelayer/miner_mock.go -package=mockrelayer . Miner //go:generate mockgen -destination=../../testutil/mockrelayer/relayer_sessions_manager_mock.go -package=mockrelayer . RelayerSessionsManager +//go:generate mockgen -destination=../../testutil/mockrelayer/relay_meter_mock.go -package=mockrelayer . RelayMeter package relayer @@ -162,3 +163,29 @@ type SessionTree interface { // GetTrieSpec returns the trie spec of the SMST. GetTrieSpec() smt.TrieSpec } + +// RelayMeter is an interface that keeps track of the amount of stake consumed between +// a single onchain Application and a single onchain Supplier over the course of a single session. +// It enables the RelayMiner to rate limit the number of requests handled offchain as a function +// of the optimistic onchain rate limits. +type RelayMeter interface { + // Start starts the relay meter. + Start(ctx context.Context) error + + // AccumulateRelayReward adds the relay reward from the incoming request to session's accumulator. + // The relay cost is added optimistically, assuming that the relay WILL be volume / reward applicable. + // + // The reason why optimistic AccumulateRelayReward + SetNonApplicableRelayReward is used instead of + // a simpler AccumulateVolumeApplicableRelayReward is that when the relay is first seen + // we don't know if it will be volume / reward applicable until it is served. + // + // To rate limit or not the current relay, we need to always optimistically account all relays as being + // volume / reward applicable. + AccumulateRelayReward(ctx context.Context, relayRequestMeta servicetypes.RelayRequestMetadata) error + + // SetNonApplicableRelayReward updates the relay meter for the given relay request as + // non-applicable between a single Application and a single Supplier for a single session. + // The volume / reward applicability of the relay is unknown to the relay miner + // until the relay is served and the relay response signed. + SetNonApplicableRelayReward(ctx context.Context, relayRequestMeta servicetypes.RelayRequestMetadata) error +} diff --git a/pkg/relayer/miner/miner.go b/pkg/relayer/miner/miner.go index b6ed55e0e..83cd4a8a8 100644 --- a/pkg/relayer/miner/miner.go +++ b/pkg/relayer/miner/miner.go @@ -27,6 +27,7 @@ type miner struct { // serviceQueryClient is used to query for the relay difficulty target hash of a service. // relay_difficulty is the target hash which a relay hash must be less than to be volume/reward applicable. serviceQueryClient client.ServiceQueryClient + relayMeter relayer.RelayMeter } // NewMiner creates a new miner from the given dependencies and options. It @@ -43,7 +44,7 @@ func NewMiner( ) (*miner, error) { mnr := &miner{} - if err := depinject.Inject(deps, &mnr.serviceQueryClient); err != nil { + if err := depinject.Inject(deps, &mnr.serviceQueryClient, &mnr.relayMeter); err != nil { return nil, err } @@ -88,6 +89,9 @@ func (mnr *miner) mapMineRelay( ) (_ either.Either[*relayer.MinedRelay], skip bool) { relayBz, err := relay.Marshal() if err != nil { + if relayMeteringResult := mnr.unclaimRelayUPOKT(ctx, *relay); relayMeteringResult.IsError() { + return relayMeteringResult, false + } return either.Error[*relayer.MinedRelay](err), false } relayHashArr := protocol.GetRelayHashFromBytes(relayBz) @@ -95,11 +99,17 @@ func (mnr *miner) mapMineRelay( relayDifficultyTargetHash, err := mnr.getServiceRelayDifficultyTargetHash(ctx, relay.Req) if err != nil { - return either.Error[*relayer.MinedRelay](err), false + if relayMeteringResult := mnr.unclaimRelayUPOKT(ctx, *relay); relayMeteringResult.IsError() { + return relayMeteringResult, true + } + return either.Error[*relayer.MinedRelay](err), true } // The relay IS NOT volume / reward applicable if !protocol.IsRelayVolumeApplicable(relayHash, relayDifficultyTargetHash) { + if eitherMeteringResult := mnr.unclaimRelayUPOKT(ctx, *relay); eitherMeteringResult.IsError() { + return eitherMeteringResult, true + } return either.Success[*relayer.MinedRelay](nil), true } @@ -135,3 +145,13 @@ func (mnr *miner) getServiceRelayDifficultyTargetHash(ctx context.Context, req * return serviceRelayDifficulty.GetTargetHash(), nil } + +// unclaimRelayUPOKT unclaims the relay UPOKT reward for the relay. +// It returns an either.Error if the relay UPOKT reward could not be unclaimed. +func (mnr *miner) unclaimRelayUPOKT(ctx context.Context, relay servicetypes.Relay) either.Either[*relayer.MinedRelay] { + if err := mnr.relayMeter.SetNonApplicableRelayReward(ctx, relay.GetReq().GetMeta()); err != nil { + return either.Error[*relayer.MinedRelay](err) + } + + return either.Success[*relayer.MinedRelay](nil) +} diff --git a/pkg/relayer/miner/miner_test.go b/pkg/relayer/miner/miner_test.go index 7afbf69d2..1aaf6d318 100644 --- a/pkg/relayer/miner/miner_test.go +++ b/pkg/relayer/miner/miner_test.go @@ -13,12 +13,14 @@ import ( "time" "cosmossdk.io/depinject" + gomock "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/crypto/protocol" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/pkg/relayer/miner" + "github.com/pokt-network/poktroll/testutil/mockrelayer" "github.com/pokt-network/poktroll/testutil/testclient/testqueryclients" servicetypes "github.com/pokt-network/poktroll/x/service/types" ) @@ -48,8 +50,9 @@ func TestMiner_MinedRelays(t *testing.T) { testqueryclients.SetServiceRelayDifficultyTargetHash(t, testSvcId, testRelayMiningTargetHash) serviceQueryClientMock := testqueryclients.NewTestServiceQueryClient(t) + relayMeterMock := newMockRelayMeter(t) - deps := depinject.Supply(serviceQueryClientMock) + deps := depinject.Supply(serviceQueryClientMock, relayMeterMock) mnr, err := miner.NewMiner(deps) require.NoError(t, err) @@ -154,3 +157,17 @@ func unmarshalHexMinedRelay( Hash: relayHash, } } + +// newMockRelayMeter returns a mock RelayMeter that is used by the relay miner to claim and unclaim relays. +func newMockRelayMeter(t *testing.T) relayer.RelayMeter { + t.Helper() + + ctrl := gomock.NewController(t) + relayMeter := mockrelayer.NewMockRelayMeter(ctrl) + + relayMeter.EXPECT().Start(gomock.Any()).Return(nil).AnyTimes() + relayMeter.EXPECT().AccumulateRelayReward(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + relayMeter.EXPECT().SetNonApplicableRelayReward(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + return relayMeter +} diff --git a/pkg/relayer/proxy/errors.go b/pkg/relayer/proxy/errors.go index be2dbd95c..ff2fc285f 100644 --- a/pkg/relayer/proxy/errors.go +++ b/pkg/relayer/proxy/errors.go @@ -17,4 +17,7 @@ var ( ErrRelayerProxyUnsupportedTransportType = sdkerrors.Register(codespace, 9, "unsupported proxy transport type") ErrRelayerProxyInternalError = sdkerrors.Register(codespace, 10, "internal error") ErrRelayerProxyMissingSupplierOperatorAddress = sdkerrors.Register(codespace, 11, "supplier operator address is missing") + ErrRelayerProxyUnknownSession = sdkerrors.Register(codespace, 12, "relayer proxy encountered unknown session") + ErrRelayerProxyRateLimited = sdkerrors.Register(codespace, 13, "offchain rate limit hit by relayer proxy") + ErrRelayerProxyUnclaimRelayPrice = sdkerrors.Register(codespace, 14, "failed to unclaim relay price") ) diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 0db1016f9..ad69a587b 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -78,6 +78,11 @@ type relayerProxy struct { // 1. Relay verification to check if the incoming relay matches the supplier hosted by the relay miner; // 2. Relay signing to resolve which keyring key name to use for signing; OperatorAddressToSigningKeyNameMap map[string]string + + // relayMeter keeps track of the total amount of stake an onchhain Application + // will owe an onchain Supplier (backed by this RelayMiner) once the session settles. + // It also configures application over-servicing allowance. + relayMeter relayer.RelayMeter } // NewRelayerProxy creates a new relayer proxy with the given dependencies or returns @@ -86,9 +91,15 @@ type relayerProxy struct { // Required dependencies: // - cosmosclient.Context // - client.BlockClient +// - crypto.RingCache +// - client.SupplierQueryClient // - client.SessionQueryClient // - client.SharedQueryClient -// - client.SupplierQueryClient +// - keyring.Keyring +// - client.SharedQueryClient +// - client.ApplicationQueryClient +// - client.ServiceQueryClient +// - client.EventsQueryClient // // Available options: // - WithSigningKeyNames @@ -108,6 +119,7 @@ func NewRelayerProxy( &rp.sessionQuerier, &rp.sharedQuerier, &rp.keyring, + &rp.relayMeter, ); err != nil { return nil, err } @@ -143,6 +155,13 @@ func (rp *relayerProxy) Start(ctx context.Context) error { // Start the ring cache. rp.ringCache.Start(ctx) + // Start the relay meter by subscribing to the on-chain events. + // This function is non-blocking and the subscription cancellation is handled + // by the context passed to the Start method. + if err := rp.relayMeter.Start(ctx); err != nil { + return err + } + startGroup, ctx := errgroup.WithContext(ctx) for _, relayServer := range rp.servers { diff --git a/pkg/relayer/proxy/relay_meter.go b/pkg/relayer/proxy/relay_meter.go new file mode 100644 index 000000000..e60f058db --- /dev/null +++ b/pkg/relayer/proxy/relay_meter.go @@ -0,0 +1,460 @@ +package proxy + +import ( + "context" + "math/big" + "strings" + "sync" + + "cosmossdk.io/depinject" + "cosmossdk.io/math" + cosmostypes "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/gogoproto/proto" + + "github.com/pokt-network/poktroll/app/volatile" + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/tx" + "github.com/pokt-network/poktroll/pkg/crypto/protocol" + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/polylog" + "github.com/pokt-network/poktroll/pkg/relayer" + apptypes "github.com/pokt-network/poktroll/x/application/types" + servicetypes "github.com/pokt-network/poktroll/x/service/types" + sessiontypes "github.com/pokt-network/poktroll/x/session/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +const defaultOverServicingAllowanceCoins = 1000000 + +var _ relayer.RelayMeter = (*ProxyRelayMeter)(nil) + +// sessionRelayMeter is the relay meter's internal representation of an onchain +// Application's max and consumed stake. +type sessionRelayMeter struct { + // The onchain application the relay meter is for. + app apptypes.Application + // The maximum uPOKT an application can pay this relayer for a given session. + // This is a fraction of the Application's overall stake in proportion. + maxCoin cosmostypes.Coin + // The amount of uPOKT a specific application has consumed from this relayer in the given session. + consumedCoin cosmostypes.Coin + // The header for the session the Application and Supplier (backed by the relayer) + // are exchanging services in. + sessionHeader *sessiontypes.SessionHeader + // numOverServicedRelays is the number of relays that have been over-serviced + // by the relayer for the application. + numOverServicedRelays uint64 + // numOverservicedComputeUnits is the number of compute units that have been + // over-serviced by the relayer for the application. + numOverServicedComputeUnits uint64 + + // sharedParams, service and serviceRelayDifficulty are used to calculate the relay cost + // that increments the consumedAmount. + // They are cached at each session to avoid querying the blockchain for each relay. + // TODO_TECHDEBT(#543): Remove once the query clients start handling caching and invalidation. + sharedParams *sharedtypes.Params + service *sharedtypes.Service + serviceRelayDifficulty servicetypes.RelayMiningDifficulty + numSuppliersPerSession uint64 +} + +// ProxyRelayMeter is the offchain Supplier's rate limiter. +// It ensures that no Application is over-serviced by the Supplier per session. +// This is done by maintaining the max amount of stake the supplier can consume +// per session and the amount of stake consumed by mined relays. +// TODO_POST_MAINNET(@red-0ne): Consider making the relay meter a light client, +// since it's already receiving all committed blocks and events. +type ProxyRelayMeter struct { + // sessionToRelayMeterMap is a map of session IDs to their corresponding session relay meter. + // Only known applications (i.e. have sent at least one relay) have their stakes metered. + // This map gets reset every new session in order to meter new applications. + sessionToRelayMeterMap map[string]*sessionRelayMeter + // overServicingAllowanceCoins allows Suppliers to overservice applications. + // This entails providing a free service (i.e. mine for relays), that they will not be paid for onchain. + // This is common by some suppliers to build goodwill and receive a higher offchain quality-of-service rating. + // If negative, allow infinite overservicing. + // TODO_MAINNET(@red-0ne): Expose overServicingAllowanceCoins as a configuration parameter. + overServicingAllowanceCoins cosmostypes.Coin + + // relayMeterMu ensures that relay meter operations are thread-safe. + relayMeterMu sync.Mutex + + // Clients to query onchain data. + applicationQuerier client.ApplicationQueryClient + serviceQuerier client.ServiceQueryClient + sharedQuerier client.SharedQueryClient + sessionQuerier client.SessionQueryClient + eventsQueryClient client.EventsQueryClient + blockQuerier client.BlockClient + + logger polylog.Logger +} + +func NewRelayMeter(deps depinject.Config) (relayer.RelayMeter, error) { + overservicingAllowanceCoins := cosmostypes.NewInt64Coin(volatile.DenomuPOKT, defaultOverServicingAllowanceCoins) + rm := &ProxyRelayMeter{ + sessionToRelayMeterMap: make(map[string]*sessionRelayMeter), + overServicingAllowanceCoins: overservicingAllowanceCoins, + } + + if err := depinject.Inject( + deps, + &rm.sharedQuerier, + &rm.applicationQuerier, + &rm.serviceQuerier, + &rm.blockQuerier, + &rm.eventsQueryClient, + &rm.sessionQuerier, + &rm.logger, + ); err != nil { + return nil, err + } + + return rm, nil +} + +// Start starts the relay meter by observing application staked events and new sessions. +func (rmtr *ProxyRelayMeter) Start(ctx context.Context) error { + // Listen to transaction events to filter application staked events. + // TODO_BETA(@red-0ne): refactor this listener to be shared across all query clients + // and remove the need to listen to events in the relay meter. + eventsObs, err := rmtr.eventsQueryClient.EventsBytes(ctx, "tm.event = 'Tx'") + if err != nil { + return err + } + + // Listen for application staked events and update known application stakes. + // + // Since an applications might upstake (never downstake) during a session, this + // stake increase is guaranteed to be available at settlement. + // Stake updates take effect immediately. + // + // This enables applications to adjust their stake mid-session and increase + // their rate limits without needing to wait for the next session to start. + appStakedEvents := filterTypedEvents[*apptypes.EventApplicationStaked](ctx, eventsObs, nil) + channel.ForEach(ctx, appStakedEvents, rmtr.forEachEventApplicationStakedFn) + + // Listen to new blocks and reset the relay meter application stakes every new session. + committedBlocksSequence := rmtr.blockQuerier.CommittedBlocksSequence(ctx) + channel.ForEach(ctx, committedBlocksSequence, rmtr.forEachNewBlockFn) + + return nil +} + +// AccumulateRelayReward accumulates the relay reward for the given relay request. +// The relay reward is added optimistically, assuming that the relay will be volume / reward +// applicable and the relay meter would remain up to date. +func (rmtr *ProxyRelayMeter) AccumulateRelayReward(ctx context.Context, reqMeta servicetypes.RelayRequestMetadata) error { + // TODO_MAINNET(@adshmh): Locking the relay serving flow to ensure that the relay meter is updated + // might be a bottleneck since ensureRequestAppMetrics is performing multiple + // sequential queries to the Pocket Network node. + // Re-evaluate when caching and invalidation is implemented. + rmtr.relayMeterMu.Lock() + defer rmtr.relayMeterMu.Unlock() + + // Ensure that the served application has a relay meter and update the consumed + // stake amount. + appRelayMeter, err := rmtr.ensureRequestSessionRelayMeter(ctx, reqMeta) + if err != nil { + return err + } + + // Get the cost of the relay based on the service and shared parameters. + relayCostCoin, err := getSingleMinedRelayCostCoin( + appRelayMeter.sharedParams, + appRelayMeter.service, + appRelayMeter.serviceRelayDifficulty, + ) + if err != nil { + return err + } + + // Increase the consumed stake amount by relay cost. + newConsumedCoin := appRelayMeter.consumedCoin.Add(relayCostCoin) + + isAppOverServiced := appRelayMeter.maxCoin.IsLT(newConsumedCoin) + if !isAppOverServiced { + appRelayMeter.consumedCoin = newConsumedCoin + return nil + } + + // Check if the supplier is allowing unlimited over-servicing (i.e. negative value) + allowUnlimitedOverServicing := rmtr.overServicingAllowanceCoins.IsNegative() + + // The application is over-servicing, if unlimited over-servicing is not allowed + // and the newConsumedCoin is greater than the maxCoin + overServicingAllowanceCoins, + // then return a rate limit error. + overServicingCoin := newConsumedCoin.Sub(appRelayMeter.maxCoin) + + // In case Allowance is positive, add it to maxCoin to allow no or limited over-servicing. + if !allowUnlimitedOverServicing { + maxAllowedOverServicing := appRelayMeter.maxCoin.Add(rmtr.overServicingAllowanceCoins) + if maxAllowedOverServicing.IsLT(newConsumedCoin) { + return ErrRelayerProxyRateLimited.Wrapf( + "application has been rate limited, stake needed: %s, has: %s", + newConsumedCoin.String(), + appRelayMeter.maxCoin.String(), + ) + } + } + + appRelayMeter.numOverServicedRelays++ + appRelayMeter.numOverServicedComputeUnits += appRelayMeter.service.ComputeUnitsPerRelay + + // Exponential backoff, only log over-servicing when numOverServicedRelays is a power of 2 + if shouldLogOverServicing(appRelayMeter.numOverServicedRelays) { + rmtr.logger.Warn().Msgf( + "overservicing enabled, application %q over-serviced %s", + appRelayMeter.app.GetAddress(), + overServicingCoin, + ) + } + + return nil +} + +// SetNonApplicableRelayReward updates the relay meter to make the relay reward for +// the given relay request as non-applicable. +// This is used when the relay is not volume / reward applicable but was optimistically +// accounted for in the relay meter. +func (rmtr *ProxyRelayMeter) SetNonApplicableRelayReward(ctx context.Context, reqMeta servicetypes.RelayRequestMetadata) error { + rmtr.relayMeterMu.Lock() + defer rmtr.relayMeterMu.Unlock() + + sessionRelayMeter, ok := rmtr.sessionToRelayMeterMap[reqMeta.GetSessionHeader().GetSessionId()] + if !ok { + return ErrRelayerProxyUnknownSession.Wrap("session relay meter not found") + } + + // Get the cost of the relay based on the service and shared parameters. + relayCost, err := getSingleMinedRelayCostCoin( + sessionRelayMeter.sharedParams, + sessionRelayMeter.service, + sessionRelayMeter.serviceRelayDifficulty, + ) + if err != nil { + return ErrRelayerProxyUnclaimRelayPrice.Wrapf("%s", err) + } + + // Decrease the consumed stake amount by relay cost. + newConsumedAmount := sessionRelayMeter.consumedCoin.Sub(relayCost) + + sessionRelayMeter.consumedCoin = newConsumedAmount + return nil +} + +// forEachNewBlockFn is a callback function that is called every time a new block is committed. +// It resets the relay meter's application stakes every new session so that new +// application stakes can be metered. +func (rmtr *ProxyRelayMeter) forEachNewBlockFn(ctx context.Context, block client.Block) { + rmtr.relayMeterMu.Lock() + defer rmtr.relayMeterMu.Unlock() + + sharedParams, err := rmtr.sharedQuerier.GetParams(ctx) + if err != nil { + return + } + + // Delete the relay meters that correspond to settled sessions. + for _, sessionRelayMeter := range rmtr.sessionToRelayMeterMap { + sessionEndHeight := sessionRelayMeter.sessionHeader.GetSessionEndBlockHeight() + sessionClaimOpenHeight := sessionEndHeight + int64(sharedParams.GetClaimWindowOpenOffsetBlocks()) + + if block.Height() >= sessionClaimOpenHeight { + // The session started its claim phase and the corresponding session relay meter + // is no longer needed. + delete(rmtr.sessionToRelayMeterMap, sessionRelayMeter.sessionHeader.GetSessionId()) + } + } +} + +// forEachEventApplicationStakedFn is a callback function that is called every time +// an application staked event is observed. It updates the relay meter known applications. +func (rmtr *ProxyRelayMeter) forEachEventApplicationStakedFn(ctx context.Context, event *apptypes.EventApplicationStaked) { + rmtr.relayMeterMu.Lock() + defer rmtr.relayMeterMu.Unlock() + + app := event.GetApplication() + + // Since lean clients are supported, multiple suppliers might share the same RelayMiner. + // Loop over all the suppliers that have metered the application and update their + // max amount of stake they can consume. + for _, sessionRelayMeter := range rmtr.sessionToRelayMeterMap { + if sessionRelayMeter.app.Address != app.Address { + continue + } + sessionRelayMeter.app.Stake = app.GetStake() + appStakeShare := getAppStakePortionPayableToSessionSupplier( + app.GetStake(), + sessionRelayMeter.sharedParams, + sessionRelayMeter.numSuppliersPerSession, + ) + sessionRelayMeter.maxCoin = appStakeShare + } +} + +// ensureRequestSessionRelayMeter ensures that the relay miner has a relay meter +// ready for monitoring the requests's application's consumption. +func (rmtr *ProxyRelayMeter) ensureRequestSessionRelayMeter(ctx context.Context, reqMeta servicetypes.RelayRequestMetadata) (*sessionRelayMeter, error) { + appAddress := reqMeta.GetSessionHeader().GetApplicationAddress() + sessionId := reqMeta.GetSessionHeader().GetSessionId() + + relayMeter, ok := rmtr.sessionToRelayMeterMap[sessionId] + // If the application is seen for the first time in this session, calculate the + // max amount of stake the application can consume. + if !ok { + var app apptypes.Application + app, err := rmtr.applicationQuerier.GetApplication(ctx, appAddress) + if err != nil { + return nil, err + } + + // In order to prevent over-servicing, the protocol must split the application's stake + // among all the suppliers that are serving it. + if len(app.ServiceConfigs) != 1 { + return nil, ErrRelayerProxyInvalidSession.Wrapf( + "application %q has %d service configs, expected 1", + appAddress, + len(app.ServiceConfigs), + ) + } + + sharedParams, err := rmtr.sharedQuerier.GetParams(ctx) + if err != nil { + return nil, err + } + + service, err := rmtr.serviceQuerier.GetService(ctx, reqMeta.SessionHeader.ServiceId) + if err != nil { + return nil, err + } + + serviceRelayDifficulty, err := rmtr.serviceQuerier.GetServiceRelayDifficulty(ctx, service.Id) + if err != nil { + return nil, err + } + + sessionParams, err := rmtr.sessionQuerier.GetParams(ctx) + if err != nil { + return nil, err + } + + // calculate the max amount of stake the application can consume in the current session. + supplierAppStake := getAppStakePortionPayableToSessionSupplier( + app.GetStake(), + sharedParams, + sessionParams.GetNumSuppliersPerSession(), + ) + relayMeter = &sessionRelayMeter{ + app: app, + consumedCoin: cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 0), + maxCoin: supplierAppStake, + sessionHeader: reqMeta.SessionHeader, + sharedParams: sharedParams, + service: &service, + serviceRelayDifficulty: serviceRelayDifficulty, + numSuppliersPerSession: sessionParams.GetNumSuppliersPerSession(), + } + + rmtr.sessionToRelayMeterMap[sessionId] = relayMeter + } + + return relayMeter, nil +} + +// filterTypedEvents filters the provided events bytes for the typed event T. +// T is then filtered by the provided filter function. +func filterTypedEvents[T proto.Message]( + ctx context.Context, + eventBzObs client.EventsBytesObservable, + filterFn func(T) bool, +) observable.Observable[T] { + eventObs, eventCh := channel.NewObservable[T]() + channel.ForEach(ctx, eventBzObs, func(ctx context.Context, maybeTxBz either.Bytes) { + if maybeTxBz.IsError() { + return + } + txBz, _ := maybeTxBz.ValueOrError() + + // Try to deserialize the provided bytes into an abci.TxResult. + txResult, err := tx.UnmarshalTxResult(txBz) + if err != nil { + return + } + + for _, event := range txResult.Result.Events { + eventApplicationStakedType := cosmostypes.MsgTypeURL(*new(T)) + if strings.Trim(event.GetType(), "/") != strings.Trim(eventApplicationStakedType, "/") { + continue + } + + typedEvent, err := cosmostypes.ParseTypedEvent(event) + if err != nil { + return + } + + castedEvent, ok := typedEvent.(T) + if !ok { + return + } + + // Apply the filter function to the typed event. + if filterFn == nil || filterFn(castedEvent) { + eventCh <- castedEvent + return + } + } + }) + + return eventObs +} + +// getSingleMinedRelayCostCoin returns the cost of a relay based on the shared parameters and the service. +// relayCost = Compute Units Per Relay (CUPR) * Compute Units To Token Multiplier (CUTTM) * relayDifficultyMultiplier +func getSingleMinedRelayCostCoin( + sharedParams *sharedtypes.Params, + service *sharedtypes.Service, + relayMiningDifficulty servicetypes.RelayMiningDifficulty, +) (cosmostypes.Coin, error) { + // Get the difficulty multiplier based on the relay mining difficulty. + difficultyTargetHash := relayMiningDifficulty.GetTargetHash() + difficultyMultiplier := protocol.GetRelayDifficultyMultiplier(difficultyTargetHash) + + // Get the estimated cost of the relay if it gets mined. + relayCostAmt := service.ComputeUnitsPerRelay * sharedParams.GetComputeUnitsToTokensMultiplier() + relayCostRat := big.NewRat(int64(relayCostAmt), 1) + estimatedRelayCostRat := big.NewRat(0, 1).Mul(relayCostRat, difficultyMultiplier) + estimatedRelayCost := big.NewInt(0).Quo(estimatedRelayCostRat.Num(), estimatedRelayCostRat.Denom()) + + estimatedRelayCostCoin := cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewIntFromBigInt(estimatedRelayCost)) + + return estimatedRelayCostCoin, nil +} + +// getAppStakePortionPayableToSessionSupplier returns the portion of the application +// stake that can be consumed per supplier per session. +func getAppStakePortionPayableToSessionSupplier( + stake *cosmostypes.Coin, + sharedParams *sharedtypes.Params, + numSuppliersPerSession uint64, +) cosmostypes.Coin { + appStakePerSupplier := stake.Amount.Quo(math.NewInt(int64(numSuppliersPerSession))) + + // Calculate the number of pending sessions that might consume the application's stake. + numBlocksPerSession := int64(sharedParams.GetNumBlocksPerSession()) + numBlocksUntilProofWindowCloses := sharedtypes.GetSessionEndToProofWindowCloseBlocks(sharedParams) + pendingSessions := (numBlocksUntilProofWindowCloses + numBlocksPerSession - 1) / numBlocksPerSession + + appStakePerSessionSupplier := appStakePerSupplier.Quo(math.NewInt(pendingSessions)) + appStakePerSessionSupplierCoin := cosmostypes.NewCoin(volatile.DenomuPOKT, appStakePerSessionSupplier) + + return appStakePerSessionSupplierCoin +} + +// shouldLogOverServicing returns true if the number of occurrences is a power of 2. +// This is used to log the over-servicing warning with an exponential backoff. +func shouldLogOverServicing(occurrence uint64) bool { + return (occurrence & (occurrence - 1)) == 0 +} diff --git a/pkg/relayer/proxy/server_builder.go b/pkg/relayer/proxy/server_builder.go index b59b1d490..e386aa68d 100644 --- a/pkg/relayer/proxy/server_builder.go +++ b/pkg/relayer/proxy/server_builder.go @@ -128,6 +128,7 @@ func (rp *relayerProxy) initializeProxyServers() (proxyServerMap map[string]rela serverConfig, rp.servedRelaysPublishCh, rp, + rp.relayMeter, ) default: return nil, ErrRelayerProxyUnsupportedTransportType diff --git a/pkg/relayer/proxy/synchronous.go b/pkg/relayer/proxy/synchronous.go index df3bb8e35..66cde7e6d 100644 --- a/pkg/relayer/proxy/synchronous.go +++ b/pkg/relayer/proxy/synchronous.go @@ -47,6 +47,10 @@ type synchronousRPCServer struct { // servedRelaysProducer is a channel that emits the relays that have been served, allowing // the servedRelays observable to fan-out notifications to its subscribers. servedRelaysProducer chan<- *types.Relay + + // relayMeter is the relay meter that the RelayServer uses to meter the relays and claim the relay price. + // It is used to ensure that the relays are metered and priced correctly. + relayMeter relayer.RelayMeter } // NewSynchronousServer creates a new HTTP server that listens for incoming @@ -61,6 +65,7 @@ func NewSynchronousServer( serverConfig *config.RelayMinerServerConfig, servedRelaysProducer chan<- *types.Relay, proxy relayer.RelayerProxy, + relayMeter relayer.RelayMeter, ) relayer.RelayServer { return &synchronousRPCServer{ logger: logger, @@ -68,6 +73,7 @@ func NewSynchronousServer( relayerProxy: proxy, servedRelaysProducer: servedRelaysProducer, serverConfig: serverConfig, + relayMeter: relayMeter, } } @@ -240,6 +246,14 @@ func (sync *synchronousRPCServer) serveHTTP( return nil, err } + // Optimistically accumulate the relay reward before actually serving the relay. + // The relay price will be deducted from the application's stake before the relay is served. + // If the relay comes out to be not reward / volume applicable, the miner will refund the + // claimed price back to the application. + if err := sync.relayMeter.AccumulateRelayReward(ctx, relayRequest.Meta); err != nil { + return nil, err + } + // Deserialize the relay request payload to get the upstream HTTP request. poktHTTPRequest, err := sdktypes.DeserializeHTTPRequest(relayRequest.Payload) if err != nil { diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index 243862218..bf83cf0ad 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -21,13 +21,6 @@ import ( var _ relayer.SessionTree = (*sessionTree)(nil) // sessionTree is an implementation of the SessionTree interface. -// TODO_BETA(@red-0ne): Per the Relay Mining paper, we need to optimistically store -// the number of requests that an application can pay for. This needs to be tracked -// based on the app's stake in the beginning of a session and the number of nodes -// per session. An operator should be able to specify "overservicing_compute_units_limit" -// whereby an upper bound on how much it can overserviced an application is set. The -// default value for this should be -1, implying "unlimited". -// Ref discussion: https://github.com/pokt-network/poktroll/pull/755#discussion_r1737287860 type sessionTree struct { logger polylog.Logger @@ -75,15 +68,6 @@ type sessionTree struct { // NewSessionTree creates a new sessionTree from a Session and a storePrefix. It also takes a function // removeFromRelayerSessions that removes the sessionTree from the RelayerSessionsManager. // It returns an error if the KVStore fails to be created. -// -// TODO_BETA(@red-0ne): When starting a new session, check what the MaxClaimableAmount -// (in uPOKT) by the Supplier as a function of -// (app_stake, compute_units_per_relay_for_service, global_compute_units_to_token_multiplier). -// TODO_CONFIG_NOTE: Whether or not the RelayMiner stop handling requests when the max is reached should be -// configurable by the operator. -// TODO_ERROR_NOTE: If overservicing is set to false, create a new error that the relay is rejected -// specifically because the supplier has reached the max claimable amount, so the caller should relay -// the request to another supplier. func NewSessionTree( sessionHeader *sessiontypes.SessionHeader, supplierOperatorAddress *cosmostypes.AccAddress, diff --git a/x/proof/keeper/proof_validation.go b/x/proof/keeper/proof_validation.go index 778d66d8c..850c97f5b 100644 --- a/x/proof/keeper/proof_validation.go +++ b/x/proof/keeper/proof_validation.go @@ -251,10 +251,6 @@ func (k Keeper) validateClosestPath( // be received before proceeding. proofPathSeedBlockHash := k.sessionKeeper.GetBlockHash(ctx, earliestSupplierProofCommitHeight-1) - // TODO_BETA(@red-0ne): Investigate "proof for the path provided does not match one expected by the on-chain protocol" - // error that may occur due to block height differing from the off-chain part. - k.logger.Info("E2E_DEBUG: height for block hash when verifying the proof", earliestSupplierProofCommitHeight, sessionHeader.GetSessionId()) - expectedProofPath := protocol.GetPathForProof(proofPathSeedBlockHash, sessionHeader.GetSessionId()) if !bytes.Equal(proof.Path, expectedProofPath) { return types.ErrProofInvalidProof.Wrapf( diff --git a/x/shared/types/session.go b/x/shared/types/session.go index ea75a0e3b..355d83c1e 100644 --- a/x/shared/types/session.go +++ b/x/shared/types/session.go @@ -176,3 +176,16 @@ func GetSettlementSessionEndHeight(sharedParams *Params, queryHeight int64) int6 return GetSessionEndToProofWindowCloseBlocks(sharedParams) + GetSessionEndHeight(sharedParams, queryHeight) + 1 } + +// GetNumPendingSessions returns the number of pending sessions (i.e. that have not +// yet been settled). +func GetNumPendingSessions(sharedParams *Params) int64 { + // Get the number of blocks between the end of a session and the block height + // at which the session claim is settled. + numPendingSessionsBlocks := GetSessionEndToProofWindowCloseBlocks(sharedParams) + // Use the number of blocks per session to calculate the number of pending sessions. + numBlocksPerSession := int64(sharedParams.GetNumBlocksPerSession()) + // numBlocksPerSession - 1 is added to round up the integer division so that pending + // sessions are all the sessions that have their end height at least `pendingBlocks` old. + return (numPendingSessionsBlocks + numBlocksPerSession - 1) / numBlocksPerSession +} diff --git a/x/tokenomics/keeper/settle_pending_claims.go b/x/tokenomics/keeper/settle_pending_claims.go index 1f028c365..cc660f152 100644 --- a/x/tokenomics/keeper/settle_pending_claims.go +++ b/x/tokenomics/keeper/settle_pending_claims.go @@ -203,6 +203,14 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) ( appAddress := claim.GetSessionHeader().GetApplicationAddress() applicationInitialStake := applicationInitialStakeMap[appAddress] + // TODO_MAINNET(@red-0ne): Add tests to ensure that a zero application stake + // is handled correctly. + if applicationInitialStake.IsZero() { + logger.Error(fmt.Sprintf("application %q has a zero initial stake", appAddress)) + + continue + } + // Manage the mint & burn accounting for the claim. if err = k.ProcessTokenLogicModules(ctx, &claim, applicationInitialStake); err != nil { logger.Error(fmt.Sprintf("error processing token logic modules for claim %q: %v", claim.SessionHeader.SessionId, err)) diff --git a/x/tokenomics/keeper/token_logic_modules.go b/x/tokenomics/keeper/token_logic_modules.go index 7968579c9..25062481a 100644 --- a/x/tokenomics/keeper/token_logic_modules.go +++ b/x/tokenomics/keeper/token_logic_modules.go @@ -279,13 +279,25 @@ func (k Keeper) ProcessTokenLogicModules( // Ensure the claim amount is within the limits set by Relay Mining. // If not, update the settlement amount and emit relevant events. - actualSettlementCoin, err := k.ensureClaimAmountLimits(ctx, logger, &application, &supplier, claimSettlementCoin, applicationInitialStake) + // TODO_MAINNET(@red-0ne): Consider pulling this out of Keeper#ProcessTokenLogicModules + // and ensure claim amount limits are enforced before TLM processing. + actualSettlementCoin, err := k.ensureClaimAmountLimits(ctx, logger, &sharedParams, &application, &supplier, claimSettlementCoin, applicationInitialStake) if err != nil { return err } logger = logger.With("actual_settlement_upokt", actualSettlementCoin) logger.Info(fmt.Sprintf("About to start processing TLMs for (%d) compute units, equal to (%s) claimed", numClaimComputeUnits, actualSettlementCoin)) + // TODO_MAINNET(@red-0ne): Add tests to ensure that a zero settlement coin + // due to integer division rounding is handled correctly. + if actualSettlementCoin.Amount.IsZero() { + logger.Warn(fmt.Sprintf( + "actual settlement coin is zero, skipping TLM processing, application %q stake %s", + application.Address, application.Stake, + )) + return nil + } + // Execute all the token logic modules processors for tlm, tlmProcessor := range tokenLogicModuleProcessorMap { logger.Info(fmt.Sprintf("Starting TLM processing: %q", tlm)) @@ -685,6 +697,7 @@ func (k Keeper) sendRewardsToAccount( func (k Keeper) ensureClaimAmountLimits( ctx context.Context, logger log.Logger, + sharedParams *sharedtypes.Params, application *apptypes.Application, supplier *sharedtypes.Supplier, claimSettlementCoin cosmostypes.Coin, @@ -695,8 +708,6 @@ func (k Keeper) ensureClaimAmountLimits( ) { logger = logger.With("helper", "ensureClaimAmountLimits") - // TODO_BETA(@red-0ne): Make relay miners use the appStake at the beginning - // of a session to determine the maximum amount they can claim. // Note that this also incorporates MintPerClaimGlobalInflation since applications // are being overcharged by that amount and the funds are sent to the DAO/PNF // before being reimbursed to the application in the future. @@ -709,26 +720,20 @@ func (k Keeper) ensureClaimAmountLimits( minRequiredAppStakeAmt := claimSettlementCoin.Amount.Add(globalInflationAmt) totalClaimedCoin := sdk.NewCoin(volatile.DenomuPOKT, minRequiredAppStakeAmt) - // TODO_BETA(@red-0ne): Introduce a session sliding window to account for potential consumption - // during the current session (i.e. Not the session being settled) such as: - // maxClaimableAmt = (AppStake / (currSessionNum - settlingSessionNum + 1) / NumSuppliersPerSession) - // In conjunction with single service applications, this would make maxClaimableAmt - // effectively addressing the issue of over-servicing. - // Example: - // - Current session num: 3 - // - Settling session num: 2 - // - Application already requested work for session 3 - // Problem: - // - If the application consumes its entire stake in settlement of session 2 - // - Then over-servicing in session 3 (i.e. No stake left to consume) - // Solution: - // - By dividing the claimable stake by 2 (3 - 2 + 1), settling session 2 assumes that - // the application will consume its maxClaimableAmt the current session (3). - // - Off-chain actors could use this formula during the servicing of session num 3 - // and assume maxClaimableAmt will be settled in session 2. - // - Guarantee no over-servicing at the cost of higher application stake requirements. + // get the number of pending sessions that share the application stake at claim time + // This is used to calculate the maximum claimable amount for the supplier within a session. + numPendingSessions := sharedtypes.GetNumPendingSessions(sharedParams) + + // The maximum any single supplier can claim is a fraction of the app's total stake + // divided by the number of suppliers per session. + // Re decentralization - This ensures the app biases towards using all suppliers in a session. + // Re costs - This is an easy way to split the stake evenly. + // TODO_FUTURE: See if there's a way to let the application prefer (the best) + // supplier(s) in a session while maintaining a simple solution to implement this. numSuppliersPerSession := int64(k.sessionKeeper.GetParams(ctx).NumSuppliersPerSession) - maxClaimableAmt := appStake.Amount.Quo(math.NewInt(numSuppliersPerSession)) + maxClaimableAmt := appStake.Amount. + Quo(math.NewInt(numSuppliersPerSession)). + Quo(math.NewInt(numPendingSessions)) maxClaimSettlementAmt := supplierAppStakeToMaxSettlementAmount(maxClaimableAmt) // Check if the claimable amount is capped by the max claimable amount. diff --git a/x/tokenomics/keeper/token_logic_modules_test.go b/x/tokenomics/keeper/token_logic_modules_test.go index a2d6fdf30..d4fedf314 100644 --- a/x/tokenomics/keeper/token_logic_modules_test.go +++ b/x/tokenomics/keeper/token_logic_modules_test.go @@ -65,9 +65,9 @@ func TestProcessTokenLogicModules_TLMBurnEqualsMint_Valid(t *testing.T) { supplierModuleAddress := authtypes.NewModuleAddress(suppliertypes.ModuleName).String() // Set compute_units_to_tokens_multiplier to simplify expectation calculations. - err := keepers.SharedKeeper.SetParams(ctx, sharedtypes.Params{ - ComputeUnitsToTokensMultiplier: globalComputeUnitsToTokensMultiplier, - }) + sharedParams := keepers.SharedKeeper.GetParams(ctx) + sharedParams.ComputeUnitsToTokensMultiplier = globalComputeUnitsToTokensMultiplier + err := keepers.SharedKeeper.SetParams(ctx, sharedParams) require.NoError(t, err) // TODO_TECHDEBT: Setting inflation to zero so we are testing the BurnEqualsMint logic exclusively. // Once it is a governance param, update it using the keeper above. @@ -196,9 +196,9 @@ func TestProcessTokenLogicModules_TLMBurnEqualsMint_Valid_SupplierExceedsMaxClai supplierModuleAddress := authtypes.NewModuleAddress(suppliertypes.ModuleName).String() // Set compute_units_to_tokens_multiplier to simplify expectation calculations. - err := keepers.SharedKeeper.SetParams(ctx, sharedtypes.Params{ - ComputeUnitsToTokensMultiplier: globalComputeUnitsToTokensMultiplier, - }) + sharedParams := keepers.SharedKeeper.GetParams(ctx) + sharedParams.ComputeUnitsToTokensMultiplier = globalComputeUnitsToTokensMultiplier + err := keepers.SharedKeeper.SetParams(ctx, sharedParams) require.NoError(t, err) // TODO_TECHDEBT: Setting inflation to zero so we are testing the BurnEqualsMint logic exclusively. // Once it is a governance param, update it using the keeper above. @@ -330,9 +330,9 @@ func TestProcessTokenLogicModules_TLMGlobalMint_Valid_MintDistributionCorrect(t keepers.SetService(ctx, *service) // Set compute_units_to_tokens_multiplier to simplify expectation calculations. - err := keepers.SharedKeeper.SetParams(ctx, sharedtypes.Params{ - ComputeUnitsToTokensMultiplier: globalComputeUnitsToTokensMultiplier, - }) + sharedParams := keepers.SharedKeeper.GetParams(ctx) + sharedParams.ComputeUnitsToTokensMultiplier = globalComputeUnitsToTokensMultiplier + err := keepers.SharedKeeper.SetParams(ctx, sharedParams) require.NoError(t, err) // Add a new application with non-zero app stake end balance to assert against.