Skip to content

Commit

Permalink
Merge branch 'main' into docs/hello_poktroll
Browse files Browse the repository at this point in the history
  • Loading branch information
Olshansk committed Dec 9, 2023
2 parents 73c7d36 + c80aed0 commit bd05bb9
Show file tree
Hide file tree
Showing 19 changed files with 584 additions and 286 deletions.
2 changes: 2 additions & 0 deletions pkg/appgateserver/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ func setupAppGateServerDependencies(
config.NewSupplyQueryClientContextFn(queryNodeURL.String()), // leaf
config.NewSupplyAccountQuerierFn(), // leaf
config.NewSupplyApplicationQuerierFn(), // leaf
config.NewSupplySessionQuerierFn(), // leaf
config.NewSupplyRingCacheFn(),
config.NewSupplyPOKTRollSDKFn(queryNodeURL, appGateConfig.SigningKey),
}

return config.SupplyConfig(ctx, cmd, supplierFuncs)
Expand Down
40 changes: 12 additions & 28 deletions pkg/appgateserver/endpoint_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package appgateserver

import (
"context"
"net/url"

"github.com/pokt-network/poktroll/pkg/polylog"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
"github.com/pokt-network/poktroll/pkg/sdk"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

Expand All @@ -18,34 +16,20 @@ func (app *appGateServer) getRelayerUrl(
ctx context.Context,
serviceId string,
rpcType sharedtypes.RPCType,
session *sessiontypes.Session,
) (supplierUrl *url.URL, supplierAddress string, err error) {
logger := polylog.Ctx(ctx)

for _, supplier := range session.Suppliers {
for _, service := range supplier.Services {
// Skip services that don't match the requested serviceId.
if service.Service.Id != serviceId {
continue
}
supplierEndpoints []*sdk.SingleSupplierEndpoint,
) (supplierEndpoint *sdk.SingleSupplierEndpoint, err error) {
for _, supplierEndpoint := range supplierEndpoints {
// Skip services that don't match the requested serviceId.
if supplierEndpoint.Header.Service.Id != serviceId {
continue
}

for _, endpoint := range service.Endpoints {
// Return the first endpoint url that matches the request's RpcType.
if endpoint.RpcType == rpcType {
supplierUrl, err := url.Parse(endpoint.Url)
if err != nil {
logger.Error().
Str("url", endpoint.Url).
Err(err).
Msg("failed to parse url")
continue
}
return supplierUrl, supplier.Address, nil
}
}
// Return the first endpoint url that matches the request's RpcType.
if supplierEndpoint.RpcType == rpcType {
return supplierEndpoint, nil
}
}

// Return an error if no relayer endpoints were found.
return nil, "", ErrAppGateNoRelayEndpoints
return nil, ErrAppGateNoRelayEndpoints
}
16 changes: 6 additions & 10 deletions pkg/appgateserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package appgateserver
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "appgateserver"
ErrAppGateInvalidRelayResponseSignature = sdkerrors.Register(codespace, 1, "invalid relay response signature")
ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 2, "no relay endpoints found")
ErrAppGateInvalidRequestURL = sdkerrors.Register(codespace, 3, "invalid request URL")
ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address")
ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information")
ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint")
ErrAppGateEmptyRelayResponseMeta = sdkerrors.Register(codespace, 7, "empty relay response metadata")
ErrAppGateEmptyRelayResponseSignature = sdkerrors.Register(codespace, 8, "empty relay response signature")
ErrAppGateHandleRelay = sdkerrors.Register(codespace, 9, "internal error handling relay request")
codespace = "appgateserver"
ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 1, "no relay endpoints found")
ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 2, "missing application address")
ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 3, "missing app client signing information")
ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 4, "missing app client listening endpoint")
ErrAppGateHandleRelay = sdkerrors.Register(codespace, 5, "internal error handling relay request")
)
104 changes: 18 additions & 86 deletions pkg/appgateserver/server.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
package appgateserver

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"

"cosmossdk.io/depinject"
ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1"
ringtypes "github.com/athanorlabs/go-dleq/types"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
"github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"

"github.com/pokt-network/poktroll/pkg/client"

querytypes "github.com/pokt-network/poktroll/pkg/client/query/types"
"github.com/pokt-network/poktroll/pkg/crypto"
"github.com/pokt-network/poktroll/pkg/polylog"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
"github.com/pokt-network/poktroll/pkg/sdk"
)

