From 4da6a93e6c23218db447cef95b4fb68758f7a769 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 24 Mar 2023 16:38:40 -0600 Subject: [PATCH 1/6] Add channel close correlation --- relayer/processor/path_processor_internal.go | 119 +++++++++++++++++-- relayer/processor/types_internal.go | 8 ++ 2 files changed, 119 insertions(+), 8 deletions(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 9fc835257..58567e138 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -488,6 +488,76 @@ func (pp *PathProcessor) unrelayedChannelHandshakeMessages( return res } +func (pp *PathProcessor) unrelayedChannelCloseMessages( + pathEndChannelCloseMessages pathEndChannelCloseMessages, +) pathEndChannelHandshakeResponse { + var ( + res pathEndChannelHandshakeResponse + toDeleteSrc = make(map[string][]ChannelKey) + toDeleteDst = make(map[string][]ChannelKey) + ) + processRemovals := func() { + pathEndChannelCloseMessages.Src.messageCache.ChannelHandshake.DeleteMessages(toDeleteSrc) + pathEndChannelCloseMessages.Dst.messageCache.ChannelHandshake.DeleteMessages(toDeleteDst) + pathEndChannelCloseMessages.Src.channelProcessing.deleteMessages(toDeleteSrc) + pathEndChannelCloseMessages.Dst.channelProcessing.deleteMessages(toDeleteDst) + toDeleteSrc = make(map[string][]ChannelKey) + toDeleteDst = make(map[string][]ChannelKey) + } + + for chanKey := range pathEndChannelCloseMessages.DstMsgChannelCloseConfirm { + // found open confirm, channel handshake complete. remove all retention + + counterpartyKey := chanKey.Counterparty() + toDeleteDst[chantypes.EventTypeChannelCloseConfirm] = append( + toDeleteDst[chantypes.EventTypeChannelCloseConfirm], + chanKey, + ) + // MsgChannelCloseInit does not have CounterpartyChannelID // TODO: confirm this + toDeleteSrc[chantypes.EventTypeChannelCloseInit] = append( + toDeleteSrc[chantypes.EventTypeChannelCloseInit], + counterpartyKey.MsgInitKey(), + ) + // TODO: confirm this should use PreInitKey + toDeleteSrc[preInitKey] = append(toDeleteSrc[preInitKey], counterpartyKey.PreInitKey()) + } + + processRemovals() + + for chanKey, info := range pathEndChannelCloseMessages.SrcMsgChannelCloseInit { + // need to send a close confirm to dst + msgCloseConfirm := channelIBCMessage{ + eventType: chantypes.EventTypeChannelCloseConfirm, + info: info, + } + if pathEndChannelCloseMessages.Dst.shouldSendChannelMessage( + msgCloseConfirm, pathEndChannelCloseMessages.Src, + ) { + res.DstMessages = append(res.DstMessages, msgCloseConfirm) + } + + // TODO: confirm this should use PreInitKey + toDeleteSrc[preInitKey] = append(toDeleteSrc[preInitKey], chanKey.PreInitKey()) + } + + processRemovals() + + for _, info := range pathEndChannelCloseMessages.SrcMsgChannelPreInit { + // need to send a close init to src + msgCloseInit := channelIBCMessage{ + eventType: chantypes.EventTypeChannelCloseInit, + info: info, + } + if pathEndChannelCloseMessages.Src.shouldSendChannelMessage( + msgCloseInit, pathEndChannelCloseMessages.Dst, + ) { + res.SrcMessages = append(res.SrcMessages, msgCloseInit) + } + } + + return res +} + func (pp *PathProcessor) getUnrelayedClientICQMessages(pathEnd *pathEndRuntime, queryMessages, responseMessages ClientICQMessageCache) (res []clientICQMessage) { ClientICQLoop: for queryID, queryMsg := range queryMessages { @@ -559,6 +629,9 @@ var observedEventTypeForDesiredMessage = map[string]string{ chantypes.EventTypeChannelOpenTry: chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenInit: preInitKey, + chantypes.EventTypeChannelCloseConfirm: chantypes.EventTypeChannelCloseInit, + chantypes.EventTypeChannelCloseInit: preInitKey, + chantypes.EventTypeAcknowledgePacket: chantypes.EventTypeRecvPacket, chantypes.EventTypeRecvPacket: chantypes.EventTypeSendPacket, chantypes.EventTypeSendPacket: preInitKey, @@ -724,6 +797,23 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { pathEnd1ChannelHandshakeRes := pp.unrelayedChannelHandshakeMessages(pathEnd1ChannelHandshakeMessages) pathEnd2ChannelHandshakeRes := pp.unrelayedChannelHandshakeMessages(pathEnd2ChannelHandshakeMessages) + pathEnd1ChannelCloseMessages := pathEndChannelCloseMessages{ + Src: pp.pathEnd1, + Dst: pp.pathEnd2, + SrcMsgChannelPreInit: pp.pathEnd1.messageCache.ChannelHandshake[preInitKey], + SrcMsgChannelCloseInit: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], + DstMsgChannelCloseConfirm: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], + } + pathEnd2ChannelCloseMessages := pathEndChannelCloseMessages{ + Src: pp.pathEnd2, + Dst: pp.pathEnd1, + SrcMsgChannelPreInit: pp.pathEnd2.messageCache.ChannelHandshake[preInitKey], + SrcMsgChannelCloseInit: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], + DstMsgChannelCloseConfirm: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], + } + pathEnd1ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd1ChannelCloseMessages) + pathEnd2ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd2ChannelCloseMessages) + // process the packet flows for both path ends to determine what needs to be relayed pathEnd1ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs)) pathEnd2ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs)) @@ -791,7 +881,10 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { // concatenate applicable messages for pathend pathEnd1ConnectionMessages, pathEnd2ConnectionMessages := pp.connectionMessagesToSend(pathEnd1ConnectionHandshakeRes, pathEnd2ConnectionHandshakeRes) - pathEnd1ChannelMessages, pathEnd2ChannelMessages := pp.channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes) + pathEnd1ChannelMessages, pathEnd2ChannelMessages := pp.channelMessagesToSend( + pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes, + pathEnd1ChannelCloseRes, pathEnd2ChannelCloseRes, + ) pathEnd1PacketMessages, pathEnd2PacketMessages, pathEnd1ChanCloseMessages, pathEnd2ChanCloseMessages := pp.packetMessagesToSend(channelPairs, pathEnd1ProcessRes, pathEnd2ProcessRes) pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd1ChanCloseMessages...) @@ -836,21 +929,31 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { return eg.Wait() } -func (pp *PathProcessor) channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes pathEndChannelHandshakeResponse) ([]channelIBCMessage, []channelIBCMessage) { - pathEnd1ChannelSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) - pathEnd1ChannelDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) - pathEnd2ChannelDstLen := len(pathEnd2ChannelHandshakeRes.DstMessages) - pathEnd2ChannelSrcLen := len(pathEnd2ChannelHandshakeRes.SrcMessages) - pathEnd1ChannelMessages := make([]channelIBCMessage, 0, pathEnd1ChannelSrcLen+pathEnd2ChannelDstLen) - pathEnd2ChannelMessages := make([]channelIBCMessage, 0, pathEnd2ChannelSrcLen+pathEnd1ChannelDstLen) +func (pp *PathProcessor) channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes, pathEnd1ChannelCloseRes, pathEnd2ChannelCloseRes pathEndChannelHandshakeResponse) ([]channelIBCMessage, []channelIBCMessage) { + pathEnd1ChannelOpenSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) + pathEnd1ChannelOpenDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) + pathEnd2ChannelOpenDstLen := len(pathEnd2ChannelHandshakeRes.DstMessages) + pathEnd2ChannelOpenSrcLen := len(pathEnd2ChannelHandshakeRes.SrcMessages) + + pathEnd1ChannelCloseSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) + pathEnd1ChannelCloseDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) + pathEnd2ChannelCloseDstLen := len(pathEnd2ChannelHandshakeRes.DstMessages) + pathEnd2ChannelCloseSrcLen := len(pathEnd2ChannelHandshakeRes.SrcMessages) + + pathEnd1ChannelMessages := make([]channelIBCMessage, 0, pathEnd1ChannelOpenSrcLen+pathEnd2ChannelOpenDstLen+pathEnd1ChannelCloseSrcLen+pathEnd2ChannelCloseDstLen) + pathEnd2ChannelMessages := make([]channelIBCMessage, 0, pathEnd2ChannelOpenSrcLen+pathEnd1ChannelOpenDstLen+pathEnd2ChannelCloseSrcLen+pathEnd1ChannelCloseDstLen) // pathEnd1 channel messages come from pathEnd1 src and pathEnd2 dst pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd2ChannelHandshakeRes.DstMessages...) + pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd2ChannelCloseRes.DstMessages...) pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd1ChannelHandshakeRes.SrcMessages...) + pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd1ChannelCloseRes.SrcMessages...) // pathEnd2 channel messages come from pathEnd2 src and pathEnd1 dst pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd1ChannelHandshakeRes.DstMessages...) + pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd1ChannelCloseRes.DstMessages...) pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd2ChannelHandshakeRes.SrcMessages...) + pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd2ChannelCloseRes.SrcMessages...) return pathEnd1ChannelMessages, pathEnd2ChannelMessages } diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index 66c56edac..83835cba1 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -408,6 +408,14 @@ type pathEndChannelHandshakeMessages struct { DstMsgChannelOpenConfirm ChannelMessageCache } +type pathEndChannelCloseMessages struct { + Src *pathEndRuntime + Dst *pathEndRuntime + SrcMsgChannelPreInit ChannelMessageCache + SrcMsgChannelCloseInit ChannelMessageCache + DstMsgChannelCloseConfirm ChannelMessageCache +} + type pathEndPacketFlowResponse struct { SrcMessages []packetIBCMessage DstMessages []packetIBCMessage From 15142fbfacea11f5f04ab74aaf9a9d429449ed57 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 24 Mar 2023 18:28:28 -0600 Subject: [PATCH 2/6] Switch to pre-close key --- relayer/channel.go | 7 +++++++ relayer/processor/path_end_runtime.go | 3 ++- relayer/processor/path_processor_internal.go | 19 +++++++++++-------- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/relayer/channel.go b/relayer/channel.go index f5aa9dc0e..0ee754150 100644 --- a/relayer/channel.go +++ b/relayer/channel.go @@ -117,6 +117,12 @@ func (c *Chain) CloseChannel( ctx, cancel := context.WithTimeout(ctx, processorTimeout) defer cancel() + c.log.Info("Starting event processor for channel close", + zap.String("src_chain_id", c.PathEnd.ChainID), + zap.String("src_port_id", srcPortID), + zap.String("dst_chain_id", dst.PathEnd.ChainID), + ) + return processor.NewEventProcessor(). WithChainProcessors( c.chainProcessor(c.log, nil), @@ -137,6 +143,7 @@ func (c *Chain) CloseChannel( ChainID: c.PathEnd.ChainID, EventType: chantypes.EventTypeChannelCloseInit, Info: provider.ChannelInfo{ + ConnID: c.PathEnd.ConnectionID, PortID: srcPortID, ChannelID: srcChanID, }, diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index a9d26d6aa..a60a0b27c 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -620,8 +620,9 @@ func (pathEnd *pathEndRuntime) shouldSendChannelMessage(message channelIBCMessag toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()} toDeleteCounterparty[preInitKey] = []ChannelKey{counterpartyKey.MsgInitKey()} case chantypes.EventTypeChannelCloseConfirm: - toDeleteCounterparty[chantypes.EventTypeChannelCloseInit] = []ChannelKey{counterpartyKey} toDelete[chantypes.EventTypeChannelCloseConfirm] = []ChannelKey{channelKey} + toDeleteCounterparty[chantypes.EventTypeChannelCloseInit] = []ChannelKey{counterpartyKey} + toDeleteCounterparty[preCloseKey] = []ChannelKey{counterpartyKey} // Gather relevant send packet messages, for this channel key, that should be deleted if we // are operating on an ordered channel. diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 58567e138..fd59eb76a 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -17,7 +17,10 @@ import ( // preInitKey is used to declare intent to initialize a connection or channel handshake // i.e. a MsgConnectionOpenInit or a MsgChannelOpenInit should be broadcasted to start // the handshake if this key exists in the relevant cache. -const preInitKey = "pre_init" +const ( + preInitKey = "pre_init" + preCloseKey = "pre_close" +) // getMessagesToSend returns only the lowest sequence message (if it should be sent) for ordered channels, // otherwise returns all which should be sent. @@ -518,8 +521,8 @@ func (pp *PathProcessor) unrelayedChannelCloseMessages( toDeleteSrc[chantypes.EventTypeChannelCloseInit], counterpartyKey.MsgInitKey(), ) - // TODO: confirm this should use PreInitKey - toDeleteSrc[preInitKey] = append(toDeleteSrc[preInitKey], counterpartyKey.PreInitKey()) + // TODO: confirm chankey does not need modification + toDeleteSrc[preCloseKey] = append(toDeleteSrc[preCloseKey], counterpartyKey) } processRemovals() @@ -536,8 +539,8 @@ func (pp *PathProcessor) unrelayedChannelCloseMessages( res.DstMessages = append(res.DstMessages, msgCloseConfirm) } - // TODO: confirm this should use PreInitKey - toDeleteSrc[preInitKey] = append(toDeleteSrc[preInitKey], chanKey.PreInitKey()) + // TODO: confirm chankey does not need modification + toDeleteSrc[preCloseKey] = append(toDeleteSrc[preCloseKey], chanKey) } processRemovals() @@ -630,7 +633,7 @@ var observedEventTypeForDesiredMessage = map[string]string{ chantypes.EventTypeChannelOpenInit: preInitKey, chantypes.EventTypeChannelCloseConfirm: chantypes.EventTypeChannelCloseInit, - chantypes.EventTypeChannelCloseInit: preInitKey, + chantypes.EventTypeChannelCloseInit: preCloseKey, chantypes.EventTypeAcknowledgePacket: chantypes.EventTypeRecvPacket, chantypes.EventTypeRecvPacket: chantypes.EventTypeSendPacket, @@ -800,14 +803,14 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { pathEnd1ChannelCloseMessages := pathEndChannelCloseMessages{ Src: pp.pathEnd1, Dst: pp.pathEnd2, - SrcMsgChannelPreInit: pp.pathEnd1.messageCache.ChannelHandshake[preInitKey], + SrcMsgChannelPreInit: pp.pathEnd1.messageCache.ChannelHandshake[preCloseKey], SrcMsgChannelCloseInit: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], DstMsgChannelCloseConfirm: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], } pathEnd2ChannelCloseMessages := pathEndChannelCloseMessages{ Src: pp.pathEnd2, Dst: pp.pathEnd1, - SrcMsgChannelPreInit: pp.pathEnd2.messageCache.ChannelHandshake[preInitKey], + SrcMsgChannelPreInit: pp.pathEnd2.messageCache.ChannelHandshake[preCloseKey], SrcMsgChannelCloseInit: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], DstMsgChannelCloseConfirm: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], } From 45f04914ebdc4e6d25d05a019c99ae1a9f971774 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sat, 25 Mar 2023 11:11:28 -0600 Subject: [PATCH 3/6] make tx channel-close cli command work, add test coverage --- interchaintest/ica_channel_close_test.go | 315 +++++++++++++++++++ interchaintest/interchain_accounts_test.go | 6 +- relayer/channel.go | 56 ++-- relayer/processor/path_end_runtime.go | 41 +++ relayer/processor/path_processor.go | 6 +- relayer/processor/path_processor_internal.go | 159 ++++++---- relayer/processor/types.go | 12 + 7 files changed, 514 insertions(+), 81 deletions(-) create mode 100644 interchaintest/ica_channel_close_test.go diff --git a/interchaintest/ica_channel_close_test.go b/interchaintest/ica_channel_close_test.go new file mode 100644 index 000000000..30e4dcee0 --- /dev/null +++ b/interchaintest/ica_channel_close_test.go @@ -0,0 +1,315 @@ +package interchaintest_test + +import ( + "context" + "encoding/json" + "strconv" + "testing" + "time" + + "github.com/cosmos/cosmos-sdk/crypto/keyring" + relayerinterchaintest "github.com/cosmos/relayer/v2/interchaintest" + interchaintest "github.com/strangelove-ventures/interchaintest/v7" + "github.com/strangelove-ventures/interchaintest/v7/chain/cosmos" + "github.com/strangelove-ventures/interchaintest/v7/ibc" + "github.com/strangelove-ventures/interchaintest/v7/testreporter" + "github.com/strangelove-ventures/interchaintest/v7/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +// TestScenarioICAChannelClose is very similar to the TestScenarioInterchainAccounts, +// but instead it tests manually closing the channel using the relayer CLI. +func TestScenarioICAChannelClose(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + + t.Parallel() + + client, network := interchaintest.DockerSetup(t) + + rep := testreporter.NewNopReporter() + eRep := rep.RelayerExecReporter(t) + + ctx := context.Background() + + // Get both chains + nf := 0 + nv := 1 + cf := interchaintest.NewBuiltinChainFactory(zaptest.NewLogger(t), []*interchaintest.ChainSpec{ + { + Name: "icad", + NumValidators: &nv, + NumFullNodes: &nf, + ChainConfig: ibc.ChainConfig{ + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, + }, + }, + { + Name: "icad", + NumValidators: &nv, + NumFullNodes: &nf, + ChainConfig: ibc.ChainConfig{ + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, + }, + }, + }) + + chains, err := cf.Chains(t.Name()) + require.NoError(t, err) + + chain1, chain2 := chains[0], chains[1] + + // Get a relayer instance + r := relayerinterchaintest. + NewRelayerFactory(relayerinterchaintest.RelayerConfig{}). + Build(t, client, network) + + // Build the network; spin up the chains and configure the relayer + const pathName = "test-path" + const relayerName = "relayer" + + ic := interchaintest.NewInterchain(). + AddChain(chain1). + AddChain(chain2). + AddRelayer(r, relayerName). + AddLink(interchaintest.InterchainLink{ + Chain1: chain1, + Chain2: chain2, + Relayer: r, + Path: pathName, + }) + + require.NoError(t, ic.Build(ctx, eRep, interchaintest.InterchainBuildOptions{ + TestName: t.Name(), + Client: client, + NetworkID: network, + SkipPathCreation: true, + BlockDatabaseFile: interchaintest.DefaultBlockDatabaseFilepath(), + })) + + // Fund a user account on chain1 and chain2 + const userFunds = int64(10_000_000_000) + users := interchaintest.GetAndFundTestUsers(t, ctx, t.Name(), userFunds, chain1, chain2) + chain1User := users[0] + chain2User := users[1] + + // Generate a new IBC path + err = r.GeneratePath(ctx, eRep, chain1.Config().ChainID, chain2.Config().ChainID, pathName) + require.NoError(t, err) + + // Create new clients + err = r.CreateClients(ctx, eRep, pathName, ibc.CreateClientOptions{TrustingPeriod: "330h"}) + require.NoError(t, err) + + err = testutil.WaitForBlocks(ctx, 5, chain1, chain2) + require.NoError(t, err) + + // Create a new connection + err = r.CreateConnections(ctx, eRep, pathName) + require.NoError(t, err) + + err = testutil.WaitForBlocks(ctx, 5, chain1, chain2) + require.NoError(t, err) + + // Query for the newly created connection + connections, err := r.GetConnections(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(connections)) + + // Register a new interchain account on chain2, on behalf of the user acc on chain1 + chain1Addr := chain1User.(*cosmos.CosmosWallet).FormattedAddressWithPrefix(chain1.Config().Bech32Prefix) + + registerICA := []string{ + chain1.Config().Bin, "tx", "intertx", "register", + "--from", chain1Addr, + "--connection-id", connections[0].ID, + "--chain-id", chain1.Config().ChainID, + "--home", chain1.HomeDir(), + "--node", chain1.GetRPCAddress(), + "--keyring-backend", keyring.BackendTest, + "-y", + } + _, _, err = chain1.Exec(ctx, registerICA, nil) + require.NoError(t, err) + + // Start the relayer and set the cleanup function. + err = r.StartRelayer(ctx, eRep, pathName) + require.NoError(t, err) + + t.Cleanup( + func() { + err := r.StopRelayer(ctx, eRep) + if err != nil { + t.Logf("an error occured while stopping the relayer: %s", err) + } + }, + ) + + // Wait for relayer to start up and finish channel handshake + err = testutil.WaitForBlocks(ctx, 15, chain1, chain2) + require.NoError(t, err) + + // Query for the newly registered interchain account + queryICA := []string{ + chain1.Config().Bin, "query", "intertx", "interchainaccounts", connections[0].ID, chain1Addr, + "--chain-id", chain1.Config().ChainID, + "--home", chain1.HomeDir(), + "--node", chain1.GetRPCAddress(), + } + stdout, _, err := chain1.Exec(ctx, queryICA, nil) + require.NoError(t, err) + + icaAddr := parseInterchainAccountField(stdout) + require.NotEmpty(t, icaAddr) + + // Get initial account balances + chain2Addr := chain2User.(*cosmos.CosmosWallet).FormattedAddressWithPrefix(chain2.Config().Bech32Prefix) + + chain2OrigBal, err := chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + + icaOrigBal, err := chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + + // Send funds to ICA from user account on chain2 + const transferAmount = 10000 + transfer := ibc.WalletAmount{ + Address: icaAddr, + Denom: chain2.Config().Denom, + Amount: transferAmount, + } + err = chain2.SendFunds(ctx, chain2User.KeyName(), transfer) + require.NoError(t, err) + + // Wait for transfer to be complete and assert balances + err = testutil.WaitForBlocks(ctx, 5, chain2) + require.NoError(t, err) + + chain2Bal, err := chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, chain2OrigBal-transferAmount, chain2Bal) + + icaBal, err := chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, icaOrigBal+transferAmount, icaBal) + + // Build bank transfer msg + rawMsg, err := json.Marshal(map[string]any{ + "@type": "/cosmos.bank.v1beta1.MsgSend", + "from_address": icaAddr, + "to_address": chain2Addr, + "amount": []map[string]any{ + { + "denom": chain2.Config().Denom, + "amount": strconv.Itoa(transferAmount), + }, + }, + }) + require.NoError(t, err) + + // Send bank transfer msg to ICA on chain2 from the user account on chain1 + sendICATransfer := []string{ + chain1.Config().Bin, "tx", "intertx", "submit", string(rawMsg), + "--connection-id", connections[0].ID, + "--from", chain1Addr, + "--chain-id", chain1.Config().ChainID, + "--home", chain1.HomeDir(), + "--node", chain1.GetRPCAddress(), + "--keyring-backend", keyring.BackendTest, + "-y", + } + _, _, err = chain1.Exec(ctx, sendICATransfer, nil) + require.NoError(t, err) + + // Wait for tx to be relayed + err = testutil.WaitForBlocks(ctx, 10, chain2) + require.NoError(t, err) + + // Assert that the funds have been received by the user account on chain2 + chain2Bal, err = chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, chain2OrigBal, chain2Bal) + + // Assert that the funds have been removed from the ICA on chain2 + icaBal, err = chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, icaOrigBal, icaBal) + + // Stop the relayer and wait for the process to terminate + err = r.StopRelayer(ctx, eRep) + require.NoError(t, err) + + err = testutil.WaitForBlocks(ctx, 5, chain1, chain2) + require.NoError(t, err) + + // Send another bank transfer msg to ICA on chain2 from the user account on chain1. + // This message should timeout and the channel will be closed when we re-start the relayer. + _, _, err = chain1.Exec(ctx, sendICATransfer, nil) + require.NoError(t, err) + + // Wait for approximately one minute to allow packet timeout threshold to be hit + time.Sleep(70 * time.Second) + + chain1Chans, err := r.GetChannels(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(chain1Chans)) + + // Close the channel using the channel close CLI method + res := r.Exec(ctx, eRep, []string{"tx", "channel-close", pathName, chain1Chans[0].ChannelID, chain1Chans[0].PortID}, nil) + require.NoError(t, res.Err) + require.Zero(t, res.ExitCode) + + // Assert that the packet timed out and that the acc balances are correct + chain2Bal, err = chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, chain2OrigBal, chain2Bal) + + icaBal, err = chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, icaOrigBal, icaBal) + + // Assert that the channel ends are both closed + chain1Chans, err = r.GetChannels(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(chain1Chans)) + require.Subset(t, []string{"STATE_CLOSED", "Closed"}, []string{chain1Chans[0].State}) + + chain2Chans, err := r.GetChannels(ctx, eRep, chain2.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(chain2Chans)) + require.Subset(t, []string{"STATE_CLOSED", "Closed"}, []string{chain2Chans[0].State}) + + // Restart the relayer for the next channel handshake + err = r.StartRelayer(ctx, eRep, pathName) + require.NoError(t, err) + + // Attempt to open another channel for the same ICA + _, _, err = chain1.Exec(ctx, registerICA, nil) + require.NoError(t, err) + + // Wait for channel handshake to finish + err = testutil.WaitForBlocks(ctx, 15, chain1, chain2) + require.NoError(t, err) + + // Assert that a new channel has been opened and the same ICA is in use + stdout, _, err = chain1.Exec(ctx, queryICA, nil) + require.NoError(t, err) + + newICA := parseInterchainAccountField(stdout) + require.NotEmpty(t, newICA) + require.Equal(t, icaAddr, newICA) + + chain1Chans, err = r.GetChannels(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 2, len(chain1Chans)) + require.Subset(t, []string{"STATE_OPEN", "Open"}, []string{chain1Chans[1].State}) + + chain2Chans, err = r.GetChannels(ctx, eRep, chain2.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 2, len(chain2Chans)) + require.Subset(t, []string{"STATE_OPEN", "Open"}, []string{chain2Chans[1].State}) +} diff --git a/interchaintest/interchain_accounts_test.go b/interchaintest/interchain_accounts_test.go index 37be21e5d..3da235746 100644 --- a/interchaintest/interchain_accounts_test.go +++ b/interchaintest/interchain_accounts_test.go @@ -44,7 +44,8 @@ func TestScenarioInterchainAccounts(t *testing.T) { NumValidators: &nv, NumFullNodes: &nf, ChainConfig: ibc.ChainConfig{ - Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.3.5"}}, + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, }, }, { @@ -52,7 +53,8 @@ func TestScenarioInterchainAccounts(t *testing.T) { NumValidators: &nv, NumFullNodes: &nf, ChainConfig: ibc.ChainConfig{ - Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.3.5"}}, + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, }, }, }) diff --git a/relayer/channel.go b/relayer/channel.go index 0ee754150..5274d405d 100644 --- a/relayer/channel.go +++ b/relayer/channel.go @@ -114,6 +114,38 @@ func (c *Chain) CloseChannel( // Timeout is per message. Two close channel handshake messages, allowing maxRetries for each. processorTimeout := timeout * 2 * time.Duration(maxRetries) + // Perform a flush first so that any timeouts are cleared. + flushCtx, flushCancel := context.WithTimeout(ctx, processorTimeout) + defer flushCancel() + + flushProcessor := processor.NewEventProcessor(). + WithChainProcessors( + c.chainProcessor(c.log, nil), + dst.chainProcessor(c.log, nil), + ). + WithPathProcessors(processor.NewPathProcessor( + c.log, + processor.NewPathEnd(pathName, c.PathEnd.ChainID, c.PathEnd.ClientID, "", []processor.ChainChannelKey{}), + processor.NewPathEnd(pathName, dst.PathEnd.ChainID, dst.PathEnd.ClientID, "", []processor.ChainChannelKey{}), + nil, + memo, + DefaultClientUpdateThreshold, + DefaultFlushInterval, + )). + WithInitialBlockHistory(0). + WithMessageLifecycle(&processor.FlushLifecycle{}). + Build() + + c.log.Info("Starting event processor for flush before channel close", + zap.String("src_chain_id", c.PathEnd.ChainID), + zap.String("src_port_id", srcPortID), + zap.String("dst_chain_id", dst.PathEnd.ChainID), + ) + + if err := flushProcessor.Run(flushCtx); err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, processorTimeout) defer cancel() @@ -138,24 +170,12 @@ func (c *Chain) CloseChannel( DefaultFlushInterval, )). WithInitialBlockHistory(0). - WithMessageLifecycle(&processor.ChannelMessageLifecycle{ - Initial: &processor.ChannelMessage{ - ChainID: c.PathEnd.ChainID, - EventType: chantypes.EventTypeChannelCloseInit, - Info: provider.ChannelInfo{ - ConnID: c.PathEnd.ConnectionID, - PortID: srcPortID, - ChannelID: srcChanID, - }, - }, - Termination: &processor.ChannelMessage{ - ChainID: dst.PathEnd.ChainID, - EventType: chantypes.EventTypeChannelCloseConfirm, - Info: provider.ChannelInfo{ - CounterpartyPortID: srcPortID, - CounterpartyChannelID: srcChanID, - }, - }, + WithMessageLifecycle(&processor.ChannelCloseLifecycle{ + SrcChainID: c.PathEnd.ChainID, + SrcChannelID: srcChanID, + SrcPortID: srcPortID, + SrcConnID: c.PathEnd.ConnectionID, + DstConnID: dst.PathEnd.ConnectionID, }). Build(). Run(ctx) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index a60a0b27c..9796ade1c 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -232,6 +232,12 @@ func (pathEnd *pathEndRuntime) shouldTerminate(ibcMessagesCache IBCMessagesCache foundCounterpartyChannelID := m.Termination.Info.CounterpartyChannelID == "" foundCounterpartyPortID := m.Termination.Info.CounterpartyPortID == "" for _, ci := range cache { + pathEnd.log.Info("Channel handshake termination candidate", + zap.String("termination_port_id", m.Termination.Info.PortID), + zap.String("observed_port_id", ci.PortID), + zap.String("termination_counterparty_port_id", m.Termination.Info.CounterpartyPortID), + zap.String("observed_counterparty_port_id", ci.CounterpartyPortID), + ) if ci.ChannelID == m.Termination.Info.ChannelID { foundChannelID = true } @@ -249,6 +255,41 @@ func (pathEnd *pathEndRuntime) shouldTerminate(ibcMessagesCache IBCMessagesCache pathEnd.log.Info("Found termination condition for channel handshake") return true } + case *ChannelCloseLifecycle: + cache, ok := ibcMessagesCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm] + if !ok { + return false + } + // check against m.Termination.Info + foundChannelID := m.SrcChannelID == "" + foundPortID := m.SrcPortID == "" + for _, ci := range cache { + pathEnd.log.Info("Channel close termination candidate", + zap.String("termination_port_id", m.SrcPortID), + zap.String("observed_port_id", ci.PortID), + zap.String("termination_channel_id", m.SrcChannelID), + zap.String("observed_channel_id", ci.ChannelID), + ) + if pathEnd.info.ChainID == m.SrcChainID { + if ci.ChannelID == m.SrcChannelID { + foundChannelID = true + } + if ci.PortID == m.SrcPortID { + foundPortID = true + } + } else { + if ci.CounterpartyChannelID == m.SrcChannelID { + foundChannelID = true + } + if ci.CounterpartyPortID == m.SrcPortID { + foundPortID = true + } + } + } + if foundChannelID && foundPortID { + pathEnd.log.Info("Found termination condition for channel close") + return true + } case *ConnectionMessageLifecycle: if m.Termination == nil || m.Termination.ChainID != pathEnd.info.ChainID { return false diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 3fb8f6b95..976037ba1 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -180,7 +180,7 @@ func (pp *PathProcessor) channelPairs() []channelPair { } pairs := make([]channelPair, len(channels)) i := 0 - for k, _ := range channels { + for k := range channels { pairs[i] = channelPair{ pathEnd1ChannelKey: k, pathEnd2ChannelKey: k.Counterparty(), @@ -321,13 +321,13 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { if pp.shouldFlush() && !pp.initialFlushComplete { pp.flush(ctx) pp.initialFlushComplete = true - } else if pp.shouldTerminateForFlushComplete(ctx, cancel) { + } else if pp.shouldTerminateForFlushComplete() { cancel() return } // process latest message cache state from both pathEnds - if err := pp.processLatestMessages(ctx); err != nil { + if err := pp.processLatestMessages(ctx, cancel); err != nil { // in case of IBC message send errors, schedule retry after durationErrorRetry if retryTimer != nil { retryTimer.Stop() diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index fd59eb76a..82d68c693 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -126,39 +126,21 @@ func (pp *PathProcessor) unrelayedPacketFlowMessages( for seq, info := range pathEndPacketFlowMessages.SrcMsgTimeout { deletePreInitIfMatches(info) toDeleteSrc[chantypes.EventTypeSendPacket] = append(toDeleteSrc[chantypes.EventTypeSendPacket], seq) + toDeleteSrc[chantypes.EventTypeTimeoutPacket] = append(toDeleteSrc[chantypes.EventTypeTimeoutPacket], seq) if info.ChannelOrder == chantypes.ORDERED.String() { - // For ordered channel packets, flow is not done until channel-close-confirm is observed. - if pathEndPacketFlowMessages.DstMsgChannelCloseConfirm == nil { - // have not observed a channel-close-confirm yet for this channel, send it if ready. - // will come back through here next block if not yet ready. - closeChan := channelIBCMessage{ - eventType: chantypes.EventTypeChannelCloseConfirm, - info: provider.ChannelInfo{ - Height: info.Height, - PortID: info.SourcePort, - ChannelID: info.SourceChannel, - CounterpartyPortID: info.DestPort, - CounterpartyChannelID: info.DestChannel, - Order: orderFromString(info.ChannelOrder), - }, - } - - if pathEndPacketFlowMessages.Dst.shouldSendChannelMessage(closeChan, pathEndPacketFlowMessages.Src) { - res.DstChannelMessage = append(res.DstChannelMessage, closeChan) - } - } else { - // ordered channel, and we have a channel close confirm, so packet-flow and channel-close-flow is complete. - // remove all retention of this sequence number and this channel-close-confirm. - toDeleteDstChannel[chantypes.EventTypeChannelCloseConfirm] = append( - toDeleteDstChannel[chantypes.EventTypeChannelCloseConfirm], - k.Counterparty(), - ) - toDeleteSrc[chantypes.EventTypeTimeoutPacket] = append(toDeleteSrc[chantypes.EventTypeTimeoutPacket], seq) + // Channel is now closed on src. + // enqueue channel close init observation to be handled by channel close correlation + if _, ok := pathEndPacketFlowMessages.Src.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit]; !ok { + pathEndPacketFlowMessages.Src.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit] = make(ChannelMessageCache) + } + pathEndPacketFlowMessages.Src.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit][k] = provider.ChannelInfo{ + Height: info.Height, + PortID: info.SourcePort, + ChannelID: info.SourceChannel, + CounterpartyPortID: info.DestPort, + CounterpartyChannelID: info.DestChannel, + Order: orderFromString(info.ChannelOrder), } - } else { - // unordered channel, and we have a timeout for this packet, so packet flow is complete - // remove all retention of this sequence number - toDeleteSrc[chantypes.EventTypeTimeoutPacket] = append(toDeleteSrc[chantypes.EventTypeTimeoutPacket], seq) } } @@ -632,15 +614,12 @@ var observedEventTypeForDesiredMessage = map[string]string{ chantypes.EventTypeChannelOpenTry: chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenInit: preInitKey, - chantypes.EventTypeChannelCloseConfirm: chantypes.EventTypeChannelCloseInit, - chantypes.EventTypeChannelCloseInit: preCloseKey, - chantypes.EventTypeAcknowledgePacket: chantypes.EventTypeRecvPacket, chantypes.EventTypeRecvPacket: chantypes.EventTypeSendPacket, chantypes.EventTypeSendPacket: preInitKey, } -func (pp *PathProcessor) queuePreInitMessages() { +func (pp *PathProcessor) queuePreInitMessages(cancel func()) { if pp.messageLifecycle == nil || pp.sentInitialMsg { return } @@ -658,6 +637,7 @@ func (pp *PathProcessor) queuePreInitMessages() { zap.Inline(channelKey), zap.Error(err), ) + cancel() return } if !pp.IsRelayedChannel(m.Initial.ChainID, channelKey) { @@ -669,6 +649,7 @@ func (pp *PathProcessor) queuePreInitMessages() { "Failed to queue initial connection message, event type not handled", zap.String("event_type", m.Initial.EventType), ) + cancel() return } if m.Initial.ChainID == pp.pathEnd1.info.ChainID { @@ -698,6 +679,7 @@ func (pp *PathProcessor) queuePreInitMessages() { "Failed to queue initial connection message, event type not handled", zap.String("event_type", m.Initial.EventType), ) + cancel() return } connKey := ConnectionInfoConnectionKey(m.Initial.Info) @@ -728,6 +710,7 @@ func (pp *PathProcessor) queuePreInitMessages() { "Failed to queue initial channel message, event type not handled", zap.String("event_type", m.Initial.EventType), ) + cancel() return } chanKey := ChannelInfoChannelKey(m.Initial.Info) @@ -736,7 +719,6 @@ func (pp *PathProcessor) queuePreInitMessages() { if !ok { pp.pathEnd1.messageCache.ChannelHandshake[eventType] = make(ChannelMessageCache) } - pp.pathEnd1.messageCache.ChannelHandshake[eventType][chanKey] = m.Initial.Info } else if m.Initial.ChainID == pp.pathEnd2.info.ChainID { _, ok = pp.pathEnd2.messageCache.ChannelHandshake[eventType] @@ -745,18 +727,81 @@ func (pp *PathProcessor) queuePreInitMessages() { } pp.pathEnd2.messageCache.ChannelHandshake[eventType][chanKey] = m.Initial.Info } + case *ChannelCloseLifecycle: + pp.sentInitialMsg = true + + if !pp.IsRelevantConnection(pp.pathEnd1.info.ChainID, m.SrcConnID) { + return + } + + for k, open := range pp.pathEnd1.channelStateCache { + if k.ChannelID == m.SrcChannelID && k.PortID == m.SrcPortID && k.CounterpartyChannelID != "" && k.CounterpartyPortID != "" { + if open { + // channel is still open on pathEnd1 + break + } + if counterpartyOpen, ok := pp.pathEnd2.channelStateCache[k.Counterparty()]; ok && !counterpartyOpen { + pp.log.Info("Channel already closed on both sides") + cancel() + return + } + // queue channel close init on pathEnd1 + if _, ok := pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit]; !ok { + pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit] = make(ChannelMessageCache) + } + pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit][k] = provider.ChannelInfo{ + PortID: k.PortID, + ChannelID: k.ChannelID, + CounterpartyPortID: k.CounterpartyPortID, + CounterpartyChannelID: k.CounterpartyChannelID, + ConnID: m.SrcConnID, + } + return + } + } + + for k, open := range pp.pathEnd2.channelStateCache { + if k.CounterpartyChannelID == m.SrcChannelID && k.CounterpartyPortID == m.SrcPortID && k.ChannelID != "" && k.PortID != "" { + if open { + // channel is still open on pathEnd2 + break + } + if counterpartyChanState, ok := pp.pathEnd1.channelStateCache[k.Counterparty()]; ok && !counterpartyChanState { + pp.log.Info("Channel already closed on both sides") + cancel() + return + } + // queue channel close init on pathEnd2 + if _, ok := pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit]; !ok { + pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit] = make(ChannelMessageCache) + } + pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit][k] = provider.ChannelInfo{ + PortID: k.PortID, + ChannelID: k.ChannelID, + CounterpartyPortID: k.CounterpartyPortID, + CounterpartyChannelID: k.CounterpartyChannelID, + ConnID: m.DstConnID, + } + } + } + + pp.log.Error("This channel is unable to be closed. Channel must already be closed on one chain.", + zap.String("src_channel_id", m.SrcChannelID), + zap.String("src_port_id", m.SrcPortID), + ) + cancel() } } // messages from both pathEnds are needed in order to determine what needs to be relayed for a single pathEnd -func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { +func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func()) error { // Update trusted client state for both pathends pp.updateClientTrustedState(pp.pathEnd1, pp.pathEnd2) pp.updateClientTrustedState(pp.pathEnd2, pp.pathEnd1) channelPairs := pp.channelPairs() - pp.queuePreInitMessages() + pp.queuePreInitMessages(cancel) pathEnd1ConnectionHandshakeMessages := pathEndConnectionHandshakeMessages{ Src: pp.pathEnd1, @@ -800,23 +845,6 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { pathEnd1ChannelHandshakeRes := pp.unrelayedChannelHandshakeMessages(pathEnd1ChannelHandshakeMessages) pathEnd2ChannelHandshakeRes := pp.unrelayedChannelHandshakeMessages(pathEnd2ChannelHandshakeMessages) - pathEnd1ChannelCloseMessages := pathEndChannelCloseMessages{ - Src: pp.pathEnd1, - Dst: pp.pathEnd2, - SrcMsgChannelPreInit: pp.pathEnd1.messageCache.ChannelHandshake[preCloseKey], - SrcMsgChannelCloseInit: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], - DstMsgChannelCloseConfirm: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], - } - pathEnd2ChannelCloseMessages := pathEndChannelCloseMessages{ - Src: pp.pathEnd2, - Dst: pp.pathEnd1, - SrcMsgChannelPreInit: pp.pathEnd2.messageCache.ChannelHandshake[preCloseKey], - SrcMsgChannelCloseInit: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], - DstMsgChannelCloseConfirm: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], - } - pathEnd1ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd1ChannelCloseMessages) - pathEnd2ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd2ChannelCloseMessages) - // process the packet flows for both path ends to determine what needs to be relayed pathEnd1ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs)) pathEnd2ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs)) @@ -882,6 +910,23 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { pathEnd2ProcessRes[i] = pp.unrelayedPacketFlowMessages(ctx, pathEnd2PacketFlowMessages) } + pathEnd1ChannelCloseMessages := pathEndChannelCloseMessages{ + Src: pp.pathEnd1, + Dst: pp.pathEnd2, + SrcMsgChannelPreInit: pp.pathEnd1.messageCache.ChannelHandshake[preCloseKey], + SrcMsgChannelCloseInit: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], + DstMsgChannelCloseConfirm: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], + } + pathEnd2ChannelCloseMessages := pathEndChannelCloseMessages{ + Src: pp.pathEnd2, + Dst: pp.pathEnd1, + SrcMsgChannelPreInit: pp.pathEnd2.messageCache.ChannelHandshake[preCloseKey], + SrcMsgChannelCloseInit: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], + DstMsgChannelCloseConfirm: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], + } + pathEnd1ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd1ChannelCloseMessages) + pathEnd2ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd2ChannelCloseMessages) + // concatenate applicable messages for pathend pathEnd1ConnectionMessages, pathEnd2ConnectionMessages := pp.connectionMessagesToSend(pathEnd1ConnectionHandshakeRes, pathEnd2ConnectionHandshakeRes) pathEnd1ChannelMessages, pathEnd2ChannelMessages := pp.channelMessagesToSend( @@ -1186,9 +1231,7 @@ func (pp *PathProcessor) flush(ctx context.Context) { // shouldTerminateForFlushComplete will determine if the relayer should exit // when FlushLifecycle is used. It will exit when all of the message caches are cleared. -func (pp *PathProcessor) shouldTerminateForFlushComplete( - ctx context.Context, cancel func(), -) bool { +func (pp *PathProcessor) shouldTerminateForFlushComplete() bool { if _, ok := pp.messageLifecycle.(*FlushLifecycle); !ok { return false } diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 347974800..3f4059b7b 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -72,6 +72,18 @@ type ChannelMessageLifecycle struct { func (t *ChannelMessageLifecycle) messageLifecycler() {} +// ChannelCloseLifecycle is used as a stop condition for the PathProcessor. +// It will attempt to finish closing the channel and terminate once the channel is closed. +type ChannelCloseLifecycle struct { + SrcChainID string + SrcChannelID string + SrcPortID string + SrcConnID string + DstConnID string +} + +func (t *ChannelCloseLifecycle) messageLifecycler() {} + // IBCMessagesCache holds cached messages for packet flows, connection handshakes, // and channel handshakes. The PathProcessors use this for message correlation to determine // when messages should be sent and are pruned when flows/handshakes are complete. From f5ed01402b29e12ddd5d8806a2ca162b9f75dc3c Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sat, 25 Mar 2023 11:32:54 -0600 Subject: [PATCH 4/6] more sweet code removals --- relayer/processor/path_processor_internal.go | 52 +++++++------------- relayer/processor/types_internal.go | 19 ++++--- 2 files changed, 27 insertions(+), 44 deletions(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 82d68c693..af94cbd36 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -850,20 +850,6 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( pathEnd2ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs)) for i, pair := range channelPairs { - var pathEnd1ChannelCloseConfirm, pathEnd2ChannelCloseConfirm *provider.ChannelInfo - - if pathEnd1ChanCloseConfirmMsgs, ok := pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm]; ok { - if pathEnd1ChannelCloseConfirmMsg, ok := pathEnd1ChanCloseConfirmMsgs[pair.pathEnd1ChannelKey]; ok { - pathEnd1ChannelCloseConfirm = &pathEnd1ChannelCloseConfirmMsg - } - } - - if pathEnd2ChanCloseConfirmMsgs, ok := pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm]; ok { - if pathEnd2ChannelCloseConfirmMsg, ok := pathEnd2ChanCloseConfirmMsgs[pair.pathEnd2ChannelKey]; ok { - pathEnd2ChannelCloseConfirm = &pathEnd2ChannelCloseConfirmMsg - } - } - // Append acks into recv packet info if present pathEnd1DstMsgRecvPacket := pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeRecvPacket] for seq, ackInfo := range pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeWriteAck] { @@ -882,28 +868,26 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( } pathEnd1PacketFlowMessages := pathEndPacketFlowMessages{ - Src: pp.pathEnd1, - Dst: pp.pathEnd2, - ChannelKey: pair.pathEnd1ChannelKey, - SrcPreTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], - SrcMsgTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeSendPacket], - DstMsgRecvPacket: pathEnd1DstMsgRecvPacket, - SrcMsgAcknowledgement: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeAcknowledgePacket], - SrcMsgTimeout: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacket], - SrcMsgTimeoutOnClose: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], - DstMsgChannelCloseConfirm: pathEnd2ChannelCloseConfirm, + Src: pp.pathEnd1, + Dst: pp.pathEnd2, + ChannelKey: pair.pathEnd1ChannelKey, + SrcPreTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], + SrcMsgTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeSendPacket], + DstMsgRecvPacket: pathEnd1DstMsgRecvPacket, + SrcMsgAcknowledgement: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeAcknowledgePacket], + SrcMsgTimeout: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacket], + SrcMsgTimeoutOnClose: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], } pathEnd2PacketFlowMessages := pathEndPacketFlowMessages{ - Src: pp.pathEnd2, - Dst: pp.pathEnd1, - ChannelKey: pair.pathEnd2ChannelKey, - SrcPreTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], - SrcMsgTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeSendPacket], - DstMsgRecvPacket: pathEnd2DstMsgRecvPacket, - SrcMsgAcknowledgement: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeAcknowledgePacket], - SrcMsgTimeout: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacket], - SrcMsgTimeoutOnClose: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], - DstMsgChannelCloseConfirm: pathEnd1ChannelCloseConfirm, + Src: pp.pathEnd2, + Dst: pp.pathEnd1, + ChannelKey: pair.pathEnd2ChannelKey, + SrcPreTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], + SrcMsgTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeSendPacket], + DstMsgRecvPacket: pathEnd2DstMsgRecvPacket, + SrcMsgAcknowledgement: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeAcknowledgePacket], + SrcMsgTimeout: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacket], + SrcMsgTimeoutOnClose: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], } pathEnd1ProcessRes[i] = pp.unrelayedPacketFlowMessages(ctx, pathEnd1PacketFlowMessages) diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index 83835cba1..d526ed70a 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -376,16 +376,15 @@ type clientICQProcessingCache map[provider.ClientICQQueryID]processingMessage // contains MsgRecvPacket from counterparty // entire packet flow type pathEndPacketFlowMessages struct { - Src *pathEndRuntime - Dst *pathEndRuntime - ChannelKey ChannelKey - SrcPreTransfer PacketSequenceCache - SrcMsgTransfer PacketSequenceCache - DstMsgRecvPacket PacketSequenceCache - SrcMsgAcknowledgement PacketSequenceCache - SrcMsgTimeout PacketSequenceCache - SrcMsgTimeoutOnClose PacketSequenceCache - DstMsgChannelCloseConfirm *provider.ChannelInfo + Src *pathEndRuntime + Dst *pathEndRuntime + ChannelKey ChannelKey + SrcPreTransfer PacketSequenceCache + SrcMsgTransfer PacketSequenceCache + DstMsgRecvPacket PacketSequenceCache + SrcMsgAcknowledgement PacketSequenceCache + SrcMsgTimeout PacketSequenceCache + SrcMsgTimeoutOnClose PacketSequenceCache } type pathEndConnectionHandshakeMessages struct { From 7366071f0e87ec2af8ee951d8fccfbdf9a247f3a Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sat, 25 Mar 2023 16:28:25 -0600 Subject: [PATCH 5/6] wip flush connection and channel handshakes in tx flush --- relayer/processor/path_processor_internal.go | 415 ++++++++++++++++++- 1 file changed, 404 insertions(+), 11 deletions(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index af94cbd36..40b93918e 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -9,6 +9,7 @@ import ( conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -1161,18 +1162,40 @@ func queuePendingRecvAndAcks( } } -// flush runs queries to relay any pending messages which may have been -// in blocks before the height that the chain processors started querying. func (pp *PathProcessor) flush(ctx context.Context) { var ( - commitments1 = make(map[ChannelKey][]uint64) - commitments2 = make(map[ChannelKey][]uint64) - commitments1Mu, commitments2Mu sync.Mutex - pathEnd1Cache = NewIBCMessagesCache() pathEnd2Cache = NewIBCMessagesCache() pathEnd1CacheMu, pathEnd2CacheMu sync.Mutex ) + var wg sync.WaitGroup + wg.Add(3) + + go pp.flushPackets(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) + go pp.flushChannels(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) + go pp.flushConnections(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) + + wg.Wait() + + pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync) + pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync) + +} + +// flushPackets runs queries to relay any pending messages which may have been +// in blocks before the height that the chain processors started querying. +func (pp *PathProcessor) flushPackets( + ctx context.Context, + pathEnd1Cache, pathEnd2Cache IBCMessagesCache, + pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, + wg *sync.WaitGroup, +) { + defer wg.Done() + var ( + commitments1 = make(map[ChannelKey][]uint64) + commitments2 = make(map[ChannelKey][]uint64) + commitments1Mu, commitments2Mu sync.Mutex + ) // Query remaining packet commitments on both chains var eg errgroup.Group @@ -1198,19 +1221,16 @@ func (pp *PathProcessor) flush(ctx context.Context) { // 2. Packet commitment is on source, and MsgRecvPacket has been relayed to destination, but MsgAcknowledgement has not been written to source to clear the packet commitment. // Based on above conditions, enqueue MsgRecvPacket and MsgAcknowledgement messages for k, seqs := range commitments1 { - eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, &pathEnd1CacheMu, &pathEnd2CacheMu)) + eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, pathEnd1CacheMu, pathEnd2CacheMu)) } for k, seqs := range commitments2 { - eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, &pathEnd2CacheMu, &pathEnd1CacheMu)) + eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, pathEnd2CacheMu, pathEnd1CacheMu)) } if err := eg.Wait(); err != nil { pp.log.Error("Failed to enqueue pending messages for flush", zap.Error(err)) } - - pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync) - pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync) } // shouldTerminateForFlushComplete will determine if the relayer should exit @@ -1270,3 +1290,376 @@ func (pp *PathProcessor) shouldTerminateForFlushComplete() bool { pp.log.Info("Found termination condition for flush, all caches cleared") return true } + +type ChannelHandshakeState struct { + StateSrc chantypes.State + StateDst chantypes.State + Order chantypes.Order + Version string + SrcConnID string + DstConnID string +} + +// flushChannels runs queries to relay any pending channel handshake messages which may have been +// in blocks before the height that the chain processors started querying. +func (pp *PathProcessor) flushChannels( + ctx context.Context, + pathEnd1Cache, pathEnd2Cache IBCMessagesCache, + pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, + wg *sync.WaitGroup, +) { + defer wg.Done() + + var ( + pathEnd1Chans, pathEnd2Chans []*chantypes.IdentifiedChannel + eg errgroup.Group + ) + eg.Go(func() (err error) { + pathEnd1Chans, err = pp.pathEnd1.chainProvider.QueryChannels(ctx) + return err + }) + eg.Go(func() (err error) { + pathEnd2Chans, err = pp.pathEnd2.chainProvider.QueryChannels(ctx) + return err + }) + if err := eg.Wait(); err != nil { + pp.log.Error("Failed to query channels for channel handshake flush", zap.Error(err)) + return + } + + srcChanKeys := make(map[ChannelKey]ChannelHandshakeState) + + for _, channel := range pathEnd1Chans { + k := ChannelKey{ + ChannelID: channel.ChannelId, + PortID: channel.PortId, + CounterpartyChannelID: channel.Counterparty.ChannelId, + CounterpartyPortID: channel.Counterparty.PortId, + } + srcConnID := channel.ConnectionHops[0] + if !pp.pathEnd1.isRelevantConnection(srcConnID) { + continue + } + if !pp.pathEnd1.info.ShouldRelayChannel(ChainChannelKey{ + ChainID: pp.pathEnd1.info.ChainID, + CounterpartyChainID: pp.pathEnd2.info.ChainID, + ChannelKey: k, + }) { + continue + } + var dstConnID string + for k := range pp.pathEnd1.connectionStateCache { + if k.ConnectionID == srcConnID { + dstConnID = k.CounterpartyConnID + } + } + srcChanKeys[k] = ChannelHandshakeState{ + StateSrc: channel.State, + Order: channel.Ordering, + Version: channel.Version, + SrcConnID: srcConnID, + DstConnID: dstConnID, + } + } + +DstLoop: + for _, channel := range pathEnd2Chans { + k := ChannelKey{ + ChannelID: channel.ChannelId, + PortID: channel.PortId, + CounterpartyChannelID: channel.Counterparty.ChannelId, + CounterpartyPortID: channel.Counterparty.PortId, + } + dstConnID := channel.ConnectionHops[0] + if !pp.pathEnd2.isRelevantConnection(dstConnID) { + continue DstLoop + } + if !pp.pathEnd2.info.ShouldRelayChannel(ChainChannelKey{ + ChainID: pp.pathEnd2.info.ChainID, + CounterpartyChainID: pp.pathEnd1.info.ChainID, + ChannelKey: k, + }) { + continue DstLoop + } + ck := k.Counterparty() + var srcConnID string + for k := range pp.pathEnd2.connectionStateCache { + if k.ConnectionID == dstConnID { + srcConnID = k.CounterpartyConnID + } + } + // check if counterparty key already exists observed from src + if s, ok := srcChanKeys[ck]; ok { + s.StateDst = channel.State + srcChanKeys[ck] = s + continue DstLoop + } + // check if counterparty init key already exists observed from src + msgInitKey := ck.MsgInitKey() + if s, ok := srcChanKeys[msgInitKey]; ok { + s.StateDst = channel.State + srcChanKeys[ck] = s + delete(srcChanKeys, msgInitKey) + continue DstLoop + } + // check if counterparty key already exists observed from src (compared to src init key) + for k, s := range srcChanKeys { + if k.MsgInitKey() == ck { + // have an init on dst with try on src + s.StateDst = channel.State + srcChanKeys[k] = s + continue DstLoop + } + } + // populate dst only + srcChanKeys[ck] = ChannelHandshakeState{ + StateDst: channel.State, + Order: channel.Ordering, + Version: channel.Version, + SrcConnID: srcConnID, + DstConnID: dstConnID, + } + } + + for k, s := range srcChanKeys { + if s.StateSrc == chantypes.OPEN && s.StateDst == chantypes.OPEN { + // already open on both sides. nothing needed. + continue + } + + if s.StateSrc == chantypes.CLOSED || s.StateDst == chantypes.CLOSED { + // don't handle channel closure here. + // Channel closure is a manual operation for now with rly tx channel-close. + continue + } + + pp.log.Info("Found channel that needs to complete handshake", + zap.String("src_channel_id", k.ChannelID), + zap.String("src_port_id", k.PortID), + zap.String("dst_channel_id", k.CounterpartyChannelID), + zap.String("dst_port_id", k.CounterpartyPortID), + zap.String("src_state", s.StateSrc.String()), + zap.String("dst_state", s.StateDst.String()), + ) + + switch { + case s.StateSrc == chantypes.INIT && s.StateDst == chantypes.UNINITIALIZED: + pp.log.Info("Chan is init on src but no try yet on dst") + populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenInit, + k, s.Order, s.Version, s.SrcConnID, s.DstConnID) + case s.StateSrc == chantypes.UNINITIALIZED && s.StateDst == chantypes.INIT: + pp.log.Info("Chan is init on dst but no try yet on src") + populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenInit, + k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) + case s.StateSrc == chantypes.TRYOPEN && s.StateDst == chantypes.INIT: + pp.log.Info("Chan is try on src but no ack yet on dst") + populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenTry, + k, s.Order, s.Version, s.SrcConnID, s.DstConnID) + case s.StateSrc == chantypes.INIT && s.StateDst == chantypes.TRYOPEN: + pp.log.Info("Chan is try on dst but no ack yet on src") + populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenTry, + k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) + case s.StateSrc == chantypes.OPEN && s.StateDst == chantypes.TRYOPEN: + pp.log.Info("Chan is ack on src but no confirm yet on dst") + populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenAck, + k, s.Order, s.Version, s.SrcConnID, s.DstConnID) + case s.StateSrc == chantypes.TRYOPEN && s.StateDst == chantypes.OPEN: + pp.log.Info("Chan is ack on dst but no confirm yet on src") + populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenAck, + k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) + } + } +} + +func populateChannelHandshake( + c IBCMessagesCache, + mu sync.Locker, + eventType string, + k ChannelKey, + order chantypes.Order, + version, + connID, counterpartyConnID string, +) { + mu.Lock() + defer mu.Unlock() + if _, ok := c.ChannelHandshake[eventType]; !ok { + c.ChannelHandshake[eventType] = make(ChannelMessageCache) + } + c.ChannelHandshake[eventType][k] = provider.ChannelInfo{ + PortID: k.PortID, + ChannelID: k.ChannelID, + CounterpartyPortID: k.CounterpartyPortID, + CounterpartyChannelID: k.CounterpartyChannelID, + Order: order, + Version: version, + ConnID: connID, + CounterpartyConnID: counterpartyConnID, + } +} + +type ConnectionHandshakeState struct { + StateSrc conntypes.State + StateDst conntypes.State + PrefixSrc commitmenttypes.MerklePrefix + PrefixDst commitmenttypes.MerklePrefix +} + +// flushChannels runs queries to relay any pending connection handshake messages which may have been +// in blocks before the height that the chain processors started querying. +func (pp *PathProcessor) flushConnections( + ctx context.Context, + pathEnd1Cache, pathEnd2Cache IBCMessagesCache, + pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, + wg *sync.WaitGroup, +) { + defer wg.Done() + + var ( + pathEnd1Conns, pathEnd2Conns []*conntypes.IdentifiedConnection + eg errgroup.Group + ) + eg.Go(func() (err error) { + pathEnd1Conns, err = pp.pathEnd1.chainProvider.QueryConnections(ctx) + return err + }) + eg.Go(func() (err error) { + pathEnd2Conns, err = pp.pathEnd2.chainProvider.QueryConnections(ctx) + return err + }) + if err := eg.Wait(); err != nil { + pp.log.Error("Failed to query connections for connection handshake flush", zap.Error(err)) + return + } + + srcConnKeys := make(map[ConnectionKey]ConnectionHandshakeState) + + for _, conn := range pathEnd1Conns { + if conn.ClientId != pp.pathEnd1.info.ClientID { + continue + } + + k := ConnectionKey{ + ConnectionID: conn.Id, + ClientID: conn.ClientId, + CounterpartyConnID: conn.Counterparty.ConnectionId, + CounterpartyClientID: conn.Counterparty.ClientId, + } + + srcConnKeys[k] = ConnectionHandshakeState{ + StateSrc: conn.State, + PrefixDst: conn.Counterparty.Prefix, + } + } + +DstLoop: + for _, conn := range pathEnd2Conns { + if conn.ClientId != pp.pathEnd2.info.ClientID { + continue + } + + k := ConnectionKey{ + ConnectionID: conn.Id, + ClientID: conn.ClientId, + CounterpartyConnID: conn.Counterparty.ConnectionId, + CounterpartyClientID: conn.Counterparty.ClientId, + } + + ck := k.Counterparty() + + // check if counterparty key already exists observed from src + if s, ok := srcConnKeys[ck]; ok { + s.StateDst = conn.State + s.PrefixSrc = conn.Counterparty.Prefix + srcConnKeys[ck] = s + continue DstLoop + } + // check if counterparty init key already exists observed from src + msgInitKey := ck.MsgInitKey() + if s, ok := srcConnKeys[msgInitKey]; ok { + s.StateDst = conn.State + s.PrefixSrc = conn.Counterparty.Prefix + srcConnKeys[ck] = s + delete(srcConnKeys, msgInitKey) + continue DstLoop + } + // check if counterparty key already exists observed from src (compared to src init key) + for k, s := range srcConnKeys { + if k.MsgInitKey() == ck { + // have an init on dst with try on src + s.StateDst = conn.State + s.PrefixSrc = conn.Counterparty.Prefix + srcConnKeys[k] = s + continue DstLoop + } + } + // populate dst only + srcConnKeys[ck] = ConnectionHandshakeState{ + StateDst: conn.State, + PrefixSrc: conn.Counterparty.Prefix, + } + } + + for k, s := range srcConnKeys { + if s.StateSrc == conntypes.OPEN && s.StateDst == conntypes.OPEN { + // already open on both sides. nothing needed. + continue + } + + pp.log.Info("Found connection that needs to complete handshake", + zap.String("src_connection_id", k.ConnectionID), + zap.String("src_client_id", k.ClientID), + zap.String("dst_connection_id", k.CounterpartyConnID), + zap.String("dst_client_id", k.CounterpartyClientID), + zap.String("src_state", s.StateSrc.String()), + zap.String("dst_state", s.StateDst.String()), + ) + + switch { + case s.StateSrc == conntypes.INIT && s.StateDst == conntypes.UNINITIALIZED: + pp.log.Info("Conn is init on src but no try yet on dst") + populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenInit, + k, s.PrefixDst) + case s.StateSrc == conntypes.UNINITIALIZED && s.StateDst == conntypes.INIT: + pp.log.Info("Conn is init on dst but no try yet on src") + populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenInit, + k.Counterparty(), s.PrefixSrc) + case s.StateSrc == conntypes.TRYOPEN && s.StateDst == conntypes.INIT: + pp.log.Info("Conn is try on src but no ack yet on dst") + populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenTry, + k, s.PrefixDst) + case s.StateSrc == conntypes.INIT && s.StateDst == conntypes.TRYOPEN: + pp.log.Info("Conn is try on dst but no ack yet on src") + populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenTry, + k.Counterparty(), s.PrefixSrc) + case s.StateSrc == conntypes.OPEN && s.StateDst == conntypes.TRYOPEN: + pp.log.Info("Conn is ack on src but no confirm yet on dst") + populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenAck, + k, s.PrefixDst) + case s.StateSrc == conntypes.TRYOPEN && s.StateDst == conntypes.OPEN: + pp.log.Info("Conn is ack on dst but no confirm yet on src") + populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenAck, + k.Counterparty(), s.PrefixSrc) + } + } +} + +func populateConnectionHandshake( + c IBCMessagesCache, + mu sync.Locker, + eventType string, + k ConnectionKey, + counterpartyCommitmentPrefix commitmenttypes.MerklePrefix, +) { + mu.Lock() + defer mu.Unlock() + if _, ok := c.ConnectionHandshake[eventType]; !ok { + c.ConnectionHandshake[eventType] = make(ConnectionMessageCache) + } + c.ConnectionHandshake[eventType][k] = provider.ConnectionInfo{ + ConnID: k.ConnectionID, + ClientID: k.ClientID, + CounterpartyConnID: k.CounterpartyConnID, + CounterpartyClientID: k.CounterpartyClientID, + CounterpartyCommitmentPrefix: counterpartyCommitmentPrefix, + } +} From a6fbf764212f8f591aaed87e4c6ce570fe7d12df Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 27 Mar 2023 09:27:21 -0600 Subject: [PATCH 6/6] move flush to flush.go --- relayer/processor/flush.go | 515 +++++++++++++++++++ relayer/processor/path_processor_internal.go | 503 ------------------ 2 files changed, 515 insertions(+), 503 deletions(-) create mode 100644 relayer/processor/flush.go diff --git a/relayer/processor/flush.go b/relayer/processor/flush.go new file mode 100644 index 000000000..42d9863ac --- /dev/null +++ b/relayer/processor/flush.go @@ -0,0 +1,515 @@ +package processor + +import ( + "context" + "sync" + + conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" + chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" + "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +func (pp *PathProcessor) flush(ctx context.Context) { + var ( + pathEnd1Cache = NewIBCMessagesCache() + pathEnd2Cache = NewIBCMessagesCache() + pathEnd1CacheMu, pathEnd2CacheMu sync.Mutex + ) + var wg sync.WaitGroup + wg.Add(3) + + go pp.flushPackets(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) + go pp.flushChannels(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) + go pp.flushConnections(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) + + wg.Wait() + + pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync) + pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync) + +} + +// flushPackets runs queries to relay any pending messages which may have been +// in blocks before the height that the chain processors started querying. +func (pp *PathProcessor) flushPackets( + ctx context.Context, + pathEnd1Cache, pathEnd2Cache IBCMessagesCache, + pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, + wg *sync.WaitGroup, +) { + defer wg.Done() + var ( + commitments1 = make(map[ChannelKey][]uint64) + commitments2 = make(map[ChannelKey][]uint64) + commitments1Mu, commitments2Mu sync.Mutex + ) + + // Query remaining packet commitments on both chains + var eg errgroup.Group + for k, open := range pp.pathEnd1.channelStateCache { + if !open { + continue + } + eg.Go(queryPacketCommitments(ctx, pp.pathEnd1, k, commitments1, &commitments1Mu)) + } + for k, open := range pp.pathEnd2.channelStateCache { + if !open { + continue + } + eg.Go(queryPacketCommitments(ctx, pp.pathEnd2, k, commitments2, &commitments2Mu)) + } + + if err := eg.Wait(); err != nil { + pp.log.Error("Failed to query packet commitments", zap.Error(err)) + } + + // From remaining packet commitments, determine if: + // 1. Packet commitment is on source, but MsgRecvPacket has not yet been relayed to destination + // 2. Packet commitment is on source, and MsgRecvPacket has been relayed to destination, but MsgAcknowledgement has not been written to source to clear the packet commitment. + // Based on above conditions, enqueue MsgRecvPacket and MsgAcknowledgement messages + for k, seqs := range commitments1 { + eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, pathEnd1CacheMu, pathEnd2CacheMu)) + } + + for k, seqs := range commitments2 { + eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, pathEnd2CacheMu, pathEnd1CacheMu)) + } + + if err := eg.Wait(); err != nil { + pp.log.Error("Failed to enqueue pending messages for flush", zap.Error(err)) + } +} + +// shouldTerminateForFlushComplete will determine if the relayer should exit +// when FlushLifecycle is used. It will exit when all of the message caches are cleared. +func (pp *PathProcessor) shouldTerminateForFlushComplete() bool { + if _, ok := pp.messageLifecycle.(*FlushLifecycle); !ok { + return false + } + for k, packetMessagesCache := range pp.pathEnd1.messageCache.PacketFlow { + if open, ok := pp.pathEnd1.channelStateCache[k]; !ok || !open { + continue + } + for _, c := range packetMessagesCache { + if len(c) > 0 { + return false + } + } + } + for _, c := range pp.pathEnd1.messageCache.ChannelHandshake { + for k := range pp.pathEnd1.channelStateCache { + if _, ok := c[k]; ok { + return false + } + } + } + for _, c := range pp.pathEnd1.messageCache.ConnectionHandshake { + for k := range pp.pathEnd1.connectionStateCache { + if _, ok := c[k]; ok { + return false + } + } + } + for k, packetMessagesCache := range pp.pathEnd2.messageCache.PacketFlow { + if open, ok := pp.pathEnd1.channelStateCache[k]; !ok || !open { + continue + } + for _, c := range packetMessagesCache { + if len(c) > 0 { + return false + } + } + } + for _, c := range pp.pathEnd2.messageCache.ChannelHandshake { + for k := range pp.pathEnd1.channelStateCache { + if _, ok := c[k]; ok { + return false + } + } + } + for _, c := range pp.pathEnd2.messageCache.ConnectionHandshake { + for k := range pp.pathEnd1.connectionStateCache { + if _, ok := c[k]; ok { + return false + } + } + } + pp.log.Info("Found termination condition for flush, all caches cleared") + return true +} + +type ChannelHandshakeState struct { + StateSrc chantypes.State + StateDst chantypes.State + Order chantypes.Order + Version string + SrcConnID string + DstConnID string +} + +// flushChannels runs queries to relay any pending channel handshake messages which may have been +// in blocks before the height that the chain processors started querying. +func (pp *PathProcessor) flushChannels( + ctx context.Context, + pathEnd1Cache, pathEnd2Cache IBCMessagesCache, + pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, + wg *sync.WaitGroup, +) { + defer wg.Done() + + var ( + pathEnd1Chans, pathEnd2Chans []*chantypes.IdentifiedChannel + eg errgroup.Group + ) + eg.Go(func() (err error) { + pathEnd1Chans, err = pp.pathEnd1.chainProvider.QueryChannels(ctx) + return err + }) + eg.Go(func() (err error) { + pathEnd2Chans, err = pp.pathEnd2.chainProvider.QueryChannels(ctx) + return err + }) + if err := eg.Wait(); err != nil { + pp.log.Error("Failed to query channels for channel handshake flush", zap.Error(err)) + return + } + + srcChanKeys := make(map[ChannelKey]ChannelHandshakeState) + + for _, channel := range pathEnd1Chans { + k := ChannelKey{ + ChannelID: channel.ChannelId, + PortID: channel.PortId, + CounterpartyChannelID: channel.Counterparty.ChannelId, + CounterpartyPortID: channel.Counterparty.PortId, + } + srcConnID := channel.ConnectionHops[0] + if !pp.pathEnd1.isRelevantConnection(srcConnID) { + continue + } + if !pp.pathEnd1.info.ShouldRelayChannel(ChainChannelKey{ + ChainID: pp.pathEnd1.info.ChainID, + CounterpartyChainID: pp.pathEnd2.info.ChainID, + ChannelKey: k, + }) { + continue + } + var dstConnID string + for k := range pp.pathEnd1.connectionStateCache { + if k.ConnectionID == srcConnID { + dstConnID = k.CounterpartyConnID + } + } + srcChanKeys[k] = ChannelHandshakeState{ + StateSrc: channel.State, + Order: channel.Ordering, + Version: channel.Version, + SrcConnID: srcConnID, + DstConnID: dstConnID, + } + } + +DstLoop: + for _, channel := range pathEnd2Chans { + k := ChannelKey{ + ChannelID: channel.ChannelId, + PortID: channel.PortId, + CounterpartyChannelID: channel.Counterparty.ChannelId, + CounterpartyPortID: channel.Counterparty.PortId, + } + dstConnID := channel.ConnectionHops[0] + if !pp.pathEnd2.isRelevantConnection(dstConnID) { + continue DstLoop + } + if !pp.pathEnd2.info.ShouldRelayChannel(ChainChannelKey{ + ChainID: pp.pathEnd2.info.ChainID, + CounterpartyChainID: pp.pathEnd1.info.ChainID, + ChannelKey: k, + }) { + continue DstLoop + } + ck := k.Counterparty() + var srcConnID string + for k := range pp.pathEnd2.connectionStateCache { + if k.ConnectionID == dstConnID { + srcConnID = k.CounterpartyConnID + } + } + // check if counterparty key already exists observed from src + if s, ok := srcChanKeys[ck]; ok { + s.StateDst = channel.State + srcChanKeys[ck] = s + continue DstLoop + } + // check if counterparty init key already exists observed from src + msgInitKey := ck.MsgInitKey() + if s, ok := srcChanKeys[msgInitKey]; ok { + s.StateDst = channel.State + srcChanKeys[ck] = s + delete(srcChanKeys, msgInitKey) + continue DstLoop + } + // check if counterparty key already exists observed from src (compared to src init key) + for k, s := range srcChanKeys { + if k.MsgInitKey() == ck { + // have an init on dst with try on src + s.StateDst = channel.State + srcChanKeys[k] = s + continue DstLoop + } + } + // populate dst only + srcChanKeys[ck] = ChannelHandshakeState{ + StateDst: channel.State, + Order: channel.Ordering, + Version: channel.Version, + SrcConnID: srcConnID, + DstConnID: dstConnID, + } + } + + for k, s := range srcChanKeys { + if s.StateSrc == chantypes.OPEN && s.StateDst == chantypes.OPEN { + // already open on both sides. nothing needed. + continue + } + + if s.StateSrc == chantypes.CLOSED || s.StateDst == chantypes.CLOSED { + // don't handle channel closure here. + // Channel closure is a manual operation for now with rly tx channel-close. + continue + } + + pp.log.Info("Found channel that needs to complete handshake", + zap.String("src_channel_id", k.ChannelID), + zap.String("src_port_id", k.PortID), + zap.String("dst_channel_id", k.CounterpartyChannelID), + zap.String("dst_port_id", k.CounterpartyPortID), + zap.String("src_state", s.StateSrc.String()), + zap.String("dst_state", s.StateDst.String()), + ) + + switch { + case s.StateSrc == chantypes.INIT && s.StateDst == chantypes.UNINITIALIZED: + pp.log.Info("Chan is init on src but no try yet on dst") + populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenInit, + k, s.Order, s.Version, s.SrcConnID, s.DstConnID) + case s.StateSrc == chantypes.UNINITIALIZED && s.StateDst == chantypes.INIT: + pp.log.Info("Chan is init on dst but no try yet on src") + populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenInit, + k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) + case s.StateSrc == chantypes.TRYOPEN && s.StateDst == chantypes.INIT: + pp.log.Info("Chan is try on src but no ack yet on dst") + populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenTry, + k, s.Order, s.Version, s.SrcConnID, s.DstConnID) + case s.StateSrc == chantypes.INIT && s.StateDst == chantypes.TRYOPEN: + pp.log.Info("Chan is try on dst but no ack yet on src") + populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenTry, + k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) + case s.StateSrc == chantypes.OPEN && s.StateDst == chantypes.TRYOPEN: + pp.log.Info("Chan is ack on src but no confirm yet on dst") + populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenAck, + k, s.Order, s.Version, s.SrcConnID, s.DstConnID) + case s.StateSrc == chantypes.TRYOPEN && s.StateDst == chantypes.OPEN: + pp.log.Info("Chan is ack on dst but no confirm yet on src") + populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenAck, + k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) + } + } +} + +func populateChannelHandshake( + c IBCMessagesCache, + mu sync.Locker, + eventType string, + k ChannelKey, + order chantypes.Order, + version, + connID, counterpartyConnID string, +) { + mu.Lock() + defer mu.Unlock() + if _, ok := c.ChannelHandshake[eventType]; !ok { + c.ChannelHandshake[eventType] = make(ChannelMessageCache) + } + c.ChannelHandshake[eventType][k] = provider.ChannelInfo{ + PortID: k.PortID, + ChannelID: k.ChannelID, + CounterpartyPortID: k.CounterpartyPortID, + CounterpartyChannelID: k.CounterpartyChannelID, + Order: order, + Version: version, + ConnID: connID, + CounterpartyConnID: counterpartyConnID, + } +} + +type ConnectionHandshakeState struct { + StateSrc conntypes.State + StateDst conntypes.State + PrefixSrc commitmenttypes.MerklePrefix + PrefixDst commitmenttypes.MerklePrefix +} + +// flushChannels runs queries to relay any pending connection handshake messages which may have been +// in blocks before the height that the chain processors started querying. +func (pp *PathProcessor) flushConnections( + ctx context.Context, + pathEnd1Cache, pathEnd2Cache IBCMessagesCache, + pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, + wg *sync.WaitGroup, +) { + defer wg.Done() + + var ( + pathEnd1Conns, pathEnd2Conns []*conntypes.IdentifiedConnection + eg errgroup.Group + ) + eg.Go(func() (err error) { + pathEnd1Conns, err = pp.pathEnd1.chainProvider.QueryConnections(ctx) + return err + }) + eg.Go(func() (err error) { + pathEnd2Conns, err = pp.pathEnd2.chainProvider.QueryConnections(ctx) + return err + }) + if err := eg.Wait(); err != nil { + pp.log.Error("Failed to query connections for connection handshake flush", zap.Error(err)) + return + } + + srcConnKeys := make(map[ConnectionKey]ConnectionHandshakeState) + + for _, conn := range pathEnd1Conns { + if conn.ClientId != pp.pathEnd1.info.ClientID { + continue + } + + k := ConnectionKey{ + ConnectionID: conn.Id, + ClientID: conn.ClientId, + CounterpartyConnID: conn.Counterparty.ConnectionId, + CounterpartyClientID: conn.Counterparty.ClientId, + } + + srcConnKeys[k] = ConnectionHandshakeState{ + StateSrc: conn.State, + PrefixDst: conn.Counterparty.Prefix, + } + } + +DstLoop: + for _, conn := range pathEnd2Conns { + if conn.ClientId != pp.pathEnd2.info.ClientID { + continue + } + + k := ConnectionKey{ + ConnectionID: conn.Id, + ClientID: conn.ClientId, + CounterpartyConnID: conn.Counterparty.ConnectionId, + CounterpartyClientID: conn.Counterparty.ClientId, + } + + ck := k.Counterparty() + + // check if counterparty key already exists observed from src + if s, ok := srcConnKeys[ck]; ok { + s.StateDst = conn.State + s.PrefixSrc = conn.Counterparty.Prefix + srcConnKeys[ck] = s + continue DstLoop + } + // check if counterparty init key already exists observed from src + msgInitKey := ck.MsgInitKey() + if s, ok := srcConnKeys[msgInitKey]; ok { + s.StateDst = conn.State + s.PrefixSrc = conn.Counterparty.Prefix + srcConnKeys[ck] = s + delete(srcConnKeys, msgInitKey) + continue DstLoop + } + // check if counterparty key already exists observed from src (compared to src init key) + for k, s := range srcConnKeys { + if k.MsgInitKey() == ck { + // have an init on dst with try on src + s.StateDst = conn.State + s.PrefixSrc = conn.Counterparty.Prefix + srcConnKeys[k] = s + continue DstLoop + } + } + // populate dst only + srcConnKeys[ck] = ConnectionHandshakeState{ + StateDst: conn.State, + PrefixSrc: conn.Counterparty.Prefix, + } + } + + for k, s := range srcConnKeys { + if s.StateSrc == conntypes.OPEN && s.StateDst == conntypes.OPEN { + // already open on both sides. nothing needed. + continue + } + + pp.log.Info("Found connection that needs to complete handshake", + zap.String("src_connection_id", k.ConnectionID), + zap.String("src_client_id", k.ClientID), + zap.String("dst_connection_id", k.CounterpartyConnID), + zap.String("dst_client_id", k.CounterpartyClientID), + zap.String("src_state", s.StateSrc.String()), + zap.String("dst_state", s.StateDst.String()), + ) + + switch { + case s.StateSrc == conntypes.INIT && s.StateDst == conntypes.UNINITIALIZED: + pp.log.Info("Conn is init on src but no try yet on dst") + populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenInit, + k, s.PrefixDst) + case s.StateSrc == conntypes.UNINITIALIZED && s.StateDst == conntypes.INIT: + pp.log.Info("Conn is init on dst but no try yet on src") + populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenInit, + k.Counterparty(), s.PrefixSrc) + case s.StateSrc == conntypes.TRYOPEN && s.StateDst == conntypes.INIT: + pp.log.Info("Conn is try on src but no ack yet on dst") + populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenTry, + k, s.PrefixDst) + case s.StateSrc == conntypes.INIT && s.StateDst == conntypes.TRYOPEN: + pp.log.Info("Conn is try on dst but no ack yet on src") + populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenTry, + k.Counterparty(), s.PrefixSrc) + case s.StateSrc == conntypes.OPEN && s.StateDst == conntypes.TRYOPEN: + pp.log.Info("Conn is ack on src but no confirm yet on dst") + populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenAck, + k, s.PrefixDst) + case s.StateSrc == conntypes.TRYOPEN && s.StateDst == conntypes.OPEN: + pp.log.Info("Conn is ack on dst but no confirm yet on src") + populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenAck, + k.Counterparty(), s.PrefixSrc) + } + } +} + +func populateConnectionHandshake( + c IBCMessagesCache, + mu sync.Locker, + eventType string, + k ConnectionKey, + counterpartyCommitmentPrefix commitmenttypes.MerklePrefix, +) { + mu.Lock() + defer mu.Unlock() + if _, ok := c.ConnectionHandshake[eventType]; !ok { + c.ConnectionHandshake[eventType] = make(ConnectionMessageCache) + } + c.ConnectionHandshake[eventType][k] = provider.ConnectionInfo{ + ConnID: k.ConnectionID, + ClientID: k.ClientID, + CounterpartyConnID: k.CounterpartyConnID, + CounterpartyClientID: k.CounterpartyClientID, + CounterpartyCommitmentPrefix: counterpartyCommitmentPrefix, + } +} diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 40b93918e..b6f8e133d 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -9,7 +9,6 @@ import ( conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" - commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -1161,505 +1160,3 @@ func queuePendingRecvAndAcks( return nil } } - -func (pp *PathProcessor) flush(ctx context.Context) { - var ( - pathEnd1Cache = NewIBCMessagesCache() - pathEnd2Cache = NewIBCMessagesCache() - pathEnd1CacheMu, pathEnd2CacheMu sync.Mutex - ) - var wg sync.WaitGroup - wg.Add(3) - - go pp.flushPackets(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) - go pp.flushChannels(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) - go pp.flushConnections(ctx, pathEnd1Cache, pathEnd2Cache, &pathEnd1CacheMu, &pathEnd2CacheMu, &wg) - - wg.Wait() - - pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync) - pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync) - -} - -// flushPackets runs queries to relay any pending messages which may have been -// in blocks before the height that the chain processors started querying. -func (pp *PathProcessor) flushPackets( - ctx context.Context, - pathEnd1Cache, pathEnd2Cache IBCMessagesCache, - pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, - wg *sync.WaitGroup, -) { - defer wg.Done() - var ( - commitments1 = make(map[ChannelKey][]uint64) - commitments2 = make(map[ChannelKey][]uint64) - commitments1Mu, commitments2Mu sync.Mutex - ) - - // Query remaining packet commitments on both chains - var eg errgroup.Group - for k, open := range pp.pathEnd1.channelStateCache { - if !open { - continue - } - eg.Go(queryPacketCommitments(ctx, pp.pathEnd1, k, commitments1, &commitments1Mu)) - } - for k, open := range pp.pathEnd2.channelStateCache { - if !open { - continue - } - eg.Go(queryPacketCommitments(ctx, pp.pathEnd2, k, commitments2, &commitments2Mu)) - } - - if err := eg.Wait(); err != nil { - pp.log.Error("Failed to query packet commitments", zap.Error(err)) - } - - // From remaining packet commitments, determine if: - // 1. Packet commitment is on source, but MsgRecvPacket has not yet been relayed to destination - // 2. Packet commitment is on source, and MsgRecvPacket has been relayed to destination, but MsgAcknowledgement has not been written to source to clear the packet commitment. - // Based on above conditions, enqueue MsgRecvPacket and MsgAcknowledgement messages - for k, seqs := range commitments1 { - eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, pathEnd1CacheMu, pathEnd2CacheMu)) - } - - for k, seqs := range commitments2 { - eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, pathEnd2CacheMu, pathEnd1CacheMu)) - } - - if err := eg.Wait(); err != nil { - pp.log.Error("Failed to enqueue pending messages for flush", zap.Error(err)) - } -} - -// shouldTerminateForFlushComplete will determine if the relayer should exit -// when FlushLifecycle is used. It will exit when all of the message caches are cleared. -func (pp *PathProcessor) shouldTerminateForFlushComplete() bool { - if _, ok := pp.messageLifecycle.(*FlushLifecycle); !ok { - return false - } - for k, packetMessagesCache := range pp.pathEnd1.messageCache.PacketFlow { - if open, ok := pp.pathEnd1.channelStateCache[k]; !ok || !open { - continue - } - for _, c := range packetMessagesCache { - if len(c) > 0 { - return false - } - } - } - for _, c := range pp.pathEnd1.messageCache.ChannelHandshake { - for k := range pp.pathEnd1.channelStateCache { - if _, ok := c[k]; ok { - return false - } - } - } - for _, c := range pp.pathEnd1.messageCache.ConnectionHandshake { - for k := range pp.pathEnd1.connectionStateCache { - if _, ok := c[k]; ok { - return false - } - } - } - for k, packetMessagesCache := range pp.pathEnd2.messageCache.PacketFlow { - if open, ok := pp.pathEnd1.channelStateCache[k]; !ok || !open { - continue - } - for _, c := range packetMessagesCache { - if len(c) > 0 { - return false - } - } - } - for _, c := range pp.pathEnd2.messageCache.ChannelHandshake { - for k := range pp.pathEnd1.channelStateCache { - if _, ok := c[k]; ok { - return false - } - } - } - for _, c := range pp.pathEnd2.messageCache.ConnectionHandshake { - for k := range pp.pathEnd1.connectionStateCache { - if _, ok := c[k]; ok { - return false - } - } - } - pp.log.Info("Found termination condition for flush, all caches cleared") - return true -} - -type ChannelHandshakeState struct { - StateSrc chantypes.State - StateDst chantypes.State - Order chantypes.Order - Version string - SrcConnID string - DstConnID string -} - -// flushChannels runs queries to relay any pending channel handshake messages which may have been -// in blocks before the height that the chain processors started querying. -func (pp *PathProcessor) flushChannels( - ctx context.Context, - pathEnd1Cache, pathEnd2Cache IBCMessagesCache, - pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, - wg *sync.WaitGroup, -) { - defer wg.Done() - - var ( - pathEnd1Chans, pathEnd2Chans []*chantypes.IdentifiedChannel - eg errgroup.Group - ) - eg.Go(func() (err error) { - pathEnd1Chans, err = pp.pathEnd1.chainProvider.QueryChannels(ctx) - return err - }) - eg.Go(func() (err error) { - pathEnd2Chans, err = pp.pathEnd2.chainProvider.QueryChannels(ctx) - return err - }) - if err := eg.Wait(); err != nil { - pp.log.Error("Failed to query channels for channel handshake flush", zap.Error(err)) - return - } - - srcChanKeys := make(map[ChannelKey]ChannelHandshakeState) - - for _, channel := range pathEnd1Chans { - k := ChannelKey{ - ChannelID: channel.ChannelId, - PortID: channel.PortId, - CounterpartyChannelID: channel.Counterparty.ChannelId, - CounterpartyPortID: channel.Counterparty.PortId, - } - srcConnID := channel.ConnectionHops[0] - if !pp.pathEnd1.isRelevantConnection(srcConnID) { - continue - } - if !pp.pathEnd1.info.ShouldRelayChannel(ChainChannelKey{ - ChainID: pp.pathEnd1.info.ChainID, - CounterpartyChainID: pp.pathEnd2.info.ChainID, - ChannelKey: k, - }) { - continue - } - var dstConnID string - for k := range pp.pathEnd1.connectionStateCache { - if k.ConnectionID == srcConnID { - dstConnID = k.CounterpartyConnID - } - } - srcChanKeys[k] = ChannelHandshakeState{ - StateSrc: channel.State, - Order: channel.Ordering, - Version: channel.Version, - SrcConnID: srcConnID, - DstConnID: dstConnID, - } - } - -DstLoop: - for _, channel := range pathEnd2Chans { - k := ChannelKey{ - ChannelID: channel.ChannelId, - PortID: channel.PortId, - CounterpartyChannelID: channel.Counterparty.ChannelId, - CounterpartyPortID: channel.Counterparty.PortId, - } - dstConnID := channel.ConnectionHops[0] - if !pp.pathEnd2.isRelevantConnection(dstConnID) { - continue DstLoop - } - if !pp.pathEnd2.info.ShouldRelayChannel(ChainChannelKey{ - ChainID: pp.pathEnd2.info.ChainID, - CounterpartyChainID: pp.pathEnd1.info.ChainID, - ChannelKey: k, - }) { - continue DstLoop - } - ck := k.Counterparty() - var srcConnID string - for k := range pp.pathEnd2.connectionStateCache { - if k.ConnectionID == dstConnID { - srcConnID = k.CounterpartyConnID - } - } - // check if counterparty key already exists observed from src - if s, ok := srcChanKeys[ck]; ok { - s.StateDst = channel.State - srcChanKeys[ck] = s - continue DstLoop - } - // check if counterparty init key already exists observed from src - msgInitKey := ck.MsgInitKey() - if s, ok := srcChanKeys[msgInitKey]; ok { - s.StateDst = channel.State - srcChanKeys[ck] = s - delete(srcChanKeys, msgInitKey) - continue DstLoop - } - // check if counterparty key already exists observed from src (compared to src init key) - for k, s := range srcChanKeys { - if k.MsgInitKey() == ck { - // have an init on dst with try on src - s.StateDst = channel.State - srcChanKeys[k] = s - continue DstLoop - } - } - // populate dst only - srcChanKeys[ck] = ChannelHandshakeState{ - StateDst: channel.State, - Order: channel.Ordering, - Version: channel.Version, - SrcConnID: srcConnID, - DstConnID: dstConnID, - } - } - - for k, s := range srcChanKeys { - if s.StateSrc == chantypes.OPEN && s.StateDst == chantypes.OPEN { - // already open on both sides. nothing needed. - continue - } - - if s.StateSrc == chantypes.CLOSED || s.StateDst == chantypes.CLOSED { - // don't handle channel closure here. - // Channel closure is a manual operation for now with rly tx channel-close. - continue - } - - pp.log.Info("Found channel that needs to complete handshake", - zap.String("src_channel_id", k.ChannelID), - zap.String("src_port_id", k.PortID), - zap.String("dst_channel_id", k.CounterpartyChannelID), - zap.String("dst_port_id", k.CounterpartyPortID), - zap.String("src_state", s.StateSrc.String()), - zap.String("dst_state", s.StateDst.String()), - ) - - switch { - case s.StateSrc == chantypes.INIT && s.StateDst == chantypes.UNINITIALIZED: - pp.log.Info("Chan is init on src but no try yet on dst") - populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenInit, - k, s.Order, s.Version, s.SrcConnID, s.DstConnID) - case s.StateSrc == chantypes.UNINITIALIZED && s.StateDst == chantypes.INIT: - pp.log.Info("Chan is init on dst but no try yet on src") - populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenInit, - k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) - case s.StateSrc == chantypes.TRYOPEN && s.StateDst == chantypes.INIT: - pp.log.Info("Chan is try on src but no ack yet on dst") - populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenTry, - k, s.Order, s.Version, s.SrcConnID, s.DstConnID) - case s.StateSrc == chantypes.INIT && s.StateDst == chantypes.TRYOPEN: - pp.log.Info("Chan is try on dst but no ack yet on src") - populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenTry, - k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) - case s.StateSrc == chantypes.OPEN && s.StateDst == chantypes.TRYOPEN: - pp.log.Info("Chan is ack on src but no confirm yet on dst") - populateChannelHandshake(pathEnd1Cache, pathEnd1CacheMu, chantypes.EventTypeChannelOpenAck, - k, s.Order, s.Version, s.SrcConnID, s.DstConnID) - case s.StateSrc == chantypes.TRYOPEN && s.StateDst == chantypes.OPEN: - pp.log.Info("Chan is ack on dst but no confirm yet on src") - populateChannelHandshake(pathEnd2Cache, pathEnd2CacheMu, chantypes.EventTypeChannelOpenAck, - k.Counterparty(), s.Order, s.Version, s.DstConnID, s.SrcConnID) - } - } -} - -func populateChannelHandshake( - c IBCMessagesCache, - mu sync.Locker, - eventType string, - k ChannelKey, - order chantypes.Order, - version, - connID, counterpartyConnID string, -) { - mu.Lock() - defer mu.Unlock() - if _, ok := c.ChannelHandshake[eventType]; !ok { - c.ChannelHandshake[eventType] = make(ChannelMessageCache) - } - c.ChannelHandshake[eventType][k] = provider.ChannelInfo{ - PortID: k.PortID, - ChannelID: k.ChannelID, - CounterpartyPortID: k.CounterpartyPortID, - CounterpartyChannelID: k.CounterpartyChannelID, - Order: order, - Version: version, - ConnID: connID, - CounterpartyConnID: counterpartyConnID, - } -} - -type ConnectionHandshakeState struct { - StateSrc conntypes.State - StateDst conntypes.State - PrefixSrc commitmenttypes.MerklePrefix - PrefixDst commitmenttypes.MerklePrefix -} - -// flushChannels runs queries to relay any pending connection handshake messages which may have been -// in blocks before the height that the chain processors started querying. -func (pp *PathProcessor) flushConnections( - ctx context.Context, - pathEnd1Cache, pathEnd2Cache IBCMessagesCache, - pathEnd1CacheMu, pathEnd2CacheMu sync.Locker, - wg *sync.WaitGroup, -) { - defer wg.Done() - - var ( - pathEnd1Conns, pathEnd2Conns []*conntypes.IdentifiedConnection - eg errgroup.Group - ) - eg.Go(func() (err error) { - pathEnd1Conns, err = pp.pathEnd1.chainProvider.QueryConnections(ctx) - return err - }) - eg.Go(func() (err error) { - pathEnd2Conns, err = pp.pathEnd2.chainProvider.QueryConnections(ctx) - return err - }) - if err := eg.Wait(); err != nil { - pp.log.Error("Failed to query connections for connection handshake flush", zap.Error(err)) - return - } - - srcConnKeys := make(map[ConnectionKey]ConnectionHandshakeState) - - for _, conn := range pathEnd1Conns { - if conn.ClientId != pp.pathEnd1.info.ClientID { - continue - } - - k := ConnectionKey{ - ConnectionID: conn.Id, - ClientID: conn.ClientId, - CounterpartyConnID: conn.Counterparty.ConnectionId, - CounterpartyClientID: conn.Counterparty.ClientId, - } - - srcConnKeys[k] = ConnectionHandshakeState{ - StateSrc: conn.State, - PrefixDst: conn.Counterparty.Prefix, - } - } - -DstLoop: - for _, conn := range pathEnd2Conns { - if conn.ClientId != pp.pathEnd2.info.ClientID { - continue - } - - k := ConnectionKey{ - ConnectionID: conn.Id, - ClientID: conn.ClientId, - CounterpartyConnID: conn.Counterparty.ConnectionId, - CounterpartyClientID: conn.Counterparty.ClientId, - } - - ck := k.Counterparty() - - // check if counterparty key already exists observed from src - if s, ok := srcConnKeys[ck]; ok { - s.StateDst = conn.State - s.PrefixSrc = conn.Counterparty.Prefix - srcConnKeys[ck] = s - continue DstLoop - } - // check if counterparty init key already exists observed from src - msgInitKey := ck.MsgInitKey() - if s, ok := srcConnKeys[msgInitKey]; ok { - s.StateDst = conn.State - s.PrefixSrc = conn.Counterparty.Prefix - srcConnKeys[ck] = s - delete(srcConnKeys, msgInitKey) - continue DstLoop - } - // check if counterparty key already exists observed from src (compared to src init key) - for k, s := range srcConnKeys { - if k.MsgInitKey() == ck { - // have an init on dst with try on src - s.StateDst = conn.State - s.PrefixSrc = conn.Counterparty.Prefix - srcConnKeys[k] = s - continue DstLoop - } - } - // populate dst only - srcConnKeys[ck] = ConnectionHandshakeState{ - StateDst: conn.State, - PrefixSrc: conn.Counterparty.Prefix, - } - } - - for k, s := range srcConnKeys { - if s.StateSrc == conntypes.OPEN && s.StateDst == conntypes.OPEN { - // already open on both sides. nothing needed. - continue - } - - pp.log.Info("Found connection that needs to complete handshake", - zap.String("src_connection_id", k.ConnectionID), - zap.String("src_client_id", k.ClientID), - zap.String("dst_connection_id", k.CounterpartyConnID), - zap.String("dst_client_id", k.CounterpartyClientID), - zap.String("src_state", s.StateSrc.String()), - zap.String("dst_state", s.StateDst.String()), - ) - - switch { - case s.StateSrc == conntypes.INIT && s.StateDst == conntypes.UNINITIALIZED: - pp.log.Info("Conn is init on src but no try yet on dst") - populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenInit, - k, s.PrefixDst) - case s.StateSrc == conntypes.UNINITIALIZED && s.StateDst == conntypes.INIT: - pp.log.Info("Conn is init on dst but no try yet on src") - populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenInit, - k.Counterparty(), s.PrefixSrc) - case s.StateSrc == conntypes.TRYOPEN && s.StateDst == conntypes.INIT: - pp.log.Info("Conn is try on src but no ack yet on dst") - populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenTry, - k, s.PrefixDst) - case s.StateSrc == conntypes.INIT && s.StateDst == conntypes.TRYOPEN: - pp.log.Info("Conn is try on dst but no ack yet on src") - populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenTry, - k.Counterparty(), s.PrefixSrc) - case s.StateSrc == conntypes.OPEN && s.StateDst == conntypes.TRYOPEN: - pp.log.Info("Conn is ack on src but no confirm yet on dst") - populateConnectionHandshake(pathEnd1Cache, pathEnd1CacheMu, conntypes.EventTypeConnectionOpenAck, - k, s.PrefixDst) - case s.StateSrc == conntypes.TRYOPEN && s.StateDst == conntypes.OPEN: - pp.log.Info("Conn is ack on dst but no confirm yet on src") - populateConnectionHandshake(pathEnd2Cache, pathEnd2CacheMu, conntypes.EventTypeConnectionOpenAck, - k.Counterparty(), s.PrefixSrc) - } - } -} - -func populateConnectionHandshake( - c IBCMessagesCache, - mu sync.Locker, - eventType string, - k ConnectionKey, - counterpartyCommitmentPrefix commitmenttypes.MerklePrefix, -) { - mu.Lock() - defer mu.Unlock() - if _, ok := c.ConnectionHandshake[eventType]; !ok { - c.ConnectionHandshake[eventType] = make(ConnectionMessageCache) - } - c.ConnectionHandshake[eventType][k] = provider.ConnectionInfo{ - ConnID: k.ConnectionID, - ClientID: k.ClientID, - CounterpartyConnID: k.CounterpartyConnID, - CounterpartyClientID: k.CounterpartyClientID, - CounterpartyCommitmentPrefix: counterpartyCommitmentPrefix, - } -}