Skip to content

Commit

Permalink
New reactive endpoint (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Jul 1, 2024
1 parent a5590f6 commit eddaf98
Show file tree
Hide file tree
Showing 22 changed files with 1,276 additions and 232 deletions.
2 changes: 1 addition & 1 deletion emerald-grpc
7 changes: 7 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class BlockchainRpc(
@Autowired(required = false)
private val providerSpanHandler: ProviderSpanHandler?,
private val tracer: Tracer,
private val subscribeChainStatus: SubscribeChainStatus,
) : ReactorBlockchainGrpc.BlockchainImplBase() {

private val log = LoggerFactory.getLogger(BlockchainRpc::class.java)
Expand Down Expand Up @@ -164,6 +165,12 @@ class BlockchainRpc(
}
}

override fun subscribeChainStatus(
request: Mono<BlockchainOuterClass.SubscribeChainStatusRequest>,
): Flux<BlockchainOuterClass.SubscribeChainStatusResponse> {
return subscribeChainStatus.chainStatuses()
}

class RequestMetrics(val chain: Chain) {
val nativeCallMetric = Counter.builder("request.grpc.request")
.tag("type", "nativeCall")
Expand Down
151 changes: 151 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/ChainEventMapper.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.emeraldpay.dshackle.rpc

import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.BlockchainOuterClass.SupportedMethodsEvent
import io.emeraldpay.api.proto.Common
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import org.springframework.stereotype.Component

@Component
class ChainEventMapper {

fun mapHead(head: BlockContainer): BlockchainOuterClass.ChainEvent {
return BlockchainOuterClass.ChainEvent.newBuilder()
.setHead(
BlockchainOuterClass.HeadEvent.newBuilder()
.setHeight(head.height)
.setSlot(head.slot)
.setTimestamp(head.timestamp.toEpochMilli())
.setWeight(ByteString.copyFrom(head.difficulty.toByteArray()))
.setBlockId(head.hash.toHex())
.setParentBlockId(head.parentHash?.toHex() ?: "")
.build(),
)
.build()
}

fun mapCapabilities(capabilities: Collection<Capability>): BlockchainOuterClass.ChainEvent {
val caps = capabilities.map {
when (it) {
Capability.RPC -> BlockchainOuterClass.Capabilities.CAP_CALLS
Capability.BALANCE -> BlockchainOuterClass.Capabilities.CAP_BALANCE
Capability.WS_HEAD -> BlockchainOuterClass.Capabilities.CAP_WS_HEAD
}
}

return BlockchainOuterClass.ChainEvent.newBuilder()
.setCapabilitiesEvent(
BlockchainOuterClass.CapabilitiesEvent.newBuilder()
.addAllCapabilities(caps)
.build(),
)
.build()
}

fun mapNodeDetails(nodeDetails: Collection<QuorumForLabels.QuorumItem>): BlockchainOuterClass.ChainEvent {
val details = nodeDetails.map {
BlockchainOuterClass.NodeDetails.newBuilder()
.setQuorum(it.quorum)
.addAllLabels(
it.labels.entries.map { label ->
BlockchainOuterClass.Label.newBuilder()
.setName(label.key)
.setValue(label.value)
.build()
},
).build()
}

return BlockchainOuterClass.ChainEvent.newBuilder()
.setNodesEvent(
BlockchainOuterClass.NodeDetailsEvent.newBuilder()
.addAllNodes(details)
.build(),
)
.build()
}

fun mapFinalizationData(finalizationData: Collection<FinalizationData>): BlockchainOuterClass.ChainEvent {
val data = finalizationData.map {
Common.FinalizationData.newBuilder()
.setHeight(it.height)
.setType(it.type.toProtoFinalizationType())
.build()
}

return BlockchainOuterClass.ChainEvent.newBuilder()
.setFinalizationDataEvent(
BlockchainOuterClass.FinalizationDataEvent.newBuilder()
.addAllFinalizationData(data)
.build(),
)
.build()
}

fun mapLowerBounds(lowerBounds: Collection<LowerBoundData>): BlockchainOuterClass.ChainEvent {
val data = lowerBounds
.map {
BlockchainOuterClass.LowerBound.newBuilder()
.setLowerBoundTimestamp(it.timestamp)
.setLowerBoundType(mapLowerBoundType(it.type))
.setLowerBoundValue(it.lowerBound)
.build()
}

return BlockchainOuterClass.ChainEvent.newBuilder()
.setLowerBoundsEvent(
BlockchainOuterClass.LowerBoundEvent.newBuilder()
.addAllLowerBounds(data)
.build(),
)
.build()
}

fun chainStatus(status: UpstreamAvailability): BlockchainOuterClass.ChainEvent {
return BlockchainOuterClass.ChainEvent.newBuilder()
.setStatus(
BlockchainOuterClass.ChainStatus.newBuilder()
.setAvailability(Common.AvailabilityEnum.forNumber(status.grpcId))
.build(),
)
.build()
}

fun supportedMethods(methods: Collection<String>): BlockchainOuterClass.ChainEvent {
return BlockchainOuterClass.ChainEvent.newBuilder()
.setSupportedMethodsEvent(
SupportedMethodsEvent.newBuilder()
.addAllMethods(methods)
.build(),
)
.build()
}

fun supportedSubs(subs: Collection<String>): BlockchainOuterClass.ChainEvent {
return BlockchainOuterClass.ChainEvent.newBuilder()
.setSupportedSubscriptionsEvent(
BlockchainOuterClass.SupportedSubscriptionsEvent.newBuilder()
.addAllSubs(subs)
.build(),
)
.build()
}

private fun mapLowerBoundType(type: LowerBoundType): BlockchainOuterClass.LowerBoundType {
return when (type) {
LowerBoundType.SLOT -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_SLOT
LowerBoundType.UNKNOWN -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_UNSPECIFIED
LowerBoundType.STATE -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_STATE
LowerBoundType.BLOCK -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK
LowerBoundType.TX -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TX
LowerBoundType.LOGS -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_LOGS
}
}
}
3 changes: 2 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import io.emeraldpay.dshackle.upstream.MultistreamHolder
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.calls.DisabledCallMethods
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
Expand Down Expand Up @@ -110,7 +111,7 @@ open class NativeCall(
if (it is ValidCallContext<*>) {
if (it.payload is ParsedCallDetails) {
log.error("nativeCallResult method ${it.payload.method} of ${it.upstream.getId()} is not available, disabling")
val cm = (it.upstream.getMethods() as Multistream.DisabledCallMethods)
val cm = (it.upstream.getMethods() as DisabledCallMethods)
cm.disableMethodTemporarily(it.payload.method)
it.upstream.updateMethods(cm)
}
Expand Down
128 changes: 128 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package io.emeraldpay.dshackle.rpc

import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.Common
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.MultistreamHolder
import io.emeraldpay.dshackle.upstream.state.MultistreamStateEvent
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.switchIfEmpty

@Service
class SubscribeChainStatus(
private val multistreamHolder: MultistreamHolder,
private val chainEventMapper: ChainEventMapper,
) {

companion object {
private val log = LoggerFactory.getLogger(SubscribeChainStatus::class.java)
}

fun chainStatuses(): Flux<BlockchainOuterClass.SubscribeChainStatusResponse> {
return Flux.merge(
// we need to track not only multistreams with upstreams but all of them
// because upstreams can be added in runtime with hot config reload
multistreamHolder.all()
.filter { Common.ChainRef.forNumber(it.chain.id) != null }
.map { ms ->
Flux.concat(
// the first event must be filled with all fields
firstFullEvent(ms),
Flux.merge(
// head events are separated from others
headEvents(ms),
multistreamEvents(ms),
),
)
},
).doOnError {
log.error("Error during sending chain statuses", it)
}
}

private fun multistreamEvents(ms: Multistream): Flux<BlockchainOuterClass.SubscribeChainStatusResponse> {
return ms.stateEvents()
.filter { it.isNotEmpty() }
.map { events ->
val response = BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder()
val chainDescription = BlockchainOuterClass.ChainDescription.newBuilder()
.setChain(Common.ChainRef.forNumber(ms.chain.id))

events.forEach {
chainDescription.addChainEvent(processMsEvent(it))
}

response.setChainDescription(chainDescription.build())
response.build()
}
}

private fun processMsEvent(event: MultistreamStateEvent): BlockchainOuterClass.ChainEvent {
return when (event) {
is MultistreamStateEvent.CapabilitiesEvent -> chainEventMapper.mapCapabilities(event.caps)
is MultistreamStateEvent.FinalizationEvent -> chainEventMapper.mapFinalizationData(event.finalizationData)
is MultistreamStateEvent.LowerBoundsEvent -> chainEventMapper.mapLowerBounds(event.lowerBounds)
is MultistreamStateEvent.MethodsEvent -> chainEventMapper.supportedMethods(event.methods)
is MultistreamStateEvent.NodeDetailsEvent -> chainEventMapper.mapNodeDetails(event.details)
is MultistreamStateEvent.StatusEvent -> chainEventMapper.chainStatus(event.status)
is MultistreamStateEvent.SubsEvent -> chainEventMapper.supportedSubs(event.subs)
}
}

private fun firstFullEvent(ms: Multistream): Mono<BlockchainOuterClass.SubscribeChainStatusResponse> {
return Mono.justOrEmpty(ms.getHead().getCurrent())
.map { toFullResponse(it!!, ms) }
.switchIfEmpty {
// in case if there is still no head we mush wait until we get it
ms.getHead()
.getFlux()
.next()
.map { toFullResponse(it!!, ms) }
}
}

private fun headEvents(ms: Multistream): Flux<BlockchainOuterClass.SubscribeChainStatusResponse> {
return ms.getHead()
.getFlux()
.skip(1)
.map {
BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder()
.setChainDescription(
BlockchainOuterClass.ChainDescription.newBuilder()
.setChain(Common.ChainRef.forNumber(ms.chain.id))
.addChainEvent(chainEventMapper.mapHead(it))
.build(),
)
.build()
}
}

private fun toFullResponse(head: BlockContainer, ms: Multistream): BlockchainOuterClass.SubscribeChainStatusResponse {
return BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder()
.setChainDescription(
BlockchainOuterClass.ChainDescription.newBuilder()
.setChain(Common.ChainRef.forNumber(ms.chain.id))
.addChainEvent(chainEventMapper.chainStatus(ms.getStatus()))
.addChainEvent(chainEventMapper.mapHead(head))
.addChainEvent(chainEventMapper.supportedMethods(ms.getMethods().getSupportedMethods()))
.addChainEvent(chainEventMapper.supportedSubs(ms.getEgressSubscription().getAvailableTopics()))
.addChainEvent(chainEventMapper.mapCapabilities(ms.getCapabilities()))
.addChainEvent(chainEventMapper.mapLowerBounds(ms.getLowerBounds()))
.addChainEvent(chainEventMapper.mapFinalizationData(ms.getFinalizations()))
.addChainEvent(chainEventMapper.mapNodeDetails(ms.getQuorumLabels()))
.build(),
)
.setBuildInfo(
BlockchainOuterClass.BuildInfo.newBuilder()
.setVersion(Global.version)
.build(),
)
.setFullResponse(true)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class QuorumForLabels() {
/**
* Details for a single element (upstream, node or aggregation)
*/
class QuorumItem(val quorum: Int, val labels: UpstreamsConfig.Labels) {
data class QuorumItem(val quorum: Int, val labels: UpstreamsConfig.Labels) {
companion object {
fun empty(): QuorumItem {
return QuorumItem(0, UpstreamsConfig.Labels())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ abstract class AbstractHead @JvmOverloads constructor(
).onBackpressureLatest()
}

fun getCurrent(): BlockContainer? {
override fun getCurrent(): BlockContainer? {
return forkChoice.getHead()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels.Companion.fromMap
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
Expand Down Expand Up @@ -155,7 +156,7 @@ abstract class DefaultUpstream(
override fun nodeId(): Byte = hash

open fun getQuorumByLabel(): QuorumForLabels {
return node?.let { QuorumForLabels(it) }
return node?.let { QuorumForLabels(it.copy(labels = fromMap(it.labels))) }
?: QuorumForLabels(QuorumForLabels.QuorumItem.empty())
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ class EmptyHead : Head {
}

override fun headLiveness(): Flux<Boolean> = Flux.empty()

override fun getCurrent(): BlockContainer? {
return null
}
}
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ interface Head {
fun onSyncingNode(isSyncing: Boolean)

fun headLiveness(): Flux<Boolean>

fun getCurrent(): BlockContainer?
}
Loading

0 comments on commit eddaf98

Please sign in to comment.