diff --git a/emerald-grpc b/emerald-grpc index 088e3016e..a81672777 160000 --- a/emerald-grpc +++ b/emerald-grpc @@ -1 +1 @@ -Subproject commit 088e3016e356a201794782a2dee9b2f87cf77d00 +Subproject commit a816727774a4d28693986c580c8bbe9144647cc2 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt index 601c7b07f..3e3100137 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt @@ -52,6 +52,7 @@ class BlockchainRpc( @Autowired(required = false) private val providerSpanHandler: ProviderSpanHandler?, private val tracer: Tracer, + private val subscribeChainStatus: SubscribeChainStatus, ) : ReactorBlockchainGrpc.BlockchainImplBase() { private val log = LoggerFactory.getLogger(BlockchainRpc::class.java) @@ -164,6 +165,12 @@ class BlockchainRpc( } } + override fun subscribeChainStatus( + request: Mono, + ): Flux { + return subscribeChainStatus.chainStatuses() + } + class RequestMetrics(val chain: Chain) { val nativeCallMetric = Counter.builder("request.grpc.request") .tag("type", "nativeCall") diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/ChainEventMapper.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/ChainEventMapper.kt new file mode 100644 index 000000000..7628db642 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/ChainEventMapper.kt @@ -0,0 +1,151 @@ +package io.emeraldpay.dshackle.rpc + +import com.google.protobuf.ByteString +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.api.proto.BlockchainOuterClass.SupportedMethodsEvent +import io.emeraldpay.api.proto.Common +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.finalization.FinalizationData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import org.springframework.stereotype.Component + +@Component +class ChainEventMapper { + + fun mapHead(head: BlockContainer): BlockchainOuterClass.ChainEvent { + return BlockchainOuterClass.ChainEvent.newBuilder() + .setHead( + BlockchainOuterClass.HeadEvent.newBuilder() + .setHeight(head.height) + .setSlot(head.slot) + .setTimestamp(head.timestamp.toEpochMilli()) + .setWeight(ByteString.copyFrom(head.difficulty.toByteArray())) + .setBlockId(head.hash.toHex()) + .setParentBlockId(head.parentHash?.toHex() ?: "") + .build(), + ) + .build() + } + + fun mapCapabilities(capabilities: Collection): BlockchainOuterClass.ChainEvent { + val caps = capabilities.map { + when (it) { + Capability.RPC -> BlockchainOuterClass.Capabilities.CAP_CALLS + Capability.BALANCE -> BlockchainOuterClass.Capabilities.CAP_BALANCE + Capability.WS_HEAD -> BlockchainOuterClass.Capabilities.CAP_WS_HEAD + } + } + + return BlockchainOuterClass.ChainEvent.newBuilder() + .setCapabilitiesEvent( + BlockchainOuterClass.CapabilitiesEvent.newBuilder() + .addAllCapabilities(caps) + .build(), + ) + .build() + } + + fun mapNodeDetails(nodeDetails: Collection): BlockchainOuterClass.ChainEvent { + val details = nodeDetails.map { + BlockchainOuterClass.NodeDetails.newBuilder() + .setQuorum(it.quorum) + .addAllLabels( + it.labels.entries.map { label -> + BlockchainOuterClass.Label.newBuilder() + .setName(label.key) + .setValue(label.value) + .build() + }, + ).build() + } + + return BlockchainOuterClass.ChainEvent.newBuilder() + .setNodesEvent( + BlockchainOuterClass.NodeDetailsEvent.newBuilder() + .addAllNodes(details) + .build(), + ) + .build() + } + + fun mapFinalizationData(finalizationData: Collection): BlockchainOuterClass.ChainEvent { + val data = finalizationData.map { + Common.FinalizationData.newBuilder() + .setHeight(it.height) + .setType(it.type.toProtoFinalizationType()) + .build() + } + + return BlockchainOuterClass.ChainEvent.newBuilder() + .setFinalizationDataEvent( + BlockchainOuterClass.FinalizationDataEvent.newBuilder() + .addAllFinalizationData(data) + .build(), + ) + .build() + } + + fun mapLowerBounds(lowerBounds: Collection): BlockchainOuterClass.ChainEvent { + val data = lowerBounds + .map { + BlockchainOuterClass.LowerBound.newBuilder() + .setLowerBoundTimestamp(it.timestamp) + .setLowerBoundType(mapLowerBoundType(it.type)) + .setLowerBoundValue(it.lowerBound) + .build() + } + + return BlockchainOuterClass.ChainEvent.newBuilder() + .setLowerBoundsEvent( + BlockchainOuterClass.LowerBoundEvent.newBuilder() + .addAllLowerBounds(data) + .build(), + ) + .build() + } + + fun chainStatus(status: UpstreamAvailability): BlockchainOuterClass.ChainEvent { + return BlockchainOuterClass.ChainEvent.newBuilder() + .setStatus( + BlockchainOuterClass.ChainStatus.newBuilder() + .setAvailability(Common.AvailabilityEnum.forNumber(status.grpcId)) + .build(), + ) + .build() + } + + fun supportedMethods(methods: Collection): BlockchainOuterClass.ChainEvent { + return BlockchainOuterClass.ChainEvent.newBuilder() + .setSupportedMethodsEvent( + SupportedMethodsEvent.newBuilder() + .addAllMethods(methods) + .build(), + ) + .build() + } + + fun supportedSubs(subs: Collection): BlockchainOuterClass.ChainEvent { + return BlockchainOuterClass.ChainEvent.newBuilder() + .setSupportedSubscriptionsEvent( + BlockchainOuterClass.SupportedSubscriptionsEvent.newBuilder() + .addAllSubs(subs) + .build(), + ) + .build() + } + + private fun mapLowerBoundType(type: LowerBoundType): BlockchainOuterClass.LowerBoundType { + return when (type) { + LowerBoundType.SLOT -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_SLOT + LowerBoundType.UNKNOWN -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_UNSPECIFIED + LowerBoundType.STATE -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_STATE + LowerBoundType.BLOCK -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK + LowerBoundType.TX -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TX + LowerBoundType.LOGS -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_LOGS + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index 54e1471a6..bcd773f2a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -46,6 +46,7 @@ import io.emeraldpay.dshackle.upstream.MultistreamHolder import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods +import io.emeraldpay.dshackle.upstream.calls.DisabledCallMethods import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError import io.emeraldpay.dshackle.upstream.finalization.FinalizationData @@ -110,7 +111,7 @@ open class NativeCall( if (it is ValidCallContext<*>) { if (it.payload is ParsedCallDetails) { log.error("nativeCallResult method ${it.payload.method} of ${it.upstream.getId()} is not available, disabling") - val cm = (it.upstream.getMethods() as Multistream.DisabledCallMethods) + val cm = (it.upstream.getMethods() as DisabledCallMethods) cm.disableMethodTemporarily(it.payload.method) it.upstream.updateMethods(cm) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt new file mode 100644 index 000000000..e8c468f2b --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt @@ -0,0 +1,128 @@ +package io.emeraldpay.dshackle.rpc + +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.api.proto.Common +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.MultistreamHolder +import io.emeraldpay.dshackle.upstream.state.MultistreamStateEvent +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.kotlin.core.publisher.switchIfEmpty + +@Service +class SubscribeChainStatus( + private val multistreamHolder: MultistreamHolder, + private val chainEventMapper: ChainEventMapper, +) { + + companion object { + private val log = LoggerFactory.getLogger(SubscribeChainStatus::class.java) + } + + fun chainStatuses(): Flux { + return Flux.merge( + // we need to track not only multistreams with upstreams but all of them + // because upstreams can be added in runtime with hot config reload + multistreamHolder.all() + .filter { Common.ChainRef.forNumber(it.chain.id) != null } + .map { ms -> + Flux.concat( + // the first event must be filled with all fields + firstFullEvent(ms), + Flux.merge( + // head events are separated from others + headEvents(ms), + multistreamEvents(ms), + ), + ) + }, + ).doOnError { + log.error("Error during sending chain statuses", it) + } + } + + private fun multistreamEvents(ms: Multistream): Flux { + return ms.stateEvents() + .filter { it.isNotEmpty() } + .map { events -> + val response = BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() + val chainDescription = BlockchainOuterClass.ChainDescription.newBuilder() + .setChain(Common.ChainRef.forNumber(ms.chain.id)) + + events.forEach { + chainDescription.addChainEvent(processMsEvent(it)) + } + + response.setChainDescription(chainDescription.build()) + response.build() + } + } + + private fun processMsEvent(event: MultistreamStateEvent): BlockchainOuterClass.ChainEvent { + return when (event) { + is MultistreamStateEvent.CapabilitiesEvent -> chainEventMapper.mapCapabilities(event.caps) + is MultistreamStateEvent.FinalizationEvent -> chainEventMapper.mapFinalizationData(event.finalizationData) + is MultistreamStateEvent.LowerBoundsEvent -> chainEventMapper.mapLowerBounds(event.lowerBounds) + is MultistreamStateEvent.MethodsEvent -> chainEventMapper.supportedMethods(event.methods) + is MultistreamStateEvent.NodeDetailsEvent -> chainEventMapper.mapNodeDetails(event.details) + is MultistreamStateEvent.StatusEvent -> chainEventMapper.chainStatus(event.status) + is MultistreamStateEvent.SubsEvent -> chainEventMapper.supportedSubs(event.subs) + } + } + + private fun firstFullEvent(ms: Multistream): Mono { + return Mono.justOrEmpty(ms.getHead().getCurrent()) + .map { toFullResponse(it!!, ms) } + .switchIfEmpty { + // in case if there is still no head we mush wait until we get it + ms.getHead() + .getFlux() + .next() + .map { toFullResponse(it!!, ms) } + } + } + + private fun headEvents(ms: Multistream): Flux { + return ms.getHead() + .getFlux() + .skip(1) + .map { + BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() + .setChainDescription( + BlockchainOuterClass.ChainDescription.newBuilder() + .setChain(Common.ChainRef.forNumber(ms.chain.id)) + .addChainEvent(chainEventMapper.mapHead(it)) + .build(), + ) + .build() + } + } + + private fun toFullResponse(head: BlockContainer, ms: Multistream): BlockchainOuterClass.SubscribeChainStatusResponse { + return BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() + .setChainDescription( + BlockchainOuterClass.ChainDescription.newBuilder() + .setChain(Common.ChainRef.forNumber(ms.chain.id)) + .addChainEvent(chainEventMapper.chainStatus(ms.getStatus())) + .addChainEvent(chainEventMapper.mapHead(head)) + .addChainEvent(chainEventMapper.supportedMethods(ms.getMethods().getSupportedMethods())) + .addChainEvent(chainEventMapper.supportedSubs(ms.getEgressSubscription().getAvailableTopics())) + .addChainEvent(chainEventMapper.mapCapabilities(ms.getCapabilities())) + .addChainEvent(chainEventMapper.mapLowerBounds(ms.getLowerBounds())) + .addChainEvent(chainEventMapper.mapFinalizationData(ms.getFinalizations())) + .addChainEvent(chainEventMapper.mapNodeDetails(ms.getQuorumLabels())) + .build(), + ) + .setBuildInfo( + BlockchainOuterClass.BuildInfo.newBuilder() + .setVersion(Global.version) + .build(), + ) + .setFullResponse(true) + .build() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt index 5b6dc6617..da0d6257e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt @@ -73,7 +73,7 @@ class QuorumForLabels() { /** * Details for a single element (upstream, node or aggregation) */ - class QuorumItem(val quorum: Int, val labels: UpstreamsConfig.Labels) { + data class QuorumItem(val quorum: Int, val labels: UpstreamsConfig.Labels) { companion object { fun empty(): QuorumItem { return QuorumItem(0, UpstreamsConfig.Labels()) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt index 75868db55..3804e1c77 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt @@ -143,7 +143,7 @@ abstract class AbstractHead @JvmOverloads constructor( ).onBackpressureLatest() } - fun getCurrent(): BlockContainer? { + override fun getCurrent(): BlockContainer? { return forkChoice.getHead() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 16e3e8b5f..7bbc62e12 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -20,6 +20,7 @@ import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels.Companion.fromMap import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.startup.UpstreamChangeEvent @@ -155,7 +156,7 @@ abstract class DefaultUpstream( override fun nodeId(): Byte = hash open fun getQuorumByLabel(): QuorumForLabels { - return node?.let { QuorumForLabels(it) } + return node?.let { QuorumForLabels(it.copy(labels = fromMap(it.labels))) } ?: QuorumForLabels(QuorumForLabels.QuorumItem.empty()) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt index cc00e074b..fee871242 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt @@ -45,4 +45,8 @@ class EmptyHead : Head { } override fun headLiveness(): Flux = Flux.empty() + + override fun getCurrent(): BlockContainer? { + return null + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt index 8ee80e71d..4d8c5f819 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt @@ -47,4 +47,6 @@ interface Head { fun onSyncingNode(isSyncing: Boolean) fun headLiveness(): Flux + + fun getCurrent(): BlockContainer? } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index e4e6515d6..35718784b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -16,26 +16,22 @@ */ package io.emeraldpay.dshackle.upstream -import com.github.benmanes.caffeine.cache.Cache -import com.github.benmanes.caffeine.cache.Caffeine import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Defaults.Companion.multistreamUnavailableMethodDisableDuration import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.cache.CachesEnabled import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.quorum.CallQuorum import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.startup.UpstreamChangeEvent -import io.emeraldpay.dshackle.upstream.calls.AggregatedCallMethods import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.CallSelector import io.emeraldpay.dshackle.upstream.finalization.FinalizationData -import io.emeraldpay.dshackle.upstream.finalization.FinalizationType import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.state.MultistreamState +import io.emeraldpay.dshackle.upstream.state.MultistreamStateEvent import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Metrics @@ -50,7 +46,6 @@ import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler import java.time.Duration import java.util.concurrent.ConcurrentHashMap -import kotlin.math.max /** * Aggregation of multiple upstreams responding to a single blockchain @@ -68,29 +63,22 @@ abstract class Multistream( private const val metrics = "upstreams" } + private val state = MultistreamState { onUpstreamsUpdated() } + protected val log = LoggerFactory.getLogger(this::class.java) private var started = false private var cacheSubscription: Disposable? = null - @Volatile - private var callMethods: DisabledCallMethods? = null private var callMethodsFactory: Factory = Factory { - return@Factory callMethods ?: throw FunctorException("Not initialized yet") + return@Factory state.getCallMethods() ?: throw FunctorException("Not initialized yet") } private var stopSignal = Sinks.many().multicast().directBestEffort() private var seq = 0 protected var lagObserver: HeadLagObserver? = null private var subscription: Disposable? = null - @Volatile - private var capabilities: Set = emptySet() - - private val lowerBounds = ConcurrentHashMap() - - @Volatile - private var quorumLabels: List? = null private val meters: MutableMap> = HashMap() private val observedUpstreams = Sinks.many() .multicast() @@ -232,41 +220,7 @@ abstract class Multistream( protected open fun onUpstreamsUpdated() { val upstreams = getAll() - val availableUpstreams = upstreams.filter { it.isAvailable() } - availableUpstreams.map { it.getMethods() }.let { - if (callMethods == null) { - callMethods = DisabledCallMethods(this, multistreamUnavailableMethodDisableDuration, AggregatedCallMethods(it)) - } else { - callMethods = DisabledCallMethods( - this, - multistreamUnavailableMethodDisableDuration, - AggregatedCallMethods(it), - callMethods!!.disabledMethods, - ) - } - } - capabilities = if (upstreams.isEmpty()) { - emptySet() - } else { - availableUpstreams.map { up -> - up.getCapabilities() - }.let { - if (it.isNotEmpty()) { - it.reduce { acc, curr -> acc + curr } - } else { - emptySet() - } - } - } - quorumLabels = getQuorumLabels(availableUpstreams) - - availableUpstreams - .flatMap { it.getLowerBounds() } - .groupBy { it.type } - .forEach { entry -> - val min = entry.value.minBy { it.lowerBound } - lowerBounds[entry.key] = min - } + state.updateState(upstreams, getSubscriptionTopics()) when { upstreams.size == 1 -> { @@ -279,17 +233,7 @@ abstract class Multistream( } } - private fun getQuorumLabels(ups: List): List { - val nodes = QuorumForLabels() - ups.forEach { up -> - if (up is DefaultUpstream) { - nodes.add(up.getQuorumByLabel()) - } - } - return nodes.getAll() - } - - fun getQuorumLabels(): List = quorumLabels ?: emptyList() + open fun getQuorumLabels(): List = state.getQuorumLabels() ?: emptyList() override fun observeStatus(): Flux { val upstreamsFluxes = getAll().map { up -> @@ -314,12 +258,7 @@ abstract class Multistream( } override fun getStatus(): UpstreamAvailability { - val upstreams = getAll() - return if (upstreams.isEmpty()) { - UpstreamAvailability.UNAVAILABLE - } else { - upstreams.minOf { it.getStatus() } - } + return state.getStatus() } // TODO options for multistream are useless @@ -333,7 +272,7 @@ abstract class Multistream( } override fun getMethods(): CallMethods { - return callMethods ?: throw IllegalStateException("Methods are not initialized yet") + return state.getCallMethods() ?: throw IllegalStateException("Methods are not initialized yet") } override fun updateMethods(m: CallMethods) { @@ -359,11 +298,7 @@ abstract class Multistream( } override fun getFinalizations(): Collection { - return getAll().flatMap { it.getFinalizations() } - .fold(mutableMapOf()) { acc, data -> - acc[data.type] = max(acc[data.type] ?: 0, data.height) - acc - }.toList().map { FinalizationData(it.second, it.first) } + return state.getFinalizationData() } override fun addFinalization(finalization: FinalizationData, upstreamId: String) { @@ -371,11 +306,11 @@ abstract class Multistream( } override fun getLowerBounds(): Collection { - return lowerBounds.values + return state.getLowerBounds() } override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? { - return lowerBounds[lowerBoundType] + return state.getLowerBound(lowerBoundType) } override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? { @@ -447,7 +382,7 @@ abstract class Multistream( } override fun getCapabilities(): Set { - return this.capabilities + return state.getCapabilities() } override fun isGrpc(): Boolean { @@ -482,7 +417,7 @@ abstract class Multistream( val weak = getUpstreams() .filter { it.getStatus() != UpstreamAvailability.OK } .joinToString(", ") { it.getId() } - val lowerBlockData = lowerBounds.entries.joinToString(", ") { "${it.key}=${it.value.lowerBound}" } + val lowerBlockData = state.lowerBoundsToString() val instance = System.identityHashCode(this).toString(16) log.info("State of ${chain.chainCode}: height=${height ?: '?'}, status=[$statuses], lag=[$lag], lower bounds=[$lowerBlockData], weak=[$weak] ($instance)") @@ -571,6 +506,8 @@ abstract class Multistream( fun subscribeUpdatedUpstreams(): Flux = updateUpstreams.asFlux() + fun stateEvents(): Flux> = state.stateEvents() + abstract fun makeLagObserver(): HeadLagObserver open fun tryProxySubscribe( @@ -582,57 +519,6 @@ abstract class Multistream( abstract fun getHead(mather: Selector.Matcher): Head - // -------------------------------------------------------------------------------------------------------- - - class DisabledCallMethods(private val multistream: Multistream, private val defaultDisableTimeout: Long, private val callMethods: CallMethods) : CallMethods { - var disabledMethods: Cache = Caffeine.newBuilder() - .removalListener { key: String?, _: Boolean?, cause -> - if (cause.wasEvicted() && key != null) { - multistream.log.info("${multistream.getId()} restoring method $key") - multistream.onUpstreamsUpdated() - } - } - .expireAfterWrite(Duration.ofMinutes(defaultDisableTimeout)) - .build() - - constructor( - multistream: Multistream, - defaultDisableTimeout: Long, - callMethods: CallMethods, - disabledMethodsCopy: Cache, - ) : this(multistream, defaultDisableTimeout, callMethods) { - disabledMethods = disabledMethodsCopy - } - - override fun createQuorumFor(method: String): CallQuorum { - return callMethods.createQuorumFor(method) - } - - override fun isCallable(method: String): Boolean { - return callMethods.isCallable(method) && disabledMethods.getIfPresent(method) == null - } - - override fun getSupportedMethods(): Set { - return callMethods.getSupportedMethods() - disabledMethods.asMap().keys - } - - override fun isHardcoded(method: String): Boolean { - return callMethods.isHardcoded(method) - } - - override fun executeHardcoded(method: String): ByteArray { - return callMethods.executeHardcoded(method) - } - - override fun getGroupMethods(groupName: String): Set { - return callMethods.getGroupMethods(groupName) - } - - fun disableMethodTemporarily(method: String) { - disabledMethods.put(method, true) - } - } - class UpstreamStatus(val upstream: Upstream, val status: UpstreamAvailability) class FilterBestAvailability : java.util.function.Function { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DisabledCallMethods.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DisabledCallMethods.kt new file mode 100644 index 000000000..3a215267e --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DisabledCallMethods.kt @@ -0,0 +1,58 @@ +package io.emeraldpay.dshackle.upstream.calls + +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import io.emeraldpay.dshackle.quorum.CallQuorum +import java.time.Duration + +class DisabledCallMethods( + defaultDisableTimeout: Long, + private val callMethods: CallMethods, + private val onUpdate: () -> Unit, +) : CallMethods { + var disabledMethods: Cache = Caffeine.newBuilder() + .removalListener { key: String?, _: Boolean?, cause -> + if (cause.wasEvicted() && key != null) { + onUpdate.invoke() + } + } + .expireAfterWrite(Duration.ofMinutes(defaultDisableTimeout)) + .build() + + constructor( + onUpdate: () -> Unit, + defaultDisableTimeout: Long, + callMethods: CallMethods, + disabledMethodsCopy: Cache, + ) : this(defaultDisableTimeout, callMethods, onUpdate) { + disabledMethods = disabledMethodsCopy + } + + override fun createQuorumFor(method: String): CallQuorum { + return callMethods.createQuorumFor(method) + } + + override fun isCallable(method: String): Boolean { + return callMethods.isCallable(method) && disabledMethods.getIfPresent(method) == null + } + + override fun getSupportedMethods(): Set { + return callMethods.getSupportedMethods() - disabledMethods.asMap().keys + } + + override fun isHardcoded(method: String): Boolean { + return callMethods.isHardcoded(method) + } + + override fun executeHardcoded(method: String): ByteArray { + return callMethods.executeHardcoded(method) + } + + override fun getGroupMethods(groupName: String): Set { + return callMethods.getGroupMethods(groupName) + } + + fun disableMethodTemporarily(method: String) { + disabledMethods.put(method, true) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt index 93c5fe4bb..47e5a6c84 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt @@ -13,6 +13,7 @@ import io.emeraldpay.dshackle.upstream.finalization.FinalizationType import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.slf4j.LoggerFactory import reactor.core.publisher.Flux +import reactor.core.publisher.Sinks import java.time.Duration import java.util.concurrent.ConcurrentHashMap @@ -23,64 +24,67 @@ class EthereumFinalizationDetector : FinalizationDetector { val data: ConcurrentHashMap = ConcurrentHashMap() + private val finalizationSink = Sinks.many().multicast().directBestEffort() + override fun detectFinalization( upstream: Upstream, blockTime: Duration, ): Flux { - val timer = - Flux.merge( - Flux.just(1), - Flux.interval(Duration.ofSeconds(15)), - ) - return timer.flatMap { - Flux.fromIterable( - listOf( - Pair( - FinalizationType.SAFE_BLOCK, - ChainRequest( - "eth_getBlockByNumber", - ListParams("safe", false), - 1, + return Flux.merge( + finalizationSink.asFlux(), + Flux.interval( + Duration.ofSeconds(0), + Duration.ofSeconds(15), + ).flatMap { + Flux.fromIterable( + listOf( + Pair( + FinalizationType.SAFE_BLOCK, + ChainRequest( + "eth_getBlockByNumber", + ListParams("safe", false), + 1, + ), ), - ), - Pair( - FinalizationType.FINALIZED_BLOCK, - ChainRequest( - "eth_getBlockByNumber", - ListParams("finalized", false), - 2, + Pair( + FinalizationType.FINALIZED_BLOCK, + ChainRequest( + "eth_getBlockByNumber", + ListParams("finalized", false), + 2, + ), ), ), - ), - ).flatMap { (type, req) -> - upstream - .getIngressReader() - .read(req) - .flatMap { - it.requireResult().map { result -> - val block = - Global.objectMapper - .readValue(result, BlockJson::class.java) as BlockJson? - if (block != null) { - FinalizationData(block.number, type) - } else { - throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data") + ).flatMap { (type, req) -> + upstream + .getIngressReader() + .read(req) + .flatMap { + it.requireResult().map { result -> + val block = + Global.objectMapper + .readValue(result, BlockJson::class.java) as BlockJson? + if (block != null) { + FinalizationData(block.number, type) + } else { + throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data") + } } } - } + }.onErrorResume { + log.error("Error during retrieving — $it") + Flux.empty() + } + }.filter { + it.height > (data[it.type]?.height ?: 0) }.doOnNext { - addFinalization(it) - }.onErrorResume { - log.error("Error during retrieving — $it") - Flux.empty() - } - } + data[it.type] = it + }, + ) } override fun addFinalization(finalization: FinalizationData) { - data[finalization.type] = maxOf(data[finalization.type], finalization) { a, b -> - ((a?.height ?: 0) - (b?.height ?: 0)).toInt() - } ?: finalization + finalizationSink.emitNext(finalization) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } } override fun getFinalizations(): Collection { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index ca1863ee7..2184b50bc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -208,7 +208,11 @@ open class GenericUpstream( } private fun detectSettings() { - settingsDetector?.detectLabels()?.subscribe { label -> updateLabels(label) } + settingsDetector?.detectLabels() + ?.subscribe { label -> + updateLabels(label) + sendUpstreamStateEvent(UPDATED) + } settingsDetector?.detectClientVersion() ?.subscribe { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamState.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamState.kt new file mode 100644 index 000000000..fe6c38c74 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamState.kt @@ -0,0 +1,162 @@ +package io.emeraldpay.dshackle.upstream.state + +import io.emeraldpay.dshackle.Defaults +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.calls.AggregatedCallMethods +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.calls.DisabledCallMethods +import io.emeraldpay.dshackle.upstream.finalization.FinalizationData +import io.emeraldpay.dshackle.upstream.finalization.FinalizationType +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import reactor.core.publisher.Flux +import reactor.core.publisher.Sinks +import java.util.concurrent.ConcurrentHashMap + +class MultistreamState( + private val onMsUpdate: () -> Unit, +) { + @Volatile + private var callMethods: DisabledCallMethods? = null + + @Volatile + private var capabilities: Set = emptySet() + + @Volatile + private var quorumLabels: List? = null + + @Volatile + private var status: UpstreamAvailability = UpstreamAvailability.UNAVAILABLE + + @Volatile + private var subs: List = emptyList() + private val lowerBounds = ConcurrentHashMap() + private val finalizationData = ConcurrentHashMap() + + private val stateHandler = MultistreamStateHandler + private val stateEvents = Sinks.many().multicast().directBestEffort>() + + fun getCallMethods(): CallMethods? = callMethods + + fun getQuorumLabels(): List? = quorumLabels + + fun getLowerBounds(): Collection = HashSet(lowerBounds.values) + + fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? = lowerBounds[lowerBoundType] + + fun getFinalizationData(): Collection = HashSet(finalizationData.values) + + fun getCapabilities(): Set = capabilities + + fun lowerBoundsToString(): String = + lowerBounds.entries.joinToString(", ") { "${it.key}=${it.value.lowerBound}" } + + fun getStatus(): UpstreamAvailability { + return status + } + + fun updateState(upstreams: List, subs: List) { + val oldState = CurrentMultistreamState(this) + + val availableUpstreams = upstreams.filter { it.isAvailable() } + updateMethods(availableUpstreams) + updateCapabilities(availableUpstreams) + updateQuorumLabels(availableUpstreams) + updateUpstreamBounds(availableUpstreams) + status = if (upstreams.isEmpty()) UpstreamAvailability.UNAVAILABLE else upstreams.minOf { it.getStatus() } + this.subs = subs + + stateEvents.emitNext( + stateHandler.compareStates(oldState, CurrentMultistreamState(this)), + ) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + } + + fun stateEvents(): Flux> = stateEvents.asFlux() + + private fun updateMethods(upstreams: List) { + upstreams.map { it.getMethods() }.let { + callMethods = if (callMethods == null) { + DisabledCallMethods( + Defaults.multistreamUnavailableMethodDisableDuration, + AggregatedCallMethods(it), + onMsUpdate, + ) + } else { + DisabledCallMethods( + onMsUpdate, + Defaults.multistreamUnavailableMethodDisableDuration, + AggregatedCallMethods(it), + callMethods!!.disabledMethods, + ) + } + } + } + + private fun updateCapabilities(upstreams: List) { + capabilities = if (upstreams.isEmpty()) { + emptySet() + } else { + upstreams.map { up -> + up.getCapabilities() + }.let { + if (it.isNotEmpty()) { + it.reduce { acc, curr -> acc + curr } + } else { + emptySet() + } + } + } + } + + private fun updateQuorumLabels(ups: List) { + val nodes = QuorumForLabels() + ups.forEach { up -> + if (up is DefaultUpstream) { + nodes.add(up.getQuorumByLabel()) + } + } + quorumLabels = nodes.getAll() + } + + private fun updateUpstreamBounds(upstreams: List) { + upstreams + .flatMap { it.getLowerBounds() } + .groupBy { it.type } + .forEach { entry -> + val min = entry.value.minBy { it.lowerBound } + lowerBounds[entry.key] = min + } + + upstreams + .flatMap { it.getFinalizations() } + .groupBy { it.type } + .forEach { entry -> + val max = entry.value.maxBy { it.height } + finalizationData[entry.key] = max + } + } + + data class CurrentMultistreamState( + val status: UpstreamAvailability, + val methods: Collection, + val subs: Collection, + val caps: Collection, + val lowerBounds: Collection, + val finalizationData: Collection, + val nodeDetails: Collection, + ) { + constructor(state: MultistreamState) : this( + state.getStatus(), + state.getCallMethods()?.getSupportedMethods() ?: emptySet(), + state.subs, + state.getCapabilities(), + state.getLowerBounds(), + state.getFinalizationData(), + state.getQuorumLabels() ?: emptySet(), + ) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateEvent.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateEvent.kt new file mode 100644 index 000000000..a2400c6ec --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateEvent.kt @@ -0,0 +1,37 @@ +package io.emeraldpay.dshackle.upstream.state + +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.finalization.FinalizationData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData + +sealed class MultistreamStateEvent { + data class StatusEvent( + val status: UpstreamAvailability, + ) : MultistreamStateEvent() + + data class MethodsEvent( + val methods: Collection, + ) : MultistreamStateEvent() + + data class SubsEvent( + val subs: Collection, + ) : MultistreamStateEvent() + + data class CapabilitiesEvent( + val caps: Collection, + ) : MultistreamStateEvent() + + data class LowerBoundsEvent( + val lowerBounds: Collection, + ) : MultistreamStateEvent() + + data class FinalizationEvent( + val finalizationData: Collection, + ) : MultistreamStateEvent() + + data class NodeDetailsEvent( + val details: Collection, + ) : MultistreamStateEvent() +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateHandler.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateHandler.kt new file mode 100644 index 000000000..0cb12ecda --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateHandler.kt @@ -0,0 +1,32 @@ +package io.emeraldpay.dshackle.upstream.state + +object MultistreamStateHandler { + + fun compareStates(old: MultistreamState.CurrentMultistreamState, new: MultistreamState.CurrentMultistreamState): Collection { + val events = mutableListOf() + + if (old.status != new.status) { + events.add(MultistreamStateEvent.StatusEvent(new.status)) + } + if (old.methods != new.methods) { + events.add(MultistreamStateEvent.MethodsEvent(new.methods)) + } + if (old.subs != new.subs) { + events.add(MultistreamStateEvent.SubsEvent(new.subs)) + } + if (old.caps != new.caps) { + events.add(MultistreamStateEvent.CapabilitiesEvent(new.caps)) + } + if (old.lowerBounds != new.lowerBounds) { + events.add(MultistreamStateEvent.LowerBoundsEvent(new.lowerBounds)) + } + if (old.finalizationData != new.finalizationData) { + events.add(MultistreamStateEvent.FinalizationEvent(new.finalizationData)) + } + if (old.nodeDetails != new.nodeDetails) { + events.add(MultistreamStateEvent.NodeDetailsEvent(new.nodeDetails)) + } + + return events + } +} diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy index 45de0592f..0b982bccf 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy @@ -92,4 +92,9 @@ class EthereumHeadMock implements Head { Flux headLiveness() { return Flux.empty() } + + @Override + BlockContainer getCurrent() { + return null + } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt new file mode 100644 index 000000000..37c894920 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt @@ -0,0 +1,319 @@ +package io.emeraldpay.dshackle.rpc + +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.api.proto.Common +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.cache.Caches +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.data.BlockId +import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.CachingReader +import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.HeadLagObserver +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.MultistreamHolder +import io.emeraldpay.dshackle.upstream.Selector +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.finalization.FinalizationData +import io.emeraldpay.dshackle.upstream.finalization.FinalizationType +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.state.MultistreamStateEvent +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.spy +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import reactor.test.StepVerifier +import java.math.BigInteger +import java.time.Duration +import java.time.Instant + +class SubscribeChainStatusTest { + private val chainEventMapper = ChainEventMapper() + + @Test + fun `terminate stream if an error is thrown`() { + val head = mock { + on { getCurrent() } doReturn null + on { getFlux() } doReturn Flux.error(IllegalStateException()) + } + val ms = mock { + on { chain } doReturn Chain.ETHEREUM__MAINNET + on { getHead() } doReturn head + on { stateEvents() } doReturn Flux.empty() + } + val msHolder = mock { + on { all() } doReturn listOf(ms) + } + val subscribeChainStatus = SubscribeChainStatus(msHolder, chainEventMapper) + + StepVerifier.create(subscribeChainStatus.chainStatuses()) + .expectSubscription() + .expectError() + .verify(Duration.ofSeconds(1)) + } + + @Test + fun `first full event if there is already an ms head`() { + val head = mock { + on { getCurrent() } doReturn head(550) + on { getFlux() } doReturn Flux.empty() + } + val ms = spy { + on { getHead() } doReturn head + on { stateEvents() } doReturn Flux.empty() + } + val msHolder = mock { + on { all() } doReturn listOf(ms) + } + val subscribeChainStatus = SubscribeChainStatus(msHolder, chainEventMapper) + + StepVerifier.create(subscribeChainStatus.chainStatuses()) + .expectSubscription() + .expectNext(response(true)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + + @Test + fun `first full event with awaiting a head from a head stream`() { + val head = mock { + on { getCurrent() } doReturn null + on { getFlux() } doReturn Flux.just(head(550)) + } + val ms = spy { + on { getHead() } doReturn head + on { stateEvents() } doReturn Flux.empty() + } + val msHolder = mock { + on { all() } doReturn listOf(ms) + } + val subscribeChainStatus = SubscribeChainStatus(msHolder, chainEventMapper) + + StepVerifier.create(subscribeChainStatus.chainStatuses()) + .expectSubscription() + .expectNext(response(true)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + + @Test + fun `first full event with awaiting a head from a head stream and then state events`() { + val head = mock { + on { getCurrent() } doReturn null + on { getFlux() } doReturn Flux.just(head(550)) + } + val ms = spy { + on { getHead() } doReturn head + on { stateEvents() } doReturn Flux.just( + listOf( + MultistreamStateEvent.StatusEvent(UpstreamAvailability.OK), + MultistreamStateEvent.MethodsEvent(setOf("superMethod")), + MultistreamStateEvent.SubsEvent(listOf("heads")), + MultistreamStateEvent.CapabilitiesEvent(setOf(Capability.BALANCE)), + MultistreamStateEvent.LowerBoundsEvent(listOf(LowerBoundData(800, 1000, LowerBoundType.STATE))), + MultistreamStateEvent.FinalizationEvent(listOf(FinalizationData(30, FinalizationType.SAFE_BLOCK))), + MultistreamStateEvent.NodeDetailsEvent(listOf(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("test" to "val"))))), + ), + ) + } + val msHolder = mock { + on { all() } doReturn listOf(ms) + } + val subscribeChainStatus = SubscribeChainStatus(msHolder, chainEventMapper) + + StepVerifier.create(subscribeChainStatus.chainStatuses()) + .expectSubscription() + .expectNext(response(true)) + .expectNext(response(false)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + + @Test + fun `first full event with awaiting a head from a head stream and then head events`() { + val head = mock { + on { getCurrent() } doReturn null + on { getFlux() } doReturn Flux.just(head(550), head(600)) + } + val ms = spy { + on { getHead() } doReturn head + on { stateEvents() } doReturn Flux.empty() + } + val msHolder = mock { + on { all() } doReturn listOf(ms) + } + val subscribeChainStatus = SubscribeChainStatus(msHolder, chainEventMapper) + + StepVerifier.create(subscribeChainStatus.chainStatuses()) + .expectSubscription() + .expectNext(response(true)) + .expectNext( + BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() + .setChainDescription( + BlockchainOuterClass.ChainDescription.newBuilder() + .setChain(Common.ChainRef.CHAIN_ETHEREUM__MAINNET) + .addChainEvent(chainEventMapper.mapHead(head(600))) + .build(), + ) + .build(), + ) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + + private fun head(height: Long): BlockContainer { + return BlockContainer( + height, + BlockId.from("0xa6af163aab691919c595e2a466f0a7b01f1dff8cfd9631dee811df57064c2d32"), + BigInteger.ONE, + Instant.ofEpochSecond(1719485864), + false, + null, + null, + BlockId.from("0xa6af163aab691919c595e2a466f0a6b01f1dff8cfd9631dee811df57064c2d32"), + emptyList(), + ) + } + + private fun response(headEvent: Boolean): BlockchainOuterClass.SubscribeChainStatusResponse { + return BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() + .apply { + if (headEvent) { + setBuildInfo( + BlockchainOuterClass.BuildInfo.newBuilder() + .setVersion("DEV") + .build(), + ) + } + } + .setChainDescription( + BlockchainOuterClass.ChainDescription.newBuilder() + .setChain(Common.ChainRef.CHAIN_ETHEREUM__MAINNET) + .addChainEvent(chainEventMapper.chainStatus(UpstreamAvailability.OK)) + .apply { + if (headEvent) { + addChainEvent(chainEventMapper.mapHead(head(550))) + } + } + .addChainEvent(chainEventMapper.supportedMethods(setOf("superMethod"))) + .addChainEvent(chainEventMapper.supportedSubs(listOf("heads"))) + .addChainEvent(chainEventMapper.mapCapabilities(setOf(Capability.BALANCE))) + .addChainEvent( + chainEventMapper.mapLowerBounds( + listOf(LowerBoundData(800, 1000, LowerBoundType.STATE)), + ), + ) + .addChainEvent( + chainEventMapper.mapFinalizationData( + listOf(FinalizationData(30, FinalizationType.SAFE_BLOCK)), + ), + ) + .addChainEvent( + chainEventMapper.mapNodeDetails( + listOf(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("test" to "val")))), + ), + ) + .build(), + ) + .apply { + if (headEvent) { + setFullResponse(true) + } + } + .build() + } + + private open class TestMultistream : Multistream(Chain.ETHEREUM__MAINNET, mock(), null, Schedulers.single()) { + companion object { + private const val UNIMPLEMENTED = "UNIMPLEMENTED" + } + + override fun getMethods(): CallMethods { + val callMethods = mock { + on { getSupportedMethods() } doReturn setOf("superMethod") + } + return callMethods + } + + override fun getUpstreams(): MutableList { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun addUpstreamInternal(u: Upstream) { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun getLocalReader(): Mono { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun addHead(upstream: Upstream) { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun removeHead(upstreamId: String) { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun makeLagObserver(): HeadLagObserver { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun getCachingReader(): CachingReader? { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun getHead(mather: Selector.Matcher): Head { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun getHead(): Head { + return mock() + } + + override fun getEgressSubscription(): EgressSubscription { + val sub = mock { + on { getAvailableTopics() } doReturn listOf("heads") + } + return sub + } + + override fun getLabels(): Collection { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun cast(selfType: Class): T { + throw IllegalStateException(UNIMPLEMENTED) + } + + override fun getStatus(): UpstreamAvailability { + return UpstreamAvailability.OK + } + + override fun getCapabilities(): Set { + return setOf(Capability.BALANCE) + } + + override fun getLowerBounds(): Collection { + return listOf(LowerBoundData(800, 1000, LowerBoundType.STATE)) + } + + override fun getFinalizations(): Collection { + return listOf(FinalizationData(30, FinalizationType.SAFE_BLOCK)) + } + + override fun getQuorumLabels(): List { + return listOf(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("test" to "val")))) + } + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt index 6e5c210ad..c1df16722 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt @@ -10,66 +10,55 @@ import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson import io.emeraldpay.dshackle.upstream.finalization.FinalizationData import io.emeraldpay.dshackle.upstream.finalization.FinalizationType import io.emeraldpay.dshackle.upstream.rpcclient.ListParams -import org.junit.jupiter.api.Assertions -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.mockito.Mockito.mock -import org.mockito.Mockito.`when` +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock import reactor.core.publisher.Mono +import reactor.test.StepVerifier import java.time.Duration import java.time.Instant class EthereumFinalizationDetectorTest { - private lateinit var upstream: Upstream - private lateinit var chainReader: ChainReader - private lateinit var detector: EthereumFinalizationDetector - - @BeforeEach - fun setUp() { - upstream = mock() - chainReader = mock() - `when`(upstream.getIngressReader()).thenReturn(chainReader) - detector = EthereumFinalizationDetector() - } - @Test fun testDetectFinalization() { - `when`(chainReader.read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1))) - .thenReturn( - Mono.just( - ChainResponse( - Global.objectMapper.writeValueAsString( - BlockJson().apply { - number = 1 - timestamp = Instant.now() - }, - ).toByteArray(), - null, - ), - ), - ) - `when`(chainReader.read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2))) - .thenReturn( - Mono.just( - ChainResponse( - Global.objectMapper.writeValueAsString( - BlockJson().apply { - number = 2 - timestamp = Instant.now() - }, - ).toByteArray(), - null, - ), - ), - ) + val reader = mock { + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1)) + } doReturn response(1) doReturn response(5) doReturn response(3) + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2)) + } doReturn response(2) doReturn response(10) doReturn response(5) + } + val upstream = mock { + on { getIngressReader() } doReturn reader + } + val detector = EthereumFinalizationDetector() - val flux = detector.detectFinalization(upstream, Duration.ofMillis(200)) - flux.take(2).collectList().block() - val result = detector.getFinalizations().toList() - Assertions.assertEquals(2, result.size) - org.assertj.core.api.Assertions.assertThat(result) - .contains(FinalizationData(2L, FinalizationType.FINALIZED_BLOCK)) - .contains(FinalizationData(1L, FinalizationType.SAFE_BLOCK)) + StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofMillis(200)) } + .expectSubscription() + .thenAwait(Duration.ofSeconds(0)) + .expectNext(FinalizationData(1L, FinalizationType.SAFE_BLOCK)) + .expectNext(FinalizationData(2L, FinalizationType.FINALIZED_BLOCK)) + .thenAwait(Duration.ofSeconds(15)) + .expectNext(FinalizationData(5L, FinalizationType.SAFE_BLOCK)) + .expectNext(FinalizationData(10L, FinalizationType.FINALIZED_BLOCK)) + .thenAwait(Duration.ofSeconds(15)) + .expectNoEvent(Duration.ofMillis(100)) + .thenCancel() + .verify(Duration.ofSeconds(1)) } + + private fun response(blockNumber: Long) = + Mono.just( + ChainResponse( + Global.objectMapper.writeValueAsString( + BlockJson().apply { + number = blockNumber + timestamp = Instant.now() + }, + ).toByteArray(), + null, + ), + ) } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateHandlerTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateHandlerTest.kt new file mode 100644 index 000000000..ffc7ad9e2 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateHandlerTest.kt @@ -0,0 +1,93 @@ +package io.emeraldpay.dshackle.upstream.state + +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.finalization.FinalizationData +import io.emeraldpay.dshackle.upstream.finalization.FinalizationType +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource + +class MultistreamStateHandlerTest { + private val stateHandler = MultistreamStateHandler + + @ParameterizedTest + @MethodSource("states") + fun `compare states`( + newState: MultistreamState.CurrentMultistreamState, + expectedEvents: Collection, + ) { + val oldState = state() + val events = stateHandler.compareStates(oldState, newState) + + assertThat(events).isEqualTo(expectedEvents) + } + + companion object { + @JvmStatic + fun states(): List = + listOf( + Arguments.of( + state(), + listOf(), + ), + Arguments.of( + state(status = UpstreamAvailability.UNAVAILABLE), + listOf(MultistreamStateEvent.StatusEvent(UpstreamAvailability.UNAVAILABLE)), + ), + Arguments.of( + state(methods = setOf("otherCall")), + listOf(MultistreamStateEvent.MethodsEvent(setOf("otherCall"))), + ), + Arguments.of( + state(subs = setOf("newSub")), + listOf(MultistreamStateEvent.SubsEvent(setOf("newSub"))), + ), + Arguments.of( + state(caps = setOf(Capability.BALANCE)), + listOf(MultistreamStateEvent.CapabilitiesEvent(setOf(Capability.BALANCE))), + ), + Arguments.of( + state(lowerBounds = setOf(LowerBoundData(90, 90, LowerBoundType.STATE))), + listOf(MultistreamStateEvent.LowerBoundsEvent(setOf(LowerBoundData(90, 90, LowerBoundType.STATE)))), + ), + Arguments.of( + state(finalizationData = setOf(FinalizationData(80, FinalizationType.SAFE_BLOCK))), + listOf(MultistreamStateEvent.FinalizationEvent(setOf(FinalizationData(80, FinalizationType.SAFE_BLOCK)))), + ), + Arguments.of( + state(quorumForLabels = setOf(QuorumForLabels.QuorumItem(2, UpstreamsConfig.Labels.fromMap(mapOf("test1" to "val1"))))), + listOf( + MultistreamStateEvent.NodeDetailsEvent( + setOf(QuorumForLabels.QuorumItem(2, UpstreamsConfig.Labels.fromMap(mapOf("test1" to "val1")))), + ), + ), + ), + ) + + private fun state( + status: UpstreamAvailability = UpstreamAvailability.OK, + methods: Set = setOf("method"), + subs: Set = setOf("sub"), + caps: Set = setOf(Capability.RPC), + lowerBounds: Set = setOf(LowerBoundData(55, 55, LowerBoundType.STATE)), + finalizationData: Set = setOf(FinalizationData(20, FinalizationType.SAFE_BLOCK)), + quorumForLabels: Set = setOf(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("test" to "val")))), + ): MultistreamState.CurrentMultistreamState { + return MultistreamState.CurrentMultistreamState( + status, + methods, + subs, + caps, + lowerBounds, + finalizationData, + quorumForLabels, + ) + } + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateTest.kt new file mode 100644 index 000000000..c6f738fd3 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateTest.kt @@ -0,0 +1,161 @@ +package io.emeraldpay.dshackle.upstream.state + +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.finalization.FinalizationData +import io.emeraldpay.dshackle.upstream.finalization.FinalizationType +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import reactor.test.StepVerifier +import java.time.Duration + +class MultistreamStateTest { + + @Test + fun `update state and send events`() { + val up1 = upstream( + UpstreamAvailability.OK, + true, + callMethods(setOf("eth_call", "super_call")), + setOf(Capability.RPC), + QuorumForLabels(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("test" to "value")))), + listOf(LowerBoundData(55, LowerBoundType.STATE), LowerBoundData(99, LowerBoundType.BLOCK)), + listOf(FinalizationData(32, FinalizationType.SAFE_BLOCK), FinalizationData(80, FinalizationType.FINALIZED_BLOCK)), + ) + val up2 = upstream( + UpstreamAvailability.UNAVAILABLE, + false, + callMethods(setOf("one_more_call")), + setOf(Capability.BALANCE), + QuorumForLabels(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("new" to "old")))), + listOf(LowerBoundData(22, LowerBoundType.STATE), LowerBoundData(5, LowerBoundType.BLOCK)), + listOf(FinalizationData(3200, FinalizationType.SAFE_BLOCK), FinalizationData(1180, FinalizationType.FINALIZED_BLOCK)), + ) + val up3 = upstream( + UpstreamAvailability.LAGGING, + true, + callMethods(setOf("super_duper_call")), + setOf(Capability.WS_HEAD), + QuorumForLabels(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("megaTest" to "valTest")))), + listOf(LowerBoundData(40, LowerBoundType.STATE), LowerBoundData(400, LowerBoundType.BLOCK)), + listOf(FinalizationData(70, FinalizationType.SAFE_BLOCK), FinalizationData(15, FinalizationType.FINALIZED_BLOCK)), + ) + val up4 = upstream( + UpstreamAvailability.OK, + true, + callMethods(setOf("super_duper_call", "yet_another_call")), + setOf(Capability.WS_HEAD), + QuorumForLabels(QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("megaTest" to "valTest")))), + listOf(LowerBoundData(1, LowerBoundType.STATE), LowerBoundData(1, LowerBoundType.BLOCK)), + listOf(FinalizationData(990, FinalizationType.SAFE_BLOCK), FinalizationData(880, FinalizationType.FINALIZED_BLOCK)), + ) + + val state = MultistreamState {} + + StepVerifier.create(state.stateEvents()) + .then { state.updateState(listOf(up1, up2, up3), listOf("heads", "notHeads")) } + .assertNext { + assertThat(it).hasSize(7) + assertThat(it.toList()) + .usingRecursiveFieldByFieldElementComparatorIgnoringFields("lowerBounds.timestamp") + .hasSameElementsAs( + listOf( + MultistreamStateEvent.StatusEvent(UpstreamAvailability.OK), + MultistreamStateEvent.MethodsEvent(setOf("eth_call", "super_call", "super_duper_call").toHashSet()), + MultistreamStateEvent.SubsEvent(listOf("heads", "notHeads")), + MultistreamStateEvent.CapabilitiesEvent(setOf(Capability.WS_HEAD, Capability.RPC).toHashSet()), + MultistreamStateEvent.LowerBoundsEvent( + setOf( + LowerBoundData(40, LowerBoundType.STATE), + LowerBoundData(99, LowerBoundType.BLOCK), + ).toHashSet(), + ), + MultistreamStateEvent.FinalizationEvent( + setOf( + FinalizationData(70, FinalizationType.SAFE_BLOCK), + FinalizationData(80, FinalizationType.FINALIZED_BLOCK), + ).toHashSet(), + ), + MultistreamStateEvent.NodeDetailsEvent( + listOf( + QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("test" to "value"))), + QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("megaTest" to "valTest"))), + ), + ), + ), + ) + } + .then { state.updateState(listOf(up1, up2, up3), listOf("heads", "notHeads")) } + .assertNext { + assertThat(it).hasSize(0) + } + .then { state.updateState(listOf(up1, up2, up3, up4), listOf("heads", "notHeads")) } + .assertNext { + assertThat(it).hasSize(4) + assertThat(it.toList()) + .usingRecursiveFieldByFieldElementComparatorIgnoringFields("lowerBounds.timestamp") + .hasSameElementsAs( + listOf( + MultistreamStateEvent.MethodsEvent(setOf("eth_call", "yet_another_call", "super_call", "super_duper_call").toHashSet()), + MultistreamStateEvent.LowerBoundsEvent( + setOf( + LowerBoundData(1, LowerBoundType.STATE), + LowerBoundData(1, LowerBoundType.BLOCK), + ).toHashSet(), + ), + MultistreamStateEvent.FinalizationEvent( + setOf( + FinalizationData(990, FinalizationType.SAFE_BLOCK), + FinalizationData(880, FinalizationType.FINALIZED_BLOCK), + ).toHashSet(), + ), + MultistreamStateEvent.NodeDetailsEvent( + listOf( + QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(mapOf("test" to "value"))), + QuorumForLabels.QuorumItem(2, UpstreamsConfig.Labels.fromMap(mapOf("megaTest" to "valTest"))), + ), + ), + ), + ) + } + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + + private fun upstream( + status: UpstreamAvailability, + isAvailable: Boolean, + callMethods: CallMethods, + capabilities: Set, + quorumForLabels: QuorumForLabels, + lowerBounds: Collection, + finalizationData: Collection, + ): DefaultUpstream { + val upstream = mock { + on { isAvailable() } doReturn isAvailable + on { getMethods() } doReturn callMethods + on { getCapabilities() } doReturn capabilities + on { getQuorumByLabel() } doReturn quorumForLabels + on { getLowerBounds() } doReturn lowerBounds + on { getFinalizations() } doReturn finalizationData + on { getStatus() } doReturn status + } + + return upstream + } + + private fun callMethods(methods: Set): CallMethods { + val callMethods = mock { + on { getSupportedMethods() } doReturn methods + } + return callMethods + } +}