From adf9fd6eeef6e4e6aaed9011d1d1e2645eaac6ab Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Wed, 15 Jan 2025 19:41:41 -0500 Subject: [PATCH] Added control knob for substreams tier1 max active requests --- CHANGELOG.md | 18 ++++++++++++++++ cmd/apps/substreams_tier1.go | 41 ++++++++++++++++++++++++++---------- devel/standard/standard.yaml | 4 +++- go.mod | 6 +++--- go.sum | 12 +++++------ test/integration_test.go | 2 +- utils.go | 4 +++- 7 files changed, 64 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c1fa75..5c2a833 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,24 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s * Add support for ConnectWeb firehose requests. * Always use gzip compression on firehose requests for clients that support it (instead of always answering with the same compression as the request). +### Substreams + +- The `substreams-tier1` app now has two new configuration flags named respectively `substreams-tier1-active-requests-soft-limit` and `substreams-tier1-active-requests-hard-limit` +helping better load balance active requests across a pool of `tier1` instances. + + The `substreams-tier1-active-requests-soft-limit` limits the number of client active requests that a tier1 accepts before starting + to be report itself as 'unready' within the health check endpoint. A limit of 0 or less means no limit. + + This is useful to load balance active requests more easily across a pool of tier1 instance. When the instance reaches the soft + limit, it will start to be unready from the load balancer standpoint. The load balancer in return will remove it from the list + of available instances, and new connections will be routed to remaining clients, spreading the load. + + The `substreams-tier1-active-requests-hard-limit` limits the number of client active requests that a tier1 accepts before + rejecting incoming gRPC requests with 'Unavailable' code and setting itself as unready. A limit of 0 or less means no limit. + + This is useful to prevent the tier1 from being overwhelmed by too many requests, most client auto-reconnects on 'Unavailable' code + so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available. + ## v1.6.9 ### Substreams diff --git a/cmd/apps/substreams_tier1.go b/cmd/apps/substreams_tier1.go index 04d9eff..792e560 100644 --- a/cmd/apps/substreams_tier1.go +++ b/cmd/apps/substreams_tier1.go @@ -22,6 +22,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/streamingfast/cli" "github.com/streamingfast/dauth" discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service" firecore "github.com/streamingfast/firehose-core" @@ -52,7 +53,21 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root cmd.Flags().Bool("substreams-tier1-enforce-compression", true, "Reject any request that does not accept gzip or zstd encoding in their GRPC/Connect header") cmd.Flags().Int("substreams-tier1-max-subrequests", 4, "number of parallel subrequests that the tier1 can make to the tier2 per request") cmd.Flags().String("substreams-tier1-block-type", "", "Block type to use for the substreams tier1 (Ex: sf.ethereum.type.v2.Block)") - + cmd.Flags().Int("substreams-tier1-active-requests-soft-limit", 0, cli.FlagDescription(` + The number of client active requests that a tier1 accepts before starting to be report itself as 'unready' within the health + check endpoint. A limit of 0 or less means no limit. + + This is useful to load balance active requests more easily across a pool of tier1 instance. When the instance reaches the soft + limit, it will start to be unready from the load balancer standpoint. The load balancer in return will remove it from the list + of available instances, and new connections will be routed to remaining clients, spreading the load. + `)) + cmd.Flags().Int("substreams-tier1-active-requests-hard-limit", 0, cli.FlagDescription(` + The maximum number of client active requests that a tier1 accepts before rejecting incoming gRPC requests with 'Unavailable' code + and setting itself as unready. A limit of 0 or less means no limit. + + This is useful to prevent the tier1 from being overwhelmed by too many requests, most client auto-reconnects on 'Unavailable' code + so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available. + `)) // all substreams registerCommonSubstreamsFlags(cmd) return nil @@ -86,6 +101,8 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root subrequestsInsecure := viper.GetBool("substreams-tier1-subrequests-insecure") subrequestsPlaintext := viper.GetBool("substreams-tier1-subrequests-plaintext") maxSubrequests := viper.GetUint64("substreams-tier1-max-subrequests") + activeRequestsSoftLimit := viper.GetInt("substreams-tier1-active-requests-soft-limit") + activeRequestsHardLimit := viper.GetInt("substreams-tier1-active-requests-hard-limit") var blockType string if chain.DefaultBlockType != "" { @@ -137,16 +154,18 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root BlockStreamAddr: blockstreamAddr, TmpDir: tmpDir, - StateStoreURL: stateStoreURL, - StateStoreDefaultTag: stateStoreDefaultTag, - StateBundleSize: stateBundleSize, - MaxSubrequests: maxSubrequests, - SubrequestsEndpoint: subrequestsEndpoint, - SubrequestsInsecure: subrequestsInsecure, - SubrequestsPlaintext: subrequestsPlaintext, - BlockType: blockType, - WASMExtensions: wasmExtensions, - BlockExecutionTimeout: executionTimeout, + StateStoreURL: stateStoreURL, + StateStoreDefaultTag: stateStoreDefaultTag, + StateBundleSize: stateBundleSize, + MaxSubrequests: maxSubrequests, + SubrequestsEndpoint: subrequestsEndpoint, + ActiveRequestsSoftLimit: activeRequestsSoftLimit, + ActiveRequestsHardLimit: activeRequestsHardLimit, + SubrequestsInsecure: subrequestsInsecure, + SubrequestsPlaintext: subrequestsPlaintext, + BlockType: blockType, + WASMExtensions: wasmExtensions, + BlockExecutionTimeout: executionTimeout, Tracing: tracing, diff --git a/devel/standard/standard.yaml b/devel/standard/standard.yaml index 7736f8a..26a2f79 100644 --- a/devel/standard/standard.yaml +++ b/devel/standard/standard.yaml @@ -4,6 +4,8 @@ start: - merger - relayer - firehose + - substreams-tier1 + - substreams-tier2 flags: advertise-block-id-encoding: "hex" advertise-chain-name: "acme-dummy-blockchain" @@ -20,4 +22,4 @@ start: --store-dir="{node-data-dir}" --block-rate=120 --genesis-height=0 - --genesis-block-burst=1000 + --genesis-block-burst=100 diff --git a/go.mod b/go.mod index 39d8fb9..1c5c4b4 100644 --- a/go.mod +++ b/go.mod @@ -18,11 +18,11 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d - github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 + github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 - github.com/streamingfast/dgrpc v0.0.0-20250109212433-ae21a7f7a01a + github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 github.com/streamingfast/dmetering v0.0.0-20241101155221-489f5a9d9139 github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 @@ -31,7 +31,7 @@ require ( github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 github.com/streamingfast/pbgo v0.0.6-0.20250114182320-0b43084f4000 github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 - github.com/streamingfast/substreams v1.11.4-0.20250113142113-36c2750be692 + github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e github.com/stretchr/testify v1.9.0 github.com/test-go/testify v1.1.4 go.uber.org/multierr v1.10.0 diff --git a/go.sum b/go.sum index 99f5082..bb62cd5 100644 --- a/go.sum +++ b/go.sum @@ -2138,16 +2138,16 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d h1:5cGG1t9rwbAwXeTq9epU7hm6cBsC2V8DM2jVCIN6JSo= github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= -github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY= -github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng= +github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce h1:RWla1PaRrlDf/MOwVoN/dJhIM/dXa9O4rmKZkv9T5bg= +github.com/streamingfast/cli v0.0.4-0.20250116003948-fbf66c930cce/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84/go.mod h1:cwfI5vaMd+CiwZIL0H0JdP5UDWCZOVFz/ex3L0+o/j4= github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c h1:6WjE2yInE+5jnI7cmCcxOiGZiEs2FQm9Zsg2a9Ivp0Q= github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c/go.mod h1:dbfiy9ORrL8c6ldSq+L0H9pg8TOqqu/FsghsgUEWK54= github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLesosMjfwWsEL2i/40mFSkzenEb3M0qTyM= github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y= -github.com/streamingfast/dgrpc v0.0.0-20250109212433-ae21a7f7a01a h1:yrxCZ7Py0FdMu80cWPv/EpDvBLyumPlfhehD7iJ5VJM= -github.com/streamingfast/dgrpc v0.0.0-20250109212433-ae21a7f7a01a/go.mod h1:bxRfCxRKQ0ZH2BGi6UcYdlH0nkj8yERm3kpP1jPLQLY= +github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef h1:He9qXjmnDtxVrJcHAOfFiWFA6An48zTezpU5iMnNHuY= +github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef/go.mod h1:bxRfCxRKQ0ZH2BGi6UcYdlH0nkj8yERm3kpP1jPLQLY= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4= github.com/streamingfast/dmetering v0.0.0-20241101155221-489f5a9d9139 h1:a22XzjeY7n9Xv+0yJMV2pzuPptALtOu6jdg69pOwuO4= @@ -2181,8 +2181,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.11.4-0.20250113142113-36c2750be692 h1:YrIj24iHkkdhzWHVNqdjdL76BqEOs9PxMjW8ejbEGnk= -github.com/streamingfast/substreams v1.11.4-0.20250113142113-36c2750be692/go.mod h1:gl4g6eqMV3tAvir2J+3tY/JfXwm3TThHe7VL53glywE= +github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e h1:9pk6d5QKvVLMl5TXSXKb8b0VMmEVh6e3kca200yIuk8= +github.com/streamingfast/substreams v1.11.4-0.20250116174758-7b0afb88692e/go.mod h1:Dgbt37alWqMyahFQ4rdhX8iFLZHn2qD8TBhcP3NIuW8= github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed h1:LU6/c376zP1cMAo9L6rFLyjo0W7RU+hIh7BegH8Zo5M= github.com/streamingfast/wazero v0.0.0-20241202185309-91287c3640ed/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/test/integration_test.go b/test/integration_test.go index 62f1fff..621b2d9 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -75,7 +75,7 @@ func TestIntegration(t *testing.T) { meteringServer.Run() }() - clientConfig := client.NewSubstreamsClientConfig("localhost:9003", "", 0, false, true) + clientConfig := client.NewSubstreamsClientConfig("localhost:9003", "", 0, false, true, "substreams-test") substreamsClient, _, _, _, err := client.NewSubstreamsClient(clientConfig) require.NoError(t, err) diff --git a/utils.go b/utils.go index 458e66e..a481dd6 100644 --- a/utils.go +++ b/utils.go @@ -50,7 +50,9 @@ func MakeDirs(directories []string) error { } // MustReplaceDataDir replaces `{data-dir}` from within the `in` received argument by the -// `dataDir` argument +// `dataDir` argument. +// +// MustReplaceDataDir("/tmp/data", "{data-dir}/subdir") == "/tmp/data/subdir" func MustReplaceDataDir(dataDir, in string) string { d, err := filepath.Abs(dataDir) if err != nil {