Skip to content

Commit

Permalink
[RelayMiner] Supplier rate limiting (#895)
Browse files Browse the repository at this point in the history
## Summary

This PR adds an optimistic `RelayMeter` to the `RelayMiner` to monitor
the consumed stake and stops servicing when `maxStake` is reached.

It works by
* Intercepting `RelayRequest`s before being served.
* Assume the relay will be mined and deduce the corresponding amount
from the consumed stake.
* Send a `rate-limited` error if app stake share cannot cover of a
minable `RelayRequest`.
* After a relay has been served check if the `Relay` is volume
applicable and revert the initial deduction if it is not.
* Updates the known `Application`s stake if an `EventApplicationStaked`
is observed.
* Resets the `Application`s stakes at the beginning of every session.

## Issue

`RelayMiner`s need to know how much of the `Application`'s stake they
are allowed to consume without over-servicing.

![image](https://github.com/user-attachments/assets/4354c747-23a6-4b44-a63a-62a13617d796)

## Type of change

Select one or more from the following:

- [x] New feature, functionality or library

## Testing

- [x] **Unit Tests**: `make go_develop_and_test`
- [x] **LocalNet E2E Tests**: `make test_e2e`
- [ ] **DevNet E2E Tests**: Add the `devnet-test-e2e` label to the PR.

## Sanity Checklist

- [x] I have tested my changes using the available tooling
- [x] I have commented my code
- [x] I have performed a self-review of my own code; both comments &
source code

---------

Co-authored-by: Daniel Olshansky <olshansky.daniel@gmail.com>
  • Loading branch information
red-0ne and Olshansk authored Nov 13, 2024
1 parent 547fbbb commit fac4d6f
Show file tree
Hide file tree
Showing 18 changed files with 657 additions and 59 deletions.
6 changes: 4 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,15 @@ genesis:
denom: upokt
bank:
supply:
- amount: "1102000204"
- amount: "1202000272"
denom: upokt
balances:
# Application module
- address: pokt1rl3gjgzexmplmds3tq3r3yk84zlwdl6djzgsvm
coins:
- amount: "100000068" # MUST BE equal to the total of all app stakes below
# TODO_MAINNET(@olshansk): Pass config.yml into ChatGPT to build a script
# that ensures the amounts line up
- amount: "200000136" # MUST BE equal to the total of all app stakes below
denom: upokt
# Supplier module
- address: pokt1j40dzzmn6cn9kxku7a5tjnud6hv37vesr5ccaa
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ type SessionQueryClient interface {
serviceId string,
blockHeight int64,
) (*sessiontypes.Session, error)

// GetParams queries the chain for the session module parameters.
GetParams(ctx context.Context) (*sessiontypes.Params, error)
}

// SharedQueryClient defines an interface that enables the querying of the
Expand Down
10 changes: 10 additions & 0 deletions pkg/client/query/sessionquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,13 @@ func (sessq *sessionQuerier) GetSession(
}
return res.Session, nil
}

