Skip to content

Commit

Permalink
Merge pull request #106 from kaleido-io/auto-backoff-catchup
Browse files Browse the repository at this point in the history
Auto backoff catchup
  • Loading branch information
nguyer authored Jan 12, 2024
2 parents 26854b8 + 321864d commit a70cacf
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 6 deletions.
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|blockTimestamps|Whether to include the block timestamps in the event information|`boolean`|`true`
|catchupDownscaleRegex|An error pattern to check for from JSON/RPC providers if they limit response sizes to eth_getLogs(). If an error is returned from eth_getLogs() and that error matches the configured pattern, the number of logs requested (catchupPageSize) will be reduced automatically.|string|`Response size is larger than.*limit error.`
|catchupPageSize|Number of blocks to query per poll when catching up to the head of the blockchain|`int`|`500`
|catchupThreshold|How many blocks behind the chain head an event stream or listener must be on startup, to enter catchup mode|`int`|`500`
|checkpointBlockGap|The number of blocks at the head of the chain that should be considered unstable (could be dropped from the canonical chain after a re-org). Unless events with a full set of confirmations are detected, the restart checkpoint will this many blocks behind the chain head.|`int`|`50`
Expand Down
83 changes: 83 additions & 0 deletions go.sum

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions internal/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
BlockCacheSize = "blockCacheSize"
EventsCatchupPageSize = "events.catchupPageSize"
EventsCatchupThreshold = "events.catchupThreshold"
EventsCatchupDownscaleRegex = "events.catchupDownscaleRegex"
EventsCheckpointBlockGap = "events.checkpointBlockGap"
EventsBlockTimestamps = "events.blockTimestamps"
EventsFilterPollingInterval = "events.filterPollingInterval"
Expand All @@ -42,9 +43,10 @@ const (
DefaultListenerPort = 5102
DefaultGasEstimationFactor = 1.5

DefaultCatchupPageSize = 500
DefaultEventsCatchupThreshold = 500
DefaultEventsCheckpointBlockGap = 50
DefaultCatchupPageSize = 500
DefaultEventsCatchupThreshold = 500
DefaultEventsCatchupDownscaleRegex = "Response size is larger than.*limit error."
DefaultEventsCheckpointBlockGap = 50

DefaultRetryInitDelay = "100ms"
DefaultRetryMaxDelay = "30s"
Expand All @@ -61,6 +63,7 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(EventsFilterPollingInterval, "1s")
conf.AddKnownKey(EventsCatchupPageSize, DefaultCatchupPageSize)
conf.AddKnownKey(EventsCatchupThreshold, DefaultEventsCatchupThreshold)
conf.AddKnownKey(EventsCatchupDownscaleRegex, DefaultEventsCatchupDownscaleRegex)
conf.AddKnownKey(EventsCheckpointBlockGap, DefaultEventsCheckpointBlockGap)
conf.AddKnownKey(RetryFactor, DefaultRetryDelayFactor)
conf.AddKnownKey(RetryInitDelay, DefaultRetryInitDelay)
Expand Down
7 changes: 7 additions & 0 deletions internal/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/big"
"regexp"
"sync"
"time"

Expand All @@ -42,6 +43,7 @@ type ethConnector struct {
gasEstimationFactor *big.Float
catchupPageSize int64
catchupThreshold int64
catchupDownscaleRegex *regexp.Regexp
checkpointBlockGap int64
retry *retry.Retry
eventBlockTimestamps bool
Expand Down Expand Up @@ -87,6 +89,11 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc ffcapi.A
}
c.gasEstimationFactor = big.NewFloat(conf.GetFloat64(ConfigGasEstimationFactor))

c.catchupDownscaleRegex, err = regexp.Compile(conf.GetString(EventsCatchupDownscaleRegex))
if err != nil {
return nil, i18n.WrapError(ctx, err, msgs.MsgInvalidRegex, c.catchupDownscaleRegex)
}

httpClient, err := ffresty.New(ctx, conf)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions internal/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestConnectorInit(t *testing.T) {
conf.Set(ffresty.HTTPConfigURL, "http://localhost:8545")
conf.Set(EventsCatchupThreshold, 1)
conf.Set(EventsCatchupPageSize, 500)
conf.Set(EventsCatchupDownscaleRegex, "Response size is larger.*error.")

cc, err = NewEthereumConnector(context.Background(), conf)
assert.NoError(t, err)
Expand Down Expand Up @@ -119,4 +120,8 @@ func TestConnectorInit(t *testing.T) {
cc, err = NewEthereumConnector(context.Background(), conf)
assert.Regexp(t, "FF23040", err)

conf.Set(TxCacheSize, "1")
conf.Set(EventsCatchupDownscaleRegex, "[")
cc, err = NewEthereumConnector(context.Background(), conf)
assert.Regexp(t, "FF23051", err)
}
17 changes: 14 additions & 3 deletions internal/ethereum/event_listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inl.c.
// Copyright © 2024 Kaleido, Inl.c.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -206,8 +206,19 @@ func (l *listener) listenerCatchupLoop() {
toBlock := l.hwmBlock + l.c.catchupPageSize - 1
events, err := l.es.getBlockRangeEvents(ctx, al, fromBlock, toBlock)
if err != nil {
log.L(ctx).Errorf("Failed to query block range fromBlock=%d toBlock=%d: %s", fromBlock, toBlock, err)
failCount++
if l.c.catchupDownscaleRegex.String() != "" && l.c.catchupDownscaleRegex.MatchString(err.Error()) {
log.L(ctx).Warnf("Failed to query block range fromBlock=%d toBlock=%d. Error %s matches configured downscale regex, catchup page size will automatically be reduced", fromBlock, toBlock, err.Error())
if l.c.catchupPageSize > 1 {
l.c.catchupPageSize /= 2

if l.c.catchupPageSize < 20 {
log.L(ctx).Warnf("Catchup page size auto-reduced to extremely low value %d. The connector may never catch up with the head of the chain.", l.c.catchupPageSize)
}
}
} else {
log.L(ctx).Errorf("Failed to query block range fromBlock=%d toBlock=%d: %s", fromBlock, toBlock, err)
failCount++
}
continue
}
log.L(ctx).Infof("Listener catchup fromBlock=%d toBlock=%d events=%d", fromBlock, toBlock, len(events))
Expand Down
171 changes: 171 additions & 0 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ethereum

import (
"encoding/json"
"regexp"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -186,6 +187,176 @@ func TestListenerCatchupErrorsThenDeliveryExit(t *testing.T) {

}

func TestListenerCatchupScalesBackOnExpectedError(t *testing.T) {

l, mRPC, cancelCtx := newTestListener(t, false)

l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1001),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "Response size is larger than 150MB limit error."}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{sampleTransferLog()}
// Cancel the context here so we exit pushing the event
cancelCtx()
})

