Skip to content

Commit

Permalink
Merge pull request #73 from kaleido-io/tx-cache
Browse files Browse the repository at this point in the history
add LRU cache for transaction info
  • Loading branch information
awrichar authored Apr 7, 2023
2 parents dd7b14d + 27008e3 commit 2b1a0be
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ $(eval $(call makemock, $$(FF_SIGNER_PATH), Backend, rpcbackendmocks))
$(eval $(call makemock, $$(FFTM_PATH), Manager, fftmmocks))

firefly-evmconnect: ${GOFILES}
$(VGO) build -o ./firefly-evmconnect -ldflags "-X main.buildDate=`date -u +\"%Y-%m-%dT%H:%M:%SZ\"` -X main.buildVersion=$(BUILD_VERSION)" -tags=prod -tags=prod -v ./evmconnect
$(VGO) build -o ./firefly-evmconnect -ldflags "-X main.buildDate=`date -u +\"%Y-%m-%dT%H:%M:%SZ\"` -X main.buildVersion=$(BUILD_VERSION)" -tags=prod -tags=prod -v ./evmconnect
go-mod-tidy: .ALWAYS
$(VGO) mod tidy
build: firefly-evmconnect
Expand Down
2 changes: 1 addition & 1 deletion config.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|blockCacheSize|Maximum of blocks to hold in the block info cache|`int`|`250`
|blockCacheTTL|Time to live for the block info cache|[`time.Duration`](https://pkg.go.dev/time#Duration)|`5m`
|blockPollingInterval|Interval for polling to check for new blocks|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|dataFormat|Configure the JSON data format for query output and events|map,flat_array,self_describing|`map`
Expand All @@ -62,6 +61,7 @@
|passthroughHeadersEnabled|Enable passing through the set of allowed HTTP request headers|`boolean`|`false`
|requestTimeout|The maximum amount of time that a request is allowed to remain open|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|tlsHandshakeTimeout|The maximum amount of time to wait for a successful TLS handshake|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s`
|txCacheSize|Maximum of transactions to hold in the transaction info cache|`int`|`250`
|url|URL of JSON/RPC endpoint for the Ethereum node/gateway|string|`<nil>`

## connector.auth
Expand Down
6 changes: 3 additions & 3 deletions internal/ethereum/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -26,7 +26,6 @@ const (
ConfigDataFormat = "dataFormat"
BlockPollingInterval = "blockPollingInterval"
BlockCacheSize = "blockCacheSize"
BlockCacheTTL = "blockCacheTTL"
EventsCatchupPageSize = "events.catchupPageSize"
EventsCatchupThreshold = "events.catchupThreshold"
EventsCheckpointBlockGap = "events.checkpointBlockGap"
Expand All @@ -35,6 +34,7 @@ const (
RetryInitDelay = "retry.initialDelay"
RetryMaxDelay = "retry.maxDelay"
RetryFactor = "retry.factor"
TxCacheSize = "txCacheSize"
)

const (
Expand All @@ -53,7 +53,6 @@ const (
func InitConfig(conf config.Section) {
ffresty.InitConfig(conf)
conf.AddKnownKey(BlockCacheSize, 250)
conf.AddKnownKey(BlockCacheTTL, "5m")
conf.AddKnownKey(BlockPollingInterval, "1s")
conf.AddKnownKey(ConfigDataFormat, "map")
conf.AddKnownKey(ConfigGasEstimationFactor, DefaultGasEstimationFactor)
Expand All @@ -65,4 +64,5 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(RetryFactor, DefaultRetryDelayFactor)
conf.AddKnownKey(RetryInitDelay, DefaultRetryInitDelay)
conf.AddKnownKey(RetryMaxDelay, DefaultRetryMaxDelay)
conf.AddKnownKey(TxCacheSize, 250)
}
10 changes: 8 additions & 2 deletions internal/ethereum/ethereum.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -51,6 +51,7 @@ type ethConnector struct {
mux sync.Mutex
eventStreams map[fftypes.UUID]*eventStream
blockCache *lru.Cache
txCache *lru.Cache
}

func NewEthereumConnector(ctx context.Context, conf config.Section) (cc ffcapi.API, err error) {
Expand All @@ -73,7 +74,12 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc ffcapi.A
}
c.blockCache, err = lru.New(conf.GetInt(BlockCacheSize))
if err != nil {
return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail)
return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail, "block")
}

c.txCache, err = lru.New(conf.GetInt(TxCacheSize))
if err != nil {
return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail, "transaction")
}

if conf.GetString(ffresty.HTTPConfigURL) == "" {
Expand Down
4 changes: 4 additions & 0 deletions internal/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,8 @@ func TestConnectorInit(t *testing.T) {
cc, err = NewEthereumConnector(context.Background(), conf)
assert.Regexp(t, "FF23040", err)

conf.Set(BlockCacheSize, "1")
conf.Set(TxCacheSize, "-1")
cc, err = NewEthereumConnector(context.Background(), conf)
assert.Regexp(t, "FF23040", err)
}
8 changes: 6 additions & 2 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,13 @@ func TestFilterEnrichEthLogMethodInputsOk(t *testing.T) {
From: ethtypes.MustNewAddress("0x3968ef051b422d3d1cdc182a88bba8dd922e6fa4"),
Input: ethtypes.MustNewHexBytes0xPrefix("0xa9059cbb000000000000000000000000d0f2f5103fd050739a9fb567251bc460cc24d09100000000000000000000000000000000000000000000000000000000000003e8"),
}
})
}).Once() // 1 cache miss and hit

ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog())
ev, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) // cache miss
assert.True(t, ok)
assert.NoError(t, err)

ev, ok, err = l.filterEnrichEthLog(context.Background(), l.config.filters[0], sampleTransferLog()) // cache hit
assert.True(t, ok)
assert.NoError(t, err)
ei := ev.Event.Info.(*eventInfo)
Expand Down
7 changes: 7 additions & 0 deletions internal/ethereum/get_receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,17 @@ type txInfoJSONRPC struct {

func (c *ethConnector) getTransactionInfo(ctx context.Context, hash ethtypes.HexBytes0xPrefix) (*txInfoJSONRPC, error) {
var txInfo *txInfoJSONRPC
cached, ok := c.txCache.Get(hash.String())
if ok {
return cached.(*txInfoJSONRPC), nil
}

rpcErr := c.backend.CallRPC(ctx, &txInfo, "eth_getTransactionByHash", hash)
var err error
if rpcErr != nil {
err = rpcErr.Error()
} else {
c.txCache.Add(hash.String(), txInfo)
}
return txInfo, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/msgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -31,11 +31,11 @@ var (
ConfigEthereumDataFormat = ffc("config.connector.dataFormat", "Configure the JSON data format for query output and events", "map,flat_array,self_describing")
ConfigEthereumGasEstimationFactor = ffc("config.connector.gasEstimationFactor", "The factor to apply to the gas estimation to determine the gas limit", "float")
ConfigBlockCacheSize = ffc("config.connector.blockCacheSize", "Maximum of blocks to hold in the block info cache", i18n.IntType)
ConfigBlockCacheTTL = ffc("config.connector.blockCacheTTL", "Time to live for the block info cache", i18n.TimeDurationType)
ConfigBlockPollingInterval = ffc("config.connector.blockPollingInterval", "Interval for polling to check for new blocks", i18n.TimeDurationType)
ConfigEventsBlockTimestamps = ffc("config.connector.events.blockTimestamps", "Whether to include the block timestamps in the event information", i18n.BooleanType)
ConfigEventsCatchupPageSize = ffc("config.connector.events.catchupPageSize", "Number of blocks to query per poll when catching up to the head of the blockchain", i18n.IntType)
ConfigEventsCatchupThreshold = ffc("config.connector.events.catchupThreshold", "How many blocks behind the chain head an event stream or listener must be on startup, to enter catchup mode", i18n.IntType)
ConfigEventsCheckpointBlockGap = ffc("config.connector.events.checkpointBlockGap", "The number of blocks at the head of the chain that should be considered unstable (could be dropped from the canonical chain after a re-org). Unless events with a full set of confirmations are detected, the restart checkpoint will this many blocks behind the chain head.", i18n.IntType)
ConfigEventsFilterPollingInterval = ffc("config.connector.events.filterPollingInterval", "The interval between polling calls to a filter, when checking for newly arrived events", i18n.TimeDurationType)
ConfigTxCacheSize = ffc("config.connector.txCacheSize", "Maximum of transactions to hold in the transaction info cache", i18n.IntType)
)
2 changes: 1 addition & 1 deletion internal/msgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
MsgMissingEventInFilter = ffe("FF23037", "Each filter must have an 'event' child containing the ABI definition of the event")
MsgListenerAlreadyStarted = ffe("FF23038", "Listener already started: %s")
MsgInvalidCheckpoint = ffe("FF23039", "Invalid checkpoint: %s")
MsgCacheInitFail = ffe("FF23040", "Failed to initialize cache")
MsgCacheInitFail = ffe("FF23040", "Failed to initialize %s cache")
MsgStreamNotStarted = ffe("FF23041", "Event stream %s not started")
MsgStreamAlreadyStarted = ffe("FF23042", "Event stream %s already started")
MsgListenerNotStarted = ffe("FF23043", "Event listener %s not started in event stream %s")
Expand Down

0 comments on commit 2b1a0be

Please sign in to comment.