// GetParams queries & returns the session module on-chain parameters.
func (sessq *sessionQuerier) GetParams(ctx context.Context) (*sessiontypes.Params, error) {
req := &sessiontypes.QueryParamsRequest{}
res, err := sessq.sessionQuerier.Params(ctx, req)
if err != nil {
return nil, ErrQuerySessionParams.Wrapf("[%v]", err)
}
return &res.Params, nil
}
20 changes: 18 additions & 2 deletions pkg/relayer/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,13 @@ func setupRelayerDependencies(
config.NewSupplyDelegationClientFn(), // leaf
config.NewSupplySharedQueryClientFn(), // leaf
config.NewSupplyServiceQueryClientFn(),
config.NewSupplyApplicationQuerierFn(),
config.NewSupplySessionQuerierFn(),
supplyRelayMeter,
supplyMiner,
config.NewSupplyAccountQuerierFn(),
config.NewSupplyBankQuerierFn(),
config.NewSupplyApplicationQuerierFn(),
config.NewSupplySupplierQuerierFn(),
config.NewSupplySessionQuerierFn(),
config.NewSupplyProofQueryClientFn(),
config.NewSupplyRingCacheFn(),
supplyTxFactory,
Expand Down Expand Up @@ -231,6 +232,21 @@ func supplyMiner(
return depinject.Configs(deps, depinject.Supply(mnr)), nil
}

// supplyRelayMeter constructs a RelayMeter instance and returns a new
// depinject.Config which is supplied with the given deps and the new RelayMeter.
func supplyRelayMeter(
_ context.Context,
deps depinject.Config,
_ *cobra.Command,
) (depinject.Config, error) {
rm, err := proxy.NewRelayMeter(deps)
if err != nil {
return nil, err
}

return depinject.Configs(deps, depinject.Supply(rm)), nil
}

// supplyTxFactory constructs a cosmostx.Factory instance and returns a new
// depinject.Config which is supplied with the given deps and the new
// cosmostx.Factory.
Expand Down
27 changes: 27 additions & 0 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//go:generate mockgen -destination=../../testutil/mockrelayer/relayer_proxy_mock.go -package=mockrelayer . RelayerProxy
//go:generate mockgen -destination=../../testutil/mockrelayer/miner_mock.go -package=mockrelayer . Miner
//go:generate mockgen -destination=../../testutil/mockrelayer/relayer_sessions_manager_mock.go -package=mockrelayer . RelayerSessionsManager
//go:generate mockgen -destination=../../testutil/mockrelayer/relay_meter_mock.go -package=mockrelayer . RelayMeter

package relayer

Expand Down Expand Up @@ -162,3 +163,29 @@ type SessionTree interface {
// GetTrieSpec returns the trie spec of the SMST.
GetTrieSpec() smt.TrieSpec
}

// RelayMeter is an interface that keeps track of the amount of stake consumed between
// a single onchain Application and a single onchain Supplier over the course of a single session.
// It enables the RelayMiner to rate limit the number of requests handled offchain as a function
// of the optimistic onchain rate limits.
type RelayMeter interface {
// Start starts the relay meter.
Start(ctx context.Context) error

// AccumulateRelayReward adds the relay reward from the incoming request to session's accumulator.
// The relay cost is added optimistically, assuming that the relay WILL be volume / reward applicable.
//
// The reason why optimistic AccumulateRelayReward + SetNonApplicableRelayReward is used instead of
// a simpler AccumulateVolumeApplicableRelayReward is that when the relay is first seen
// we don't know if it will be volume / reward applicable until it is served.
//
// To rate limit or not the current relay, we need to always optimistically account all relays as being
// volume / reward applicable.
AccumulateRelayReward(ctx context.Context, relayRequestMeta servicetypes.RelayRequestMetadata) error

// SetNonApplicableRelayReward updates the relay meter for the given relay request as
// non-applicable between a single Application and a single Supplier for a single session.
// The volume / reward applicability of the relay is unknown to the relay miner
// until the relay is served and the relay response signed.
SetNonApplicableRelayReward(ctx context.Context, relayRequestMeta servicetypes.RelayRequestMetadata) error
}
24 changes: 22 additions & 2 deletions pkg/relayer/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type miner struct {
// serviceQueryClient is used to query for the relay difficulty target hash of a service.
// relay_difficulty is the target hash which a relay hash must be less than to be volume/reward applicable.
serviceQueryClient client.ServiceQueryClient
relayMeter relayer.RelayMeter
}

// NewMiner creates a new miner from the given dependencies and options. It
Expand All @@ -43,7 +44,7 @@ func NewMiner(
) (*miner, error) {
mnr := &miner{}

if err := depinject.Inject(deps, &mnr.serviceQueryClient); err != nil {
if err := depinject.Inject(deps, &mnr.serviceQueryClient, &mnr.relayMeter); err != nil {
return nil, err
}

Expand Down Expand Up @@ -88,18 +89,27 @@ func (mnr *miner) mapMineRelay(
) (_ either.Either[*relayer.MinedRelay], skip bool) {
relayBz, err := relay.Marshal()
if err != nil {
if relayMeteringResult := mnr.unclaimRelayUPOKT(ctx, *relay); relayMeteringResult.IsError() {
return relayMeteringResult, false
}
return either.Error[*relayer.MinedRelay](err), false
}
relayHashArr := protocol.GetRelayHashFromBytes(relayBz)
relayHash := relayHashArr[:]

relayDifficultyTargetHash, err := mnr.getServiceRelayDifficultyTargetHash(ctx, relay.Req)
if err != nil {
return either.Error[*relayer.MinedRelay](err), false
if relayMeteringResult := mnr.unclaimRelayUPOKT(ctx, *relay); relayMeteringResult.IsError() {
return relayMeteringResult, true
}
return either.Error[*relayer.MinedRelay](err), true
}

// The relay IS NOT volume / reward applicable
if !protocol.IsRelayVolumeApplicable(relayHash, relayDifficultyTargetHash) {
if eitherMeteringResult := mnr.unclaimRelayUPOKT(ctx, *relay); eitherMeteringResult.IsError() {
return eitherMeteringResult, true
}
return either.Success[*relayer.MinedRelay](nil), true
}

Expand Down Expand Up @@ -135,3 +145,13 @@ func (mnr *miner) getServiceRelayDifficultyTargetHash(ctx context.Context, req *

return serviceRelayDifficulty.GetTargetHash(), nil
}

// unclaimRelayUPOKT unclaims the relay UPOKT reward for the relay.
// It returns an either.Error if the relay UPOKT reward could not be unclaimed.
func (mnr *miner) unclaimRelayUPOKT(ctx context.Context, relay servicetypes.Relay) either.Either[*relayer.MinedRelay] {
if err := mnr.relayMeter.SetNonApplicableRelayReward(ctx, relay.GetReq().GetMeta()); err != nil {
return either.Error[*relayer.MinedRelay](err)
}

return either.Success[*relayer.MinedRelay](nil)
}
19 changes: 18 additions & 1 deletion pkg/relayer/miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"time"

"cosmossdk.io/depinject"
gomock "github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/crypto/protocol"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/pkg/relayer/miner"
"github.com/pokt-network/poktroll/testutil/mockrelayer"
"github.com/pokt-network/poktroll/testutil/testclient/testqueryclients"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
)
Expand Down Expand Up @@ -48,8 +50,9 @@ func TestMiner_MinedRelays(t *testing.T) {

testqueryclients.SetServiceRelayDifficultyTargetHash(t, testSvcId, testRelayMiningTargetHash)
serviceQueryClientMock := testqueryclients.NewTestServiceQueryClient(t)
relayMeterMock := newMockRelayMeter(t)

deps := depinject.Supply(serviceQueryClientMock)
deps := depinject.Supply(serviceQueryClientMock, relayMeterMock)
mnr, err := miner.NewMiner(deps)
require.NoError(t, err)

Expand Down Expand Up @@ -154,3 +157,17 @@ func unmarshalHexMinedRelay(
Hash: relayHash,
}
}

// newMockRelayMeter returns a mock RelayMeter that is used by the relay miner to claim and unclaim relays.
func newMockRelayMeter(t *testing.T) relayer.RelayMeter {
t.Helper()

ctrl := gomock.NewController(t)
relayMeter := mockrelayer.NewMockRelayMeter(ctrl)

relayMeter.EXPECT().Start(gomock.Any()).Return(nil).AnyTimes()
relayMeter.EXPECT().AccumulateRelayReward(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
relayMeter.EXPECT().SetNonApplicableRelayReward(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

return relayMeter
}
3 changes: 3 additions & 0 deletions pkg/relayer/proxy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ var (
ErrRelayerProxyUnsupportedTransportType = sdkerrors.Register(codespace, 9, "unsupported proxy transport type")
ErrRelayerProxyInternalError = sdkerrors.Register(codespace, 10, "internal error")
ErrRelayerProxyMissingSupplierOperatorAddress = sdkerrors.Register(codespace, 11, "supplier operator address is missing")
ErrRelayerProxyUnknownSession = sdkerrors.Register(codespace, 12, "relayer proxy encountered unknown session")
ErrRelayerProxyRateLimited = sdkerrors.Register(codespace, 13, "offchain rate limit hit by relayer proxy")
ErrRelayerProxyUnclaimRelayPrice = sdkerrors.Register(codespace, 14, "failed to unclaim relay price")
)
21 changes: 20 additions & 1 deletion pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type relayerProxy struct {
// 1. Relay verification to check if the incoming relay matches the supplier hosted by the relay miner;
// 2. Relay signing to resolve which keyring key name to use for signing;
OperatorAddressToSigningKeyNameMap map[string]string

// relayMeter keeps track of the total amount of stake an onchhain Application
// will owe an onchain Supplier (backed by this RelayMiner) once the session settles.
// It also configures application over-servicing allowance.
relayMeter relayer.RelayMeter
}

// NewRelayerProxy creates a new relayer proxy with the given dependencies or returns
Expand All @@ -86,9 +91,15 @@ type relayerProxy struct {
// Required dependencies:
// - cosmosclient.Context
// - client.BlockClient
// - crypto.RingCache
// - client.SupplierQueryClient
// - client.SessionQueryClient
// - client.SharedQueryClient
// - client.SupplierQueryClient
// - keyring.Keyring
// - client.SharedQueryClient
// - client.ApplicationQueryClient
// - client.ServiceQueryClient
// - client.EventsQueryClient
//
// Available options:
// - WithSigningKeyNames
Expand All @@ -108,6 +119,7 @@ func NewRelayerProxy(
&rp.sessionQuerier,
&rp.sharedQuerier,
&rp.keyring,
&rp.relayMeter,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,6 +155,13 @@ func (rp *relayerProxy) Start(ctx context.Context) error {
// Start the ring cache.
rp.ringCache.Start(ctx)

// Start the relay meter by subscribing to the on-chain events.
// This function is non-blocking and the subscription cancellation is handled
// by the context passed to the Start method.
if err := rp.relayMeter.Start(ctx); err != nil {
return err
}

startGroup, ctx := errgroup.WithContext(ctx)

for _, relayServer := range rp.servers {
Expand Down
Loading

0 comments on commit fac4d6f

Please sign in to comment.