Skip to content

Commit

Permalink
Merge pull request #57 from hyperledger/fix-53
Browse files Browse the repository at this point in the history
Treat enrichment failures due to JSON/RPC errors as errors, not partial success
  • Loading branch information
peterbroadhurst authored Mar 3, 2023
2 parents 4cd1edc + 88df884 commit a9db899
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 30 deletions.
28 changes: 14 additions & 14 deletions internal/ethereum/event_listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inl.c.
// Copyright © 2023 Kaleido, Inl.c.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -271,7 +271,7 @@ func (l *listener) matchMethod(ctx context.Context, methods []*abi.Entry, txInfo
info.InputArgs = fftypes.JSONAnyPtrBytes(b)
}

func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool) {
func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool, error) {

// Check the block for this event is at our high water mark, as we might have rewound for other listeners
blockNumber := ethLog.BlockNumber.BigInt().Int64()
Expand All @@ -280,15 +280,15 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo
protoID := getEventProtoID(blockNumber, transactionIndex, logIndex)
if blockNumber < l.hwmBlock {
log.L(ctx).Debugf("Listener %s already delivered event '%s' hwm=%d", l.id, protoID, l.hwmBlock)
return nil, false
return nil, false, nil
}

// Apply a post-filter check to the event
topicMatches := len(ethLog.Topics) > 0 && bytes.Equal(ethLog.Topics[0], f.Topic0)
addrMatches := f.Address == nil || bytes.Equal(ethLog.Address[:], f.Address[:])
if !topicMatches || !addrMatches {
log.L(ctx).Debugf("Listener %s skipping event '%s' topicMatches=%t addrMatches=%t", l.id, protoID, topicMatches, addrMatches)
return nil, false
return nil, false, nil
}

log.L(ctx).Infof("Listener %s detected event '%s'", l.id, protoID)
Expand All @@ -303,22 +303,22 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo
bi, err := l.c.getBlockInfoByHash(ctx, ethLog.BlockHash.String())
if bi == nil || err != nil {
log.L(ctx).Errorf("Failed to get block info timestamp for block '%s': %v", ethLog.BlockHash, err)
} else {
timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64())
return nil, false, err // This is an error condition, rather than just something we cannot enrich
}
timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64())
}

if len(l.config.options.Methods) > 0 || l.config.options.Signer {
txInfo, err := l.c.getTransactionInfo(ctx, ethLog.TransactionHash)
if txInfo == nil || err != nil {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': %v", ethLog.TransactionHash, err)
} else {
if l.config.options.Signer {
info.InputSigner = txInfo.From
}
if len(l.config.options.Methods) > 0 {
l.matchMethod(ctx, l.config.options.Methods, txInfo, &info)
}
return nil, false, err // This is an error condition, rather than just something we cannot enrich
}
if l.config.options.Signer {
info.InputSigner = txInfo.From
}
if len(l.config.options.Methods) > 0 {
l.matchMethod(ctx, l.config.options.Methods, txInfo, &info)
}
}

Expand All @@ -343,5 +343,5 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo
Info: &info,
Data: data,
},
}, true
}, true, nil
}
50 changes: 40 additions & 10 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ func TestFilterEnrichEthLogBlockBelowHWM(t *testing.T) {
assert.NoError(t, err)

l.hwmBlock = 2
_, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{
_, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{
BlockNumber: ethtypes.NewHexInteger64(1),
})
assert.NoError(t, err)
assert.False(t, ok)

}
Expand All @@ -263,9 +264,10 @@ func TestFilterEnrichEthLogAddressMismatch(t *testing.T) {
err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent)
assert.NoError(t, err)

_, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{
_, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{
Address: ethtypes.MustNewAddress("0x20355f3e852d4b6a9944ada8d5399ddd3409a431"),
})
assert.NoError(t, err)
assert.False(t, ok)

}
Expand Down Expand Up @@ -294,8 +296,9 @@ func TestFilterEnrichEthLogMethodInputsOk(t *testing.T) {
}
})

ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
assert.True(t, ok)
assert.NoError(t, err)
ei := ev.Event.Info.(*eventInfo)
assert.NotNil(t, ei.InputArgs)
assert.Equal(t, `{"_to":"0xd0f2f5103fd050739a9fb567251bc460cc24d091","_value":"1000"}`, ei.InputArgs.String())
Expand All @@ -319,10 +322,34 @@ func TestFilterEnrichEthLogTXInfoFail(t *testing.T) {
return th.String() == "0x1a1f797ee000c529b6a2dd330cedd0d081417a30d16a4eecb3f863ab4657246f"
})).Return(&rpcbackend.RPCError{Message: "pop2"})

ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
assert.True(t, ok)
ei := ev.Event.Info.(*eventInfo)
assert.Nil(t, ei.InputArgs)
_, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
assert.False(t, ok)
assert.Regexp(t, "pop1", err)

}

func TestFilterEnrichEthLogTXTimestampFail(t *testing.T) {

l, mRPC, _ := newTestListener(t, true)

var abiEvent *abi.Entry
err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent)
assert.NoError(t, err)

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1024),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getTransactionByHash", mock.MatchedBy(func(th ethtypes.HexBytes0xPrefix) bool {
return th.String() == "0x1a1f797ee000c529b6a2dd330cedd0d081417a30d16a4eecb3f863ab4657246f"
})).Return(&rpcbackend.RPCError{Message: "pop2"})

_, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
assert.False(t, ok)
assert.Regexp(t, "pop2", err)

}

