Skip to content

Commit

Permalink
refactor: migrate to ConsensusClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Reecepbcups committed Sep 2, 2024
1 parent 966c6ac commit 1509ba6
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 61 deletions.
8 changes: 4 additions & 4 deletions interchaintest/feegrant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func TestRelayerFeeGrant(t *testing.T) {

hash, err := hex.DecodeString(curr.Response.TxHash)
require.Nil(t, err)
txResp, err := TxWithRetry(ctx, cProv.RPCClient, hash)
txResp, err := TxWithRetry(ctx, cProv.ConsensusClient, hash)
require.Nil(t, err)

require.Nil(t, err)
Expand Down Expand Up @@ -538,11 +538,11 @@ func TestRelayerFeeGrant(t *testing.T) {
}
}

func TxWithRetry(ctx context.Context, client client.RPCClient, hash []byte) (*coretypes.ResultTx, error) {
func TxWithRetry(ctx context.Context, client client.ConsensusClient, hash []byte) (*coretypes.ResultTx, error) {
var err error
var res *coretypes.ResultTx
if err = retry.Do(func() error {
res, err = client.Tx(ctx, hash, true)
res, err = client.GetTx(ctx, hash, true)
return err
}, retry.Context(ctx), relayer.RtyAtt, relayer.RtyDel, relayer.RtyErr); err != nil {
return res, err
Expand Down Expand Up @@ -870,7 +870,7 @@ func TestRelayerFeeGrantExternal(t *testing.T) {

hash, err := hex.DecodeString(curr.Response.TxHash)
require.Nil(t, err)
txResp, err := TxWithRetry(ctx, cProv.RPCClient, hash)
txResp, err := TxWithRetry(ctx, cProv.ConsensusClient, hash)
require.Nil(t, err)

require.Nil(t, err)
Expand Down
12 changes: 6 additions & 6 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"github.com/avast/retry-go/v4"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
sdk "github.com/cosmos/cosmos-sdk/types"
clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types"
conntypes "github.com/cosmos/ibc-go/v8/modules/core/03-connection/types"
chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported"
rclient "github.com/cosmos/relayer/v2/client"
"github.com/cosmos/relayer/v2/relayer/chains"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/cosmos/relayer/v2/relayer/provider"
Expand Down Expand Up @@ -149,7 +149,7 @@ func (ccp *CosmosChainProcessor) latestHeightWithRetry(ctx context.Context) (lat

// nodeStatusWithRetry will query for the latest node status, retrying in case of failure.
// It will delay by latestHeightQueryRetryDelay between attempts, up to latestHeightQueryRetries.
func (ccp *CosmosChainProcessor) nodeStatusWithRetry(ctx context.Context) (status *coretypes.ResultStatus, err error) {
func (ccp *CosmosChainProcessor) nodeStatusWithRetry(ctx context.Context) (status *rclient.Status, err error) {
return status, retry.Do(func() error {
latestHeightQueryCtx, cancelLatestHeightQueryCtx := context.WithTimeout(ctx, queryTimeout)
defer cancelLatestHeightQueryCtx()
Expand Down Expand Up @@ -239,7 +239,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
}
continue
}
persistence.latestHeight = status.SyncInfo.LatestBlockHeight
persistence.latestHeight = int64(status.LatestBlockHeight)
break
}

Expand Down Expand Up @@ -351,7 +351,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
return nil
}

persistence.latestHeight = status.SyncInfo.LatestBlockHeight
persistence.latestHeight = int64(status.LatestBlockHeight)

// This debug log is very noisy, but is helpful when debugging new chains.
// ccp.log.Debug("Queried latest height",
Expand Down Expand Up @@ -393,7 +393,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
var (
eg errgroup.Group
blockRes *coretypes.ResultBlockResults
blockRes *rclient.BlockResults
ibcHeader provider.IBCHeader
)

Expand All @@ -403,7 +403,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout)
defer cancelQueryCtx()

blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &sI)
blockRes, err = ccp.chainProvider.ConsensusClient.GetBlockResults(queryCtx, uint64(i))
if err != nil && ccp.metrics != nil {
ccp.metrics.IncBlockQueryFailure(chainID, "RPC Client")
}
Expand Down
10 changes: 3 additions & 7 deletions relayer/chains/cosmos/fee_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"regexp"
"strings"

sdkmath "cosmossdk.io/math"
"go.uber.org/zap"
Expand Down Expand Up @@ -32,15 +31,12 @@ func (cc *CosmosProvider) DynamicFee(ctx context.Context) string {
// QueryBaseFee attempts to make an ABCI query to retrieve the base fee on chains using the Osmosis EIP-1559 implementation.
// This is currently hardcoded to only work on Osmosis.
func (cc *CosmosProvider) QueryBaseFee(ctx context.Context) (string, error) {
resp, err := cc.RPCClient.ABCIQuery(ctx, queryPath, nil)
if err != nil || resp.Response.Code != 0 {
resp, err := cc.ConsensusClient.GetABCIQuery(ctx, queryPath, nil)
if err != nil || resp.Code != 0 {
return "", err
}

// The response value contains the data link escape control character which must be removed before parsing.
cleanedString := strings.ReplaceAll(strings.TrimSpace(string(resp.Response.Value)), "\u0010", "")

decFee, err := sdkmath.LegacyNewDecFromStr(cleanedString)
decFee, err := sdkmath.LegacyNewDecFromStr(resp.ValueCleaned())
if err != nil {
return "", err
}
Expand Down
3 changes: 2 additions & 1 deletion relayer/chains/cosmos/feegrant.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ func (cc *CosmosProvider) EnsureBasicGrants(ctx context.Context, memo string, ga
}

if len(msgs) > 0 {
cliCtx := client.Context{}.WithClient(cc.RPCClient).
cliCtx := client.Context{}.
// WithClient(cc.RPCClient). // TODO(reece): how does server/v2 handle this?
WithInterfaceRegistry(cc.Cdc.InterfaceRegistry).
WithChainID(cc.PCfg.ChainID).
WithCodec(cc.Cdc.Marshaler).
Expand Down
40 changes: 20 additions & 20 deletions relayer/chains/cosmos/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/cosmos/cosmos-sdk/types/module"
"github.com/cosmos/gogoproto/proto"
commitmenttypes "github.com/cosmos/ibc-go/v8/modules/core/23-commitment/types"
cwrapper "github.com/cosmos/relayer/v2/client"
rclient "github.com/cosmos/relayer/v2/client"
"github.com/cosmos/relayer/v2/relayer/codecs/ethermint"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/cosmos/relayer/v2/relayer/provider"
Expand Down Expand Up @@ -124,14 +124,14 @@ func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, deb
type CosmosProvider struct {
log *zap.Logger

PCfg CosmosProviderConfig
Keybase keyring.Keyring
KeyringOptions []keyring.Option
RPCClient cwrapper.RPCClient
LightProvider provtypes.Provider
Input io.Reader
Output io.Writer
Cdc Codec
PCfg CosmosProviderConfig
Keybase keyring.Keyring
KeyringOptions []keyring.Option
ConsensusClient rclient.ConsensusClient
LightProvider provtypes.Provider
Input io.Reader
Output io.Writer
Cdc Codec
// TODO: GRPC Client type?

//nextAccountSeq uint64
Expand Down Expand Up @@ -350,7 +350,7 @@ func (cc *CosmosProvider) startLivelinessChecks(ctx context.Context, timeout tim
case <-ctx.Done():
return
case <-ticker.C:
_, err := cc.RPCClient.Status(ctx)
_, err := cc.ConsensusClient.GetStatus(ctx)
if err != nil {
cc.log.Error("RPC client disconnected", zap.String("chain", cc.ChainName()), zap.Error(err))

Expand Down Expand Up @@ -401,13 +401,13 @@ func (cc *CosmosProvider) setRpcClient(onStartup bool, rpcAddr string, timeout t
return err
}

cc.RPCClient = cwrapper.NewRPCClient(c)
cc.ConsensusClient = rclient.NewRPCClient(c)

// Only check status if not on startup, to ensure the relayer will not block on startup.
// All subsequent calls will perform the status check to ensure RPC endpoints are rotated
// as necessary.
if !onStartup {
if _, err = cc.RPCClient.Status(context.Background()); err != nil {
if _, err = cc.ConsensusClient.GetStatus(context.Background()); err != nil {
return err
}
}
Expand All @@ -428,21 +428,21 @@ func (cc *CosmosProvider) setLightProvider(rpcAddr string) error {

// WaitForNBlocks blocks until the next block on a given chain
func (cc *CosmosProvider) WaitForNBlocks(ctx context.Context, n int64) error {
var initial int64
h, err := cc.RPCClient.Status(ctx)
var initial uint64
h, err := cc.ConsensusClient.GetStatus(ctx)
if err != nil {
return err
}
if h.SyncInfo.CatchingUp {
if h.CatchingUp {
return errors.New("chain catching up")
}
initial = h.SyncInfo.LatestBlockHeight
initial = h.LatestBlockHeight
for {
h, err = cc.RPCClient.Status(ctx)
h, err = cc.ConsensusClient.GetStatus(ctx)
if err != nil {
return err
}
if h.SyncInfo.LatestBlockHeight > initial+n {
if h.LatestBlockHeight > initial+uint64(n) {
return nil
}
select {
Expand All @@ -455,11 +455,11 @@ func (cc *CosmosProvider) WaitForNBlocks(ctx context.Context, n int64) error {
}

func (cc *CosmosProvider) BlockTime(ctx context.Context, height int64) (time.Time, error) {
resultBlock, err := cc.RPCClient.Block(ctx, &height)
bt, err := cc.ConsensusClient.GetBlockTime(ctx, uint64(height))
if err != nil {
return time.Time{}, err
}
return resultBlock.Block.Time, nil
return bt, nil
}

func (cc *CosmosProvider) SetMetrics(m *processor.PrometheusMetrics) {
Expand Down
27 changes: 14 additions & 13 deletions relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"cosmossdk.io/x/feegrant"
upgradetypes "cosmossdk.io/x/upgrade/types"
abci "github.com/cometbft/cometbft/abci/types"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -33,6 +32,7 @@ import (
host "github.com/cosmos/ibc-go/v8/modules/core/24-host"
ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported"
tmclient "github.com/cosmos/ibc-go/v8/modules/light-clients/07-tendermint"
rclient "github.com/cosmos/relayer/v2/client"
"github.com/cosmos/relayer/v2/relayer/chains"
"github.com/cosmos/relayer/v2/relayer/provider"
"go.uber.org/zap"
Expand Down Expand Up @@ -67,7 +67,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger,
)

eg.Go(func() error {
res, err := cc.RPCClient.BlockSearch(ctx, query, &page, &limit, "")
res, err := cc.ConsensusClient.GetBlockSearch(ctx, query, &page, &limit, "")
if err != nil {
return err
}
Expand All @@ -77,7 +77,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger,
for _, b := range res.Blocks {
b := b
nestedEg.Go(func() error {
block, err := cc.RPCClient.BlockResults(ctx, &b.Block.Height)
block, err := cc.ConsensusClient.GetBlockResults(ctx, uint64(b.Block.Height))
if err != nil {
return err
}
Expand All @@ -93,7 +93,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger,
})

eg.Go(func() error {
res, err := cc.RPCClient.TxSearch(ctx, query, true, &page, &limit, "")
res, err := cc.ConsensusClient.GetTxSearch(ctx, query, true, &page, &limit, "")
if err != nil {
return err
}
Expand Down Expand Up @@ -121,7 +121,8 @@ func (cc *CosmosProvider) QueryTx(ctx context.Context, hashHex string) (*provide
return nil, err
}

resp, err := cc.RPCClient.Tx(ctx, hash, true)
// TODO(reece): Why is this true when we do not use the proof?
resp, err := cc.ConsensusClient.GetTx(ctx, hash, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,7 +152,7 @@ func (cc *CosmosProvider) QueryTxs(ctx context.Context, page, limit int, events
return nil, errors.New("limit must greater than 0")
}

res, err := cc.RPCClient.TxSearch(ctx, strings.Join(events, " AND "), true, &page, &limit, "")
res, err := cc.ConsensusClient.GetTxSearch(ctx, strings.Join(events, " AND "), true, &page, &limit, "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -604,7 +605,7 @@ func (cc *CosmosProvider) QueryUpgradedConsState(ctx context.Context, height int
// QueryConsensusState returns a consensus state for a given chain to be used as a
// client in another chain, fetches latest height when passed 0 as arg
func (cc *CosmosProvider) QueryConsensusState(ctx context.Context, height int64) (ibcexported.ConsensusState, int64, error) {
commit, err := cc.RPCClient.Commit(ctx, &height)
commit, err := cc.ConsensusClient.GetCommit(ctx, uint64(height))
if err != nil {
return &tmclient.ConsensusState{}, 0, err
}
Expand All @@ -613,7 +614,7 @@ func (cc *CosmosProvider) QueryConsensusState(ctx context.Context, height int64)
count := 10_000

nextHeight := height + 1
nextVals, err := cc.RPCClient.Validators(ctx, &nextHeight, &page, &count)
nextVals, err := cc.ConsensusClient.GetValidators(ctx, &nextHeight, &page, &count)
if err != nil {
return &tmclient.ConsensusState{}, 0, err
}
Expand Down Expand Up @@ -1201,18 +1202,18 @@ func (cc *CosmosProvider) QueryPacketReceipt(ctx context.Context, height int64,
}

func (cc *CosmosProvider) QueryLatestHeight(ctx context.Context) (int64, error) {
stat, err := cc.RPCClient.Status(ctx)
stat, err := cc.ConsensusClient.GetStatus(ctx)
if err != nil {
return -1, err
} else if stat.SyncInfo.CatchingUp {
} else if stat.CatchingUp {
return -1, fmt.Errorf("node at %s running chain %s not caught up", cc.PCfg.RPCAddr, cc.PCfg.ChainID)
}
return stat.SyncInfo.LatestBlockHeight, nil
return int64(stat.LatestBlockHeight), nil
}

// Query current node status
func (cc *CosmosProvider) QueryStatus(ctx context.Context) (*coretypes.ResultStatus, error) {
status, err := cc.RPCClient.Status(ctx)
func (cc *CosmosProvider) QueryStatus(ctx context.Context) (*rclient.Status, error) {
status, err := cc.ConsensusClient.GetStatus(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query node status: %w", err)
}
Expand Down
Loading

0 comments on commit 1509ba6

Please sign in to comment.