diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt index 5657097e3..f5676c94c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt @@ -18,7 +18,6 @@ package io.emeraldpay.dshackle.rpc import com.google.protobuf.ByteString import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.api.proto.BlockchainOuterClass.NativeSubscribeReplyItem -import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.SilentException @@ -57,8 +56,11 @@ open class NativeSubscribe( fun start(request: BlockchainOuterClass.NativeSubscribeRequest): Publisher { val chain = Chain.byId(request.chainValue) - if (chain.type != BlockchainType.ETHEREUM) { - return Mono.error(UnsupportedOperationException("Native subscribe is not supported for ${chain.chainCode}")) + + val multistream = getUpstream(chain) + + if (!multistream.getSubscriptionTopics().contains(request.method)) { + return Mono.error(UnsupportedOperationException("subscribe ${request.method} is not supported for ${chain.chainCode}")) } val nonce = request.nonce.takeIf { it != 0L } @@ -69,7 +71,7 @@ open class NativeSubscribe( * If not possible - performs subscription logic on the current instance * @see EthereumLikeMultistream.tryProxySubscribe */ - val publisher = getUpstream(chain).tryProxySubscribe(matcher, request) ?: run { + val publisher = multistream.tryProxySubscribe(matcher, request) ?: run { val method = request.method val params: Any? = request.payload?.takeIf { !it.isEmpty }?.let { objectMapper.readValue(it.newInput(), Map::class.java) @@ -110,7 +112,13 @@ open class NativeSubscribe( if (holder.response is NativeSubscribeReplyItem) { return holder.response } - val result = objectMapper.writeValueAsBytes(holder.response) + + val result = if (holder.response is ByteArray) { + holder.response + } else { + objectMapper.writeValueAsBytes(holder.response) + } + val builder = NativeSubscribeReplyItem.newBuilder() .setPayload(ByteString.copyFrom(result)) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HasEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HasEgressSubscription.kt deleted file mode 100644 index 4fd0f34d3..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HasEgressSubscription.kt +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright (c) 2022 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream - -interface HasEgressSubscription { - fun getEgressSubscription(): EgressSubscription -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 431498f59..41ce2c572 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -52,7 +52,7 @@ import kotlin.concurrent.withLock abstract class Multistream( val chain: Chain, val caches: Caches, -) : Upstream, Lifecycle, HasEgressSubscription { +) : Upstream, Lifecycle { abstract fun getUpstreams(): MutableList abstract fun addUpstreamInternal(u: Upstream) @@ -498,4 +498,6 @@ abstract class Multistream( return map.values.min() } } + + abstract fun getEgressSubscription(): EgressSubscription } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt index 274843dd8..467206b63 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt @@ -27,6 +27,17 @@ import io.emeraldpay.etherjar.rpc.RpcException */ class DefaultPolkadotMethods : CallMethods { + companion object { + val subs = setOf( + Pair("subscribe_newHead", "unsubscribe_newHead"), + Pair("chain_subscribeAllHeads", "chain_unsubscribeAllHeads"), + Pair("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads"), + Pair("chain_subscribeNewHeads", "chain_unsubscribeNewHeads"), + Pair("chain_subscribeRuntimeVersion", "chain_unsubscribeNewHeads"), + Pair("chain_subscribeRuntimeVersion", "chain_unsubscribeRuntimeVersion"), + ) + } + private val all = setOf( "author_pendingExtrinsics", "author_removeExtrinsic", @@ -37,16 +48,6 @@ class DefaultPolkadotMethods : CallMethods { "chain_getHead", "chain_getHeader", "chain_getRuntimeVersion", - "chain_subscribeAllHeads", - "chain_subscribeFinalizedHeads", - "chain_subscribeNewHeads", - "chain_subscribeRuntimeVersion", - "chain_unsubscribeAllHeads", - "chain_unsubscribeFinalisedHeads", - "chain_unsubscribeFinalizedHeads", - "chain_unsubscribeNewHead", - "chain_unsubscribeNewHeads", - "chain_unsubscribeRuntimeVersion", "childstate_getKeys", "childstate_getKeysPaged", "childstate_getKeysPagedAt", @@ -87,9 +88,7 @@ class DefaultPolkadotMethods : CallMethods { "state_queryStorageAt", "state_traceBlock", "state_trieMigrationStatus", - "subscribe_newHead", "system_chain", - "unsubscribe_newHead", ) private val add = setOf( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 008329b0d..3428023d7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -9,6 +9,7 @@ import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream @@ -16,6 +17,7 @@ import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.ethereum.subscribe.AggregatedPendingTxes import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector +import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumWsIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.subscribe.NoPendingTxes import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder @@ -91,4 +93,8 @@ object EthereumChainSpecific : ChainSpecific { } return upstream.getIngressSubscription().getAvailableTopics().plus(subs).toSet().toList() } + + override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { + return EthereumWsIngressSubscription(ws) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/AggregatedPendingTxes.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/AggregatedPendingTxes.kt index 6d271af8d..386a77fe8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/AggregatedPendingTxes.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/AggregatedPendingTxes.kt @@ -18,7 +18,6 @@ package io.emeraldpay.dshackle.upstream.ethereum.subscribe import com.github.benmanes.caffeine.cache.Caffeine import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.etherjar.domain.TransactionId -import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import java.time.Duration @@ -26,10 +25,6 @@ class AggregatedPendingTxes( private val sources: List, ) : PendingTxesSource { - companion object { - private val log = LoggerFactory.getLogger(AggregatedPendingTxes::class.java) - } - private val track = Caffeine.newBuilder() .expireAfterWrite(Duration.ofSeconds(30)) .maximumSize(10_000) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index dccab9666..d8fac2f0b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -14,12 +14,14 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.polkadot.PolkadotChainSpecific import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific @@ -52,6 +54,8 @@ interface ChainSpecific { fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? fun subscriptionTopics(upstream: GenericUpstream): List + + fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription } object ChainSpecificRegistry { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index ea885a943..d0f268dcd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -5,13 +5,11 @@ import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription -import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptionsImpl -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumWsIngressSubscription import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient @@ -32,7 +30,7 @@ class GenericWsConnector( private val pool: WsConnectionPool private val reader: JsonRpcReader private val head: GenericWsHead - private val subscriptions: EthereumIngressSubscription + private val subscriptions: IngressSubscription private val liveness: HeadLivenessValidator init { pool = wsFactory.create(upstream) @@ -49,7 +47,7 @@ class GenericWsConnector( chainSpecific, ) liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId()) - subscriptions = EthereumWsIngressSubscription(wsSubscriptions) + subscriptions = chainSpecific.makeIngressSubscription(wsSubscriptions) } override fun hasLiveSubscriptionHead(): Flux { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/subscribe/GenericPersistentConnect.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/subscribe/GenericPersistentConnect.kt new file mode 100644 index 000000000..8e19319d5 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/subscribe/GenericPersistentConnect.kt @@ -0,0 +1,41 @@ +/** + * Copyright (c) 2022 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.generic.subscribe + +import io.emeraldpay.dshackle.commons.DurableFlux +import io.emeraldpay.dshackle.commons.SharedFluxHolder +import io.emeraldpay.dshackle.upstream.Selector +import io.emeraldpay.dshackle.upstream.SubscriptionConnect +import reactor.core.publisher.Flux +import java.time.Duration + +abstract class GenericPersistentConnect : SubscriptionConnect { + + private val connectionSource = DurableFlux + .newBuilder() + .using(::createConnection) + .backoffOnError(Duration.ofMillis(100), 1.5, Duration.ofSeconds(60)) + .build() + private val holder = SharedFluxHolder( + connectionSource::connect, + ) + + override fun connect(matcher: Selector.Matcher): Flux { + return holder.get() + } + + abstract fun createConnection(): Flux +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index 714d19d34..89eb81a0a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -11,14 +11,15 @@ import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.EgressSubscription -import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.NoopCachingReader import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericUpstream @@ -73,7 +74,7 @@ object PolkadotChainSpecific : ChainSpecific { } override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { - return { _ -> EmptyEgressSubscription } + return { ms -> PolkadotEgressSubscription(ms, headScheduler) } } override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder { @@ -96,6 +97,10 @@ object PolkadotChainSpecific : ChainSpecific { override fun subscriptionTopics(upstream: GenericUpstream): List { return emptyList() } + + override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { + return PolkadotIngressSubscription(ws) + } } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotEgressSubscription.kt new file mode 100644 index 000000000..c4e8f3df1 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotEgressSubscription.kt @@ -0,0 +1,23 @@ +package io.emeraldpay.dshackle.upstream.polkadot + +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.Selector.Matcher +import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods +import io.emeraldpay.dshackle.upstream.generic.GenericUpstream +import reactor.core.publisher.Flux +import reactor.core.scheduler.Scheduler + +class PolkadotEgressSubscription( + val upstream: Multistream, + val scheduler: Scheduler, +) : EgressSubscription { + override fun getAvailableTopics(): List { + return DefaultPolkadotMethods.subs.map { it.first } + } + + override fun subscribe(topic: String, params: Any?, matcher: Matcher): Flux { + val up = upstream.getUpstreams().shuffled().first { matcher.matches(it) } as GenericUpstream + return up.getIngressSubscription().get(topic)?.connect(matcher) ?: Flux.empty() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotIngressSubscription.kt new file mode 100644 index 000000000..bcccde21c --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotIngressSubscription.kt @@ -0,0 +1,35 @@ +package io.emeraldpay.dshackle.upstream.polkadot + +import io.emeraldpay.dshackle.upstream.IngressSubscription +import io.emeraldpay.dshackle.upstream.SubscriptionConnect +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions +import io.emeraldpay.dshackle.upstream.generic.subscribe.GenericPersistentConnect +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.time.Duration + +class PolkadotIngressSubscription(val conn: WsSubscriptions) : IngressSubscription { + override fun getAvailableTopics(): List { + return emptyList() // not used now + } + + @Suppress("UNCHECKED_CAST") + override fun get(topic: String): SubscriptionConnect { + return PolkaConnect(conn, topic) as SubscriptionConnect + } +} + +class PolkaConnect( + val conn: WsSubscriptions, + val topic: String, +) : GenericPersistentConnect() { + + @Suppress("UNCHECKED_CAST") + override fun createConnection(): Flux { + return conn.subscribe(JsonRpcRequest(topic, listOf())) + .data + .timeout(Duration.ofSeconds(60), Mono.empty()) + .onErrorResume { Mono.empty() } as Flux + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index fb1178bb1..bc42903a0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -13,12 +13,15 @@ import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.NoIngressSubscription import io.emeraldpay.dshackle.upstream.NoopCachingReader import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericUpstream @@ -91,6 +94,10 @@ object StarknetChainSpecific : ChainSpecific { override fun subscriptionTopics(upstream: GenericUpstream): List { return emptyList() } + + override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { + return NoIngressSubscription() + } } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy index 2a0887b76..aa5059a90 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy @@ -45,6 +45,7 @@ class NativeSubscribeSpec extends Specification { def up = Mock(GenericMultistream) { 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> null 1 * it.getEgressSubscription() >> subscribe + 1 * it.getSubscriptionTopics() >> ["newHeads"] } def nativeSubscribe = new NativeSubscribe(new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, up), signer) @@ -84,6 +85,7 @@ class NativeSubscribeSpec extends Specification { def up = Mock(GenericMultistream) { 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> null 1 * it.getEgressSubscription() >> subscribe + 1 * it.getSubscriptionTopics() >> ["logs"] } def nativeSubscribe = new NativeSubscribe(new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, up), signer) @@ -107,6 +109,7 @@ class NativeSubscribeSpec extends Specification { def up = Mock(GenericMultistream) { 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> Flux.just("{}") 0 * it.getEgressSubscription() + 1 * it.getSubscriptionTopics() >> ["newHeads"] } def nativeSubscribe = new NativeSubscribe(new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, up), signer)