Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove unused specific tracks #326

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 0 additions & 96 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import io.emeraldpay.api.proto.Common
import io.emeraldpay.api.proto.ReactorBlockchainGrpc
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.ChainValue
import io.emeraldpay.dshackle.SilentException
import io.emeraldpay.dshackle.config.spans.ProviderSpanHandler
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Metrics
Expand All @@ -36,7 +35,6 @@ import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

Expand All @@ -46,11 +44,8 @@ class BlockchainRpc(
private val nativeCallStream: NativeCallStream,
private val nativeSubscribe: NativeSubscribe,
private val streamHead: StreamHead,
private val trackTx: List<TrackTx>,
private val trackAddress: List<TrackAddress>,
private val describe: Describe,
private val subscribeStatus: SubscribeStatus,
private val estimateFee: EstimateFee,
private val subscribeNodeStatus: SubscribeNodeStatus,
@Qualifier("rpcScheduler")
private val scheduler: Scheduler,
Expand Down Expand Up @@ -129,97 +124,6 @@ class BlockchainRpc(
).doOnError { failMetric.increment() }
}

override fun subscribeTxStatus(requestMono: Mono<BlockchainOuterClass.TxStatusRequest>): Flux<BlockchainOuterClass.TxStatus> {
return requestMono.subscribeOn(scheduler).flatMapMany { request ->
val chain = Chain.byId(request.chainValue)
val metrics = chainMetrics.get(chain)
metrics.subscribeTxMetric.increment()
try {
trackTx.find { it.isSupported(chain) }?.let { track ->
track.subscribe(request)
.doOnNext { metrics.subscribeHeadRespMetric.increment() }
.doOnError { failMetric.increment() }
} ?: Flux.error(SilentException.UnsupportedBlockchain(chain))
} catch (t: Throwable) {
log.error("Internal error during Tx Subscription", t)
failMetric.increment()
Flux.error(IllegalStateException("Internal Error"))
}
}
}

override fun subscribeBalance(requestMono: Mono<BlockchainOuterClass.BalanceRequest>): Flux<BlockchainOuterClass.AddressBalance> {
return requestMono.subscribeOn(scheduler).flatMapMany { request ->
val chain = Chain.byId(request.asset.chainValue)
val metrics = chainMetrics.get(chain)
metrics.subscribeBalanceMetric.increment()
val asset = request.asset.code.lowercase(Locale.getDefault())
try {
trackAddress.find { it.isSupported(chain, asset) }?.let { track ->
track.subscribe(request)
.doOnNext { metrics.subscribeBalanceRespMetric.increment() }
.doOnError { failMetric.increment() }
} ?: Flux.error<BlockchainOuterClass.AddressBalance>(SilentException.UnsupportedBlockchain(chain))
.doOnSubscribe {
log.error("Balance for $chain:$asset is not supported")
}
} catch (t: Throwable) {
log.error("Internal error during Balance Subscription", t)
failMetric.increment()
Flux.error(IllegalStateException("Internal Error"))
}
}
}

override fun getBalance(requestMono: Mono<BlockchainOuterClass.BalanceRequest>): Flux<BlockchainOuterClass.AddressBalance> {
return requestMono.subscribeOn(scheduler).flatMapMany { request ->
val chain = Chain.byId(request.asset.chainValue)
val metrics = chainMetrics.get(chain)
metrics.getBalanceMetric.increment()
val asset = request.asset.code.lowercase(Locale.getDefault())
val startTime = System.currentTimeMillis()
try {
trackAddress.find { it.isSupported(chain, asset) }?.let { track ->
track.getBalance(request)
.doOnNext {
metrics.getBalanceRespMetric.record(
System.currentTimeMillis() - startTime,
TimeUnit.MILLISECONDS,
)
}
} ?: Flux.error<BlockchainOuterClass.AddressBalance>(SilentException.UnsupportedBlockchain(chain))
.doOnSubscribe {
log.error("Balance for $chain:$asset is not supported")
}
} catch (t: Throwable) {
log.error("Internal error during Balance Request", t)
failMetric.increment()
Flux.error<BlockchainOuterClass.AddressBalance>(IllegalStateException("Internal Error"))
}
}
}

