Skip to content

Commit

Permalink
feat: added ws transactions monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ilkamo committed Mar 27, 2024
1 parent 7c3c7fb commit 83ef938
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 344 deletions.
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21.3

require (
github.com/gagliardetto/binary v0.7.7
github.com/gagliardetto/solana-go v1.8.4
github.com/gagliardetto/solana-go v1.9.3
github.com/oapi-codegen/runtime v1.1.1
github.com/test-go/testify v1.1.4
)
Expand All @@ -15,12 +15,15 @@ require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dfuse-io/logging v0.0.0-20201110202154-26697de88c79 // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/gagliardetto/treeout v0.1.4 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/rpc v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
Expand Down
303 changes: 7 additions & 296 deletions go.sum

Large diffs are not rendered by default.

172 changes: 127 additions & 45 deletions solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,84 +6,87 @@ import (

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/ws"
)

const defaultMaxRetries = uint(20)

type TxID string

type ClientRPC interface {
SendTransactionWithOpts(
ctx context.Context,
transaction *solana.Transaction,
opts rpc.TransactionOpts,
) (signature solana.Signature, err error)
GetLatestBlockhash(
ctx context.Context,
commitment rpc.CommitmentType,
) (out *rpc.GetLatestBlockhashResult, err error)
GetSignatureStatuses(
ctx context.Context,
searchTransactionHistory bool,
transactionSignatures ...solana.Signature,
) (out *rpc.GetSignatureStatusesResult, err error)
}

