Skip to content

Commit

Permalink
Turn off subs if conn type is rpc (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Apr 18, 2024
1 parent c377083 commit d76486b
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -1,59 +1,14 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.ThrottledLogger
import io.emeraldpay.dshackle.upstream.Head
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.scheduler.Scheduler
import java.time.Duration

class HeadLivenessValidator(
private val head: Head,
private val expectedBlockTime: Duration,
private val scheduler: Scheduler,
private val upstreamId: String,
) {
companion object {
const val CHECKED_BLOCKS_UNTIL_LIVE = 3
private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java)
}
interface HeadLivenessValidator {
fun getFlux(): Flux<Boolean>
}

fun getFlux(): Flux<Boolean> {
val headLiveness = head.headLiveness()
// first we have moving window of 2 blocks and check that they are consecutive ones
val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map {
it.last() - it.first() == 1L
}.scan(Pair(0, true)) { acc, value ->
// then we accumulate consecutive true events, false resets counter
if (value) {
Pair(acc.first + 1, true)
} else {
if (log.isDebugEnabled) {
log.debug("non consecutive blocks in head for $upstreamId")
} else {
ThrottledLogger.log(log, "non consecutive blocks in head for $upstreamId")
}
Pair(0, false)
}
}.flatMap { (count, value) ->
// we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks
// CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true
when {
count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true)
!value -> Flux.just(false)
else -> Flux.empty()
}
}.timeout(
expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2),
Flux.just(false).doOnNext {
if (log.isDebugEnabled) {
log.debug("head liveness check broken with timeout in $upstreamId")
} else {
ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId")
}
},
).repeat().subscribeOn(scheduler)
class NoHeadLivenessValidator : HeadLivenessValidator {

return Flux.merge(headFlux, headLiveness)
override fun getFlux(): Flux<Boolean> {
return Flux.just(false)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.ThrottledLogger
import io.emeraldpay.dshackle.upstream.Head
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.scheduler.Scheduler
import java.time.Duration

class HeadLivenessValidatorImpl(
private val head: Head,
private val expectedBlockTime: Duration,
private val scheduler: Scheduler,
private val upstreamId: String,
) : HeadLivenessValidator {
companion object {
const val CHECKED_BLOCKS_UNTIL_LIVE = 3
private val log = LoggerFactory.getLogger(HeadLivenessValidatorImpl::class.java)
}

override fun getFlux(): Flux<Boolean> {
val headLiveness = head.headLiveness()
// first we have moving window of 2 blocks and check that they are consecutive ones
val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map {
it.last() - it.first() == 1L
}.scan(Pair(0, true)) { acc, value ->
// then we accumulate consecutive true events, false resets counter
if (value) {
Pair(acc.first + 1, true)
} else {
if (log.isDebugEnabled) {
log.debug("non consecutive blocks in head for $upstreamId")
} else {
ThrottledLogger.log(log, "non consecutive blocks in head for $upstreamId")
}
Pair(0, false)
}
}.flatMap { (count, value) ->
// we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks
// CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true
when {
count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true)
!value -> Flux.just(false)
else -> Flux.empty()
}
}.timeout(
expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2),
Flux.just(false).doOnNext {
if (log.isDebugEnabled) {
log.debug("head liveness check broken with timeout in $upstreamId")
} else {
ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId")
}
},
).repeat().subscribeOn(scheduler)

return Flux.merge(headFlux, headLiveness)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import io.emeraldpay.dshackle.upstream.MergedHead
import io.emeraldpay.dshackle.upstream.NoIngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidatorImpl
import io.emeraldpay.dshackle.upstream.ethereum.NoHeadLivenessValidator
import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool
import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
Expand Down Expand Up @@ -123,7 +125,11 @@ class GenericRpcConnector(
)
}
}
liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, id)

liveness = when (connectorType) {
RPC_ONLY -> NoHeadLivenessValidator()
RPC_REQUESTS_WITH_MIXED_HEAD, RPC_REQUESTS_WITH_WS_HEAD, WS_ONLY -> HeadLivenessValidatorImpl(head, expectedBlockTime, headLivenessScheduler, id)
}
}

override fun setCaches(caches: Caches) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidatorImpl
import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool
import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptionsImpl
Expand All @@ -32,7 +32,7 @@ class GenericWsConnector(
private val reader: ChainReader
private val head: GenericWsHead
private val subscriptions: IngressSubscription
private val liveness: HeadLivenessValidator
private val liveness: HeadLivenessValidatorImpl
init {
pool = wsFactory.create(upstream)
reader = JsonRpcWsClient(pool)
Expand All @@ -48,7 +48,7 @@ class GenericWsConnector(
chainSpecific,
expectedBlockTime,
)
liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, upstream.getId())
liveness = HeadLivenessValidatorImpl(head, expectedBlockTime, headLivenessScheduler, upstream.getId())
subscriptions = chainSpecific.makeIngressSubscription(wsSubscriptions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class HeadLivenessValidatorSpec extends Specification{
def "emits true"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test")
def checker = new HeadLivenessValidatorImpl(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
Expand All @@ -30,7 +30,7 @@ class HeadLivenessValidatorSpec extends Specification{
1 * it.headLiveness() >> Flux.just(false)
1 * it.getFlux() >> Flux.just(TestingCommons.blockForEthereum(1))
}
def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test")
def checker = new HeadLivenessValidatorImpl(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.expectNext(false)
Expand All @@ -41,7 +41,7 @@ class HeadLivenessValidatorSpec extends Specification{
def "starts accumulating trues but immediately emits after false"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofSeconds(100), Schedulers.boundedElastic(), "test")
def checker = new HeadLivenessValidatorImpl(head, Duration.ofSeconds(100), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
Expand All @@ -59,7 +59,7 @@ class HeadLivenessValidatorSpec extends Specification{
def "starts accumulating trues but timeouts because head staled"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofMillis(100), Schedulers.boundedElastic(), "test")
def checker = new HeadLivenessValidatorImpl(head, Duration.ofMillis(100), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
Expand All @@ -74,7 +74,7 @@ class HeadLivenessValidatorSpec extends Specification{
def "it recovers after timeout"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofMillis(200), Schedulers.boundedElastic(), "test")
def checker = new HeadLivenessValidatorImpl(head, Duration.ofMillis(200), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
Expand Down

0 comments on commit d76486b

Please sign in to comment.