diff --git a/cmd/config.go b/cmd/config.go index 02ca34d..1db10db 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -29,7 +29,7 @@ func configCommand() *cobra.Command { Use: "docs", Short: "Prints the config info as markdown", Long: "", - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(_ *cobra.Command, _ []string) error { InitConfig() b, err := config.GenerateConfigMarkdown(context.Background(), "", config.GetKnownKeys()) fmt.Println(string(b)) diff --git a/cmd/evmconnect.go b/cmd/evmconnect.go index 2f8dd4a..80ba7a4 100644 --- a/cmd/evmconnect.go +++ b/cmd/evmconnect.go @@ -41,7 +41,7 @@ var rootCmd = &cobra.Command{ Use: "evmconnect", Short: "Hyperledger FireFly Connector for EVM based blockchains", Long: ``, - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(_ *cobra.Command, _ []string) error { return run() }, } diff --git a/cmd/version.go b/cmd/version.go index 3ae8b8f..3d8f20d 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -53,7 +53,7 @@ func versionCommand() *cobra.Command { Use: "version", Short: "Prints the version info", Long: "", - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(_ *cobra.Command, _ []string) error { info := &Info{ Version: BuildVersionOverride, diff --git a/internal/ethereum/blocklistener.go b/internal/ethereum/blocklistener.go index 69d058e..7e9555f 100644 --- a/internal/ethereum/blocklistener.go +++ b/internal/ethereum/blocklistener.go @@ -44,11 +44,15 @@ type blockUpdateConsumer struct { // 1) To establish and keep track of what the head block height of the blockchain is, so event streams know how far from the head they are // 2) To feed new block information to any registered consumers type blockListener struct { - ctx context.Context - c *ethConnector - backend rpcbackend.RPC - wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected - listenLoopDone chan struct{} + ctx context.Context + c *ethConnector + backend rpcbackend.RPC + wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected + listenLoopDone chan struct{} + + isStarted bool + startDone chan struct{} + initialBlockHeightObtained chan struct{} newHeadsTap chan struct{} newHeadsSub rpcbackend.Subscription @@ -73,6 +77,8 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section, ctx: log.WithLogField(ctx, "role", "blocklistener"), c: c, backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later + isStarted: false, + startDone: make(chan struct{}), initialBlockHeightObtained: make(chan struct{}), newHeadsTap: make(chan struct{}), highestBlock: -1, @@ -92,6 +98,22 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section, return bl, nil } +// setting block filter status updates that new block filter has been created +func (bl *blockListener) markStarted() { + if !bl.isStarted { + bl.isStarted = true + close(bl.startDone) + } +} + +func (bl *blockListener) waitUntilStarted(ctx context.Context) { + select { + case <-bl.startDone: + case <-bl.ctx.Done(): + case <-ctx.Done(): + } +} + func (bl *blockListener) newHeadsSubListener() { for range bl.newHeadsSub.Notifications() { select { @@ -106,7 +128,7 @@ func (bl *blockListener) newHeadsSubListener() { // getBlockHeightWithRetry keeps retrying attempting to get the initial block height until successful func (bl *blockListener) establishBlockHeightWithRetry() error { wsConnected := false - return bl.c.retry.Do(bl.ctx, "get initial block height", func(attempt int) (retry bool, err error) { + return bl.c.retry.Do(bl.ctx, "get initial block height", func(_ int) (retry bool, err error) { // If we have a WebSocket backend, then we connect it and switch over to using it // (we accept an un-locked update here to backend, as the most important routine that's @@ -136,16 +158,18 @@ func (bl *blockListener) establishBlockHeightWithRetry() error { bl.backend = bl.wsBackend } - // Now get the block heiht + // Now get the block height var hexBlockHeight ethtypes.HexInteger rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber") if rpcErr != nil { log.L(bl.ctx).Warnf("Block height could not be obtained: %s", rpcErr.Message) return true, rpcErr.Error() } + bl.mux.Lock() bl.highestBlock = hexBlockHeight.BigInt().Int64() bl.mux.Unlock() + return false, nil }) } @@ -162,6 +186,7 @@ func (bl *blockListener) listenLoop() { var filter string failCount := 0 gapPotential := true + firstIteration := true for { if failCount > 0 { if bl.c.doFailureDelay(bl.ctx, failCount) { @@ -170,12 +195,16 @@ func (bl *blockListener) listenLoop() { } } else { // Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener - select { - case <-time.After(bl.blockPollingInterval): - case <-bl.newHeadsTap: - case <-bl.ctx.Done(): - log.L(bl.ctx).Debugf("Block listener loop stopping") - return + if !firstIteration { + select { + case <-time.After(bl.blockPollingInterval): + case <-bl.newHeadsTap: + case <-bl.ctx.Done(): + log.L(bl.ctx).Debugf("Block listener loop stopping") + return + } + } else { + firstIteration = false } } @@ -186,6 +215,7 @@ func (bl *blockListener) listenLoop() { failCount++ continue } + bl.markStarted() } var blockHashes []ethtypes.HexBytes0xPrefix @@ -374,7 +404,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element { for { var bi *blockInfoJSONRPC var reason ffcapi.ErrorReason - err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) { + err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(_ int) (retry bool, err error) { bi, reason, err = bl.getBlockInfoByNumber(bl.ctx, nextBlockNumber, false, "") return reason != ffcapi.ErrorReasonNotFound, err }) @@ -428,7 +458,7 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf currentViewBlock := lastElem.Value.(*minimalBlockInfo) var freshBlockInfo *blockInfoJSONRPC var reason ffcapi.ErrorReason - err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) { + err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(_ int) (retry bool, err error) { freshBlockInfo, reason, err = bl.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "") return reason != ffcapi.ErrorReasonNotFound, err }) @@ -471,23 +501,28 @@ func (bl *blockListener) dispatchToConsumers(consumers []*blockUpdateConsumer, u } } -func (bl *blockListener) checkStartedLocked() { +func (bl *blockListener) checkAndStartListenerLoop() { + bl.mux.Lock() + defer bl.mux.Unlock() if bl.listenLoopDone == nil { bl.listenLoopDone = make(chan struct{}) go bl.listenLoop() } } -func (bl *blockListener) addConsumer(c *blockUpdateConsumer) { +func (bl *blockListener) addConsumer(ctx context.Context, c *blockUpdateConsumer) { + bl.checkAndStartListenerLoop() + bl.waitUntilStarted(ctx) // need to make sure the listener is started before adding any consumers bl.mux.Lock() defer bl.mux.Unlock() - bl.checkStartedLocked() bl.consumers[*c.id] = c } func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) { + bl.checkAndStartListenerLoop() + // block height will be established as the first step of listener startup process + // so we don't need to wait for the entire startup process to finish to return the result bl.mux.Lock() - bl.checkStartedLocked() highestBlock := bl.highestBlock bl.mux.Unlock() // if not yet initialized, wait to be initialized @@ -515,6 +550,9 @@ func (bl *blockListener) waitClosed() { bl.wsBackend.Close() } if listenLoopDone != nil { - <-listenLoopDone + select { + case <-listenLoopDone: + case <-bl.ctx.Done(): + } } } diff --git a/internal/ethereum/blocklistener_test.go b/internal/ethereum/blocklistener_test.go index 4d0cb57..53588d9 100644 --- a/internal/ethereum/blocklistener_test.go +++ b/internal/ethereum/blocklistener_test.go @@ -34,9 +34,14 @@ import ( "github.com/stretchr/testify/mock" ) +const testBlockFilterID1 = "block_filter_1" +const testBlockFilterID2 = "block_filter_2" +const testLogsFilterID1 = "log_filter_1" +const testLogsFilterID2 = "log_filter_2" + func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber"). @@ -45,6 +50,8 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(12345) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe() h, ok := bl.getHighestBlock(bl.ctx) assert.Equal(t, int64(12345), h) @@ -59,12 +66,17 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) { func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + filterID := args[1].(*string) + *filterID = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Maybe() done() // Stop before we start bl := c.blockListener mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber"). - Return(&rpcbackend.RPCError{Message: "pop"}).Maybe() + Return(&rpcbackend.RPCError{Message: "pop"}).Once() h, ok := bl.getHighestBlock(bl.ctx) assert.False(t, ok) @@ -78,7 +90,7 @@ func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) { func TestBlockListenerOKSequential(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond bl.unstableHeadLength = 2 // check wrapping @@ -94,16 +106,19 @@ func TestBlockListenerOKSequential(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002Hash, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1003Hash, @@ -140,7 +155,7 @@ func TestBlockListenerOKSequential(t *testing.T) { }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -179,7 +194,7 @@ func TestBlockListenerWSShoulderTap(t *testing.T) { } }) - ctx, c, _, done := newTestConnector(t, func(conf config.Section) { + ctx, c, _, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t, func(conf config.Section) { conf.Set(wsclient.WSConfigURL, url) conf.Set(wsclient.WSConfigKeyInitialConnectAttempts, 0) conf.Set(WebSocketsEnabled, true) @@ -248,7 +263,7 @@ func TestBlockListenerWSShoulderTap(t *testing.T) { } }() - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() // Wait until we close because it worked <-bl.listenLoopDone @@ -261,7 +276,7 @@ func TestBlockListenerWSShoulderTap(t *testing.T) { func TestBlockListenerOKDuplicates(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -276,22 +291,43 @@ func TestBlockListenerOKDuplicates(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, + *hbh = testBlockFilterID1 + }) + // wait for consumer to be added before returning get filter changes + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002Hash, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + if len(bl.consumers) > 0 { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002Hash, + } + } else { + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002Hash, + } + }).Once() } }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1003Hash, } }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1002Hash, @@ -330,7 +366,7 @@ func TestBlockListenerOKDuplicates(t *testing.T) { }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -357,7 +393,7 @@ func TestBlockListenerOKDuplicates(t *testing.T) { func TestBlockListenerReorgKeepLatestHeadInSameBatch(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -371,19 +407,23 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatch(t *testing.T) { hbh := args[1].(*ethtypes.HexInteger) *hbh = *ethtypes.NewHexInteger64(1000) }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001HashA, - block1001HashB, - block1002Hash, - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001HashA, + block1001HashB, + block1002Hash, + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) @@ -425,7 +465,7 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatch(t *testing.T) { }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -448,7 +488,7 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatch(t *testing.T) { func TestBlockListenerReorgKeepLatestHeadInSameBatchValidHashFirst(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -464,17 +504,20 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatchValidHashFirst(t *testing.T) }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001HashB, // valid hash is in the front of the array, so will need to re-build the chain - block1001HashA, - block1002Hash, - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001HashB, // valid hash is in the front of the array, so will need to re-build the chain + block1001HashA, + block1002Hash, + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) @@ -540,7 +583,7 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatchValidHashFirst(t *testing.T) } }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -563,7 +606,7 @@ func TestBlockListenerReorgKeepLatestHeadInSameBatchValidHashFirst(t *testing.T) func TestBlockListenerReorgKeepLatestMiddleInSameBatch(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -579,17 +622,21 @@ func TestBlockListenerReorgKeepLatestMiddleInSameBatch(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002HashA, - block1002HashB, - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002HashA, + block1002HashB, + block1003Hash, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { @@ -630,7 +677,7 @@ func TestBlockListenerReorgKeepLatestMiddleInSameBatch(t *testing.T) { } }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -653,7 +700,7 @@ func TestBlockListenerReorgKeepLatestMiddleInSameBatch(t *testing.T) { func TestBlockListenerReorgKeepLatestTailInSameBatch(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -669,17 +716,21 @@ func TestBlockListenerReorgKeepLatestTailInSameBatch(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, - block1003HashA, - block1003HashB, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002Hash, + block1003HashA, + block1003HashB, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { @@ -720,7 +771,7 @@ func TestBlockListenerReorgKeepLatestTailInSameBatch(t *testing.T) { } }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -743,7 +794,7 @@ func TestBlockListenerReorgKeepLatestTailInSameBatch(t *testing.T) { func TestBlockListenerReorgReplaceTail(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -759,22 +810,25 @@ func TestBlockListenerReorgReplaceTail(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002Hash, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1003HashA, } }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1003HashB, @@ -820,7 +874,7 @@ func TestBlockListenerReorgReplaceTail(t *testing.T) { }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -858,7 +912,7 @@ func TestBlockListenerGap(t *testing.T) { // needs to cope with this. This means winding back when we find a gap and re-building our canonical // view of the chain. - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -876,16 +930,19 @@ func TestBlockListenerGap(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - block1002HashA, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + block1002HashA, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1004Hash, @@ -970,7 +1027,7 @@ func TestBlockListenerGap(t *testing.T) { }), false).Return(nil) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -1001,7 +1058,7 @@ func TestBlockListenerGap(t *testing.T) { func TestBlockListenerReorgWhileRebuilding(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -1018,15 +1075,18 @@ func TestBlockListenerReorgWhileRebuilding(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1001Hash, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1001Hash, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1003HashA, @@ -1081,7 +1141,7 @@ func TestBlockListenerReorgWhileRebuilding(t *testing.T) { }) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -1107,7 +1167,7 @@ func TestBlockListenerReorgWhileRebuilding(t *testing.T) { func TestBlockListenerReorgReplaceWholeCanonicalChain(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -1123,16 +1183,19 @@ func TestBlockListenerReorgReplaceWholeCanonicalChain(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1002HashA, - block1003HashA, - } - }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1002HashA, + block1003HashA, + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{ block1003HashB, @@ -1190,7 +1253,7 @@ func TestBlockListenerReorgReplaceWholeCanonicalChain(t *testing.T) { }), false).Return(nil) updates := make(chan *ffcapi.BlockHashEvent) - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: context.Background(), updates: updates, @@ -1219,7 +1282,7 @@ func TestBlockListenerReorgReplaceWholeCanonicalChain(t *testing.T) { func TestBlockListenerClosed(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) @@ -1231,14 +1294,17 @@ func TestBlockListenerClosed(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { if len(bl.consumers) == 0 { go done() // Close after we've processed the log @@ -1258,7 +1324,7 @@ func TestBlockListenerClosed(t *testing.T) { updates := make(chan *ffcapi.BlockHashEvent) cancelledCtx, cCancel := context.WithCancel(context.Background()) cCancel() - bl.addConsumer(&blockUpdateConsumer{ + bl.addConsumer(context.Background(), &blockUpdateConsumer{ id: fftypes.NewUUID(), ctx: cancelledCtx, updates: updates, @@ -1271,7 +1337,7 @@ func TestBlockListenerClosed(t *testing.T) { func TestBlockListenerBlockNotFound(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) @@ -1282,14 +1348,17 @@ func TestBlockListenerBlockNotFound(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { go done() // Close after we've processed the log }) @@ -1298,7 +1367,7 @@ func TestBlockListenerBlockNotFound(t *testing.T) { return bh == block1003Hash.String() }), false).Return(nil) - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() c.WaitClosed() @@ -1308,7 +1377,7 @@ func TestBlockListenerBlockNotFound(t *testing.T) { func TestBlockListenerBlockHashFailed(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) @@ -1319,14 +1388,17 @@ func TestBlockListenerBlockHashFailed(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { go done() // Close after we've processed the log }) @@ -1335,7 +1407,7 @@ func TestBlockListenerBlockHashFailed(t *testing.T) { return bh == block1003Hash.String() }), false).Return(&rpcbackend.RPCError{Message: "pop"}) - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() c.WaitClosed() @@ -1345,7 +1417,7 @@ func TestBlockListenerBlockHashFailed(t *testing.T) { func TestBlockListenerProcessNonStandardHashRejectedWhenNotInHederaCompatibilityMode(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond bl.hederaCompatibilityMode = false @@ -1358,19 +1430,22 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenNotInHederaCompatibility }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { go done() // Close after we've processed the log }) - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() c.WaitClosed() @@ -1380,7 +1455,7 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenNotInHederaCompatibility func TestBlockListenerProcessNonStandardHashRejectedWhenWrongSizeForHedera(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond bl.hederaCompatibilityMode = true @@ -1393,19 +1468,22 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenWrongSizeForHedera(t *te }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { go done() // Close after we've processed the log }) - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() c.WaitClosed() @@ -1415,7 +1493,7 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenWrongSizeForHedera(t *te func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond bl.hederaCompatibilityMode = true @@ -1429,14 +1507,17 @@ func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" - }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { - hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) - *hbh = []ethtypes.HexBytes0xPrefix{ - block1003Hash, - } - }).Once() + *hbh = testBlockFilterID1 + }) + conditionalMockOnce( + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil), + func() bool { return len(bl.consumers) > 0 }, + func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{ + block1003Hash, + } + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { go done() // Close after we've processed the log }) @@ -1445,7 +1526,7 @@ func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode return bh == truncatedBlock1003Hash.String() }), false).Return(&rpcbackend.RPCError{Message: "pop"}) - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() c.WaitClosed() @@ -1455,7 +1536,7 @@ func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode func TestBlockListenerReestablishBlockFilter(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -1465,18 +1546,18 @@ func TestBlockListenerReestablishBlockFilter(t *testing.T) { }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" + *hbh = testBlockFilterID1 }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id2" + *hbh = testBlockFilterID2 }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(&rpcbackend.RPCError{Message: "filter not found"}).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(&rpcbackend.RPCError{Message: "filter not found"}).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { go done() // Close after we've processed the log }) - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() c.WaitClosed() @@ -1485,8 +1566,7 @@ func TestBlockListenerReestablishBlockFilter(t *testing.T) { } func TestBlockListenerReestablishBlockFilterFail(t *testing.T) { - - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.blockPollingInterval = 1 * time.Microsecond @@ -1498,7 +1578,7 @@ func TestBlockListenerReestablishBlockFilterFail(t *testing.T) { go done() }) - bl.checkStartedLocked() + bl.checkAndStartListenerLoop() c.WaitClosed() @@ -1506,8 +1586,83 @@ func TestBlockListenerReestablishBlockFilterFail(t *testing.T) { } +func TestBlockListenerWillNotCloseBlockFilterSignalChannelMoreThanOnce(t *testing.T) { + + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) + bl := c.blockListener + bl.blockPollingInterval = 1 * time.Microsecond + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexInteger64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + go done() + }) + + loopCount := 100 + loopDone := make(chan struct{}, loopCount) + for i := 0; i < loopCount; i++ { + go func() { + bl.checkAndStartListenerLoop() // start block listener loop + loopDone <- struct{}{} + }() + } + + resultCount := 0 + for { + select { + case <-loopDone: + resultCount++ + } + if resultCount == loopCount { + break + } + } + + bl.waitUntilStarted(context.Background()) + c.WaitClosed() + + mRPC.AssertExpectations(t) + +} + +func TestBlockListenerWaitUntilStartedOnlyReturnsAfterEstablishingBlockFilter(t *testing.T) { + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) + bl := c.blockListener + bl.blockPollingInterval = 1 * time.Microsecond + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexInteger64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + + assert.False(t, bl.isStarted) + bl.checkAndStartListenerLoop() + bl.waitUntilStarted(context.Background()) + assert.True(t, bl.isStarted) + _, ok := <-bl.startDone + if ok { + t.Errorf("Expected new block filter established signal channel to be closed") + } + + done() + c.WaitClosed() + + mRPC.AssertExpectations(t) +} + func TestBlockListenerDispatchStopped(t *testing.T) { - _, c, _, done := newTestConnector(t) + _, c, _, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) done() c.blockListener.dispatchToConsumers([]*blockUpdateConsumer{ @@ -1519,7 +1674,7 @@ func TestBlockListenerDispatchStopped(t *testing.T) { func TestBlockListenerRebuildCanonicalChainEmpty(t *testing.T) { - _, c, _, done := newTestConnector(t) + _, c, _, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) defer done() bl := c.blockListener @@ -1530,7 +1685,7 @@ func TestBlockListenerRebuildCanonicalChainEmpty(t *testing.T) { func TestBlockListenerRebuildCanonicalFailTerminate(t *testing.T) { - _, c, mRPC, done := newTestConnector(t) + _, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t) bl := c.blockListener bl.canonicalChain.PushBack(&minimalBlockInfo{ number: 1000, diff --git a/internal/ethereum/ethereum_test.go b/internal/ethereum/ethereum_test.go index 15d2066..c37575b 100644 --- a/internal/ethereum/ethereum_test.go +++ b/internal/ethereum/ethereum_test.go @@ -27,12 +27,23 @@ import ( "github.com/hyperledger/firefly-signer/pkg/abi" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func strPtr(s string) *string { return &s } func newTestConnector(t *testing.T, confSetup ...func(conf config.Section)) (context.Context, *ethConnector, *rpcbackendmocks.Backend, func()) { + ctx, c, mRPC, done := newTestConnectorWithNoBlockerFilterDefaultMocks(t, confSetup...) + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + filterID := args[1].(*string) + *filterID = testBlockFilterID1 + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Maybe() + return ctx, c, mRPC, done +} +func newTestConnectorWithNoBlockerFilterDefaultMocks(t *testing.T, confSetup ...func(conf config.Section)) (context.Context, *ethConnector, *rpcbackendmocks.Backend, func()) { mRPC := &rpcbackendmocks.Backend{} config.RootConfigReset() conf := config.RootSection("unittest") @@ -55,7 +66,18 @@ func newTestConnector(t *testing.T, confSetup ...func(conf config.Section)) (con mRPC.AssertExpectations(t) c.WaitClosed() } +} +func conditionalMockOnce(call *mock.Call, predicate func() bool, thenRun func(args mock.Arguments)) { + call.Run(func(args mock.Arguments) { + if predicate() { + thenRun(args) + } else { + call.Run(func(args mock.Arguments) { + thenRun(args) + }).Once() + } + }).Once() } func TestConnectorInit(t *testing.T) { diff --git a/internal/ethereum/event_actions.go b/internal/ethereum/event_actions.go index 7660678..5f56190 100644 --- a/internal/ethereum/event_actions.go +++ b/internal/ethereum/event_actions.go @@ -62,7 +62,7 @@ func (c *ethConnector) EventStreamStart(ctx context.Context, req *ffcapi.EventSt go es.streamLoop() // Add the block consumer - c.blockListener.addConsumer(&blockUpdateConsumer{ + c.blockListener.addConsumer(req.StreamContext, &blockUpdateConsumer{ id: es.id, ctx: req.StreamContext, updates: req.BlockListener, diff --git a/internal/ethereum/event_actions_test.go b/internal/ethereum/event_actions_test.go index 6b0ae01..ca21f2d 100644 --- a/internal/ethereum/event_actions_test.go +++ b/internal/ethereum/event_actions_test.go @@ -83,14 +83,11 @@ func mockStreamLoopEmpty(mRPC *rpcbackendmocks.Backend) { *hbh = *ethtypes.NewHexInteger64(testHighBlock) }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - *args[1].(*string) = "filter_id1" + *args[1].(*string) = testLogsFilterID1 }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) }).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) - }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index 0d4fd57..ce73503 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -301,7 +301,6 @@ func (es *eventStream) leadGroupSteadyState() bool { var ag *aggregatedListener lastUpdate := -1 failCount := 0 - filterRPC := "" filterResetRequired := false for { if es.c.doFailureDelay(es.ctx, failCount) { @@ -330,7 +329,6 @@ func (es *eventStream) leadGroupSteadyState() bool { es.uninstallFilter(&filter) } filterResetRequired = false - filterRPC = "eth_getFilterLogs" // first JSON/RPC after getting a new filter ID // Determine the earliest block we need to poll from fromBlock := int64(-1) for _, l := range ag.listeners { @@ -364,19 +362,17 @@ func (es *eventStream) leadGroupSteadyState() bool { } // Get the next batch of logs var ethLogs []*logJSONRPC - rpcErr := es.c.backend.CallRPC(es.ctx, ðLogs, filterRPC, filter) + rpcErr := es.c.backend.CallRPC(es.ctx, ðLogs, "eth_getFilterLogs", filter) // If we fail to query we just retry - setting filter to nil if not found if rpcErr != nil { if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { log.L(es.ctx).Infof("Filter '%v' reset: %s", filter, rpcErr.Message) filter = "" } - log.L(es.ctx).Errorf("Failed to query filter (%s): %s", filterRPC, rpcErr.Message) + log.L(es.ctx).Errorf("Failed to query filter (eth_getFilterLogs): %s", rpcErr.Message) failCount++ continue } - filterRPC = "eth_getFilterChanges" - // Enrich the events events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs) if enrichErr != nil { diff --git a/internal/ethereum/event_stream_test.go b/internal/ethereum/event_stream_test.go index 53cb2a5..94af303 100644 --- a/internal/ethereum/event_stream_test.go +++ b/internal/ethereum/event_stream_test.go @@ -206,20 +206,8 @@ func TestCatchupThenRejoinLeadGroup(t *testing.T) { }, } - es, events, mRPC, done := testEventStream(t, l1req) - defer done() - - l2req := &ffcapi.EventListenerAddRequest{ - StreamID: es.id, - ListenerID: fftypes.NewUUID(), - EventListenerOptions: ffcapi.EventListenerOptions{ - Filters: []fftypes.JSONAny{ - *fftypes.JSONAnyPtr(`{"event":` + abiTransferEvent + `}`), - }, - Options: fftypes.JSONAnyPtr(`{}`), - FromBlock: "1000", - }, - } + ctx, c, mRPC, done := newTestConnector(t) + mockStreamLoopEmpty(mRPC) closed := false listenerCaughtUp := make(chan struct{}) @@ -241,7 +229,7 @@ func TestCatchupThenRejoinLeadGroup(t *testing.T) { }, Data: ethtypes.MustNewHexBytes0xPrefix("0x00000000000000000000000000000000000000000000000000000000000003e8"), }) - case es.c.catchupPageSize + 1000: + case 500 /*default catch up page size*/ + 1000: if !closed { close(listenerCaughtUp) closed = true @@ -257,6 +245,21 @@ func TestCatchupThenRejoinLeadGroup(t *testing.T) { Hash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"), } }) + es, events, mRPC, done := testEventStreamExistingConnector(t, ctx, done, c, mRPC, l1req) + + defer done() + + l2req := &ffcapi.EventListenerAddRequest{ + StreamID: es.id, + ListenerID: fftypes.NewUUID(), + EventListenerOptions: ffcapi.EventListenerOptions{ + Filters: []fftypes.JSONAny{ + *fftypes.JSONAnyPtr(`{"event":` + abiTransferEvent + `}`), + }, + Options: fftypes.JSONAnyPtr(`{}`), + FromBlock: "1000", + }, + } _, _, err := es.c.EventListenerAdd(es.ctx, l2req) assert.NoError(t, err) @@ -343,7 +346,8 @@ func TestExitDuringCatchup(t *testing.T) { }, } - _, _, mRPC, done := testEventStream(t, l1req) + ctx, c, mRPC, done := newTestConnector(t) + mockStreamLoopEmpty(mRPC) completed := make(chan struct{}) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { @@ -359,7 +363,9 @@ func TestExitDuringCatchup(t *testing.T) { } *args[1].(*[]*logJSONRPC) = ethLogs }).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Maybe() + + _, _, mRPC, done = testEventStreamExistingConnector(t, ctx, done, c, mRPC, l1req) <-completed } @@ -378,18 +384,14 @@ func TestLeadGroupDeliverEvents(t *testing.T) { } ctx, c, mRPC, done := newTestConnector(t) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { *args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(testHighBlock) }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { - *args[1].(*string) = "filter_id1" + *args[1].(*string) = testLogsFilterID1 }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) - }).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = []*logJSONRPC{ { BlockNumber: ethtypes.NewHexInteger64(212122), @@ -412,9 +414,6 @@ func TestLeadGroupDeliverEvents(t *testing.T) { Hash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"), } }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - *args[1].(*[]*logJSONRPC) = []*logJSONRPC{} - }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() @@ -433,6 +432,7 @@ func TestLeadGroupDeliverEvents(t *testing.T) { assert.Equal(t, "0x3968ef051b422d3d1cdc182a88bba8dd922e6fa4", e.Event.Data.JSONObject().GetString("from")) assert.Equal(t, "0xd0f2f5103fd050739a9fb567251bc460cc24d091", e.Event.Data.JSONObject().GetString("to")) assert.Equal(t, "1000", e.Event.Data.JSONObject().GetString("value")) + mRPC.AssertExpectations(t) } func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) { @@ -457,14 +457,13 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { assert.Equal(t, int64(0), args[3].(*logFilterJSONRPC).FromBlock.BigInt().Int64()) - *args[1].(*string) = "filter_id1" + *args[1].(*string) = testLogsFilterID1 }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) }).Once().Run(func(args mock.Arguments) { close(filtered) }) - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() @@ -506,6 +505,32 @@ func TestLeadGroupCatchupRetry(t *testing.T) { <-retried +} +func TestLeadGroupCatchupExitWhenNoBlockHeightEstablished(t *testing.T) { + + l1req := &ffcapi.EventListenerAddRequest{ + ListenerID: fftypes.NewUUID(), + EventListenerOptions: ffcapi.EventListenerOptions{ + Filters: []fftypes.JSONAny{ + *fftypes.JSONAnyPtr(`{"event":` + abiTransferEvent + `}`), + }, + Options: fftypes.JSONAnyPtr(`{}`), + FromBlock: "0", + }, + } + ctx, c, mRPC, cDone := newTestConnector(t) + + attempted := make(chan struct{}) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(&rpcbackend.RPCError{Message: "pop"}).Run(func(args mock.Arguments) { + close(attempted) + cDone() + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(&rpcbackend.RPCError{Message: "pop"}).Maybe() + _, _, mRPC, done := testEventStreamExistingConnector(t, ctx, cDone, c, mRPC, l1req) + defer done() + <-attempted + } func TestStreamLoopNewFilterFail(t *testing.T) { @@ -613,19 +638,16 @@ func TestStreamLoopChangeFilter(t *testing.T) { l2req.StreamID = es.id _, _, err := c.EventListenerAdd(ctx, l2req) assert.NoError(t, err) - *args[1].(*string) = "filter_id1" + *args[1].(*string) = testLogsFilterID1 }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { - *args[1].(*string) = "filter_id2" + *args[1].(*string) = testLogsFilterID2 close(reestablishedFilter) }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) }).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) - }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() @@ -650,7 +672,6 @@ func TestStreamLoopFilterReset(t *testing.T) { }, } ctx, c, mRPC, done := newTestConnector(t) - reestablishedFilter := make(chan struct{}) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*ethtypes.HexInteger) @@ -658,20 +679,17 @@ func TestStreamLoopFilterReset(t *testing.T) { }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { - *args[1].(*string) = "filter_id1" + *args[1].(*string) = testLogsFilterID1 }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { - *args[1].(*string) = "filter_id2" + *args[1].(*string) = testLogsFilterID2 close(reestablishedFilter) }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "filter not found"}).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) }).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) - }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() @@ -705,7 +723,7 @@ func TestStreamLoopEnrichFail(t *testing.T) { }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). Run(func(args mock.Arguments) { - *args[1].(*string) = "filter_id1" + *args[1].(*string) = testLogsFilterID1 }).Once() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = []*logJSONRPC{ @@ -729,9 +747,6 @@ func TestStreamLoopEnrichFail(t *testing.T) { close(errorReturned) }). Return(&rpcbackend.RPCError{Message: "pop"}).Once() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) - }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() diff --git a/internal/ethereum/new_block_listener.go b/internal/ethereum/new_block_listener.go index 7d28e79..8b01ee3 100644 --- a/internal/ethereum/new_block_listener.go +++ b/internal/ethereum/new_block_listener.go @@ -24,7 +24,7 @@ import ( func (c *ethConnector) NewBlockListener(ctx context.Context, req *ffcapi.NewBlockListenerRequest) (*ffcapi.NewBlockListenerResponse, ffcapi.ErrorReason, error) { // Add the block consumer - c.blockListener.addConsumer(&blockUpdateConsumer{ + c.blockListener.addConsumer(req.ListenerContext, &blockUpdateConsumer{ id: req.ID, ctx: req.ListenerContext, updates: req.BlockListener, diff --git a/internal/ethereum/new_block_listener_test.go b/internal/ethereum/new_block_listener_test.go index 0529a40..fe27f7f 100644 --- a/internal/ethereum/new_block_listener_test.go +++ b/internal/ethereum/new_block_listener_test.go @@ -37,9 +37,9 @@ func TestNewBlockListenerOK(t *testing.T) { }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*string) - *hbh = "filter_id1" + *hbh = testBlockFilterID1 }).Maybe() - mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) { + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) *hbh = []ethtypes.HexBytes0xPrefix{} }).Maybe()