// SigningInformation is a struct that holds information related to the signing
Expand All @@ -35,10 +27,6 @@ type SigningInformation struct {
// private key used to sign relay requests.
SigningKeyName string

// SigningKey is the scalar point on the appropriate curve corresponding to the
// signer's private key, and is used to sign relay requests via a ring signature
SigningKey ringtypes.Scalar

// AppAddress is the address of the application that the server is serving if
// If it is nil, then the application address must be included in each request via a query parameter.
AppAddress string
Expand All @@ -56,43 +44,21 @@ type appGateServer struct {
// signing information holds the signing key and application address for the server
signingInformation *SigningInformation

// ringCache is used to obtain and store the ring for the application.
ringCache crypto.RingCache

// clientCtx is the client context for the application.
// It is used to query for the application's account to unmarshal the supplier's account
// and get the public key to verify the relay response signature.
clientCtx querytypes.Context

// sessionQuerier is the querier for the session module.
// It used to get the current session for the application given a requested service.
sessionQuerier sessiontypes.QueryClient

// sessionMu is a mutex to protect currentSession map reads and and updates.
sessionMu sync.RWMutex

// currentSessions is the current session for the application given a block height.
// It is updated by the goListenForNewSessions goroutine.
currentSessions map[string]*sessiontypes.Session

// accountQuerier is the querier for the account module.
// It is used to get the the supplier's public key to verify the relay response signature.
accountQuerier client.AccountQueryClient

// blockClient is the client for the block module.
// It is used to get the current block height to query for the current session.
blockClient client.BlockClient
// sdk is the POKTRollSDK that the appGateServer uses to query for the current session
// and send relay requests to the supplier.
sdk sdk.POKTRollSDK

// listeningEndpoint is the endpoint that the appGateServer will listen on.
listeningEndpoint *url.URL

// server is the HTTP server that will be used capture application requests
// so that they can be signed and relayed to the supplier.
server *http.Server

// accountCache is a cache of the supplier accounts that has been queried
// TODO_TECHDEBT: Add a size limit to the cache.
supplierAccountCache map[string]cryptotypes.PubKey
}

// NewAppGateServer creates a new appGateServer instance with the given dependencies.
Expand All @@ -107,18 +73,13 @@ func NewAppGateServer(
deps depinject.Config,
opts ...appGateServerOption,
) (*appGateServer, error) {
app := &appGateServer{
currentSessions: make(map[string]*sessiontypes.Session),
supplierAccountCache: make(map[string]cryptotypes.PubKey),
}
app := &appGateServer{}

if err := depinject.Inject(
deps,
&app.logger,
&app.clientCtx,
&app.blockClient,
&app.accountQuerier,
&app.ringCache,
&app.sdk,
); err != nil {
return nil, err
}
Expand All @@ -144,18 +105,6 @@ func NewAppGateServer(
app.signingInformation.AppAddress = appAddress.String()
}

// Convert the key record to a private key and return the scalar
// point on the secp256k1 curve that it corresponds to.
// If the key is not a secp256k1 key, this will return an error.
signingKey, err := recordLocalToScalar(keyRecord.GetLocal())
if err != nil {
return nil, fmt.Errorf("failed to convert private key to scalar: %w", err)
}
app.signingInformation.SigningKey = signingKey

clientCtx := cosmosclient.Context(app.clientCtx)

app.sessionQuerier = sessiontypes.NewQueryClient(clientCtx)
app.server = &http.Server{Addr: app.listeningEndpoint.Host}

return app, nil
Expand Down Expand Up @@ -202,11 +151,11 @@ func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Re
serviceId := strings.Split(path, "/")[1]

// Read the request body bytes.
payloadBz, err := io.ReadAll(request.Body)
requestPayloadBz, err := io.ReadAll(request.Body)
if err != nil {
app.replyWithError(
ctx,
payloadBz,
requestPayloadBz,
writer,
ErrAppGateHandleRelay.Wrapf("reading relay request body: %s", err),
)
Expand All @@ -216,7 +165,7 @@ func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Re
}
app.logger.Debug().
Str("service_id", serviceId).
Str("payload", string(payloadBz)).
Str("payload", string(requestPayloadBz)).
Msg("handling relay")

// Determine the application address.
Expand All @@ -225,18 +174,22 @@ func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Re
appAddress = request.URL.Query().Get("senderAddr")
}
if appAddress == "" {
app.replyWithError(ctx, payloadBz, writer, ErrAppGateMissingAppAddress)
app.replyWithError(ctx, requestPayloadBz, writer, ErrAppGateMissingAppAddress)
// TODO_TECHDEBT: log additional info?
app.logger.Error().Msg("no application address provided")
return
}

// Put the request body bytes back into the request body.
request.Body = io.NopCloser(bytes.NewBuffer(requestPayloadBz))

// TODO(@h5law, @red0ne): Add support for asynchronous relays, and switch on
// the request type here.
// TODO_RESEARCH: Should this be started in a goroutine, to allow for
// concurrent requests from numerous applications?
if err := app.handleSynchronousRelay(ctx, appAddress, serviceId, payloadBz, request, writer); err != nil {
if err := app.handleSynchronousRelay(ctx, appAddress, serviceId, requestPayloadBz, request, writer); err != nil {
// Reply with an error response if there was an error handling the relay.
app.replyWithError(ctx, payloadBz, writer, err)
app.replyWithError(ctx, requestPayloadBz, writer, err)
// TODO_TECHDEBT: log additional info?
app.logger.Error().Err(err).Msg("failed handling relay")
return
Expand All @@ -257,25 +210,4 @@ func (app *appGateServer) validateConfig() error {
return nil
}

// recordLocalToScalar converts the private key obtained from a
// key record to a scalar point on the secp256k1 curve
func recordLocalToScalar(local *keyring.Record_Local) (ringtypes.Scalar, error) {
if local == nil {
return nil, fmt.Errorf("cannot extract private key from key record: nil")
}
priv, ok := local.PrivKey.GetCachedValue().(cryptotypes.PrivKey)
if !ok {
return nil, fmt.Errorf("cannot extract private key from key record: %T", local.PrivKey.GetCachedValue())
}
if _, ok := priv.(*secp256k1.PrivKey); !ok {
return nil, fmt.Errorf("unexpected private key type: %T, want %T", priv, &secp256k1.PrivKey{})
}
crv := ring_secp256k1.NewCurve()
privKey, err := crv.DecodeToScalar(priv.Bytes())
if err != nil {
return nil, fmt.Errorf("failed to decode private key: %w", err)
}
return privKey, nil
}

type appGateServerOption func(*appGateServer)
47 changes: 0 additions & 47 deletions pkg/appgateserver/session.go

This file was deleted.

Loading

0 comments on commit bd05bb9

Please sign in to comment.