diff --git a/internal/ethereum/event_listener.go b/internal/ethereum/event_listener.go index c23f13b..cdd4ff4 100644 --- a/internal/ethereum/event_listener.go +++ b/internal/ethereum/event_listener.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inl.c. +// Copyright © 2023 Kaleido, Inl.c. // // SPDX-License-Identifier: Apache-2.0 // @@ -271,7 +271,7 @@ func (l *listener) matchMethod(ctx context.Context, methods []*abi.Entry, txInfo info.InputArgs = fftypes.JSONAnyPtrBytes(b) } -func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool) { +func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool, error) { // Check the block for this event is at our high water mark, as we might have rewound for other listeners blockNumber := ethLog.BlockNumber.BigInt().Int64() @@ -280,7 +280,7 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo protoID := getEventProtoID(blockNumber, transactionIndex, logIndex) if blockNumber < l.hwmBlock { log.L(ctx).Debugf("Listener %s already delivered event '%s' hwm=%d", l.id, protoID, l.hwmBlock) - return nil, false + return nil, false, nil } // Apply a post-filter check to the event @@ -288,7 +288,7 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo addrMatches := f.Address == nil || bytes.Equal(ethLog.Address[:], f.Address[:]) if !topicMatches || !addrMatches { log.L(ctx).Debugf("Listener %s skipping event '%s' topicMatches=%t addrMatches=%t", l.id, protoID, topicMatches, addrMatches) - return nil, false + return nil, false, nil } log.L(ctx).Infof("Listener %s detected event '%s'", l.id, protoID) @@ -303,22 +303,22 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo bi, err := l.c.getBlockInfoByHash(ctx, ethLog.BlockHash.String()) if bi == nil || err != nil { log.L(ctx).Errorf("Failed to get block info timestamp for block '%s': %v", ethLog.BlockHash, err) - } else { - timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64()) + return nil, false, err // This is an error condition, rather than just something we cannot enrich } + timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64()) } if len(l.config.options.Methods) > 0 || l.config.options.Signer { txInfo, err := l.c.getTransactionInfo(ctx, ethLog.TransactionHash) if txInfo == nil || err != nil { log.L(ctx).Errorf("Failed to get transaction info for TX '%s': %v", ethLog.TransactionHash, err) - } else { - if l.config.options.Signer { - info.InputSigner = txInfo.From - } - if len(l.config.options.Methods) > 0 { - l.matchMethod(ctx, l.config.options.Methods, txInfo, &info) - } + return nil, false, err // This is an error condition, rather than just something we cannot enrich + } + if l.config.options.Signer { + info.InputSigner = txInfo.From + } + if len(l.config.options.Methods) > 0 { + l.matchMethod(ctx, l.config.options.Methods, txInfo, &info) } } @@ -343,5 +343,5 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo Info: &info, Data: data, }, - }, true + }, true, nil } diff --git a/internal/ethereum/event_listener_test.go b/internal/ethereum/event_listener_test.go index 91aae32..67a897b 100644 --- a/internal/ethereum/event_listener_test.go +++ b/internal/ethereum/event_listener_test.go @@ -248,9 +248,10 @@ func TestFilterEnrichEthLogBlockBelowHWM(t *testing.T) { assert.NoError(t, err) l.hwmBlock = 2 - _, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{ + _, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{ BlockNumber: ethtypes.NewHexInteger64(1), }) + assert.NoError(t, err) assert.False(t, ok) } @@ -263,9 +264,10 @@ func TestFilterEnrichEthLogAddressMismatch(t *testing.T) { err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent) assert.NoError(t, err) - _, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{ + _, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{ Address: ethtypes.MustNewAddress("0x20355f3e852d4b6a9944ada8d5399ddd3409a431"), }) + assert.NoError(t, err) assert.False(t, ok) } @@ -294,8 +296,9 @@ func TestFilterEnrichEthLogMethodInputsOk(t *testing.T) { } }) - ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) + ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) assert.True(t, ok) + assert.NoError(t, err) ei := ev.Event.Info.(*eventInfo) assert.NotNil(t, ei.InputArgs) assert.Equal(t, `{"_to":"0xd0f2f5103fd050739a9fb567251bc460cc24d091","_value":"1000"}`, ei.InputArgs.String()) @@ -319,10 +322,34 @@ func TestFilterEnrichEthLogTXInfoFail(t *testing.T) { return th.String() == "0x1a1f797ee000c529b6a2dd330cedd0d081417a30d16a4eecb3f863ab4657246f" })).Return(&rpcbackend.RPCError{Message: "pop2"}) - ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) - assert.True(t, ok) - ei := ev.Event.Info.(*eventInfo) - assert.Nil(t, ei.InputArgs) + _, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) + assert.False(t, ok) + assert.Regexp(t, "pop1", err) + +} + +func TestFilterEnrichEthLogTXTimestampFail(t *testing.T) { + + l, mRPC, _ := newTestListener(t, true) + + var abiEvent *abi.Entry + err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent) + assert.NoError(t, err) + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool { + return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c" + }), false).Return(nil).Run(func(args mock.Arguments) { + *args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{ + Number: ethtypes.NewHexInteger64(1024), + } + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getTransactionByHash", mock.MatchedBy(func(th ethtypes.HexBytes0xPrefix) bool { + return th.String() == "0x1a1f797ee000c529b6a2dd330cedd0d081417a30d16a4eecb3f863ab4657246f" + })).Return(&rpcbackend.RPCError{Message: "pop2"}) + + _, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) + assert.False(t, ok) + assert.Regexp(t, "pop2", err) } @@ -350,8 +377,9 @@ func TestFilterEnrichEthLogMethodBadInputTooShort(t *testing.T) { } }) - ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) + ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) assert.True(t, ok) + assert.NoError(t, err) ei := ev.Event.Info.(*eventInfo) assert.Nil(t, ei.InputArgs) @@ -381,8 +409,9 @@ func TestFilterEnrichEthLogMethodBadInputTooMismatchFunctionID(t *testing.T) { } }) - ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) + ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) assert.True(t, ok) + assert.NoError(t, err) ei := ev.Event.Info.(*eventInfo) assert.Nil(t, ei.InputArgs) @@ -412,7 +441,8 @@ func TestFilterEnrichEthLogMethodBadInputABIData(t *testing.T) { } }) - ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) + ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) + assert.NoError(t, err) assert.True(t, ok) ei := ev.Event.Info.(*eventInfo) assert.Nil(t, ei.InputArgs) diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index 5ef9d8f..1c237c6 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -369,7 +369,12 @@ func (es *eventStream) leadGroupSteadyState() bool { filterRPC = "eth_getFilterChanges" // Enrich the events - events := es.filterEnrichSort(es.ctx, ag, ethLogs) + events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs) + if enrichErr != nil { + log.L(es.ctx).Errorf("Failed to enrich events: %s", enrichErr) + failCount++ + continue + } // Dispatch the events if es.dispatchSetHWMCheckExit(ag, events, hwmBlock) { @@ -467,13 +472,16 @@ func getEventProtoID(blockNumber, transactionIndex, logIndex int64) string { return fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, logIndex) } -func (es *eventStream) filterEnrichSort(ctx context.Context, ag *aggregatedListener, ethLogs []*logJSONRPC) ffcapi.ListenerEvents { +func (es *eventStream) filterEnrichSort(ctx context.Context, ag *aggregatedListener, ethLogs []*logJSONRPC) (ffcapi.ListenerEvents, error) { updates := make(ffcapi.ListenerEvents, 0, len(ethLogs)) for _, ethLog := range ethLogs { listeners := ag.listenersByTopic0[ethLog.Topics[0].String()] for _, l := range listeners { for _, f := range l.config.filters { - lu, matches := l.filterEnrichEthLog(ctx, f, ethLog) + lu, matches, err := l.filterEnrichEthLog(ctx, f, ethLog) + if err != nil { + return nil, err + } if matches { updates = append(updates, lu) break // A single listener cannot emit the event twice @@ -482,7 +490,7 @@ func (es *eventStream) filterEnrichSort(ctx context.Context, ag *aggregatedListe } } sort.Sort(updates) - return updates + return updates, nil } func (es *eventStream) getBlockRangeEvents(ctx context.Context, ag *aggregatedListener, fromBlock, toBlock int64) (ffcapi.ListenerEvents, error) { @@ -503,7 +511,7 @@ func (es *eventStream) getBlockRangeEvents(ctx context.Context, ag *aggregatedLi if rpcErr != nil { return nil, rpcErr.Error() } - return es.filterEnrichSort(ctx, ag, ethLogs), nil + return es.filterEnrichSort(ctx, ag, ethLogs) } func (es *eventStream) getListenerHWM(ctx context.Context, listenerID *fftypes.UUID) (*ffcapi.EventListenerHWMResponse, ffcapi.ErrorReason, error) { diff --git a/internal/ethereum/event_stream_test.go b/internal/ethereum/event_stream_test.go index 54bf05d..13ff6a4 100644 --- a/internal/ethereum/event_stream_test.go +++ b/internal/ethereum/event_stream_test.go @@ -680,6 +680,66 @@ func TestStreamLoopFilterReset(t *testing.T) { } +func TestStreamLoopEnrichFail(t *testing.T) { + + l1req := &ffcapi.EventListenerAddRequest{ + ListenerID: fftypes.NewUUID(), + EventListenerOptions: ffcapi.EventListenerOptions{ + Filters: []fftypes.JSONAny{ + *fftypes.JSONAnyPtr(`{"address":"0x171AE0BDd882F7b4C84D5b7FBFA994E39C5a3129","event":` + abiTransferEvent + `}`), + }, + Options: fftypes.JSONAnyPtr(`{}`), + FromBlock: strconv.Itoa(testHighBlock), + }, + } + ctx, c, mRPC, done := newTestConnector(t) + c.eventBlockTimestamps = true + + errorReturned := make(chan struct{}) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexInteger64(testHighBlock) + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + *args[1].(*string) = "filter_id1" + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = []*logJSONRPC{ + { + BlockNumber: ethtypes.NewHexInteger64(212122), + TransactionIndex: ethtypes.NewHexInteger64(64), + LogIndex: ethtypes.NewHexInteger64(2), + BlockHash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"), + Address: ethtypes.MustNewAddress("0x171AE0BDd882F7b4C84D5b7FBFA994E39C5a3129"), + Topics: []ethtypes.HexBytes0xPrefix{ + ethtypes.MustNewHexBytes0xPrefix("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), + ethtypes.MustNewHexBytes0xPrefix("0x0000000000000000000000003968ef051b422d3d1cdc182a88bba8dd922e6fa4"), + ethtypes.MustNewHexBytes0xPrefix("0x000000000000000000000000d0f2f5103fd050739a9fb567251bc460cc24d091"), + }, + Data: ethtypes.MustNewHexBytes0xPrefix("0x00000000000000000000000000000000000000000000000000000000000003e8"), + }, + } + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.Anything, false). + Run(func(args mock.Arguments) { + close(errorReturned) + }). + Return(&rpcbackend.RPCError{Message: "pop"}).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*bool) = true + }).Maybe() + + _, _, mRPC, done = testEventStreamExistingConnector(t, ctx, done, c, mRPC, l1req) + defer done() + + <-errorReturned + +} + func TestDispatchListenerDone(t *testing.T) { doneCtx, cancel := context.WithCancel(context.Background())