type Client struct {
type client struct {
maxRetries uint
clientRPC ClientRPC
clientRPC rpcService
clientWS wsService
wallet Wallet
}

func NewClient(
func newClient(
wallet Wallet,
rpcEndpoint string,
opts ...ClientOption,
) (Client, error) {
e := &Client{
) (*client, error) {
c := &client{
maxRetries: defaultMaxRetries,
wallet: wallet,
}

for _, opt := range opts {
if err := opt(e); err != nil {
return Client{}, fmt.Errorf("could not apply option: %w", err)
if err := opt(c); err != nil {
return nil, fmt.Errorf("could not apply option: %w", err)
}
}

if e.clientRPC == nil {
if c.clientRPC == nil {
if rpcEndpoint == "" {
return Client{}, fmt.Errorf("rpcEndpoint is required when no ClientRPC is provided")
return nil, fmt.Errorf("rpcEndpoint is required when no RPC service is provided")
}

rpcClient := rpc.New(rpcEndpoint)
e.clientRPC = rpcClient
c.clientRPC = rpcClient
}

return *e, nil
return c, nil
}

// ClientOption is a function that allows to specify options for the client
type ClientOption func(*Client) error
func newClientWithWS(
wallet Wallet,
rpcEndpoint string,
wsEndpoint string,
opts ...ClientOption,
) (*client, error) {
c, err := newClient(wallet, rpcEndpoint, opts...)
if err != nil {
return nil, err
}

if c.clientWS == nil {
if wsEndpoint == "" {
return nil, fmt.Errorf("wsEndpoint is required when no WS service is provided")
}

// WithMaxRetries sets the maximum number of retries for the engine when sending a transaction on-chain
func WithMaxRetries(maxRetries uint) ClientOption {
return func(e *Client) error {
e.maxRetries = maxRetries
return nil
wsClient, err := ws.Connect(context.Background(), wsEndpoint)
if err != nil {
return nil, fmt.Errorf("could not connect to ws: %w", err)
}

c.clientWS = wsClient
}

return c, nil
}

// WithClientRPC sets the Solana client RPC for the engine
func WithClientRPC(clientRPC ClientRPC) ClientOption {
return func(e *Client) error {
e.clientRPC = clientRPC
return nil
}
// NewClient creates a new Solana client with the given wallet and RPC endpoint.
// If you want to monitor your transactions using a websocket endpoint, use NewClientWithWS.
func NewClient(
wallet Wallet,
rpcEndpoint string,
opts ...ClientOption,
) (DefaultClient, error) {
return newClient(wallet, rpcEndpoint, opts...)
}

// SendTransactionOnChain sends on-chain a transaction
func (e Client) SendTransactionOnChain(ctx context.Context, txBase64 string) (TxID, error) {
// SendTransactionOnChain sends a transaction on-chain.
func (e client) SendTransactionOnChain(ctx context.Context, txBase64 string) (TxID, error) {
latestBlockhash, err := e.clientRPC.GetLatestBlockhash(ctx, "")
if err != nil {
return "", fmt.Errorf("could not get latest blockhash: %w", err)
Expand Down Expand Up @@ -113,8 +116,8 @@ func (e Client) SendTransactionOnChain(ctx context.Context, txBase64 string) (Tx
return TxID(sig.String()), nil
}

// CheckSignature checks if a transaction with the given signature has been confirmed on-chain
func (e Client) CheckSignature(ctx context.Context, tx TxID) (bool, error) {
// CheckSignature checks if a transaction with the given signature has been confirmed on-chain.
func (e client) CheckSignature(ctx context.Context, tx TxID) (bool, error) {
sig, err := solana.SignatureFromBase58(string(tx))
if err != nil {
return false, fmt.Errorf("could not convert signature from base58: %w", err)
Expand All @@ -139,3 +142,82 @@ func (e Client) CheckSignature(ctx context.Context, tx TxID) (bool, error) {

return true, nil
}

// Close closes the client.
func (e client) Close() error {
if e.clientRPC != nil {
return e.clientRPC.Close()
}

return nil
}

type clientWithWS struct {
*client
}

// NewClientWithWS creates a new Solana client with the given wallet, RPC and WebSocket endpoints.
func NewClientWithWS(
wallet Wallet,
rpcEndpoint string,
wsEndpoint string,
opts ...ClientOption,
) (ClientWithWS, error) {
defaultClient, err := newClientWithWS(wallet, rpcEndpoint, wsEndpoint, opts...)
if err != nil {
return nil, err
}

return clientWithWS{defaultClient}, nil
}

// WaitForCommitmentStatus waits for a transaction to reach a specific commitment status.
func (c clientWithWS) WaitForCommitmentStatus(
ctx context.Context,
txID TxID,
status CommitmentStatus,
) (bool, error) {
tx, err := solana.SignatureFromBase58(string(txID))
if err != nil {
return false, fmt.Errorf("invalid txID: %w", err)
}

ct, err := mapToCommitmentType(status)
if err != nil {
return false, err
}

sub, err := c.clientWS.SignatureSubscribe(tx, ct)
if err != nil {
return false, fmt.Errorf("could not subscribe to signature: %w", err)
}

defer sub.Unsubscribe()

for {
select {
case <-ctx.Done():
return false, fmt.Errorf("context cancelled")
case res := <-sub.Response():
if res.Value.Err != nil {
return false, fmt.Errorf("transaction confirmed with error: %s", res.Value.Err)
}
return true, nil
case subErr := <-sub.Err():
return false, fmt.Errorf("subscription error: %w", subErr)
}
}
}

// Close closes the client.
func (c clientWithWS) Close() error {
if err := c.client.Close(); err != nil {
return err
}

if c.client.clientWS != nil {
c.client.clientWS.Close()
}

return nil
}
6 changes: 5 additions & 1 deletion solana/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (r rpcMock) GetSignatureStatuses(
}, nil
}

func (r rpcMock) Close() error {
return nil
}

func TestNewClient(t *testing.T) {
testPk := "5473ZnvEhn35BdcCcPLKnzsyP6TsgqQrNFpn4i2gFegFiiJLyWginpa9GoFn2cy6Aq2EAuxLt2u2bjFDBPvNY6nw"

Expand All @@ -100,7 +104,7 @@ func TestNewClient(t *testing.T) {
"",
jupSolana.WithMaxRetries(10),
)
require.EqualError(t, err, "rpcEndpoint is required when no ClientRPC is provided")
require.EqualError(t, err, "rpcEndpoint is required when no RPC service is provided")
})

t.Run("solana client with rpc endpoint", func(t *testing.T) {
Expand Down
35 changes: 35 additions & 0 deletions solana/commitment_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package solana

import (
"errors"

"github.com/gagliardetto/solana-go/rpc"
)

type CommitmentStatus struct {
s string
}

func (cs CommitmentStatus) String() string {
return cs.s
}

// For more information, see https://docs.solanalabs.com/consensus/commitments
var (
CommitmentFinalized = CommitmentStatus{"finalized"}
CommitmentConfirmed = CommitmentStatus{"confirmed"}
CommitmentProcessed = CommitmentStatus{"processed"}
)

func mapToCommitmentType(cs CommitmentStatus) (rpc.CommitmentType, error) {
switch cs {
case CommitmentFinalized:
return rpc.CommitmentFinalized, nil
case CommitmentConfirmed:
return rpc.CommitmentConfirmed, nil
case CommitmentProcessed:
return rpc.CommitmentProcessed, nil
}

return "", errors.New("invalid CommitmentStatus")
}
54 changes: 54 additions & 0 deletions solana/commitment_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package solana

import (
"testing"

"github.com/gagliardetto/solana-go/rpc"
"github.com/test-go/testify/require"
)

func Test_mapToCommitmentType(t *testing.T) {
testCases := []struct {
name string
cs CommitmentStatus
want rpc.CommitmentType
wantErr bool
}{
{
name: "processed",
cs: CommitmentProcessed,
want: rpc.CommitmentProcessed,
wantErr: false,
},
{
name: "confirmed",
cs: CommitmentConfirmed,
want: rpc.CommitmentConfirmed,
wantErr: false,
},
{
name: "finalized",
cs: CommitmentFinalized,
want: rpc.CommitmentFinalized,
wantErr: false,
},
{
name: "invalid",
cs: CommitmentStatus{},
want: "",
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got, err := mapToCommitmentType(tc.cs)
if tc.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.want, got)
}
})
}
}
Loading

0 comments on commit 83ef938

Please sign in to comment.