l.listenerCatchupLoop()

// The response size error from an JSON/RPC endpoint should cause us to scale back the catchup page size
assert.Equal(t, int64(250), l.c.catchupPageSize)
}

func TestListenerCatchupScalesBackNTimesOnExpectedError(t *testing.T) {

l, mRPC, cancelCtx := newTestListener(t, false)

l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1001),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "Response size is larger than 150MB limit error."}).Times(5)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{sampleTransferLog()}
// Cancel the context here so we exit pushing the event
cancelCtx()
})

l.listenerCatchupLoop()

// The response size error from an JSON/RPC endpoint should cause us to scale back the catchup page size
assert.Equal(t, int64(15), l.c.catchupPageSize)
}

func TestListenerCatchupScalesBackToOne(t *testing.T) {

l, mRPC, cancelCtx := newTestListener(t, false)

l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1001),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "Response size is larger than 150MB limit error."}).Times(50)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{sampleTransferLog()}
// Cancel the context here so we exit pushing the event
cancelCtx()
})

l.listenerCatchupLoop()

// The response size error from an JSON/RPC endpoint should cause us to scale back the catchup page size
assert.Equal(t, int64(1), l.c.catchupPageSize)
}

func TestListenerNoCatchupScaleBackOnErrorMismatch(t *testing.T) {

l, mRPC, cancelCtx := newTestListener(t, false)

l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1001),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "Response size problem"}).Times(5) // This doesn't match the default regex pattern so scaling back doesn't occur
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{sampleTransferLog()}
// Cancel the context here so we exit pushing the event
cancelCtx()
})