Expand Down Expand Up @@ -350,8 +377,9 @@ func TestFilterEnrichEthLogMethodBadInputTooShort(t *testing.T) {
}
})

ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
assert.True(t, ok)
assert.NoError(t, err)
ei := ev.Event.Info.(*eventInfo)
assert.Nil(t, ei.InputArgs)

Expand Down Expand Up @@ -381,8 +409,9 @@ func TestFilterEnrichEthLogMethodBadInputTooMismatchFunctionID(t *testing.T) {
}
})

ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
assert.True(t, ok)
assert.NoError(t, err)
ei := ev.Event.Info.(*eventInfo)
assert.Nil(t, ei.InputArgs)

Expand Down Expand Up @@ -412,7 +441,8 @@ func TestFilterEnrichEthLogMethodBadInputABIData(t *testing.T) {
}
})

ev, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
assert.NoError(t, err)
assert.True(t, ok)
ei := ev.Event.Info.(*eventInfo)
assert.Nil(t, ei.InputArgs)
Expand Down
20 changes: 14 additions & 6 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -369,7 +369,12 @@ func (es *eventStream) leadGroupSteadyState() bool {
filterRPC = "eth_getFilterChanges"

// Enrich the events
events := es.filterEnrichSort(es.ctx, ag, ethLogs)
events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs)
if enrichErr != nil {
log.L(es.ctx).Errorf("Failed to enrich events: %s", enrichErr)
failCount++
continue
}

// Dispatch the events
if es.dispatchSetHWMCheckExit(ag, events, hwmBlock) {
Expand Down Expand Up @@ -467,13 +472,16 @@ func getEventProtoID(blockNumber, transactionIndex, logIndex int64) string {
return fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, logIndex)
}

func (es *eventStream) filterEnrichSort(ctx context.Context, ag *aggregatedListener, ethLogs []*logJSONRPC) ffcapi.ListenerEvents {
func (es *eventStream) filterEnrichSort(ctx context.Context, ag *aggregatedListener, ethLogs []*logJSONRPC) (ffcapi.ListenerEvents, error) {
updates := make(ffcapi.ListenerEvents, 0, len(ethLogs))
for _, ethLog := range ethLogs {
listeners := ag.listenersByTopic0[ethLog.Topics[0].String()]
for _, l := range listeners {
for _, f := range l.config.filters {
lu, matches := l.filterEnrichEthLog(ctx, f, ethLog)
lu, matches, err := l.filterEnrichEthLog(ctx, f, ethLog)
if err != nil {
return nil, err
}
if matches {
updates = append(updates, lu)
break // A single listener cannot emit the event twice
Expand All @@ -482,7 +490,7 @@ func (es *eventStream) filterEnrichSort(ctx context.Context, ag *aggregatedListe
}
}
sort.Sort(updates)
return updates
return updates, nil
}

func (es *eventStream) getBlockRangeEvents(ctx context.Context, ag *aggregatedListener, fromBlock, toBlock int64) (ffcapi.ListenerEvents, error) {
Expand All @@ -503,7 +511,7 @@ func (es *eventStream) getBlockRangeEvents(ctx context.Context, ag *aggregatedLi
if rpcErr != nil {
return nil, rpcErr.Error()
}
return es.filterEnrichSort(ctx, ag, ethLogs), nil
return es.filterEnrichSort(ctx, ag, ethLogs)
}

func (es *eventStream) getListenerHWM(ctx context.Context, listenerID *fftypes.UUID) (*ffcapi.EventListenerHWMResponse, ffcapi.ErrorReason, error) {
Expand Down
60 changes: 60 additions & 0 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,66 @@ func TestStreamLoopFilterReset(t *testing.T) {

}

func TestStreamLoopEnrichFail(t *testing.T) {

l1req := &ffcapi.EventListenerAddRequest{
ListenerID: fftypes.NewUUID(),
EventListenerOptions: ffcapi.EventListenerOptions{
Filters: []fftypes.JSONAny{
*fftypes.JSONAnyPtr(`{"address":"0x171AE0BDd882F7b4C84D5b7FBFA994E39C5a3129","event":` + abiTransferEvent + `}`),
},
Options: fftypes.JSONAnyPtr(`{}`),
FromBlock: strconv.Itoa(testHighBlock),
},
}
ctx, c, mRPC, done := newTestConnector(t)
c.eventBlockTimestamps = true

errorReturned := make(chan struct{})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(testHighBlock)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
*args[1].(*string) = "filter_id1"
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{
{
BlockNumber: ethtypes.NewHexInteger64(212122),
TransactionIndex: ethtypes.NewHexInteger64(64),
LogIndex: ethtypes.NewHexInteger64(2),
BlockHash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"),
Address: ethtypes.MustNewAddress("0x171AE0BDd882F7b4C84D5b7FBFA994E39C5a3129"),
Topics: []ethtypes.HexBytes0xPrefix{
ethtypes.MustNewHexBytes0xPrefix("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
ethtypes.MustNewHexBytes0xPrefix("0x0000000000000000000000003968ef051b422d3d1cdc182a88bba8dd922e6fa4"),
ethtypes.MustNewHexBytes0xPrefix("0x000000000000000000000000d0f2f5103fd050739a9fb567251bc460cc24d091"),
},
Data: ethtypes.MustNewHexBytes0xPrefix("0x00000000000000000000000000000000000000000000000000000000000003e8"),
},
}
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.Anything, false).
Run(func(args mock.Arguments) {
close(errorReturned)
}).
Return(&rpcbackend.RPCError{Message: "pop"}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()

_, _, mRPC, done = testEventStreamExistingConnector(t, ctx, done, c, mRPC, l1req)
defer done()

<-errorReturned

}

func TestDispatchListenerDone(t *testing.T) {

doneCtx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit a9db899

Please sign in to comment.