diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 821698c53..4810bb67a 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -162,7 +162,9 @@ func setupAppGateServerDependencies( config.NewSupplyQueryClientContextFn(queryNodeURL.String()), // leaf config.NewSupplyAccountQuerierFn(), // leaf config.NewSupplyApplicationQuerierFn(), // leaf + config.NewSupplySessionQuerierFn(), // leaf config.NewSupplyRingCacheFn(), + config.NewSupplyPOKTRollSDKFn(queryNodeURL, appGateConfig.SigningKey), } return config.SupplyConfig(ctx, cmd, supplierFuncs) diff --git a/pkg/appgateserver/endpoint_selector.go b/pkg/appgateserver/endpoint_selector.go index 55be33460..821b18018 100644 --- a/pkg/appgateserver/endpoint_selector.go +++ b/pkg/appgateserver/endpoint_selector.go @@ -2,10 +2,8 @@ package appgateserver import ( "context" - "net/url" - "github.com/pokt-network/poktroll/pkg/polylog" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" + "github.com/pokt-network/poktroll/pkg/sdk" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) @@ -18,34 +16,20 @@ func (app *appGateServer) getRelayerUrl( ctx context.Context, serviceId string, rpcType sharedtypes.RPCType, - session *sessiontypes.Session, -) (supplierUrl *url.URL, supplierAddress string, err error) { - logger := polylog.Ctx(ctx) - - for _, supplier := range session.Suppliers { - for _, service := range supplier.Services { - // Skip services that don't match the requested serviceId. - if service.Service.Id != serviceId { - continue - } + supplierEndpoints []*sdk.SingleSupplierEndpoint, +) (supplierEndpoint *sdk.SingleSupplierEndpoint, err error) { + for _, supplierEndpoint := range supplierEndpoints { + // Skip services that don't match the requested serviceId. + if supplierEndpoint.Header.Service.Id != serviceId { + continue + } - for _, endpoint := range service.Endpoints { - // Return the first endpoint url that matches the request's RpcType. - if endpoint.RpcType == rpcType { - supplierUrl, err := url.Parse(endpoint.Url) - if err != nil { - logger.Error(). - Str("url", endpoint.Url). - Err(err). - Msg("failed to parse url") - continue - } - return supplierUrl, supplier.Address, nil - } - } + // Return the first endpoint url that matches the request's RpcType. + if supplierEndpoint.RpcType == rpcType { + return supplierEndpoint, nil } } // Return an error if no relayer endpoints were found. - return nil, "", ErrAppGateNoRelayEndpoints + return nil, ErrAppGateNoRelayEndpoints } diff --git a/pkg/appgateserver/errors.go b/pkg/appgateserver/errors.go index c07a7843f..17f5456d6 100644 --- a/pkg/appgateserver/errors.go +++ b/pkg/appgateserver/errors.go @@ -3,14 +3,10 @@ package appgateserver import sdkerrors "cosmossdk.io/errors" var ( - codespace = "appgateserver" - ErrAppGateInvalidRelayResponseSignature = sdkerrors.Register(codespace, 1, "invalid relay response signature") - ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 2, "no relay endpoints found") - ErrAppGateInvalidRequestURL = sdkerrors.Register(codespace, 3, "invalid request URL") - ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address") - ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information") - ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint") - ErrAppGateEmptyRelayResponseMeta = sdkerrors.Register(codespace, 7, "empty relay response metadata") - ErrAppGateEmptyRelayResponseSignature = sdkerrors.Register(codespace, 8, "empty relay response signature") - ErrAppGateHandleRelay = sdkerrors.Register(codespace, 9, "internal error handling relay request") + codespace = "appgateserver" + ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 1, "no relay endpoints found") + ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 2, "missing application address") + ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 3, "missing app client signing information") + ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 4, "missing app client listening endpoint") + ErrAppGateHandleRelay = sdkerrors.Register(codespace, 5, "internal error handling relay request") ) diff --git a/pkg/appgateserver/server.go b/pkg/appgateserver/server.go index 22e4419be..f6c84a76a 100644 --- a/pkg/appgateserver/server.go +++ b/pkg/appgateserver/server.go @@ -1,27 +1,19 @@ package appgateserver import ( + "bytes" "context" "fmt" "io" "net/http" "net/url" "strings" - "sync" "cosmossdk.io/depinject" - ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" - ringtypes "github.com/athanorlabs/go-dleq/types" - cosmosclient "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/crypto/keyring" - "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" - cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" - - "github.com/pokt-network/poktroll/pkg/client" + querytypes "github.com/pokt-network/poktroll/pkg/client/query/types" - "github.com/pokt-network/poktroll/pkg/crypto" "github.com/pokt-network/poktroll/pkg/polylog" - sessiontypes "github.com/pokt-network/poktroll/x/session/types" + "github.com/pokt-network/poktroll/pkg/sdk" ) // SigningInformation is a struct that holds information related to the signing @@ -35,10 +27,6 @@ type SigningInformation struct { // private key used to sign relay requests. SigningKeyName string - // SigningKey is the scalar point on the appropriate curve corresponding to the - // signer's private key, and is used to sign relay requests via a ring signature - SigningKey ringtypes.Scalar - // AppAddress is the address of the application that the server is serving if // If it is nil, then the application address must be included in each request via a query parameter. AppAddress string @@ -56,32 +44,14 @@ type appGateServer struct { // signing information holds the signing key and application address for the server signingInformation *SigningInformation - // ringCache is used to obtain and store the ring for the application. - ringCache crypto.RingCache - // clientCtx is the client context for the application. // It is used to query for the application's account to unmarshal the supplier's account // and get the public key to verify the relay response signature. clientCtx querytypes.Context - // sessionQuerier is the querier for the session module. - // It used to get the current session for the application given a requested service. - sessionQuerier sessiontypes.QueryClient - - // sessionMu is a mutex to protect currentSession map reads and and updates. - sessionMu sync.RWMutex - - // currentSessions is the current session for the application given a block height. - // It is updated by the goListenForNewSessions goroutine. - currentSessions map[string]*sessiontypes.Session - - // accountQuerier is the querier for the account module. - // It is used to get the the supplier's public key to verify the relay response signature. - accountQuerier client.AccountQueryClient - - // blockClient is the client for the block module. - // It is used to get the current block height to query for the current session. - blockClient client.BlockClient + // sdk is the POKTRollSDK that the appGateServer uses to query for the current session + // and send relay requests to the supplier. + sdk sdk.POKTRollSDK // listeningEndpoint is the endpoint that the appGateServer will listen on. listeningEndpoint *url.URL @@ -89,10 +59,6 @@ type appGateServer struct { // server is the HTTP server that will be used capture application requests // so that they can be signed and relayed to the supplier. server *http.Server - - // accountCache is a cache of the supplier accounts that has been queried - // TODO_TECHDEBT: Add a size limit to the cache. - supplierAccountCache map[string]cryptotypes.PubKey } // NewAppGateServer creates a new appGateServer instance with the given dependencies. @@ -107,18 +73,13 @@ func NewAppGateServer( deps depinject.Config, opts ...appGateServerOption, ) (*appGateServer, error) { - app := &appGateServer{ - currentSessions: make(map[string]*sessiontypes.Session), - supplierAccountCache: make(map[string]cryptotypes.PubKey), - } + app := &appGateServer{} if err := depinject.Inject( deps, &app.logger, &app.clientCtx, - &app.blockClient, - &app.accountQuerier, - &app.ringCache, + &app.sdk, ); err != nil { return nil, err } @@ -144,18 +105,6 @@ func NewAppGateServer( app.signingInformation.AppAddress = appAddress.String() } - // Convert the key record to a private key and return the scalar - // point on the secp256k1 curve that it corresponds to. - // If the key is not a secp256k1 key, this will return an error. - signingKey, err := recordLocalToScalar(keyRecord.GetLocal()) - if err != nil { - return nil, fmt.Errorf("failed to convert private key to scalar: %w", err) - } - app.signingInformation.SigningKey = signingKey - - clientCtx := cosmosclient.Context(app.clientCtx) - - app.sessionQuerier = sessiontypes.NewQueryClient(clientCtx) app.server = &http.Server{Addr: app.listeningEndpoint.Host} return app, nil @@ -202,11 +151,11 @@ func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Re serviceId := strings.Split(path, "/")[1] // Read the request body bytes. - payloadBz, err := io.ReadAll(request.Body) + requestPayloadBz, err := io.ReadAll(request.Body) if err != nil { app.replyWithError( ctx, - payloadBz, + requestPayloadBz, writer, ErrAppGateHandleRelay.Wrapf("reading relay request body: %s", err), ) @@ -216,7 +165,7 @@ func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Re } app.logger.Debug(). Str("service_id", serviceId). - Str("payload", string(payloadBz)). + Str("payload", string(requestPayloadBz)). Msg("handling relay") // Determine the application address. @@ -225,18 +174,22 @@ func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Re appAddress = request.URL.Query().Get("senderAddr") } if appAddress == "" { - app.replyWithError(ctx, payloadBz, writer, ErrAppGateMissingAppAddress) + app.replyWithError(ctx, requestPayloadBz, writer, ErrAppGateMissingAppAddress) // TODO_TECHDEBT: log additional info? app.logger.Error().Msg("no application address provided") + return } + // Put the request body bytes back into the request body. + request.Body = io.NopCloser(bytes.NewBuffer(requestPayloadBz)) + // TODO(@h5law, @red0ne): Add support for asynchronous relays, and switch on // the request type here. // TODO_RESEARCH: Should this be started in a goroutine, to allow for // concurrent requests from numerous applications? - if err := app.handleSynchronousRelay(ctx, appAddress, serviceId, payloadBz, request, writer); err != nil { + if err := app.handleSynchronousRelay(ctx, appAddress, serviceId, requestPayloadBz, request, writer); err != nil { // Reply with an error response if there was an error handling the relay. - app.replyWithError(ctx, payloadBz, writer, err) + app.replyWithError(ctx, requestPayloadBz, writer, err) // TODO_TECHDEBT: log additional info? app.logger.Error().Err(err).Msg("failed handling relay") return @@ -257,25 +210,4 @@ func (app *appGateServer) validateConfig() error { return nil } -// recordLocalToScalar converts the private key obtained from a -// key record to a scalar point on the secp256k1 curve -func recordLocalToScalar(local *keyring.Record_Local) (ringtypes.Scalar, error) { - if local == nil { - return nil, fmt.Errorf("cannot extract private key from key record: nil") - } - priv, ok := local.PrivKey.GetCachedValue().(cryptotypes.PrivKey) - if !ok { - return nil, fmt.Errorf("cannot extract private key from key record: %T", local.PrivKey.GetCachedValue()) - } - if _, ok := priv.(*secp256k1.PrivKey); !ok { - return nil, fmt.Errorf("unexpected private key type: %T, want %T", priv, &secp256k1.PrivKey{}) - } - crv := ring_secp256k1.NewCurve() - privKey, err := crv.DecodeToScalar(priv.Bytes()) - if err != nil { - return nil, fmt.Errorf("failed to decode private key: %w", err) - } - return privKey, nil -} - type appGateServerOption func(*appGateServer) diff --git a/pkg/appgateserver/session.go b/pkg/appgateserver/session.go deleted file mode 100644 index 0d886d8cb..000000000 --- a/pkg/appgateserver/session.go +++ /dev/null @@ -1,47 +0,0 @@ -package appgateserver - -import ( - "context" - - sessiontypes "github.com/pokt-network/poktroll/x/session/types" - sharedtypes "github.com/pokt-network/poktroll/x/shared/types" -) - -// getCurrentSession gets the current session for the given service -// It returns the current session if it exists and is still valid, otherwise it -// queries for the latest session, caches and returns it. -func (app *appGateServer) getCurrentSession( - ctx context.Context, - appAddress, serviceId string, -) (*sessiontypes.Session, error) { - app.sessionMu.RLock() - defer app.sessionMu.RUnlock() - - latestBlock := app.blockClient.LastNBlocks(ctx, 1)[0] - if currentSession, ok := app.currentSessions[serviceId]; ok { - sessionEndBlockHeight := currentSession.Header.SessionStartBlockHeight + currentSession.NumBlocksPerSession - - // Return the current session if it is still valid. - if latestBlock.Height() < sessionEndBlockHeight { - return currentSession, nil - } - } - - // Query for the current session. - sessionQueryReq := sessiontypes.QueryGetSessionRequest{ - ApplicationAddress: appAddress, - Service: &sharedtypes.Service{Id: serviceId}, - BlockHeight: latestBlock.Height(), - } - sessionQueryRes, err := app.sessionQuerier.GetSession(ctx, &sessionQueryReq) - if err != nil { - return nil, err - } - - session := sessionQueryRes.Session - - // Cache the current session. - app.currentSessions[serviceId] = session - - return session, nil -} diff --git a/pkg/appgateserver/synchronous.go b/pkg/appgateserver/synchronous.go index ddc99f845..5d0b46a5a 100644 --- a/pkg/appgateserver/synchronous.go +++ b/pkg/appgateserver/synchronous.go @@ -1,14 +1,10 @@ package appgateserver import ( - "bytes" "context" - "io" "net/http" "github.com/pokt-network/poktroll/pkg/partials" - "github.com/pokt-network/poktroll/pkg/signer" - "github.com/pokt-network/poktroll/x/service/types" ) // handleSynchronousRelay handles relay requests for synchronous protocols, where @@ -30,100 +26,31 @@ func (app *appGateServer) handleSynchronousRelay( if err != nil { return ErrAppGateHandleRelay.Wrapf("getting request type: %s", err) } - session, err := app.getCurrentSession(ctx, appAddress, serviceId) - if err != nil { - return ErrAppGateHandleRelay.Wrapf("getting current session: %s", err) - } // TODO_TECHDEBT: log additional info? app.logger.Debug(). Str("request_type", requestType.String()). Msg("got request type") - // Get a supplier URL and address for the given service and session. - supplierUrl, supplierAddress, err := app.getRelayerUrl(ctx, serviceId, requestType, session) - if err != nil { - return ErrAppGateHandleRelay.Wrapf("getting supplier URL: %s", err) - } - - // Create the relay request. - relayRequest := &types.RelayRequest{ - Meta: &types.RelayRequestMetadata{ - SessionHeader: session.Header, - Signature: nil, // signature added below - }, - Payload: payloadBz, - } - - // Get the application's signer. - appRing, err := app.ringCache.GetRingForAddress(ctx, appAddress) - if err != nil { - return ErrAppGateHandleRelay.Wrapf("getting app ring: %s", err) - } - signer := signer.NewRingSigner(appRing, app.signingInformation.SigningKey) - - // Hash and sign the request's signable bytes. - signableBz, err := relayRequest.GetSignableBytesHash() - if err != nil { - return ErrAppGateHandleRelay.Wrapf("getting signable bytes: %s", err) - } - - requestSig, err := signer.Sign(signableBz) + sessionSuppliers, err := app.sdk.GetSessionSupplierEndpoints(ctx, appAddress, serviceId) if err != nil { - return ErrAppGateHandleRelay.Wrapf("signing relay: %s", err) - } - relayRequest.Meta.Signature = requestSig - - // Marshal the relay request to bytes and create a reader to be used as an HTTP request body. - cdc := types.ModuleCdc - relayRequestBz, err := cdc.Marshal(relayRequest) - if err != nil { - return ErrAppGateHandleRelay.Wrapf("marshaling relay request: %s", err) - } - relayRequestReader := io.NopCloser(bytes.NewReader(relayRequestBz)) - var relayReq types.RelayRequest - if err := relayReq.Unmarshal(relayRequestBz); err != nil { - return ErrAppGateHandleRelay.Wrapf("unmarshaling relay response: %s", err) - } - - // Create the HTTP request to send the request to the relayer. - relayHTTPRequest := &http.Request{ - Method: request.Method, - Header: request.Header, - URL: supplierUrl, - Body: relayRequestReader, + return ErrAppGateHandleRelay.Wrapf("getting current session: %s", err) } - app.logger.Debug(). - Str("supplier_url", supplierUrl.String()). - Msg("sending relay request") - - relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) + // Get a supplier URL and address for the given service and session. + supplierEndpoint, err := app.getRelayerUrl( + ctx, + serviceId, + requestType, + sessionSuppliers.SuppliersEndpoints, + ) if err != nil { - return ErrAppGateHandleRelay.Wrapf("sending relay request: %s", err) + return ErrAppGateHandleRelay.Wrapf("getting supplier URL: %s", err) } - // Read the response body bytes. - relayResponseBz, err := io.ReadAll(relayHTTPResponse.Body) + relayResponse, err := app.sdk.SendRelay(ctx, supplierEndpoint, request) if err != nil { - return ErrAppGateHandleRelay.Wrapf("reading relay response body: %s", err) - } - - // Unmarshal the response bytes into a RelayResponse. - relayResponse := &types.RelayResponse{} - if err := relayResponse.Unmarshal(relayResponseBz); err != nil { - return ErrAppGateHandleRelay.Wrapf("unmarshaling relay response: %s", err) - } - - // Verify the response signature. We use the supplier address that we got from - // the getRelayerUrl function since this is the address we are expecting to sign the response. - // TODO_TECHDEBT: if the RelayResponse is an internal error response, we should not verify the signature - // as in some relayer early failures, it may not be signed by the supplier. - // TODO_IMPROVE: Add more logging & telemetry so we can get visibility and signal into - // failed responses. - if err := app.verifyResponse(ctx, supplierAddress, relayResponse); err != nil { - // TODO_DISCUSS: should this be its own error type and asserted against in tests? - return ErrAppGateHandleRelay.Wrapf("verifying relay response signature: %s", err) + return err } app.logger.Debug(). diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 08d39c801..320914246 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -254,7 +254,7 @@ type SessionQueryClient interface { // GetSession queries the chain for the details of the session provided GetSession( ctx context.Context, - address string, + appAddress string, serviceId string, blockHeight int64, ) (*sessiontypes.Session, error) diff --git a/pkg/client/query/accquerier.go b/pkg/client/query/accquerier.go index 890550b16..73ed42b51 100644 --- a/pkg/client/query/accquerier.go +++ b/pkg/client/query/accquerier.go @@ -4,18 +4,19 @@ import ( "context" "cosmossdk.io/depinject" - cosmosclient "github.com/cosmos/cosmos-sdk/client" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + grpc "github.com/cosmos/gogoproto/grpc" "github.com/pokt-network/poktroll/pkg/client" - "github.com/pokt-network/poktroll/pkg/client/query/types" ) +var _ client.AccountQueryClient = (*accQuerier)(nil) + // accQuerier is a wrapper around the accounttypes.QueryClient that enables the // querying of on-chain account information through a single exposed method // which returns an accounttypes.AccountI interface type accQuerier struct { - clientCtx types.Context + clientConn grpc.ClientConn accountQuerier accounttypes.QueryClient } @@ -29,12 +30,12 @@ func NewAccountQuerier(deps depinject.Config) (client.AccountQueryClient, error) if err := depinject.Inject( deps, - &aq.clientCtx, + &aq.clientConn, ); err != nil { return nil, err } - aq.accountQuerier = accounttypes.NewQueryClient(cosmosclient.Context(aq.clientCtx)) + aq.accountQuerier = accounttypes.NewQueryClient(aq.clientConn) return aq, nil } diff --git a/pkg/client/query/appquerier.go b/pkg/client/query/appquerier.go index 91162ba68..310a25dfe 100644 --- a/pkg/client/query/appquerier.go +++ b/pkg/client/query/appquerier.go @@ -4,18 +4,19 @@ import ( "context" "cosmossdk.io/depinject" - cosmosclient "github.com/cosmos/cosmos-sdk/client" + grpc "github.com/cosmos/gogoproto/grpc" "github.com/pokt-network/poktroll/pkg/client" - "github.com/pokt-network/poktroll/pkg/client/query/types" apptypes "github.com/pokt-network/poktroll/x/application/types" ) +var _ client.ApplicationQueryClient = (*appQuerier)(nil) + // appQuerier is a wrapper around the apptypes.QueryClient that enables the // querying of on-chain application information through a single exposed method // which returns an apptypes.Application interface type appQuerier struct { - clientCtx types.Context + clientConn grpc.ClientConn applicationQuerier apptypes.QueryClient } @@ -29,12 +30,12 @@ func NewApplicationQuerier(deps depinject.Config) (client.ApplicationQueryClient if err := depinject.Inject( deps, - &aq.clientCtx, + &aq.clientConn, ); err != nil { return nil, err } - aq.applicationQuerier = apptypes.NewQueryClient(cosmosclient.Context(aq.clientCtx)) + aq.applicationQuerier = apptypes.NewQueryClient(aq.clientConn) return aq, nil } diff --git a/pkg/client/query/sessionquerier.go b/pkg/client/query/sessionquerier.go index d0ab36bd3..6a86205fe 100644 --- a/pkg/client/query/sessionquerier.go +++ b/pkg/client/query/sessionquerier.go @@ -4,19 +4,20 @@ import ( "context" "cosmossdk.io/depinject" - cosmosclient "github.com/cosmos/cosmos-sdk/client" + grpc "github.com/cosmos/gogoproto/grpc" "github.com/pokt-network/poktroll/pkg/client" - "github.com/pokt-network/poktroll/pkg/client/query/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) +var _ client.SessionQueryClient = (*sessionQuerier)(nil) + // sessionQuerier is a wrapper around the sessiontypes.QueryClient that enables the // querying of on-chain session information through a single exposed method // which returns an sessiontypes.Session struct type sessionQuerier struct { - clientCtx types.Context + clientConn grpc.ClientConn sessionQuerier sessiontypes.QueryClient } @@ -30,18 +31,18 @@ func NewSessionQuerier(deps depinject.Config) (client.SessionQueryClient, error) if err := depinject.Inject( deps, - &sessq.clientCtx, + &sessq.clientConn, ); err != nil { return nil, err } - sessq.sessionQuerier = sessiontypes.NewQueryClient(cosmosclient.Context(sessq.clientCtx)) + sessq.sessionQuerier = sessiontypes.NewQueryClient(sessq.clientConn) return sessq, nil } // GetSession returns an sessiontypes.Session struct for a given appAddress, -// serviceId and blockHeight +// serviceId and blockHeight. It implements the SessionQueryClient#GetSession function. func (sessq *sessionQuerier) GetSession( ctx context.Context, appAddress string, @@ -57,7 +58,7 @@ func (sessq *sessionQuerier) GetSession( res, err := sessq.sessionQuerier.GetSession(ctx, req) if err != nil { return nil, ErrQueryRetrieveSession.Wrapf( - "address: %s;serviceId %s; block height %d; error: [%v]", + "address: %s; serviceId: %s; block height: %d; error: [%v]", appAddress, serviceId, blockHeight, err, ) } diff --git a/pkg/client/services.go b/pkg/client/services.go index 1e2667cf9..08fbaee03 100644 --- a/pkg/client/services.go +++ b/pkg/client/services.go @@ -9,7 +9,7 @@ import ( // NewTestApplicationServiceConfig returns a slice of application service configs for testing. func NewTestApplicationServiceConfig(prefix string, count int) []*sharedtypes.ApplicationServiceConfig { appSvcCfg := make([]*sharedtypes.ApplicationServiceConfig, count) - for i, _ := range appSvcCfg { + for i := range appSvcCfg { serviceId := fmt.Sprintf("%s%d", prefix, i) appSvcCfg[i] = &sharedtypes.ApplicationServiceConfig{ Service: &sharedtypes.Service{Id: serviceId}, diff --git a/pkg/deps/config/suppliers.go b/pkg/deps/config/suppliers.go index e7fcf66f2..48283f849 100644 --- a/pkg/deps/config/suppliers.go +++ b/pkg/deps/config/suppliers.go @@ -3,10 +3,13 @@ package config import ( "context" "fmt" + "net/url" "cosmossdk.io/depinject" cosmosclient "github.com/cosmos/cosmos-sdk/client" cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + grpc "github.com/cosmos/gogoproto/grpc" "github.com/spf13/cobra" "github.com/pokt-network/poktroll/pkg/client/block" @@ -16,6 +19,7 @@ import ( txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" "github.com/pokt-network/poktroll/pkg/crypto/rings" "github.com/pokt-network/poktroll/pkg/polylog" + "github.com/pokt-network/poktroll/pkg/sdk" ) // SupplyConfig supplies a depinject config by calling each of the supplied @@ -117,6 +121,7 @@ func NewSupplyQueryClientContextFn(pocketQueryNodeURL string) SupplierFn { } deps = depinject.Configs(deps, depinject.Supply( querytypes.Context(queryClientCtx), + grpc.ClientConn(queryClientCtx), queryClientCtx.Keyring, )) @@ -214,8 +219,7 @@ func NewSupplyApplicationQuerierFn() SupplierFn { // NewSupplySessionQuerierFn returns a function which constructs a // SessionQuerier instance with the required dependencies and returns a new -// instance with the required dependencies and returns a new depinject.Config -// which is supplied with the given deps and the new SessionQuerier. +// depinject.Config which is supplied with the given deps and the new SessionQuerier. func NewSupplySessionQuerierFn() SupplierFn { return func( _ context.Context, @@ -274,6 +278,52 @@ func NewSupplyRingCacheFn() SupplierFn { } } +// NewSupplyPOKTRollSDKFn returns a function which constructs a +// POKTRollSDK instance with the required dependencies and returns a new +// depinject.Config which is supplied with the given deps and the new POKTRollSDK. +func NewSupplyPOKTRollSDKFn( + queryNodeURL *url.URL, + signingKeyName string, +) SupplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + var clientCtx cosmosclient.Context + + // On a Cosmos environment we get the private key from the keyring + // Inject the client context, get the keyring from it then get the private key + if err := depinject.Inject(deps, &clientCtx); err != nil { + return nil, err + } + + keyRecord, err := clientCtx.Keyring.Key(signingKeyName) + if err != nil { + return nil, err + } + + privateKey, ok := keyRecord.GetLocal().PrivKey.GetCachedValue().(cryptotypes.PrivKey) + if !ok { + return nil, err + } + + config := &sdk.POKTRollSDKConfig{ + PrivateKey: privateKey, + PocketNodeUrl: queryNodeURL, + Deps: deps, + } + + poktrollSDK, err := sdk.NewPOKTRollSDK(ctx, config) + if err != nil { + return nil, err + } + + // Supply the session querier to the provided deps + return depinject.Configs(deps, depinject.Supply(poktrollSDK)), nil + } +} + // hostToWebsocketURL converts the provided host into a websocket URL that can // be used to subscribe to onchain events and query the chain via a client // context or send transactions via a tx client context. diff --git a/pkg/sdk/deps_builder.go b/pkg/sdk/deps_builder.go new file mode 100644 index 000000000..73464d821 --- /dev/null +++ b/pkg/sdk/deps_builder.go @@ -0,0 +1,86 @@ +package sdk + +import ( + "context" + "fmt" + + "cosmossdk.io/depinject" + grpctypes "github.com/cosmos/gogoproto/grpc" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + block "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/client/query" + "github.com/pokt-network/poktroll/pkg/crypto/rings" + "github.com/pokt-network/poktroll/pkg/polylog" +) + +// buildDeps builds the dependencies for the POKTRollSDK if they are not provided +// in the config. This is useful for the SDK consumers that do not want or +// cannot provide the dependencies through depinject. +func (sdk *poktrollSDK) buildDeps( + ctx context.Context, + config *POKTRollSDKConfig, +) (depinject.Config, error) { + pocketNodeWebsocketURL := fmt.Sprintf("ws://%s/websocket", config.PocketNodeUrl.Host) + + // Have a new depinject config + deps := depinject.Configs() + + // Supply the logger + deps = depinject.Configs(deps, depinject.Supply(polylog.Ctx(ctx))) + + // Create and supply the events query client + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL) + deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) + + // Create and supply the block client that depends on the events query client + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketURL) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply(blockClient)) + + // Create and supply the grpc client used by the queriers + // TODO_TECHDEBT: Configure the grpc client options from the config + var grpcClient grpctypes.ClientConn + grpcClient, err = grpc.Dial( + config.PocketNodeUrl.Host, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply(grpcClient)) + + // Create and supply the account querier + accountQuerier, err := query.NewAccountQuerier(deps) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply(accountQuerier)) + + // Create and supply the application querier + applicationQuerier, err := query.NewApplicationQuerier(deps) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply(applicationQuerier)) + + // Create and supply the session querier + sessionQuerier, err := query.NewSessionQuerier(deps) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply(sessionQuerier)) + + // Create and supply the ring cache that depends on application and account queriers + ringCache, err := rings.NewRingCache(deps) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply(ringCache)) + + return deps, nil +} diff --git a/pkg/sdk/errors.go b/pkg/sdk/errors.go new file mode 100644 index 000000000..c7d1c4cc7 --- /dev/null +++ b/pkg/sdk/errors.go @@ -0,0 +1,11 @@ +package sdk + +import sdkerrors "cosmossdk.io/errors" + +var ( + codespace = "poktrollsdk" + ErrSDKHandleRelay = sdkerrors.Register(codespace, 1, "internal error handling relay request") + ErrSDKInvalidRelayResponseSignature = sdkerrors.Register(codespace, 2, "invalid relay response signature") + ErrSDKEmptyRelayResponseSignature = sdkerrors.Register(codespace, 3, "empty relay response signature") + ErrSDKVerifyResponseSignature = sdkerrors.Register(codespace, 4, "error verifying relay response signature") +) diff --git a/pkg/sdk/interface.go b/pkg/sdk/interface.go new file mode 100644 index 000000000..8fcb22586 --- /dev/null +++ b/pkg/sdk/interface.go @@ -0,0 +1,27 @@ +package sdk + +import ( + "context" + "net/http" + + servicetypes "github.com/pokt-network/poktroll/x/service/types" +) + +// POKTRollSDK is the interface for the POKTRoll SDK. It is used by gateways +// and/or applications to interact with the Pocket protocol. +type POKTRollSDK interface { + // GetSession returns the suppliers endpoints of the current session for + // the given application and service. + GetSessionSupplierEndpoints( + ctx context.Context, + appAddress string, + serviceId string, + ) (session *SessionSuppliers, err error) + + // SendRelay sends a relay request to the given supplier's endpoint. + SendRelay( + ctx context.Context, + sessionSupplierEndpoint *SingleSupplierEndpoint, + request *http.Request, + ) (response *servicetypes.RelayResponse, err error) +} diff --git a/pkg/appgateserver/relay_verifier.go b/pkg/sdk/relay_verifier.go similarity index 74% rename from pkg/appgateserver/relay_verifier.go rename to pkg/sdk/relay_verifier.go index 52c01d316..92de8583c 100644 --- a/pkg/appgateserver/relay_verifier.go +++ b/pkg/sdk/relay_verifier.go @@ -1,4 +1,4 @@ -package appgateserver +package sdk import ( "context" @@ -9,20 +9,20 @@ import ( ) // verifyResponse verifies the relay response signature. -func (app *appGateServer) verifyResponse( +func (sdk *poktrollSDK) verifyResponse( ctx context.Context, supplierAddress string, relayResponse *types.RelayResponse, ) error { // Get the supplier's public key. - supplierPubKey, err := app.getSupplierPubKeyFromAddress(ctx, supplierAddress) + supplierPubKey, err := sdk.getSupplierPubKeyFromAddress(ctx, supplierAddress) if err != nil { return err } // Extract the supplier's signature if relayResponse.Meta == nil { - return ErrAppGateEmptyRelayResponseSignature.Wrapf( + return ErrSDKEmptyRelayResponseSignature.Wrapf( "response payload: %s", relayResponse.Payload, ) } @@ -36,7 +36,7 @@ func (app *appGateServer) verifyResponse( // Verify the relay response signature. if !supplierPubKey.VerifySignature(responseSignableBz[:], supplierSignature) { - return ErrAppGateInvalidRelayResponseSignature + return ErrSDKInvalidRelayResponseSignature } return nil @@ -44,25 +44,25 @@ func (app *appGateServer) verifyResponse( // getSupplierPubKeyFromAddress gets the supplier's public key from the cache or // queries if it is not found. The public key is then cached before being returned. -func (app *appGateServer) getSupplierPubKeyFromAddress( +func (sdk *poktrollSDK) getSupplierPubKeyFromAddress( ctx context.Context, supplierAddress string, ) (cryptotypes.PubKey, error) { - supplierPubKey, ok := app.supplierAccountCache[supplierAddress] + supplierPubKey, ok := sdk.supplierAccountCache[supplierAddress] if ok { return supplierPubKey, nil } // Query for the supplier account to get the application's public key // to verify the relay request signature. - acc, err := app.accountQuerier.GetAccount(ctx, supplierAddress) + acc, err := sdk.accountQuerier.GetAccount(ctx, supplierAddress) if err != nil { return nil, err } fetchedPubKey := acc.GetPubKey() // Cache the retrieved public key. - app.supplierAccountCache[supplierAddress] = fetchedPubKey + sdk.supplierAccountCache[supplierAddress] = fetchedPubKey return fetchedPubKey, nil } diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go new file mode 100644 index 000000000..233807cec --- /dev/null +++ b/pkg/sdk/sdk.go @@ -0,0 +1,104 @@ +package sdk + +import ( + "context" + "fmt" + "net/url" + "sync" + + "cosmossdk.io/depinject" + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" + ringtypes "github.com/athanorlabs/go-dleq/types" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/crypto" + "github.com/pokt-network/poktroll/pkg/polylog" +) + +var _ POKTRollSDK = (*poktrollSDK)(nil) + +// POKTRollSDKConfig is the configuration for the POKTRollSDK. +// It contains the Pocket Node URL to be used by the queriers and the private key +// to be used for signing relay requests. +// Deps is an optional field that can be used to provide the needed dependencies +// for the SDK. If it is not provided, the SDK will build the dependencies itself. +type POKTRollSDKConfig struct { + PocketNodeUrl *url.URL + PrivateKey cryptotypes.PrivKey + Deps depinject.Config +} + +// poktrollSDK is the implementation of the POKTRollSDK. +type poktrollSDK struct { + logger polylog.Logger + config *POKTRollSDKConfig + + // signingKey is the scalar representation of the private key to be used + // for signing relay requests. + signingKey ringtypes.Scalar + + // ringCache is used to obtain and store the ring for the application. + ringCache crypto.RingCache + + // sessionQuerier is the querier for the session module. + // It used to get the current session for the application given a requested service. + sessionQuerier client.SessionQueryClient + + // serviceSessionSuppliersMu is a mutex to protect latestSessions map reads and updates. + serviceSessionSuppliersMu sync.RWMutex + + // serviceSessionSuppliers is a map of serviceId -> {appAddress -> SessionSuppliers} + // for a specific session + serviceSessionSuppliers map[string]map[string]*SessionSuppliers + + // accountQuerier is the querier for the account module. + // It is used to get the the supplier's public key to verify the relay response signature. + accountQuerier client.AccountQueryClient + + // blockClient is the client for the block module. + // It is used to get the current block height to query for the current session. + blockClient client.BlockClient + + // accountCache is a cache of the supplier accounts that has been queried + // TODO_TECHDEBT: Add a size limit to the cache. + supplierAccountCache map[string]cryptotypes.PubKey +} + +func NewPOKTRollSDK(ctx context.Context, config *POKTRollSDKConfig) (POKTRollSDK, error) { + sdk := &poktrollSDK{ + config: config, + serviceSessionSuppliers: make(map[string]map[string]*SessionSuppliers), + supplierAccountCache: make(map[string]cryptotypes.PubKey), + } + + var err error + var deps depinject.Config + + // Build the dependencies if they are not provided in the config. + if config.Deps != nil { + deps = config.Deps + } else if deps, err = sdk.buildDeps(ctx, config); err != nil { + return nil, err + } + + if err := depinject.Inject( + deps, + &sdk.logger, + &sdk.ringCache, + &sdk.sessionQuerier, + &sdk.accountQuerier, + &sdk.blockClient, + ); err != nil { + return nil, err + } + + // Store the private key as a ring scalar to be used for ring signatures. + crv := ring_secp256k1.NewCurve() + sdk.signingKey, err = crv.DecodeToScalar(config.PrivateKey.Bytes()) + if err != nil { + return nil, fmt.Errorf("failed to decode private key: %w", err) + } + + return sdk, nil +} diff --git a/pkg/sdk/send_relay.go b/pkg/sdk/send_relay.go new file mode 100644 index 000000000..45e93a728 --- /dev/null +++ b/pkg/sdk/send_relay.go @@ -0,0 +1,109 @@ +package sdk + +import ( + "bytes" + "context" + "io" + "net/http" + + "github.com/pokt-network/poktroll/pkg/signer" + "github.com/pokt-network/poktroll/x/service/types" +) + +// SendRelay sends a relay request to the given supplier's endpoint. +// It signs the request, relays it to the supplier and verifies the response signature. +// It takes an http.Request as an argument and uses its method and headers to create +// the relay request. +func (sdk *poktrollSDK) SendRelay( + ctx context.Context, + supplierEndpoint *SingleSupplierEndpoint, + request *http.Request, +) (response *types.RelayResponse, err error) { + payloadBz, err := io.ReadAll(request.Body) + if err != nil { + return nil, ErrSDKHandleRelay.Wrapf("reading request body: %s", err) + } + + // Create the relay request. + relayRequest := &types.RelayRequest{ + Meta: &types.RelayRequestMetadata{ + SessionHeader: supplierEndpoint.Header, + Signature: nil, // signature added below + }, + Payload: payloadBz, + } + + // Get the application's signer. + appAddress := supplierEndpoint.Header.ApplicationAddress + appRing, err := sdk.ringCache.GetRingForAddress(ctx, appAddress) + if err != nil { + return nil, ErrSDKHandleRelay.Wrapf("getting app ring: %s", err) + } + signer := signer.NewRingSigner(appRing, sdk.signingKey) + + // Hash and sign the request's signable bytes. + signableBz, err := relayRequest.GetSignableBytesHash() + if err != nil { + return nil, ErrSDKHandleRelay.Wrapf("error getting signable bytes: %s", err) + } + + requestSig, err := signer.Sign(signableBz) + if err != nil { + return nil, ErrSDKHandleRelay.Wrapf("error signing relay: %s", err) + } + relayRequest.Meta.Signature = requestSig + + // Marshal the relay request to bytes and create a reader to be used as an HTTP request body. + cdc := types.ModuleCdc + relayRequestBz, err := cdc.Marshal(relayRequest) + if err != nil { + return nil, ErrSDKHandleRelay.Wrapf("error marshaling relay request: %s", err) + } + relayRequestReader := io.NopCloser(bytes.NewReader(relayRequestBz)) + var relayReq types.RelayRequest + if err := relayReq.Unmarshal(relayRequestBz); err != nil { + return nil, ErrSDKHandleRelay.Wrapf("error unmarshaling relay request: %s", err) + } + + // Create the HTTP request to send the request to the relayer. + // All the RPC protocols to be supported (JSONRPC, Rest, Websockets, gRPC, etc) + // use HTTP under the hood. + relayHTTPRequest := &http.Request{ + Method: request.Method, + Header: request.Header, + URL: supplierEndpoint.Url, + Body: relayRequestReader, + } + + sdk.logger.Debug(). + Str("supplier_url", supplierEndpoint.Url.String()). + Msg("sending relay request") + relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) + if err != nil { + return nil, ErrSDKHandleRelay.Wrapf("error sending relay request: %s", err) + } + + // Read the response body bytes. + relayResponseBz, err := io.ReadAll(relayHTTPResponse.Body) + if err != nil { + return nil, ErrSDKHandleRelay.Wrapf("error reading relay response body: %s", err) + } + + // Unmarshal the response bytes into a RelayResponse. + relayResponse := &types.RelayResponse{} + if err := relayResponse.Unmarshal(relayResponseBz); err != nil { + return nil, ErrSDKHandleRelay.Wrapf("error unmarshaling relay response: %s", err) + } + + // Verify the response signature. We use the supplier address that we got from + // the getRelayerUrl function since this is the address we are expecting to sign the response. + // TODO_TECHDEBT: if the RelayResponse is an internal error response, we should not verify the signature + // as in some relayer early failures, it may not be signed by the supplier. + // TODO_IMPROVE: Add more logging & telemetry so we can get visibility and signal into + // failed responses. + if err := sdk.verifyResponse(ctx, supplierEndpoint.SupplierAddress, relayResponse); err != nil { + return nil, ErrSDKVerifyResponseSignature.Wrapf("%s", err) + } + + return relayResponse, nil +} diff --git a/pkg/sdk/session.go b/pkg/sdk/session.go new file mode 100644 index 000000000..634f7a7d1 --- /dev/null +++ b/pkg/sdk/session.go @@ -0,0 +1,114 @@ +package sdk + +import ( + "context" + "net/url" + + sessiontypes "github.com/pokt-network/poktroll/x/session/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// SessionSuppliers is the structure that represents a session's end block height +// and its matching suppliers. +type SessionSuppliers struct { + // Session is the fully hydrated session object returned by the query. + Session *sessiontypes.Session + + // SuppliersEndpoints is a slice of the session's suppliers endpoints each + // item representing a single supplier endpoint augmented with the session + // header and the supplier's address. + // An item from this slice is what needs to be passed to the `SendRelay` + // function so it has all the information needed to send the relay request. + SuppliersEndpoints []*SingleSupplierEndpoint +} + +// SingleSupplierEndpoint is the structure that represents a supplier's endpoint +// augmented with the session's header and the supplier's address for easy +// access to the needed information when sending a relay request. +type SingleSupplierEndpoint struct { + Url *url.URL + RpcType sharedtypes.RPCType + SupplierAddress string + Header *sessiontypes.SessionHeader +} + +// GetSessionSupplierEndpoints returns a flattened structure of the endpoints +// from all suppliers in the session and returns them as a SupplierEndpoint slice. +// It queries for the latest session and caches it if the cached one is outdated. +func (sdk *poktrollSDK) GetSessionSupplierEndpoints( + ctx context.Context, + appAddress, serviceId string, +) (*SessionSuppliers, error) { + sdk.serviceSessionSuppliersMu.RLock() + defer sdk.serviceSessionSuppliersMu.RUnlock() + + latestBlockHeight := sdk.blockClient.LastNBlocks(ctx, 1)[0].Height() + + // Create the latestSessions map entry for the serviceId if it doesn't exist. + if _, ok := sdk.serviceSessionSuppliers[serviceId]; !ok { + sdk.serviceSessionSuppliers[serviceId] = map[string]*SessionSuppliers{} + } + + // Create the latestSessions[serviceId] map entry for the appAddress if it doesn't exist. + if _, ok := sdk.serviceSessionSuppliers[serviceId][appAddress]; !ok { + sdk.serviceSessionSuppliers[serviceId][appAddress] = &SessionSuppliers{} + } + + // currentSession is guaranteed to exist after the checks above. + currentSession := sdk.serviceSessionSuppliers[serviceId][appAddress] + + // Return the current session's SuppliersEndpoints if the session is still valid. + if currentSession.Session != nil && + latestBlockHeight < currentSession.Session.Header.SessionEndBlockHeight { + return currentSession, nil + } + + // Query for the current session. + session, err := sdk.sessionQuerier.GetSession( + ctx, + appAddress, + serviceId, + latestBlockHeight, + ) + if err != nil { + return nil, err + } + + // Override the old Session and SessionSuppliers and construct the new one. + currentSession.Session = session + currentSession.SuppliersEndpoints = []*SingleSupplierEndpoint{} + + for _, supplier := range session.Suppliers { + for _, service := range supplier.Services { + // Skip the session's services that don't match the requested serviceId. + if service.Service.Id != serviceId { + continue + } + + // Loop through the services' endpoints and add them to the + // SessionSuppliers.SuppliersEndpoints slice. + for _, endpoint := range service.Endpoints { + url, err := url.Parse(endpoint.Url) + if err != nil { + sdk.logger.Error(). + Str("url", endpoint.Url). + Err(err). + Msg("failed to parse url") + continue + } + + currentSession.SuppliersEndpoints = append( + currentSession.SuppliersEndpoints, + &SingleSupplierEndpoint{ + Url: url, + RpcType: endpoint.RpcType, + SupplierAddress: supplier.Address, + Header: session.Header, + }, + ) + } + } + } + + return currentSession, nil +}