l.listenerCatchupLoop()

// The response size error doesn't match what we expect, catchup page size remains 500
assert.Equal(t, int64(500), l.c.catchupPageSize)
}

func TestListenerCatchupScalesBackCustomRegex(t *testing.T) {

var err error
l, mRPC, cancelCtx := newTestListener(t, false)

l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0
l.c.catchupDownscaleRegex, err = regexp.Compile("ACME JSON/RPC.*too large")

assert.NoError(t, err)

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1001),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "ACME JSON/RPC endpoint error - eth_getLogs response size is too large"}).Times(5)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{sampleTransferLog()}
// Cancel the context here so we exit pushing the event
cancelCtx()
})

l.listenerCatchupLoop()

// The response size error from an JSON/RPC endpoint should cause us to scale back the catchup page size
assert.Equal(t, int64(15), l.c.catchupPageSize)
}

func TestListenerCatchupNoScaleBackEmptyRegex(t *testing.T) {

var err error
l, mRPC, cancelCtx := newTestListener(t, false)

l.catchupLoopDone = make(chan struct{})
l.hwmBlock = 0
l.c.catchupDownscaleRegex, err = regexp.Compile("")

assert.NoError(t, err)

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1001),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(&rpcbackend.RPCError{Message: "ACME JSON/RPC endpoint error - eth_getLogs response size is too large"}).Times(5)
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{sampleTransferLog()}
// Cancel the context here so we exit pushing the event
cancelCtx()
})

l.listenerCatchupLoop()

// The response size error from an JSON/RPC endpoint should cause us to scale back the catchup page size
assert.Equal(t, int64(500), l.c.catchupPageSize)
}

func TestListenerCatchupErrorThenExit(t *testing.T) {

l, mRPC, cancelCtx := newTestListener(t, false)
Expand Down
1 change: 1 addition & 0 deletions internal/msgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
ConfigEventsBlockTimestamps = ffc("config.connector.events.blockTimestamps", "Whether to include the block timestamps in the event information", i18n.BooleanType)
ConfigEventsCatchupPageSize = ffc("config.connector.events.catchupPageSize", "Number of blocks to query per poll when catching up to the head of the blockchain", i18n.IntType)
ConfigEventsCatchupThreshold = ffc("config.connector.events.catchupThreshold", "How many blocks behind the chain head an event stream or listener must be on startup, to enter catchup mode", i18n.IntType)
ConfigEventsCatchupDownscaleRegex = ffc("config.connector.events.catchupDownscaleRegex", "An error pattern to check for from JSON/RPC providers if they limit response sizes to eth_getLogs(). If an error is returned from eth_getLogs() and that error matches the configured pattern, the number of logs requested (catchupPageSize) will be reduced automatically.", "string")
ConfigEventsCheckpointBlockGap = ffc("config.connector.events.checkpointBlockGap", "The number of blocks at the head of the chain that should be considered unstable (could be dropped from the canonical chain after a re-org). Unless events with a full set of confirmations are detected, the restart checkpoint will this many blocks behind the chain head.", i18n.IntType)
ConfigEventsFilterPollingInterval = ffc("config.connector.events.filterPollingInterval", "The interval between polling calls to a filter, when checking for newly arrived events", i18n.TimeDurationType)
ConfigTxCacheSize = ffc("config.connector.txCacheSize", "Maximum of transactions to hold in the transaction info cache", i18n.IntType)
Expand Down
1 change: 1 addition & 0 deletions internal/msgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ var (
MsgInvalidTXHashReturned = ffe("FF23048", "Received invalid transaction hash from node len=%d")
MsgUnmarshalErrorFail = ffe("FF23049", "Failed to parse error %d: %s")
MsgUnmarshalABIErrorsFail = ffe("FF23050", "Failed to parse errors ABI: %s")
MsgInvalidRegex = ffe("FF23051", "Invalid regular expression for auto-backoff catchup error: %s")
)

0 comments on commit a70cacf

Please sign in to comment.