override fun estimateFee(request: Mono<BlockchainOuterClass.EstimateFeeRequest>): Mono<BlockchainOuterClass.EstimateFeeResponse> {
return request
.subscribeOn(scheduler)
.flatMap {
val chain = Chain.byId(it.chainValue)
val metrics = chainMetrics.get(chain)
metrics.estimateFeeMetric.increment()
val startTime = System.currentTimeMillis()
estimateFee.estimateFee(it).doFinally {
metrics.estimateFeeRespMetric.record(
System.currentTimeMillis() - startTime,
TimeUnit.MILLISECONDS,
)
}
}
.doOnError { t ->
log.error("Internal error during Fee Estimation", t)
failMetric.increment()
}
}

override fun describe(request: Mono<BlockchainOuterClass.DescribeRequest>): Mono<BlockchainOuterClass.DescribeResponse> {
describeMetric.increment()
return describe.describe(request)
Expand Down
38 changes: 0 additions & 38 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/EstimateFee.kt

This file was deleted.

15 changes: 3 additions & 12 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.BlockchainType.EVM_POS
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.Global.Companion.nullValue
Expand All @@ -44,9 +45,6 @@ import io.emeraldpay.dshackle.upstream.MultistreamHolder
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.calls.EthereumCallSelector
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumMultistream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
Expand Down Expand Up @@ -84,17 +82,10 @@ open class NativeCall(
var rpcReaderFactory: RpcReaderFactory = RpcReaderFactory.default()
private val ethereumCallSelectors = EnumMap<Chain, EthereumCallSelector>(Chain::class.java)

companion object {
val casting: Map<BlockchainType, Class<out EthereumLikeMultistream>> = mapOf(
BlockchainType.EVM_POS to EthereumPosMultiStream::class.java,
BlockchainType.EVM_POW to EthereumMultistream::class.java,
)
}

@EventListener
fun onUpstreamChangeEvent(event: UpstreamChangeEvent) {
casting[BlockchainType.from(event.chain)]?.let { cast ->
multistreamHolder.getUpstream(event.chain).let { up ->
multistreamHolder.getUpstream(event.chain).let { up ->
if (BlockchainType.from(up.chain) == EVM_POS) {
ethereumCallSelectors.putIfAbsent(
event.chain,
EthereumCallSelector(up.caches),
Expand Down
10 changes: 5 additions & 5 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.SilentException
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.MultistreamHolder
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.HasUpstream
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import io.grpc.Status
Expand Down Expand Up @@ -67,9 +67,9 @@ open class NativeSubscribe(
/**
* Try to proxy request subscription directly to the upstream dshackle instance.
* If not possible - performs subscription logic on the current instance
* @see EthereumLikeMultistream.tryProxy
* @see EthereumLikeMultistream.tryProxySubscribe
*/
val publisher = getUpstream(chain).tryProxy(matcher, request) ?: run {
val publisher = getUpstream(chain).tryProxySubscribe(matcher, request) ?: run {
val method = request.method
val params: Any? = request.payload?.takeIf { !it.isEmpty }?.let {
objectMapper.readValue(it.newInput(), Map::class.java)
Expand Down Expand Up @@ -103,8 +103,8 @@ open class NativeSubscribe(
log.error("Error during subscription to $method, chain $chain, params $params", it)
}

private fun getUpstream(chain: Chain): EthereumLikeMultistream =
multistreamHolder.getUpstream(chain).let { it as EthereumLikeMultistream }
private fun getUpstream(chain: Chain): Multistream =
multistreamHolder.getUpstream(chain)

fun convertToProto(holder: ResponseHolder): NativeSubscribeReplyItem {
if (holder.response is NativeSubscribeReplyItem) {
Expand Down
30 changes: 0 additions & 30 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackAddress.kt

This file was deleted.

Loading