diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt index 1677130c2..c5a3b03f0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt @@ -1,59 +1,14 @@ package io.emeraldpay.dshackle.upstream.ethereum -import io.emeraldpay.dshackle.ThrottledLogger -import io.emeraldpay.dshackle.upstream.Head -import org.slf4j.LoggerFactory import reactor.core.publisher.Flux -import reactor.core.scheduler.Scheduler -import java.time.Duration -class HeadLivenessValidator( - private val head: Head, - private val expectedBlockTime: Duration, - private val scheduler: Scheduler, - private val upstreamId: String, -) { - companion object { - const val CHECKED_BLOCKS_UNTIL_LIVE = 3 - private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java) - } +interface HeadLivenessValidator { + fun getFlux(): Flux +} - fun getFlux(): Flux { - val headLiveness = head.headLiveness() - // first we have moving window of 2 blocks and check that they are consecutive ones - val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map { - it.last() - it.first() == 1L - }.scan(Pair(0, true)) { acc, value -> - // then we accumulate consecutive true events, false resets counter - if (value) { - Pair(acc.first + 1, true) - } else { - if (log.isDebugEnabled) { - log.debug("non consecutive blocks in head for $upstreamId") - } else { - ThrottledLogger.log(log, "non consecutive blocks in head for $upstreamId") - } - Pair(0, false) - } - }.flatMap { (count, value) -> - // we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks - // CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true - when { - count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true) - !value -> Flux.just(false) - else -> Flux.empty() - } - }.timeout( - expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2), - Flux.just(false).doOnNext { - if (log.isDebugEnabled) { - log.debug("head liveness check broken with timeout in $upstreamId") - } else { - ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId") - } - }, - ).repeat().subscribeOn(scheduler) +class NoHeadLivenessValidator : HeadLivenessValidator { - return Flux.merge(headFlux, headLiveness) + override fun getFlux(): Flux { + return Flux.just(false) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorImpl.kt new file mode 100644 index 000000000..17dc28c87 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorImpl.kt @@ -0,0 +1,59 @@ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.ThrottledLogger +import io.emeraldpay.dshackle.upstream.Head +import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux +import reactor.core.scheduler.Scheduler +import java.time.Duration + +class HeadLivenessValidatorImpl( + private val head: Head, + private val expectedBlockTime: Duration, + private val scheduler: Scheduler, + private val upstreamId: String, +) : HeadLivenessValidator { + companion object { + const val CHECKED_BLOCKS_UNTIL_LIVE = 3 + private val log = LoggerFactory.getLogger(HeadLivenessValidatorImpl::class.java) + } + + override fun getFlux(): Flux { + val headLiveness = head.headLiveness() + // first we have moving window of 2 blocks and check that they are consecutive ones + val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map { + it.last() - it.first() == 1L + }.scan(Pair(0, true)) { acc, value -> + // then we accumulate consecutive true events, false resets counter + if (value) { + Pair(acc.first + 1, true) + } else { + if (log.isDebugEnabled) { + log.debug("non consecutive blocks in head for $upstreamId") + } else { + ThrottledLogger.log(log, "non consecutive blocks in head for $upstreamId") + } + Pair(0, false) + } + }.flatMap { (count, value) -> + // we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks + // CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true + when { + count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true) + !value -> Flux.just(false) + else -> Flux.empty() + } + }.timeout( + expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2), + Flux.just(false).doOnNext { + if (log.isDebugEnabled) { + log.debug("head liveness check broken with timeout in $upstreamId") + } else { + ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId") + } + }, + ).repeat().subscribeOn(scheduler) + + return Flux.merge(headFlux, headLiveness) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 93ffaecb7..dfa42cdb8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -13,6 +13,8 @@ import io.emeraldpay.dshackle.upstream.MergedHead import io.emeraldpay.dshackle.upstream.NoIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidatorImpl +import io.emeraldpay.dshackle.upstream.ethereum.NoHeadLivenessValidator import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions @@ -123,7 +125,11 @@ class GenericRpcConnector( ) } } - liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, id) + + liveness = when (connectorType) { + RPC_ONLY -> NoHeadLivenessValidator() + RPC_REQUESTS_WITH_MIXED_HEAD, RPC_REQUESTS_WITH_WS_HEAD, WS_ONLY -> HeadLivenessValidatorImpl(head, expectedBlockTime, headLivenessScheduler, id) + } } override fun setCaches(caches: Caches) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index e7d250528..b86ffacc7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -6,7 +6,7 @@ import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead -import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidatorImpl import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptionsImpl @@ -32,7 +32,7 @@ class GenericWsConnector( private val reader: ChainReader private val head: GenericWsHead private val subscriptions: IngressSubscription - private val liveness: HeadLivenessValidator + private val liveness: HeadLivenessValidatorImpl init { pool = wsFactory.create(upstream) reader = JsonRpcWsClient(pool) @@ -48,7 +48,7 @@ class GenericWsConnector( chainSpecific, expectedBlockTime, ) - liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, upstream.getId()) + liveness = HeadLivenessValidatorImpl(head, expectedBlockTime, headLivenessScheduler, upstream.getId()) subscriptions = chainSpecific.makeIngressSubscription(wsSubscriptions) } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy index 3961c8d57..acd2efcf7 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy @@ -14,7 +14,7 @@ class HeadLivenessValidatorSpec extends Specification{ def "emits true"() { when: def head = new EthereumHeadMock() - def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test") + def checker = new HeadLivenessValidatorImpl(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .then { @@ -30,7 +30,7 @@ class HeadLivenessValidatorSpec extends Specification{ 1 * it.headLiveness() >> Flux.just(false) 1 * it.getFlux() >> Flux.just(TestingCommons.blockForEthereum(1)) } - def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test") + def checker = new HeadLivenessValidatorImpl(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .expectNext(false) @@ -41,7 +41,7 @@ class HeadLivenessValidatorSpec extends Specification{ def "starts accumulating trues but immediately emits after false"() { when: def head = new EthereumHeadMock() - def checker = new HeadLivenessValidator(head, Duration.ofSeconds(100), Schedulers.boundedElastic(), "test") + def checker = new HeadLivenessValidatorImpl(head, Duration.ofSeconds(100), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .then { @@ -59,7 +59,7 @@ class HeadLivenessValidatorSpec extends Specification{ def "starts accumulating trues but timeouts because head staled"() { when: def head = new EthereumHeadMock() - def checker = new HeadLivenessValidator(head, Duration.ofMillis(100), Schedulers.boundedElastic(), "test") + def checker = new HeadLivenessValidatorImpl(head, Duration.ofMillis(100), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .then { @@ -74,7 +74,7 @@ class HeadLivenessValidatorSpec extends Specification{ def "it recovers after timeout"() { when: def head = new EthereumHeadMock() - def checker = new HeadLivenessValidator(head, Duration.ofMillis(200), Schedulers.boundedElastic(), "test") + def checker = new HeadLivenessValidatorImpl(head, Duration.ofMillis(200), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .then {