diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt index 5fb130dfc..a62864e8a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt @@ -22,7 +22,6 @@ import io.emeraldpay.api.proto.Common import io.emeraldpay.api.proto.ReactorBlockchainGrpc import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.ChainValue -import io.emeraldpay.dshackle.SilentException import io.emeraldpay.dshackle.config.spans.ProviderSpanHandler import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Metrics @@ -36,7 +35,6 @@ import org.springframework.stereotype.Service import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler -import java.util.Locale import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit @@ -46,11 +44,8 @@ class BlockchainRpc( private val nativeCallStream: NativeCallStream, private val nativeSubscribe: NativeSubscribe, private val streamHead: StreamHead, - private val trackTx: List, - private val trackAddress: List, private val describe: Describe, private val subscribeStatus: SubscribeStatus, - private val estimateFee: EstimateFee, private val subscribeNodeStatus: SubscribeNodeStatus, @Qualifier("rpcScheduler") private val scheduler: Scheduler, @@ -129,97 +124,6 @@ class BlockchainRpc( ).doOnError { failMetric.increment() } } - override fun subscribeTxStatus(requestMono: Mono): Flux { - return requestMono.subscribeOn(scheduler).flatMapMany { request -> - val chain = Chain.byId(request.chainValue) - val metrics = chainMetrics.get(chain) - metrics.subscribeTxMetric.increment() - try { - trackTx.find { it.isSupported(chain) }?.let { track -> - track.subscribe(request) - .doOnNext { metrics.subscribeHeadRespMetric.increment() } - .doOnError { failMetric.increment() } - } ?: Flux.error(SilentException.UnsupportedBlockchain(chain)) - } catch (t: Throwable) { - log.error("Internal error during Tx Subscription", t) - failMetric.increment() - Flux.error(IllegalStateException("Internal Error")) - } - } - } - - override fun subscribeBalance(requestMono: Mono): Flux { - return requestMono.subscribeOn(scheduler).flatMapMany { request -> - val chain = Chain.byId(request.asset.chainValue) - val metrics = chainMetrics.get(chain) - metrics.subscribeBalanceMetric.increment() - val asset = request.asset.code.lowercase(Locale.getDefault()) - try { - trackAddress.find { it.isSupported(chain, asset) }?.let { track -> - track.subscribe(request) - .doOnNext { metrics.subscribeBalanceRespMetric.increment() } - .doOnError { failMetric.increment() } - } ?: Flux.error(SilentException.UnsupportedBlockchain(chain)) - .doOnSubscribe { - log.error("Balance for $chain:$asset is not supported") - } - } catch (t: Throwable) { - log.error("Internal error during Balance Subscription", t) - failMetric.increment() - Flux.error(IllegalStateException("Internal Error")) - } - } - } - - override fun getBalance(requestMono: Mono): Flux { - return requestMono.subscribeOn(scheduler).flatMapMany { request -> - val chain = Chain.byId(request.asset.chainValue) - val metrics = chainMetrics.get(chain) - metrics.getBalanceMetric.increment() - val asset = request.asset.code.lowercase(Locale.getDefault()) - val startTime = System.currentTimeMillis() - try { - trackAddress.find { it.isSupported(chain, asset) }?.let { track -> - track.getBalance(request) - .doOnNext { - metrics.getBalanceRespMetric.record( - System.currentTimeMillis() - startTime, - TimeUnit.MILLISECONDS, - ) - } - } ?: Flux.error(SilentException.UnsupportedBlockchain(chain)) - .doOnSubscribe { - log.error("Balance for $chain:$asset is not supported") - } - } catch (t: Throwable) { - log.error("Internal error during Balance Request", t) - failMetric.increment() - Flux.error(IllegalStateException("Internal Error")) - } - } - } - - override fun estimateFee(request: Mono): Mono { - return request - .subscribeOn(scheduler) - .flatMap { - val chain = Chain.byId(it.chainValue) - val metrics = chainMetrics.get(chain) - metrics.estimateFeeMetric.increment() - val startTime = System.currentTimeMillis() - estimateFee.estimateFee(it).doFinally { - metrics.estimateFeeRespMetric.record( - System.currentTimeMillis() - startTime, - TimeUnit.MILLISECONDS, - ) - } - } - .doOnError { t -> - log.error("Internal error during Fee Estimation", t) - failMetric.increment() - } - } - override fun describe(request: Mono): Mono { describeMetric.increment() return describe.describe(request) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/EstimateFee.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/EstimateFee.kt deleted file mode 100644 index 87fd567b9..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/EstimateFee.kt +++ /dev/null @@ -1,38 +0,0 @@ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.upstream.ChainFees -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.grpc.Status -import io.grpc.StatusException -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.stereotype.Service -import reactor.core.publisher.Mono - -@Service -class EstimateFee( - @Autowired private val multistreamHolder: MultistreamHolder, -) { - - companion object { - private val log = LoggerFactory.getLogger(EstimateFee::class.java) - } - - fun estimateFee(req: BlockchainOuterClass.EstimateFeeRequest): Mono { - val chain = Chain.byId(req.chainValue) - val up = multistreamHolder.getUpstream(chain) ?: return Mono.error( - StatusException( - Status.UNAVAILABLE.withDescription("BLOCKCHAIN UNAVAILABLE: ${req.chainValue}"), - ), - ) - val mode = ChainFees.extractMode(req) ?: return Mono.error( - StatusException( - Status.UNAVAILABLE.withDescription("UNSUPPORTED MODE: ${req.mode.number}"), - ), - ) - return up.getFeeEstimation() - .estimate(mode, req.blocks) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index 71dc04db6..536bdec9e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.google.protobuf.ByteString import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.BlockchainType +import io.emeraldpay.dshackle.BlockchainType.EVM_POS import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.Global.Companion.nullValue @@ -44,9 +45,6 @@ import io.emeraldpay.dshackle.upstream.MultistreamHolder import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods import io.emeraldpay.dshackle.upstream.calls.EthereumCallSelector -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream -import io.emeraldpay.dshackle.upstream.ethereum.EthereumMultistream -import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest @@ -84,17 +82,10 @@ open class NativeCall( var rpcReaderFactory: RpcReaderFactory = RpcReaderFactory.default() private val ethereumCallSelectors = EnumMap(Chain::class.java) - companion object { - val casting: Map> = mapOf( - BlockchainType.EVM_POS to EthereumPosMultiStream::class.java, - BlockchainType.EVM_POW to EthereumMultistream::class.java, - ) - } - @EventListener fun onUpstreamChangeEvent(event: UpstreamChangeEvent) { - casting[BlockchainType.from(event.chain)]?.let { cast -> - multistreamHolder.getUpstream(event.chain).let { up -> + multistreamHolder.getUpstream(event.chain).let { up -> + if (BlockchainType.from(up.chain) == EVM_POS) { ethereumCallSelectors.putIfAbsent( event.chain, EthereumCallSelector(up.caches), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt index f9b37973f..92ad5a6f5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt @@ -22,9 +22,9 @@ import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.SilentException +import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.MultistreamHolder import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.HasUpstream import io.emeraldpay.dshackle.upstream.signature.ResponseSigner import io.grpc.Status @@ -67,9 +67,9 @@ open class NativeSubscribe( /** * Try to proxy request subscription directly to the upstream dshackle instance. * If not possible - performs subscription logic on the current instance - * @see EthereumLikeMultistream.tryProxy + * @see EthereumLikeMultistream.tryProxySubscribe */ - val publisher = getUpstream(chain).tryProxy(matcher, request) ?: run { + val publisher = getUpstream(chain).tryProxySubscribe(matcher, request) ?: run { val method = request.method val params: Any? = request.payload?.takeIf { !it.isEmpty }?.let { objectMapper.readValue(it.newInput(), Map::class.java) @@ -103,8 +103,8 @@ open class NativeSubscribe( log.error("Error during subscription to $method, chain $chain, params $params", it) } - private fun getUpstream(chain: Chain): EthereumLikeMultistream = - multistreamHolder.getUpstream(chain).let { it as EthereumLikeMultistream } + private fun getUpstream(chain: Chain): Multistream = + multistreamHolder.getUpstream(chain) fun convertToProto(holder: ResponseHolder): NativeSubscribeReplyItem { if (holder.response is NativeSubscribeReplyItem) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackAddress.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackAddress.kt deleted file mode 100644 index 005aed428..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackAddress.kt +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.Chain -import reactor.core.publisher.Flux - -/** - * Base interface to tracking balance on a single blockchain - */ -interface TrackAddress { - - fun isSupported(chain: Chain, asset: String): Boolean - fun getBalance(request: BlockchainOuterClass.BalanceRequest): Flux - fun subscribe(request: BlockchainOuterClass.BalanceRequest): Flux -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinAddress.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinAddress.kt deleted file mode 100644 index 8cc6964e7..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinAddress.kt +++ /dev/null @@ -1,303 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.api.proto.ReactorBlockchainGrpc -import io.emeraldpay.dshackle.BlockchainType -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Defaults -import io.emeraldpay.dshackle.SilentException -import io.emeraldpay.dshackle.startup.UpstreamChangeEvent -import io.emeraldpay.dshackle.upstream.Capability -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream -import io.emeraldpay.dshackle.upstream.bitcoin.data.SimpleUnspent -import io.emeraldpay.dshackle.upstream.grpc.BitcoinGrpcUpstream -import org.apache.commons.lang3.StringUtils -import org.bitcoinj.params.MainNetParams -import org.bitcoinj.params.TestNet3Params -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.context.event.EventListener -import org.springframework.stereotype.Service -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.math.BigInteger -import java.util.concurrent.ConcurrentHashMap - -@Service -class TrackBitcoinAddress( - @Autowired private val multistreamHolder: MultistreamHolder, -) : TrackAddress { - - companion object { - private val log = LoggerFactory.getLogger(TrackBitcoinAddress::class.java) - } - - override fun isSupported(chain: Chain, asset: String): Boolean { - return (asset == "bitcoin" || asset == "btc" || asset == "satoshi") && - BlockchainType.from(chain) == BlockchainType.BITCOIN && multistreamHolder.isAvailable(chain) - } - - /** - * Keep tracking of the current state of local upstreams. True for a chain that has an upstream with balance data. - */ - private val localBalanceAvailable: MutableMap = ConcurrentHashMap() - - /** - * Criteria for a remote grpc upstream that can provide a balance - */ - private val balanceUpstreamMatcher = Selector.MultiMatcher( - listOf( - Selector.GrpcMatcher(), - Selector.CapabilityMatcher(Capability.BALANCE), - ), - ) - - @EventListener - fun onUpstreamChangeEvent(event: UpstreamChangeEvent) { - multistreamHolder.getUpstream(event.chain)?.let { mup -> - val available = mup.getAll().any { up -> - !up.isGrpc() && up.getCapabilities().contains(Capability.BALANCE) - } - setBalanceAvailability(event.chain, available) - } - } - - fun setBalanceAvailability(chain: Chain, enabled: Boolean) { - localBalanceAvailable[chain] = enabled - } - - /** - * @return true if the current instance has data sources to provide the balance - */ - fun isBalanceAvailable(chain: Chain): Boolean { - return localBalanceAvailable[chain] ?: false - } - - fun allAddresses(api: BitcoinMultistream, request: BlockchainOuterClass.BalanceRequest): Flux { - if (!request.hasAddress()) { - return Flux.empty() - } - return when { - request.address.hasAddressXpub() -> { - val xpubAddresses = api.getXpubAddresses() - ?: return Flux.error(IllegalStateException("Xpub verification is not available")) - - val addressXpub = request.address.addressXpub - if (StringUtils.isEmpty(addressXpub.xpub)) { - return Flux.error(IllegalArgumentException("xpub string is empty")) - } - val xpub = addressXpub.xpub - val start = Math.max(0, addressXpub.start).toInt() - val limit = Math.min(100, Math.max(1, addressXpub.limit)).toInt() - xpubAddresses.activeAddresses(xpub, start, limit) - .map { it.toString() } - .doOnError { t -> log.error("Failed to process xpub. ${t.javaClass}:${t.message}") } - } - request.address.hasAddressSingle() -> { - Flux.just(request.address.addressSingle.address) - } - request.address.hasAddressMulti() -> { - Flux.fromIterable( - request.address.addressMulti.addressesList - .map { addr -> addr.address } - // TODO why sorted? - .sorted(), - ) - } - else -> Flux.error(IllegalArgumentException("Unsupported address type")) - } - } - - fun requestBalances( - chain: Chain, - api: BitcoinMultistream, - addresses: Flux, - includeUtxo: Boolean, - ): Flux { - return addresses - .map { Address(chain, it) } - .flatMap { address -> - balanceForAddress(api, address, includeUtxo) - } - } - - fun balanceForAddress(api: BitcoinMultistream, address: Address, includeUtxo: Boolean): Mono { - return api.getReader() - .listUnspent(address.bitcoinAddress) - .map { unspent -> - totalUnspent(address, includeUtxo, unspent) - } - .switchIfEmpty( - Mono.just(0).map { - AddressBalance(address, BigInteger.ZERO) - }, - ) - .onErrorResume { t -> - log.error("Failed to get unspent", t) - Mono.empty() - } - } - - fun totalUnspent(address: Address, includeUtxo: Boolean, unspent: List): AddressBalance { - return if (unspent.isEmpty()) { - AddressBalance(address, BigInteger.ZERO) - } else { - unspent.map { - AddressBalance( - address, - BigInteger.valueOf(it.value), - if (includeUtxo) { - listOf(BalanceUtxo(it.txid, it.vout, it.value)) - } else { - emptyList() - }, - ) - }.reduce { a, b -> a.plus(b) } - } - } - - fun getBalanceGrpc(api: BitcoinMultistream): Mono { - val ups = api.getApiSource(balanceUpstreamMatcher) - ups.request(1) - return Mono.from(ups) - .map { up -> - up.cast(BitcoinGrpcUpstream::class.java).remote - } - .timeout(Defaults.timeoutInternal, Mono.empty()) - .switchIfEmpty( - Mono.fromCallable { - log.warn("No upstream providing balance for ${api.chain}") - } - .then(Mono.error(SilentException.DataUnavailable("BALANCE"))), - ) - } - - fun getRemoteBalance( - api: BitcoinMultistream, - request: BlockchainOuterClass.BalanceRequest, - ): Flux { - return getBalanceGrpc(api).flatMapMany { remote -> - remote.getBalance(request) - } - } - - fun subscribeRemoteBalance( - api: BitcoinMultistream, - request: BlockchainOuterClass.BalanceRequest, - ): Flux { - return getBalanceGrpc(api).flatMapMany { remote -> - remote.subscribeBalance(request) - } - } - - override fun getBalance(request: BlockchainOuterClass.BalanceRequest): Flux { - val chain = Chain.byId(request.asset.chainValue) - val upstream = multistreamHolder.getUpstream(chain)?.cast(BitcoinMultistream::class.java) - ?: return Flux.error(SilentException.UnsupportedBlockchain(request.asset.chainValue)) - return if (isBalanceAvailable(chain)) { - val addresses = allAddresses(upstream, request) - requestBalances(chain, upstream, addresses, request.includeUtxo) - .map(this@TrackBitcoinAddress::buildResponse) - .doOnError { t -> - log.error("Failed to get balance", t) - } - } else { - getRemoteBalance(upstream, request) - .doOnError { t -> - log.error("Failed to get balance from remote", t) - } - } - } - - override fun subscribe(request: BlockchainOuterClass.BalanceRequest): Flux { - val chain = Chain.byId(request.asset.chainValue) - val upstream = multistreamHolder.getUpstream(chain)?.cast(BitcoinMultistream::class.java) - ?: return Flux.error(SilentException.UnsupportedBlockchain(request.asset.chainValue)) - if (isBalanceAvailable(chain)) { - val addresses = allAddresses(upstream, request).cache() - val following = upstream.getHead().getFlux() - .flatMap { - requestBalances(chain, upstream, Flux.from(addresses), request.includeUtxo) - } - val last = HashMap() - val result = following - .filter { curr -> - val prev = last[curr.address.address] - // TODO utxo can change without changing balance - val changed = prev == null || curr.balance != prev - if (changed) { - last[curr.address.address] = curr.balance - } - changed - } - - return result.map(this@TrackBitcoinAddress::buildResponse) - } else { - return subscribeRemoteBalance(upstream, request) - } - } - - private fun buildResponse(address: AddressBalance): BlockchainOuterClass.AddressBalance { - return BlockchainOuterClass.AddressBalance.newBuilder() - .setBalance(address.balance.toString(10)) - .setAsset( - Common.Asset.newBuilder() - .setChainValue(address.address.chain.id) - .setCode("BTC"), - ) - .setAddress(Common.SingleAddress.newBuilder().setAddress(address.address.address)) - .addAllUtxo( - address.utxo.map { utxo -> - BlockchainOuterClass.Utxo.newBuilder() - .setBalance(utxo.value.toString()) - .setIndex(utxo.vout.toLong()) - .setTxId(utxo.txid) - .build() - }, - ) - .build() - } - - open class AddressBalance( - val address: Address, - var balance: BigInteger = BigInteger.ZERO, - var utxo: List = emptyList(), - ) { - constructor(chain: Chain, address: String, balance: BigInteger) : this(Address(chain, address), balance) - - fun plus(other: AddressBalance) = AddressBalance(address, balance + other.balance, utxo.plus(other.utxo)) - } - - open class BalanceUtxo(val txid: String, val vout: Int, val value: Long) - - // TODO use bitcoin class for address - class Address(val chain: Chain, val address: String) { - val network = if (chain == Chain.BITCOIN__MAINNET) { - MainNetParams() - } else { - TestNet3Params() - } - val bitcoinAddress = org.bitcoinj.core.Address.fromString( - network, - address, - ) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinTx.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinTx.kt deleted file mode 100644 index fc5a27885..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinTx.kt +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.BlockchainType -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.SilentException -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream -import io.emeraldpay.dshackle.upstream.bitcoin.ExtractBlock -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.stereotype.Service -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.math.BigInteger -import java.time.Duration -import java.time.Instant -import kotlin.math.max -import kotlin.math.min - -@Service -class TrackBitcoinTx( - @Autowired private val multistreamHolder: MultistreamHolder, -) : TrackTx { - - companion object { - private val log = LoggerFactory.getLogger(TrackBitcoinTx::class.java) - } - - override fun isSupported(chain: Chain): Boolean { - return BlockchainType.from(chain) == BlockchainType.BITCOIN && multistreamHolder.isAvailable(chain) - } - - override fun subscribe(request: BlockchainOuterClass.TxStatusRequest): Flux { - val chain = Chain.byId(request.chainValue) - val upstream = multistreamHolder.getUpstream(chain)?.cast(BitcoinMultistream::class.java) - ?: return Flux.error(SilentException.UnsupportedBlockchain(chain)) - val txid = request.txId - val confirmations = max(min(1, request.confirmationLimit), 12) - return subscribe(chain, upstream, txid) - .takeUntil { tx -> - tx.confirmations >= confirmations - }.map(this::asProto) - } - - fun subscribe(chain: Chain, upstream: BitcoinMultistream, txid: String): Flux { - return loadExisting(upstream, txid) - .flatMapMany { status -> - if (status.mined) { - // Head almost always knows the current height, so it can continue with calculating confirmations - // without publishing an empty TxStatus first - continueWithMined(upstream, status) - } else { - loadMempool(upstream, txid) - .flatMapMany { tx -> - val next = if (tx.found) { - untilMined(upstream, tx) - } else { - untilFound(chain, upstream, txid) - } - // fist provide the current status, then updates - Flux.concat(Mono.just(tx), next) - } - } - } - } - - fun continueWithMined(upstream: BitcoinMultistream, status: TxStatus): Flux { - return upstream.getReader().getBlock(status.blockHash!!) - .map { block -> - TxStatus( - status.txid, - true, - ExtractBlock.getHeight(block), - true, - status.blockHash, - ExtractBlock.getTime(block), - ExtractBlock.getDifficulty(block), - ) - }.flatMapMany { tx -> - withConfirmations(upstream, tx) - } - } - - fun untilFound(chain: Chain, upstream: BitcoinMultistream, txid: String): Flux { - return Flux.interval(Duration.ofSeconds(1)) - .take(Duration.ofMinutes(10)) - .flatMap { loadMempool(upstream, txid) } - .skipUntil { it.found } - .flatMap { subscribe(chain, upstream, txid) } - .doOnError { t -> - log.error("Failed to wait until found", t) - } - } - - fun untilMined(upstream: BitcoinMultistream, tx: TxStatus): Mono { - return upstream.getHead().getFlux().flatMap { - loadExisting(upstream, tx.txid) - .filter { it.mined } - }.single() - } - - fun withConfirmations(upstream: BitcoinMultistream, tx: TxStatus): Flux { - return upstream.getHead().getFlux().map { - tx.withHead(it.height) - } - } - - fun loadExisting(api: BitcoinMultistream, txid: String): Mono { - val mined = api.getReader().getTx(txid) - return mined.map { - val block = it["blockhash"] as String? - TxStatus(txid, found = true, mined = block != null, blockHash = block, height = ExtractBlock.getHeight(it)) - } - } - - fun loadMempool(upstream: BitcoinMultistream, txid: String): Mono { - val mempool = upstream.getReader().getMempool().get() - return mempool.map { - if (it.contains(txid)) { - TxStatus(txid, found = true, mined = false) - } else { - TxStatus(txid, found = false, mined = false) - } - } - } - - private fun asProto(tx: TxStatus): BlockchainOuterClass.TxStatus { - val data = BlockchainOuterClass.TxStatus.newBuilder() - .setTxId(tx.txid) - .setConfirmations(tx.confirmations.toInt()) - - data.broadcasted = tx.found - val isMined = tx.mined - data.mined = isMined - if (isMined) { - data.setBlock( - Common.BlockInfo.newBuilder() - .setBlockId(tx.blockHash!!.substring(2)) - .setTimestamp(tx.blockTime!!.toEpochMilli()) - .setHeight(tx.height!!), - ) - } - return data.build() - } - - class TxStatus( - val txid: String, - val found: Boolean = false, - val height: Long? = null, - val mined: Boolean = false, - val blockHash: String? = null, - val blockTime: Instant? = null, - val blockTotalDifficulty: BigInteger? = null, - val confirmations: Long = 0, - ) { - - fun withHead(headHeight: Long) = - TxStatus(txid, found, height, mined, blockHash, blockTime, blockTotalDifficulty, headHeight - height!! + 1) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackERC20Address.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackERC20Address.kt deleted file mode 100644 index 2d5b724b0..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackERC20Address.kt +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.BlockchainType -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.SilentException -import io.emeraldpay.dshackle.config.TokensConfig -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.ethereum.ERC20Balance -import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream -import io.emeraldpay.etherjar.domain.Address -import io.emeraldpay.etherjar.domain.EventId -import io.emeraldpay.etherjar.erc20.ERC20Token -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.stereotype.Service -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.math.BigInteger -import java.util.Locale -import javax.annotation.PostConstruct - -@Service -class TrackERC20Address( - @Autowired private val multistreamHolder: MultistreamHolder, - @Autowired private val tokensConfig: TokensConfig, -) : TrackAddress { - - companion object { - private val log = LoggerFactory.getLogger(TrackERC20Address::class.java) - } - - var erc20Balance: ERC20Balance = ERC20Balance() - - private val ethereumAddresses = EthereumAddresses() - private val tokens: MutableMap = HashMap() - - @PostConstruct - fun init() { - tokensConfig.tokens.forEach { token -> - val chain = token.blockchain!! - val asset = token.name!!.lowercase(Locale.getDefault()) - val id = TokenId(chain, asset) - val definition = TokenDefinition( - chain, - asset, - ERC20Token(Address.from(token.address)), - ) - tokens[id] = definition - log.info("Enable ERC20 balance for $chain:$asset") - } - } - - override fun isSupported(chain: Chain, asset: String): Boolean { - return tokens.containsKey(TokenId(chain, asset.lowercase(Locale.getDefault()))) && - (BlockchainType.from(chain) == BlockchainType.EVM_POS || BlockchainType.from(chain) == BlockchainType.EVM_POW) && multistreamHolder.isAvailable(chain) - } - - override fun getBalance(request: BlockchainOuterClass.BalanceRequest): Flux { - val chain = Chain.byId(request.asset.chainValue) - val asset = request.asset.code.lowercase(Locale.getDefault()) - val tokenDefinition = tokens[TokenId(chain, asset)] ?: return Flux.empty() - return ethereumAddresses.extract(request.address) - .map { TrackedAddress(chain, it, tokenDefinition.token, tokenDefinition.name) } - .flatMap { addr -> getBalance(addr).map(addr::withBalance) } - .map { buildResponse(it) } - } - - override fun subscribe(request: BlockchainOuterClass.BalanceRequest): Flux { - val chain = Chain.byId(request.asset.chainValue) - val asset = request.asset.code.lowercase(Locale.getDefault()) - val tokenDefinition = tokens[TokenId(chain, asset)] ?: return Flux.empty() - val logs = getUpstream(chain) - .getEgressSubscription().logs - .create( - listOf(tokenDefinition.token.contract), - listOf(EventId.fromSignature("Transfer", "address", "address", "uint256")), - ).connect(Selector.empty) - - return ethereumAddresses.extract(request.address) - .map { TrackedAddress(chain, it, tokenDefinition.token, tokenDefinition.name) } - .flatMap { addr -> - val current = getBalance(addr) - - val updates = logs - .filter { - it.topics.size >= 3 && (Address.extract(it.topics[1]) == addr.address || Address.extract(it.topics[2]) == addr.address) - } - .distinctUntilChanged { - // check it once per block - it.blockHash - } - .flatMap { - // make sure we use actual balance, don't trust event blindly - getBalance(addr) - } - Flux.concat(current, updates) - .distinctUntilChanged() - .map { addr.withBalance(it) } - } - .map { buildResponse(it) } - } - - fun getBalance(addr: TrackedAddress): Mono { - val upstream = getUpstream(addr.chain) - return erc20Balance.getBalance(upstream, addr.token, addr.address) - } - - fun getUpstream(chain: Chain): EthereumPosMultiStream { - return multistreamHolder.getUpstream(chain)?.cast(EthereumPosMultiStream::class.java) - ?: throw SilentException.UnsupportedBlockchain(chain) - } - - private fun buildResponse(address: TrackedAddress): BlockchainOuterClass.AddressBalance { - return BlockchainOuterClass.AddressBalance.newBuilder() - .setBalance(address.balance!!.toString(10)) - .setAsset( - Common.Asset.newBuilder() - .setChainValue(address.chain.id) - .setCode(address.tokenName.uppercase(Locale.getDefault())), - ) - .setAddress(Common.SingleAddress.newBuilder().setAddress(address.address.toHex())) - .build() - } - - class TrackedAddress( - val chain: Chain, - val address: Address, - val token: ERC20Token, - val tokenName: String, - val balance: BigInteger? = null, - ) { - fun withBalance(balance: BigInteger) = TrackedAddress(chain, address, token, tokenName, balance) - } - - data class TokenId(val chain: Chain, val name: String) - data class TokenDefinition(val chain: Chain, val name: String, val token: ERC20Token) -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackEthereumAddress.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackEthereumAddress.kt deleted file mode 100644 index 454e614e5..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackEthereumAddress.kt +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * Copyright (c) 2019 ETCDEV GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.BlockchainType -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Defaults -import io.emeraldpay.dshackle.SilentException -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream -import io.emeraldpay.etherjar.domain.Address -import io.emeraldpay.etherjar.domain.Wei -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.stereotype.Service -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.util.Locale - -@Service -class TrackEthereumAddress( - @Autowired private val multistreamHolder: MultistreamHolder, -) : TrackAddress { - - private val log = LoggerFactory.getLogger(TrackEthereumAddress::class.java) - private val ethereumAddresses = EthereumAddresses() - - override fun isSupported(chain: Chain, asset: String): Boolean { - return asset == "ether" && - (BlockchainType.from(chain) == BlockchainType.EVM_POS || BlockchainType.from(chain) == BlockchainType.EVM_POW) && multistreamHolder.isAvailable(chain) - } - - override fun getBalance(request: BlockchainOuterClass.BalanceRequest): Flux { - return initAddress(request) - .flatMap { a -> getBalance(a).map { a.withBalance(it) } } - .map { buildResponse(it) } - } - - override fun subscribe(request: BlockchainOuterClass.BalanceRequest): Flux { - val chain = Chain.byId(request.asset.chainValue) - val head = multistreamHolder.getUpstream(chain)?.getHead()?.getFlux() ?: Flux.empty() - val balances = initAddress(request) - .flatMap { tracked -> - val current = getBalance(tracked) - .map { - tracked.withBalance(it) - } - val updates = head - .flatMap { - getBalance(tracked) - }.map { - tracked.withBalance(it) - } - - Flux.concat(current, updates) - .distinctUntilChanged { - it.balance ?: Wei.ZERO - } - } - .doOnError { t -> - if (t is SilentException) { - if (t is SilentException.UnsupportedBlockchain) { - log.warn("Unsupported blockchain: ${t.blockchainId}") - } - log.debug("Failed to process subscription", t) - } else { - log.warn("Failed to process subscription", t) - } - } - - return balances.map { - buildResponse(it) - } - } - - fun getUpstream(chain: Chain): EthereumPosMultiStream { - return multistreamHolder.getUpstream(chain)?.cast(EthereumPosMultiStream::class.java) - ?: throw SilentException.UnsupportedBlockchain(chain) - } - - private fun initAddress(request: BlockchainOuterClass.BalanceRequest): Flux { - val chain = Chain.byId(request.asset.chainValue) - if (!multistreamHolder.isAvailable(chain)) { - return Flux.error(SilentException.UnsupportedBlockchain(request.asset.chainValue)) - } - if (request.asset.code.lowercase(Locale.getDefault()) != "ether") { - return Flux.error(SilentException("Unsupported asset ${request.asset.code}")) - } - return ethereumAddresses.extract(request.address).map { - TrackedAddress(chain, it) - } - } - - private fun createAddress(address: Common.SingleAddress, chain: Chain): TrackedAddress { - val addressParsed = Address.from(address.address) - return TrackedAddress( - chain, - addressParsed, - ) - } - - fun getBalance(addr: TrackedAddress): Mono { - return getUpstream(addr.chain) - .getReader() - .balance() - .read(addr.address) - .timeout(Defaults.timeout) - .map { it.data } - } - - private fun buildResponse(address: TrackedAddress): BlockchainOuterClass.AddressBalance { - return BlockchainOuterClass.AddressBalance.newBuilder() - .setBalance(address.balance!!.amount!!.toString(10)) - .setAsset( - Common.Asset.newBuilder() - .setChainValue(address.chain.id) - .setCode("ETHER"), - ) - .setAddress(Common.SingleAddress.newBuilder().setAddress(address.address.toHex())) - .build() - } - - class TrackedAddress( - val chain: Chain, - val address: Address, - val balance: Wei? = null, - ) { - fun withBalance(balance: Wei) = TrackedAddress(chain, address, balance) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackEthereumTx.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackEthereumTx.kt deleted file mode 100644 index 90580ab26..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackEthereumTx.kt +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * Copyright (c) 2019 ETCDEV GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.BlockchainType -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.SilentException -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.data.TxId -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot -import io.emeraldpay.etherjar.domain.BlockHash -import io.emeraldpay.etherjar.domain.TransactionId -import io.emeraldpay.etherjar.rpc.RpcException -import io.emeraldpay.etherjar.rpc.json.TransactionRefJson -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Qualifier -import org.springframework.stereotype.Service -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.scheduler.Scheduler -import reactor.util.retry.Retry -import java.math.BigInteger -import java.time.Duration -import java.time.Instant -import kotlin.math.max -import kotlin.math.min - -@Service -class TrackEthereumTx( - private val multistreamHolder: MultistreamHolder, - @Qualifier("trackTxScheduler") - private val scheduler: Scheduler, -) : TrackTx { - - companion object { - private val ZERO_BLOCK = BlockHash.from("0x0000000000000000000000000000000000000000000000000000000000000000") - private val TRACK_TTL = Duration.ofHours(1) - private val NOT_FOUND_TRACK_TTL = Duration.ofMinutes(1) - private val NOT_MINED_TRACK_TTL = NOT_FOUND_TRACK_TTL.multipliedBy(2) - } - - private val log = LoggerFactory.getLogger(TrackEthereumTx::class.java) - - override fun isSupported(chain: Chain): Boolean { - return (BlockchainType.from(chain) == BlockchainType.EVM_POS || BlockchainType.from(chain) == BlockchainType.EVM_POW) && multistreamHolder.isAvailable(chain) - } - - override fun subscribe(request: BlockchainOuterClass.TxStatusRequest): Flux { - val base = prepareTracking(request) - val up = getUpstream(base.chain) - return update(base) - .defaultIfEmpty(base) - .flatMapMany { - Flux.concat(Mono.just(it), subscribe(it, up)) - .distinctUntilChanged(TxDetails::status) - .map(this@TrackEthereumTx::asProto) - .subscribeOn(scheduler) - } - .doOnError { t -> - log.error("Subscription error", t) - } - } - - fun getUpstream(chain: Chain): EthereumPosMultiStream { - return multistreamHolder.getUpstream(chain)?.cast(EthereumPosMultiStream::class.java) - ?: throw SilentException.UnsupportedBlockchain(chain) - } - - fun subscribe(base: TxDetails, up: EthereumPosMultiStream): Flux { - var latestTx = base - - val untilFound = Mono.just(latestTx) - .subscribeOn(scheduler) - .map { - // replace with the latest value, it may be already found - latestTx - } - .flatMap { latest -> - if (!latest.status.found) { - update(latest).defaultIfEmpty(latestTx) - } else { - Mono.just(latest) - } - } - .flatMap { received -> - if (!received.status.found) { - Mono.error(SilentException("Retry not found")) - } else { - Mono.just(received) - } - } - .retryWhen( - Retry.fixedDelay(10, Duration.ofSeconds(2)), - ) - .onErrorResume { Mono.empty() } - - val inBlocks = up.getHead().getFlux() - .subscribeOn(scheduler) - .flatMap { block -> - onNewBlock(latestTx, block) - } - - return Flux.merge(untilFound, inBlocks) - .takeUntil(TxDetails::shouldClose) - .doOnNext { newTx -> - latestTx = newTx - } - } - - fun onNewBlock(tx: TxDetails, block: BlockContainer): Mono { - val txid = TxId.from(tx.txid) - if (!tx.status.mined) { - val justMined = block.transactions.contains(txid) - return if (justMined) { - Mono.just( - tx.withStatus( - mined = true, - found = true, - confirmations = 1, - height = block.height, - blockTime = block.timestamp, - blockTotalDifficulty = block.difficulty, - blockHash = BlockHash(block.hash.value), - ), - ) - } else { - update(tx) - } - } else { - // verify if it's still on chain - // TODO head is supposed to erase block when it was replaced, so can safely recalc here - return update(tx) - } - } - - private fun update(tx: TxDetails): Mono { - val initialStatus = tx.status - val upstream = getUpstream(tx.chain) - return upstream.getReader() - .txByHash().read(tx.txid) - .onErrorResume(RpcException::class.java) { t -> - log.warn("Upstream error, ignoring. {}", t.rpcMessage) - Mono.empty() - } - .flatMap { updateFromBlock(upstream, tx, it) } - .doOnError { t -> - log.error("Failed to load tx block", t) - } - .switchIfEmpty(Mono.just(tx.withStatus(found = false))) - .filter { current -> - initialStatus != current.status || current.shouldClose() - } - } - - fun prepareTracking(request: BlockchainOuterClass.TxStatusRequest): TxDetails { - val chain = Chain.byId(request.chainValue) - if (!isSupported(chain)) { - throw SilentException.UnsupportedBlockchain(request.chainValue) - } - val details = TxDetails( - chain, - Instant.now(), - TransactionId.from(request.txId), - min(max(1, request.confirmationLimit), 100), - ) - return details - } - - fun setBlockDetails(tx: TxDetails, block: BlockJson): TxDetails { - return if (block.number != null && block.totalDifficulty != null) { - tx.withStatus( - blockTotalDifficulty = block.totalDifficulty, - blockTime = block.timestamp, - ) - } else { - tx.withStatus( - mined = false, - ) - } - } - - private fun loadWeight(tx: TxDetails): Mono { - val upstream = getUpstream(tx.chain) - if (tx.status.blockHash == null) { - return Mono.empty() - } - return upstream.getReader() - .blocksByHashParsed().read(tx.status.blockHash) - .map { block -> - setBlockDetails(tx, block) - }.doOnError { t -> - log.warn("Failed to update weight", t) - } - } - - fun updateFromBlock(upstream: EthereumPosMultiStream, tx: TxDetails, blockTx: TransactionJsonSnapshot): Mono { - return if (blockTx.blockNumber != null && blockTx.blockHash != null && blockTx.blockHash != ZERO_BLOCK) { - val updated = tx.withStatus( - blockHash = blockTx.blockHash, - height = blockTx.blockNumber, - found = true, - mined = true, - confirmations = 1, - ) - upstream.getHead().getFlux().next().map { head -> - val height = updated.status.height - if (height == null || head.height < height) { - updated - } else { - updated.withStatus( - confirmations = head.height - height + 1, - ) - } - }.doOnError { t -> - log.error("Unable to load head details", t) - }.flatMap(this::loadWeight) - } else { - Mono.just( - tx.withStatus( - found = true, - mined = false, - ), - ) - } - } - - private fun asProto(tx: TxDetails): BlockchainOuterClass.TxStatus { - val data = BlockchainOuterClass.TxStatus.newBuilder() - .setTxId(tx.txid.toHex()) - .setConfirmations(tx.status.confirmations.toInt()) - - data.broadcasted = tx.status.found - val isMined = tx.status.mined - data.mined = isMined - if (isMined) { - data.setBlock( - Common.BlockInfo.newBuilder() - .setBlockId(tx.status.blockHash!!.toHex().substring(2)) - .setTimestamp(tx.status.blockTime!!.toEpochMilli()) - .setHeight(tx.status.height!!), - ) - } - return data.build() - } - - class TxDetails( - val chain: Chain, - val since: Instant, - val txid: TransactionId, - val maxConfirmations: Int, - val status: TxStatus, - ) { - - constructor( - chain: Chain, - since: Instant, - txid: TransactionId, - maxConfirmations: Int, - ) : this(chain, since, txid, maxConfirmations, TxStatus()) - - fun copy( - since: Instant = this.since, - status: TxStatus = this.status, - ) = TxDetails(chain, since, txid, maxConfirmations, status) - - fun withStatus( - found: Boolean = this.status.found, - height: Long? = this.status.height, - mined: Boolean = this.status.mined, - blockHash: BlockHash? = this.status.blockHash, - blockTime: Instant? = this.status.blockTime, - blockTotalDifficulty: BigInteger? = this.status.blockTotalDifficulty, - confirmations: Long = this.status.confirmations, - ): TxDetails { - return copy( - status = this.status.copy( - found, - height, - mined, - blockHash, - blockTime, - blockTotalDifficulty, - confirmations, - ), - ) - } - - fun shouldClose(): Boolean { - return maxConfirmations <= this.status.confirmations || - since.isBefore(Instant.now().minus(TRACK_TTL)) || - (!status.found && since.isBefore(Instant.now().minus(NOT_FOUND_TRACK_TTL))) || - (!status.mined && since.isBefore(Instant.now().minus(NOT_MINED_TRACK_TTL))) - } - - override fun toString(): String { - return "TxDetails(chain=$chain, txid=$txid, status=$status)" - } - - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (other !is TxDetails) return false - - if (chain != other.chain) return false - if (since != other.since) return false - if (txid != other.txid) return false - if (maxConfirmations != other.maxConfirmations) return false - if (status != other.status) return false - - return true - } - - override fun hashCode(): Int { - var result = chain.hashCode() - result = 31 * result + since.hashCode() - result = 31 * result + txid.hashCode() - result = 31 * result + status.hashCode() - return result - } - } - - class TxStatus( - val found: Boolean = false, - val height: Long? = null, - val mined: Boolean = false, - val blockHash: BlockHash? = null, - val blockTime: Instant? = null, - val blockTotalDifficulty: BigInteger? = null, - val confirmations: Long = 0, - ) { - - fun copy( - found: Boolean = this.found, - height: Long? = this.height, - mined: Boolean = this.mined, - blockHash: BlockHash? = this.blockHash, - blockTime: Instant? = this.blockTime, - blockTotalDifficulty: BigInteger? = this.blockTotalDifficulty, - confirmation: Long = this.confirmations, - ) = TxStatus(found, height, mined, blockHash, blockTime, blockTotalDifficulty, confirmation) - - fun clean() = TxStatus(false, null, false, null, null, null, 0) - - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as TxStatus - - if (found != other.found) return false - if (height != other.height) return false - if (mined != other.mined) return false - if (blockHash != other.blockHash) return false - if (blockTime != other.blockTime) return false - if (blockTotalDifficulty != other.blockTotalDifficulty) return false - if (confirmations != other.confirmations) return false - - return true - } - - override fun hashCode(): Int { - var result = found.hashCode() - result = 31 * result + (height?.hashCode() ?: 0) - result = 31 * result + (blockHash?.hashCode() ?: 0) - return result - } - - override fun toString(): String { - return "TxStatus(found=$found, height=$height, mined=$mined, blockHash=$blockHash, blockTime=$blockTime, blockTotalDifficulty=$blockTotalDifficulty, confirmations=$confirmations)" - } - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackTx.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackTx.kt deleted file mode 100644 index 3b8511520..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackTx.kt +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.Chain -import reactor.core.publisher.Flux - -interface TrackTx { - fun isSupported(chain: Chain): Boolean - fun subscribe(request: BlockchainOuterClass.TxStatusRequest): Flux -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CachingReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CachingReader.kt new file mode 100644 index 000000000..ce67e5ee7 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CachingReader.kt @@ -0,0 +1,3 @@ +package io.emeraldpay.dshackle.upstream + +interface CachingReader diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 629ea3920..5552619fc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -16,6 +16,7 @@ */ package io.emeraldpay.dshackle.upstream +import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.cache.CachesEnabled @@ -198,8 +199,6 @@ abstract class Multistream( return FilteredApis(chain, upstreams, matcher, i) } - abstract fun getFeeEstimation(): ChainFees - /** * Finds an API that leverages caches and other optimizations/transformations of the request. */ @@ -475,6 +474,14 @@ abstract class Multistream( abstract fun makeLagObserver(): HeadLagObserver + open fun tryProxySubscribe(matcher: Selector.Matcher, request: BlockchainOuterClass.NativeSubscribeRequest): Flux? = null + + abstract fun getCachingReader(): CachingReader? + + abstract fun getHead(mather: Selector.Matcher): Head + + abstract fun getEnrichedHead(mather: Selector.Matcher): Head + // -------------------------------------------------------------------------------------------------------- class UpstreamStatus(val upstream: Upstream, val status: UpstreamAvailability) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinFees.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinFees.kt deleted file mode 100644 index c33a66ba5..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinFees.kt +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.bitcoin - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.upstream.AbstractChainFees -import io.emeraldpay.dshackle.upstream.ChainFees -import org.slf4j.LoggerFactory -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.math.BigDecimal -import java.util.function.Function - -class BitcoinFees( - upstreams: BitcoinMultistream, - private val reader: BitcoinReader, - heightLimit: Int, -) : AbstractChainFees, String, Map>(heightLimit, upstreams, extractTx), ChainFees { - - companion object { - private val log = LoggerFactory.getLogger(BitcoinFees::class.java) - - private val extractTx = { block: Map -> - block["tx"] - ?.let { it as List } - // drop first tx which is a miner reward - ?.let { if (it.isEmpty()) it else it.drop(1) } - ?: emptyList() - } - } - - fun calculateFee(tx: Map): Mono { - val outAmount = extractVOuts(tx).reduce { acc, l -> (acc ?: 0L) + (l ?: 0L) } ?: 0 - val vinsData = tx.get("vin")?.let { it as List> } ?: return Mono.empty() - return Flux.fromIterable(vinsData) - .flatMap { - val txid = it["txid"] as String? - val vout = (it["vout"] as Number?)?.toInt() - if (txid == null || vout == null) { - Mono.empty() - } else { - getTxOutAmount(txid, vout) - } - } - .reduce { t, u -> t + u } - .map { inAmount -> - (inAmount - outAmount).coerceAtLeast(0) - } - } - - fun extractSize(tx: Map): Int { - val size: Number = if (tx.containsKey("vsize")) { - tx["vsize"] as Number - } else if (tx.containsKey("size")) { - tx["size"] as Number - } else { - 0 - } - return size.toInt() - } - - fun getTxOutAmount(txid: String, vout: Int): Mono { - return reader.getTx(txid) - .switchIfEmpty( - Mono.fromCallable { log.warn("No tx $txid") } - .then(Mono.empty()), - ) - .flatMap { - extractVOuts(it).let { - if (vout < it.size) { - Mono.justOrEmpty(it[vout]) - } else { - Mono.empty() - } - } - } - } - - fun extractVOuts(tx: Map): List { - val voutsData = tx.get("vout")?.let { it as List> } ?: return emptyList() - return voutsData.map { - val amount = it["value"] ?: return@map null - BigDecimal(amount.toString()) - .multiply(BitcoinConst.COIN_DEC) - .longValueExact() - } - } - - override fun readFeesAt(height: Long, selector: TxAt, String>): Mono { - return reader.getBlock(height) - .flatMap { block -> - Mono.justOrEmpty(selector.get(block)) - .flatMap { txid -> reader.getTx(txid!!) } - .flatMap { tx -> - calculateFee(tx) - .map { fee -> - TxFee(1, fee * 1024 / extractSize(tx)) - } - } - } - } - - override fun feeAggregation(mode: ChainFees.Mode): Function, Mono> { - if (mode == ChainFees.Mode.MIN_ALWAYS) { - return Function { src -> - src.reduce { a, b -> - if (a.fee > b.fee) a else b - } - } - } - return Function { src -> - src.reduce { a, b -> - TxFee(a.count + b.count, a.fee + b.fee) - } - } - } - - override fun getResponseBuilder(): Function { - return Function { - val fee = (it.fee / it.count).coerceAtLeast(1) - BlockchainOuterClass.EstimateFeeResponse.newBuilder() - .setBitcoinStd( - BlockchainOuterClass.BitcoinStdFees.newBuilder() - .setSatPerKb(fee), - ) - .build() - } - } - - // ------- - - data class TxFee(val count: Int, val fee: Long) -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt index 99436623d..4e5ef5791 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt @@ -19,7 +19,7 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.upstream.ChainFees +import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.DistanceExtractor import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription @@ -30,6 +30,7 @@ import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.MergedHead import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector +import io.emeraldpay.dshackle.upstream.Selector.Matcher import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice @@ -49,7 +50,6 @@ open class BitcoinMultistream( private var reader = BitcoinReader(this, head, esplora) private var addressActiveCheck: AddressActiveCheck? = null private var xpubAddresses: XpubAddresses? = null - private val feeEstimation = BitcoinFees(this, reader, 6) private var callRouter: LocalCallRouter = LocalCallRouter(DefaultBitcoinMethods(), reader) override fun init() { @@ -64,10 +64,6 @@ open class BitcoinMultistream( return sourceUpstreams } - override fun getFeeEstimation(): ChainFees { - return feeEstimation - } - open fun getXpubAddresses(): XpubAddresses? { return xpubAddresses } @@ -133,6 +129,10 @@ open class BitcoinMultistream( return head } + override fun getEnrichedHead(mather: Matcher): Head { + TODO("Not yet implemented") + } + override fun getLabels(): Collection { return sourceUpstreams.flatMap { it.getLabels() } } @@ -157,6 +157,14 @@ open class BitcoinMultistream( return HeadLagObserver(head, sourceUpstreams, DistanceExtractor::extractPowDistance, headScheduler, 3) } + override fun getCachingReader(): CachingReader? { + return null + } + + override fun getHead(mather: Matcher): Head { + return getHead() + } + override fun start() { super.start() reader.start() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt index 66f8d0732..4af3d69ee 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt @@ -33,6 +33,7 @@ import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.reader.RekeyingReader import io.emeraldpay.dshackle.reader.SpannedReader import io.emeraldpay.dshackle.reader.TransformingReader +import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.calls.CallMethods @@ -57,7 +58,7 @@ open class EthereumCachingReader( private val caches: Caches, callMethodsFactory: Factory, private val tracer: Tracer, -) : Lifecycle { +) : Lifecycle, CachingReader { private val objectMapper: ObjectMapper = Global.objectMapper private val balanceCache = CurrentBlockCache() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt index 1dddce471..3979a5501 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscription.kt @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectLogs import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectNewHeads @@ -13,7 +14,7 @@ import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler open class EthereumEgressSubscription( - val upstream: EthereumLikeMultistream, + val upstream: Multistream, val scheduler: Scheduler, val pendingTxesSource: PendingTxesSource?, ) : EgressSubscription { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeMultistream.kt deleted file mode 100644 index 4fc989db1..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeMultistream.kt +++ /dev/null @@ -1,25 +0,0 @@ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.upstream.HasEgressSubscription -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.Upstream -import reactor.core.publisher.Flux - -interface EthereumLikeMultistream : Upstream, HasEgressSubscription { - fun getReader(): EthereumCachingReader - - fun getHead(mather: Selector.Matcher): Head - - fun getEnrichedHead(mather: Selector.Matcher): Head - - /** - * Tries to proxy the native subscribe request to the managed upstreams if - * - any of them matches the matcher criteria - * - all of matching above are gRPC ones - * in this case the upstream dshackle instances can sign the results and they will just proxied as is with original signs - * Otherwise return null - */ - fun tryProxy(matcher: Selector.Matcher, request: BlockchainOuterClass.NativeSubscribeRequest): Flux? -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt index b93e586f7..79986fa5c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt @@ -23,7 +23,6 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.reader.Reader -import io.emeraldpay.dshackle.upstream.ChainFees import io.emeraldpay.dshackle.upstream.DistanceExtractor import io.emeraldpay.dshackle.upstream.DynamicMergedHead import io.emeraldpay.dshackle.upstream.EgressSubscription @@ -55,7 +54,7 @@ open class EthereumMultistream( caches: Caches, private val headScheduler: Scheduler, tracer: Tracer, -) : Multistream(chain, upstreams as MutableList, caches), EthereumLikeMultistream { +) : Multistream(chain, upstreams as MutableList, caches) { private var head: DynamicMergedHead = DynamicMergedHead( PriorityForkChoice(), @@ -69,29 +68,6 @@ open class EthereumMultistream( private val reader: EthereumCachingReader = EthereumCachingReader(this, this.caches, getMethodsFactory(), tracer) private var subscribe = EthereumEgressSubscription(this, headScheduler, NoPendingTxes()) - private val supportsEIP1559set = setOf( - Chain.ETHEREUM__MAINNET, - Chain.ETHEREUM__GOERLI, - Chain.ETHEREUM__SEPOLIA, - Chain.ARBITRUM__MAINNET, - Chain.OPTIMISM__MAINNET, - Chain.ARBITRUM__GOERLI, - Chain.OPTIMISM__GOERLI, - Chain.POLYGON_ZKEVM__MAINNET, - Chain.POLYGON_ZKEVM__TESTNET, - Chain.ZKSYNC__MAINNET, - Chain.ZKSYNC__TESTNET, - Chain.ARBITRUM_NOVA__MAINNET, - ) - - private val supportsEIP1559 = supportsEIP1559set.contains(chain) - - private val feeEstimation = if (supportsEIP1559) { - EthereumPriorityFees(this, reader, 256) - } else { - EthereumLegacyFees(this, reader, 256) - } - init { this.init() } @@ -155,7 +131,7 @@ open class EthereumMultistream( return super.isRunning() || reader.isRunning() } - override fun getReader(): EthereumCachingReader { + override fun getCachingReader(): EthereumCachingReader { return reader } @@ -163,7 +139,7 @@ open class EthereumMultistream( return head } - override fun tryProxy( + override fun tryProxySubscribe( matcher: Selector.Matcher, request: BlockchainOuterClass.NativeSubscribeRequest, ): Flux? = @@ -236,8 +212,4 @@ open class EthereumMultistream( ) } } - - override fun getFeeEstimation(): ChainFees { - return feeEstimation - } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectBlockUpdates.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectBlockUpdates.kt index 39f1fda2d..20b6ba147 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectBlockUpdates.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectBlockUpdates.kt @@ -19,9 +19,9 @@ import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.data.TxId import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.SubscriptionConnect -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler import java.time.Duration @@ -32,7 +32,7 @@ import kotlin.concurrent.read import kotlin.concurrent.write class ConnectBlockUpdates( - private val upstream: EthereumLikeMultistream, + private val upstream: Multistream, private val scheduler: Scheduler, ) : SubscriptionConnect { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectLogs.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectLogs.kt index 46869e69a..d8efc432a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectLogs.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectLogs.kt @@ -15,9 +15,9 @@ */ package io.emeraldpay.dshackle.upstream.ethereum.subscribe +import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.SubscriptionConnect -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.LogMessage import io.emeraldpay.etherjar.domain.Address import io.emeraldpay.etherjar.hex.Hex32 @@ -27,7 +27,7 @@ import reactor.core.scheduler.Scheduler import java.util.function.Function open class ConnectLogs( - upstream: EthereumLikeMultistream, + upstream: Multistream, private val connectBlockUpdates: ConnectBlockUpdates, ) { @@ -36,7 +36,7 @@ open class ConnectLogs( private val TOPIC_COMPARATOR = HexDataComparator() } - constructor(upstream: EthereumLikeMultistream, scheduler: Scheduler) : this(upstream, ConnectBlockUpdates(upstream, scheduler)) + constructor(upstream: Multistream, scheduler: Scheduler) : this(upstream, ConnectBlockUpdates(upstream, scheduler)) private val produceLogs = ProduceLogs(upstream) fun start(matcher: Selector.Matcher): Flux { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt index f1a5b3d0e..83f88559d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt @@ -15,9 +15,9 @@ */ package io.emeraldpay.dshackle.upstream.ethereum.subscribe +import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.SubscriptionConnect -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.NewHeadMessage import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap * Connects/reconnects to the upstream to produce NewHeads messages */ class ConnectNewHeads( - private val upstream: EthereumLikeMultistream, + private val upstream: Multistream, private val scheduler: Scheduler, ) : SubscriptionConnect { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt index 44b6477ec..5fa72d26d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt @@ -22,8 +22,8 @@ import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.data.TxId import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.ethereum.EthereumCachingReader import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Result -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.LogMessage import io.emeraldpay.etherjar.hex.HexData import io.emeraldpay.etherjar.rpc.json.TransactionReceiptJson @@ -42,8 +42,8 @@ class ProduceLogs( private val log = LoggerFactory.getLogger(ProduceLogs::class.java) } - constructor(upstream: EthereumLikeMultistream) : - this(upstream.getReader().receipts(), (upstream as Multistream).chain) + constructor(upstream: Multistream) : + this((upstream.getCachingReader() as EthereumCachingReader).receipts(), (upstream as Multistream).chain) private val objectMapper = Global.objectMapper diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt index f50aaf3ea..46b14e47b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt @@ -23,7 +23,6 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.reader.Reader -import io.emeraldpay.dshackle.upstream.ChainFees import io.emeraldpay.dshackle.upstream.DistanceExtractor import io.emeraldpay.dshackle.upstream.DynamicMergedHead import io.emeraldpay.dshackle.upstream.EmptyHead @@ -53,7 +52,7 @@ open class EthereumPosMultiStream( caches: Caches, private val headScheduler: Scheduler, tracer: Tracer, -) : Multistream(chain, upstreams as MutableList, caches), EthereumLikeMultistream { +) : Multistream(chain, upstreams as MutableList, caches) { private var head: DynamicMergedHead = DynamicMergedHead( PriorityForkChoice(), @@ -63,7 +62,6 @@ open class EthereumPosMultiStream( private val reader: EthereumCachingReader = EthereumCachingReader(this, this.caches, getMethodsFactory(), tracer) private var subscribe = EthereumEgressSubscription(this, headScheduler, NoPendingTxes()) - private val feeEstimation = EthereumPriorityFees(this, reader, 256) private val filteredHeads: MutableMap = ConcurrentReferenceHashMap(16, ConcurrentReferenceHashMap.ReferenceType.WEAK) @@ -112,7 +110,7 @@ open class EthereumPosMultiStream( start() } - override fun getReader(): EthereumCachingReader { + override fun getCachingReader(): EthereumCachingReader { return reader } @@ -120,7 +118,7 @@ open class EthereumPosMultiStream( return head } - override fun tryProxy( + override fun tryProxySubscribe( matcher: Selector.Matcher, request: BlockchainOuterClass.NativeSubscribeRequest, ): Flux? = @@ -204,10 +202,6 @@ open class EthereumPosMultiStream( } } - override fun getFeeEstimation(): ChainFees { - return feeEstimation - } - override fun onUpstreamsUpdated() { super.onUpstreamsUpdated() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt index e638e644b..3e351ce2c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt @@ -20,7 +20,7 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.upstream.ChainFees +import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.DistanceExtractor import io.emeraldpay.dshackle.upstream.DynamicMergedHead import io.emeraldpay.dshackle.upstream.EgressSubscription @@ -29,6 +29,7 @@ import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.HeadLagObserver import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.Selector.Matcher import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.forkchoice.PriorityForkChoice import reactor.core.publisher.Mono @@ -82,10 +83,22 @@ open class GenericMultistream( start() } + override fun getCachingReader(): CachingReader? { + return null + } + + override fun getHead(mather: Matcher): Head { + return getHead() + } + override fun getHead(): Head { return head } + override fun getEnrichedHead(mather: Matcher): Head { + return getHead() + } + override fun getLabels(): Collection { return upstreams.flatMap { it.getLabels() } } @@ -105,8 +118,4 @@ open class GenericMultistream( override fun getEgressSubscription(): EgressSubscription { return EmptyEgressSubscription() } - - override fun getFeeEstimation(): ChainFees { - throw NotImplementedError() - } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy index 6f3861d55..1ac4713c4 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy @@ -43,7 +43,7 @@ class NativeSubscribeSpec extends Specification { 1 * it.subscribe("newHeads", null, _ as Selector.AnyLabelMatcher) >> Flux.just("{}") } def up = Mock(EthereumPosMultiStream) { - 1 * it.tryProxy(_ as Selector.AnyLabelMatcher, call) >> null + 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> null 1 * it.getEgressSubscription() >> subscribe } @@ -82,7 +82,7 @@ class NativeSubscribeSpec extends Specification { }, _ as Selector.AnyLabelMatcher) >> Flux.just("{}") } def up = Mock(EthereumPosMultiStream) { - 1 * it.tryProxy(_ as Selector.AnyLabelMatcher, call) >> null + 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> null 1 * it.getEgressSubscription() >> subscribe } @@ -105,7 +105,7 @@ class NativeSubscribeSpec extends Specification { .setMethod("newHeads") .build() def up = Mock(EthereumPosMultiStream) { - 1 * it.tryProxy(_ as Selector.AnyLabelMatcher, call) >> Flux.just("{}") + 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> Flux.just("{}") 0 * it.getEgressSubscription() } diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackBitcoinAddressSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackBitcoinAddressSpec.groovy deleted file mode 100644 index 2ffa0007e..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackBitcoinAddressSpec.groovy +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import com.fasterxml.jackson.databind.ObjectMapper -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.test.MultistreamHolderMock -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinReader -import io.emeraldpay.dshackle.upstream.bitcoin.XpubAddresses -import io.emeraldpay.dshackle.upstream.bitcoin.data.SimpleUnspent -import org.bitcoinj.core.Address -import org.bitcoinj.params.MainNetParams -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.Sinks -import reactor.test.StepVerifier -import spock.lang.Specification - -import java.time.Duration -import java.time.Instant - -class TrackBitcoinAddressSpec extends Specification { - - String hash1 = "0xa0e65cbc1b52a8ca60562112c6060552d882f16f34a9dba2ccdc05c0a6a27c22" - ObjectMapper objectMapper = Global.objectMapper - - def "Correct sum for single"() { - setup: - def unspents = [ - new SimpleUnspent("f14b222e652c58d11435fa9172ddea000c6f5e20e6b715eb940fc28d1c4adeef", 0, 100L) - ] - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def address = new TrackBitcoinAddress.Address( - Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - ) - when: - def total = track.totalUnspent(address, false, unspents) - - then: - total.balance == 100 - total.utxo.isEmpty() - } - - def "Correct sum for few"() { - setup: - def unspents = [ - new SimpleUnspent("f14b222e652c58d11435fa9172ddea000c6f5e20e6b715eb940fc28d1c4adeef", 0, 100L), - new SimpleUnspent("17d1c4adf14b222e652c58d11435fa9ee2ddea000c6f5e20e6b715eb940fc28f", 0, 123L), - ] - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def address = new TrackBitcoinAddress.Address( - Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - ) - when: - def total = track.totalUnspent(address, false, unspents) - - then: - total.balance == 223 - total.utxo.isEmpty() - } - - def "Correct sum for none"() { - setup: - def unspents = [] - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def address = new TrackBitcoinAddress.Address( - Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - ) - when: - def total = track.totalUnspent(address, false, unspents) - - then: - total.balance == 0 - total.utxo.isEmpty() - } - - def "Correct sum for none read"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def address = new TrackBitcoinAddress.Address( - Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - ) - def api = Mock(BitcoinMultistream) { - 1 * getReader() >> Mock(BitcoinReader) { - 1 * listUnspent(_) >> Mono.empty() - } - } - when: - def total = track.balanceForAddress(api, address, false).block() - - then: - total.balance == 0 - total.utxo.isEmpty() - } - - def "Correct sum for few with utxo"() { - setup: - def unspents = [ - new SimpleUnspent("f14b222e652c58d11435fa9172ddea000c6f5e20e6b715eb940fc28d1c4adeef", 0, 100L), - new SimpleUnspent("17d1c4adf14b222e652c58d11435fa9ee2ddea000c6f5e20e6b715eb940fc28f", 0, 123L), - ] - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def address = new TrackBitcoinAddress.Address( - Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - ) - when: - def total = track.totalUnspent(address, true, unspents) - - then: - total.balance == 223 - total.utxo.size() == 2 - total.utxo[0].txid == "f14b222e652c58d11435fa9172ddea000c6f5e20e6b715eb940fc28d1c4adeef" - total.utxo[1].txid == "17d1c4adf14b222e652c58d11435fa9ee2ddea000c6f5e20e6b715eb940fc28f" - } - - def "One address for single provided"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def req = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAddress( - Common.AnyAddress.newBuilder() - .setAddressSingle( - Common.SingleAddress.newBuilder() - .setAddress("16rCmCmbuWDhPjWTrpQGaU3EPdZF7MTdUk") - ) - ) - .build() - when: - def act = track.allAddresses(Stub(BitcoinMultistream), req).collectList().block() - then: - act == ["16rCmCmbuWDhPjWTrpQGaU3EPdZF7MTdUk"] - } - - def "Sorted addresses for multiple provided"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def req = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAddress( - Common.AnyAddress.newBuilder() - .setAddressMulti( - Common.MultiAddress.newBuilder() - .addAddresses(Common.SingleAddress.newBuilder().setAddress("16rCmCmbuWDhPjWTrpQGaU3EPdZF7MTdUk")) - .addAddresses(Common.SingleAddress.newBuilder().setAddress("3BMqADKWoWHPASsUdHvnUL6E1jpZkMnLZz")) - .addAddresses(Common.SingleAddress.newBuilder().setAddress("1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK")) - .addAddresses(Common.SingleAddress.newBuilder().setAddress("bc1qdthqvt6cllzej7uhdddrltdfsmnt7d0gl5ue5n")) - ) - ) - .build() - when: - def act = track.allAddresses(Stub(BitcoinMultistream), req).collectList().block() - then: - act == ["16rCmCmbuWDhPjWTrpQGaU3EPdZF7MTdUk", "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK", "3BMqADKWoWHPASsUdHvnUL6E1jpZkMnLZz", "bc1qdthqvt6cllzej7uhdddrltdfsmnt7d0gl5ue5n"] - } - - def "Empty for no address provided"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def req = BlockchainOuterClass.BalanceRequest.newBuilder() - .build() - when: - def act = track.allAddresses(Stub(BitcoinMultistream), req).collectList().block() - then: - act == [] - } - - def "Use active for xpub"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def req = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAddress( - Common.AnyAddress.newBuilder() - .setAddressXpub( - // seed: chimney battle code relief era plug finish video patch dream pumpkin govern destroy fresh color - Common.XpubAddress.newBuilder() - .setXpub("zpub6tz4F49K5B4m7r7EyBKYM9K44eGECaQ2AfrCybq1w7ALFatz9856vrXxAPSrteDA4d5sjUPW3ACNq8wB2V3ugXVJxvAPAYPAYHsVm3VAncL") - .setLimit(25) - ) - ) - .build() - def xpubAddresses = Mock(XpubAddresses) { - 1 * activeAddresses( - "zpub6tz4F49K5B4m7r7EyBKYM9K44eGECaQ2AfrCybq1w7ALFatz9856vrXxAPSrteDA4d5sjUPW3ACNq8wB2V3ugXVJxvAPAYPAYHsVm3VAncL", - 0, - 25 - ) >> Flux.fromIterable([ - "bc1q25590fu8djhw9lvxxqz8ufjyfwup9h54u8fl6t", - "bc1q3k6e6vawd5l5syu9nlxn2xsch9afgunl8dnz94", - "bc1qu7hd6wycy686kakfps9c093szufjpwnh6rjs9s" - ]).map { Address.fromString(MainNetParams.get(), it) } - } - def multistream = Mock(BitcoinMultistream) { - 1 * getXpubAddresses() >> xpubAddresses - } - when: - def act = track.allAddresses(multistream, req).collectList().block() - then: - act == [ - "bc1q25590fu8djhw9lvxxqz8ufjyfwup9h54u8fl6t", - "bc1q3k6e6vawd5l5syu9nlxn2xsch9afgunl8dnz94", - "bc1qu7hd6wycy686kakfps9c093szufjpwnh6rjs9s" - ] - } - - def "Build proto for common balance"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def balance = new TrackBitcoinAddress.AddressBalance(Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK", BigInteger.valueOf(123456)) - when: - def act = track.buildResponse(balance) - then: - act.address.address == "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - act.balance == "123456" - act.asset.chain.number == Chain.BITCOIN__MAINNET.id - act.asset.code == "BTC" - } - - def "Build proto for zero balance"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def balance = new TrackBitcoinAddress.AddressBalance(Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK", BigInteger.ZERO) - when: - def act = track.buildResponse(balance) - then: - act.address.address == "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - act.balance == "0" - act.asset.chain.number == Chain.BITCOIN__MAINNET.id - act.asset.code == "BTC" - } - - def "Build proto for all bitcoins"() { - setup: - TrackBitcoinAddress track = new TrackBitcoinAddress(Stub(MultistreamHolder)) - def balance = new TrackBitcoinAddress.AddressBalance(Chain.BITCOIN__MAINNET, "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK", BigInteger.valueOf(21_000_000).multiply(BigInteger.TEN.pow(8))) - when: - def act = track.buildResponse(balance) - then: - act.address.address == "1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK" - act.balance == "2100000000000000" - act.asset.chain.number == Chain.BITCOIN__MAINNET.id - act.asset.code == "BTC" - } - - def "Get update for a balance"() { - setup: - - def blocks = Sinks.many().unicast().onBackpressureBuffer() - Head head = Mock(Head) { - 1 * getFlux() >> Flux.concat( - Flux.just( - new BlockContainer(0L, BlockId.from(hash1), BigInteger.ZERO, Instant.now(), false, null, null, BlockId.from(hash1), [], 0, "TrackBitcoinAddressSpec") - ), - blocks.asFlux() - ) - } - def upstream = null - upstream = Mock(BitcoinMultistream) { - _ * getReader() >> Mock(BitcoinReader) { - 2 * listUnspent(_) >>> [ - Mono.just([]), - Mono.just([ - new SimpleUnspent("f14b222e652c58d11435fa9172ddea000c6f5e20e6b715eb940fc28d1c4adeef", 0, 1230000L) - ]) - ] - } - _ * getHead() >> head - _ * cast(_) >> { - upstream - } - } - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.BITCOIN__MAINNET, upstream) - TrackBitcoinAddress track = new TrackBitcoinAddress(upstreams) - track.setBalanceAvailability(Chain.BITCOIN__MAINNET, true) - - when: - def resp = track.subscribe(BlockchainOuterClass.BalanceRequest.newBuilder() - .setAsset(Common.Asset.newBuilder().setChain(Common.ChainRef.CHAIN_BITCOIN__MAINNET)) - .setAddress( - Common.AnyAddress.newBuilder().setAddressSingle( - Common.SingleAddress.newBuilder().setAddress("1K7xkspJg7DDKNwzXgoRSDCUxiFsRegsSK") - ) - ) - .build() - ).map { - it.balance - } - - then: - StepVerifier.create(resp) - .expectNext("0") - .then { - blocks.tryEmitNext(new BlockContainer(1L, BlockId.from(hash1), BigInteger.ONE, Instant.now(), false, null, null, BlockId.from(hash1), [], 0, "TrackBitcoinAddressSpec")) - } - .expectNext("1230000") - .then { - blocks.tryEmitComplete() - } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackBitcoinTxSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackBitcoinTxSpec.groovy deleted file mode 100644 index 1c1e9baf5..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackBitcoinTxSpec.groovy +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinReader -import io.emeraldpay.dshackle.upstream.bitcoin.CachingMempoolData -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.test.StepVerifier -import spock.lang.Specification - -import java.time.Duration -import java.time.Instant - -class TrackBitcoinTxSpec extends Specification { - - def "loadMempool() returns not found when not found"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - - CachingMempoolData mempoolAccess = Mock(CachingMempoolData) { - 1 * get() >> Mono.just([ - "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9", - "d296c6d47335a7f283574b06f1d6303b30ac75631e081ab128346a549ad93350" - ]) - } - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - _ * getReader() >> Mock(BitcoinReader) { - _ * getMempool() >> mempoolAccess - } - } - when: - def act = track.loadMempool(upstream, "65ce58db064bd105b14dc76a0bce0df14653cf5263d22d17e78864cf272ee367") - - then: - StepVerifier.create(act) - .expectNextMatches { - it.found == false && it.mined == false && it.blockHash == null - } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "loadMempool() returns ok when found"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - CachingMempoolData mempoolAccess = Mock(CachingMempoolData) { - 1 * get() >> Mono.just([ - "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9", - "d296c6d47335a7f283574b06f1d6303b30ac75631e081ab128346a549ad93350" - ]) - } - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - _ * getReader() >> Mock(BitcoinReader) { - _ * getMempool() >> mempoolAccess - } - } - when: - def act = track.loadMempool(upstream, "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9") - - then: - StepVerifier.create(act) - .expectNextMatches { - it.found == true && it.mined == false && it.blockHash == null - } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "loadExiting() returns not found if not mined"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - def txid = "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9" - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - _ * getReader() >> Mock(BitcoinReader) { - 1 * getTx(txid) >> Mono.just([ - txid: txid - ]) - } - } - when: - def act = track.loadExisting(upstream, txid) - - then: - StepVerifier.create(act) - .expectNextMatches { - it.found == true && it.mined == false && it.blockHash == null - } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "loadExiting() returns block if mined"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - def txid = "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9" - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - _ * getReader() >> Mock(BitcoinReader) { - 1 * getTx(txid) >> Mono.just([ - txid : txid, - blockhash: "0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f", - height : 100 - ]) - } - } - when: - def act = track.loadExisting(upstream, txid) - - then: - StepVerifier.create(act) - .expectNextMatches { - it.found == true && it.mined == true && - it.blockHash == "0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f" && - it.height == 100 - } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "Goes with confirmations"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - def txid = "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9" - // start with the current block - def next = Flux.fromIterable([10, 12, 13, 14, 15]).map { h -> - def hash = BlockId.from("0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f") - new BlockContainer(h.longValue(), hash, BigInteger.ONE, Instant.now(), false, null, null, hash, [], 0, "unknown") - } - Head head = Mock(Head) { - 1 * getFlux() >> next - } - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - 1 * getHead() >> head - } - def status = new TrackBitcoinTx.TxStatus( - txid, true, 10, true, "0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f", Instant.now(), BigInteger.ONE, 0 - ) - when: - def act = track.withConfirmations(upstream, status) - - then: - StepVerifier.create(act) - .expectNextMatches { it.confirmations == 1 } - .expectNextMatches { it.confirmations == 3 } - .expectNextMatches { it.confirmations == 4 } - .expectNextMatches { it.confirmations == 5 } - .expectNextMatches { it.confirmations == 6 } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "Wait until mined"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - def txid = "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9" - // start with the current block - def next = Flux.fromIterable([10, 12, 13]).map { h -> - def hash = BlockId.from("0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f") - new BlockContainer(h.longValue(), hash, BigInteger.ONE, Instant.now(), false, null, null, hash, [], 0, "unknown") - } - Head head = Mock(Head) { - 1 * getFlux() >> next - } - BitcoinReader api = Mock(BitcoinReader) { - 3 * getTx(txid) >>> [ - Mono.just([ - txid: txid - ]), - Mono.just([ - txid: txid - ]), - Mono.just([ - txid : txid, - blockhash: "0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f", - height : 100 - ]) - ] - } - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - 1 * getHead() >> head - _ * getReader() >> api - } - def status = new TrackBitcoinTx.TxStatus( - txid, false, null, false, null, null, null, 0 - ) - when: - def act = track.untilMined(upstream, status) - - then: - StepVerifier.create(act) - .expectNextMatches { it.mined && it.height == 100 && it.blockHash == "0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f" } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "Check mempool until found"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - def txid = "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9" - - Head head = Mock(Head) { - _ * getFlux() >> Flux.empty() - } - CachingMempoolData mempoolAccess = Mock(CachingMempoolData) { - 4 * get() >>> [ - Mono.just([]), - Mono.just(["4523c7ac0c5c1e5628f025474529c69cd44d7c641db82e6982f5ffe64527efc9"]), - Mono.just(["4523c7ac0c5c1e5628f025474529c69cd44d7c641db82e6982f5ffe64527efc9", txid]), - Mono.just(["4523c7ac0c5c1e5628f025474529c69cd44d7c641db82e6982f5ffe64527efc9", txid]) //second call when started over - ] - } - BitcoinReader api = Mock(BitcoinReader) { - 1 * getTx(txid) >> Mono.just([ - txid: txid - ]) - _ * getMempool() >> mempoolAccess - } - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - _ * getHead() >> head - _ * getReader() >> api - } - - when: - def steps = StepVerifier.withVirtualTime { - track.untilFound(Chain.BITCOIN__MAINNET, upstream, txid).take(1) - } - - then: - steps - .expectSubscription() - .expectNoEvent(Duration.ofSeconds(3)) - .expectNextMatches { it.found && !it.mined } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "Subscribe to an existing tx"() { - setup: - TrackBitcoinTx track = new TrackBitcoinTx(Stub(MultistreamHolder)) - def txid = "69cd44d7c641db82e69824523c7ac0c5c1e5628f025474529cf5ffe64527efc9" - BitcoinReader api = Mock(BitcoinReader) { - _ * getTx(txid) >> Mono.just([ - txid : txid, - blockhash: "0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f", - height : 1 - ]) - _ * getBlock("0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f") >> Mono.just([ - height : 1, - chainwork: "01", - time : 10000 - ]) - } - def next = Flux.fromIterable([10, 11, 12]).map { h -> - def hash = BlockId.from("0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f") - new BlockContainer(h.longValue(), hash, BigInteger.ONE, Instant.now(), false, null, null, hash, [], 0, "unknown") - } - Head head = Mock(Head) { - _ * getFlux() >> next - } - BitcoinMultistream upstream = Mock(BitcoinMultistream) { - _ * getReader() >> api - _ * getHead() >> head - } - - when: - def act = track.subscribe(Chain.BITCOIN__MAINNET, upstream, txid) - - then: - StepVerifier.create(act) - .expectNextMatches { it.found && it.mined && it.confirmations == 10 && it.blockHash == "0000000000000000000895d1b9d3898700e1deecc3b0e69f439aa77875e6042f" } - .expectNextMatches { it.found && it.mined && it.confirmations == 11 } - .expectNextMatches { it.found && it.mined && it.confirmations == 12 } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackERC20AddressSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackERC20AddressSpec.groovy deleted file mode 100644 index 162fd20fe..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackERC20AddressSpec.groovy +++ /dev/null @@ -1,243 +0,0 @@ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.config.TokensConfig -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.SubscriptionConnect -import io.emeraldpay.dshackle.upstream.ethereum.ERC20Balance -import io.emeraldpay.dshackle.upstream.ethereum.EthereumEgressSubscription -import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectLogs -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.LogMessage -import io.emeraldpay.etherjar.domain.Address -import io.emeraldpay.etherjar.domain.BlockHash -import io.emeraldpay.etherjar.domain.TransactionId -import io.emeraldpay.etherjar.erc20.ERC20Token -import io.emeraldpay.etherjar.hex.Hex32 -import io.emeraldpay.etherjar.hex.HexData -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.test.StepVerifier -import spock.lang.Specification - -import java.time.Duration - -class TrackERC20AddressSpec extends Specification { - - def "Init with single token"() { - setup: - MultistreamHolder ups = Mock(MultistreamHolder) { - _ * isAvailable(Chain.ETHEREUM__MAINNET) >> true - } - TokensConfig tokens = new TokensConfig([ - new TokensConfig.Token().tap { - id = "dai" - blockchain = Chain.ETHEREUM__MAINNET - name = "DAI" - type = TokensConfig.Type.ERC20 - address = Address.from("0x6B175474E89094C44Da98b954EedeAC495271d0F") - } - ]) - TrackERC20Address track = new TrackERC20Address(ups, tokens) - when: - track.init() - def act = track.tokens - def supportDai = track.isSupported(Chain.ETHEREUM__MAINNET, "DAI") - def supportSai = track.isSupported(Chain.ETHEREUM__MAINNET, "SAI") - - then: - act.size() == 1 - with(act.keySet().first()) { - chain == Chain.ETHEREUM__MAINNET - name == "dai" - } - with(act.values().first()) { - chain == Chain.ETHEREUM__MAINNET - name == "dai" - token != null - } - supportDai - !supportSai - } - - def "Init without tokens"() { - setup: - MultistreamHolder ups = Mock(MultistreamHolder) { - _ * isAvailable(Chain.ETHEREUM__MAINNET) >> true - } - - TokensConfig tokens = new TokensConfig([]) - TrackERC20Address track = new TrackERC20Address(ups, tokens) - when: - track.init() - def act = track.tokens - def supportDai = track.isSupported(Chain.ETHEREUM__MAINNET, "dai") - def supportSai = track.isSupported(Chain.ETHEREUM__MAINNET, "sai") - - then: - act.size() == 0 - !supportDai - !supportSai - } - - def "Init with two tokens"() { - setup: - MultistreamHolder ups = Mock(MultistreamHolder) { - _ * isAvailable(Chain.ETHEREUM__MAINNET) >> true - } - - TokensConfig tokens = new TokensConfig([ - new TokensConfig.Token().tap { - id = "dai" - blockchain = Chain.ETHEREUM__MAINNET - name = "DAI" - type = TokensConfig.Type.ERC20 - address = Address.from("0x6B175474E89094C44Da98b954EedeAC495271d0F") - }, - new TokensConfig.Token().tap { - id = "sai" - blockchain = Chain.ETHEREUM__MAINNET - name = "SAI" - type = TokensConfig.Type.ERC20 - address = Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9") - } - ]) - TrackERC20Address track = new TrackERC20Address(ups, tokens) - when: - track.init() - def act = track.tokens - def supportDai = track.isSupported(Chain.ETHEREUM__MAINNET, "dai") - def supportSai = track.isSupported(Chain.ETHEREUM__MAINNET, "sai") - - then: - act.size() == 2 - with(act[act.keySet().find { it.name == "dai" }]) { - chain == Chain.ETHEREUM__MAINNET - name == "dai" - } - with(act[act.keySet().find { it.name == "sai" }]) { - chain == Chain.ETHEREUM__MAINNET - name == "sai" - } - supportDai - supportSai - } - - def "Builds response"() { - setup: - TrackERC20Address track = new TrackERC20Address(Stub(MultistreamHolder), new TokensConfig([])) - TrackERC20Address.TrackedAddress address = new TrackERC20Address.TrackedAddress( - Chain.ETHEREUM__MAINNET, - Address.from("0x16c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b"), - new ERC20Token(Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")), - "test", - BigInteger.valueOf(1234) - ) - when: - def act = track.buildResponse(address) - then: - act == BlockchainOuterClass.AddressBalance.newBuilder() - .setAddress(Common.SingleAddress.newBuilder().setAddress("0x16c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b")) - .setAsset(Common.Asset.newBuilder() - .setChain(Common.ChainRef.CHAIN_ETHEREUM__MAINNET) - .setCode("TEST") - ) - .setBalance("1234") - .build() - } - - def "Check balance when event happens"() { - setup: - def events = [ - new LogMessage( - Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9"), - BlockHash.from("0x0c0d2969c843d0b61fbab1b2302cf24d6681b2ae0a140a3c2908990d048f7631"), - 13668750, - HexData.from("0x0000000000000000000000000000000000000000000000000000000048f2fc7b"), - 1, - [ - Hex32.from("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), - Hex32.from("0x000000000000000000000000b02f1329d6a6acef07a763258f8509c2847a0a3e"), - Hex32.from("0x00000000000000000000000016c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b") - ], - TransactionId.from("0x5a7898e27120575c33d3d0179af3b6353c7268bbad4255df079ed26b743a21a5"), - 1, - false, - "unknown" - ) - ] - - def connect = Mock(SubscriptionConnect) { - 1 * connect(Selector.empty) >> { - Flux.fromIterable(events) - } - } - def logs = Mock(ConnectLogs) { - 1 * create( - [Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")], - [Hex32.from("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")], - ) >> { args -> - println("ConnectLogs.start $args") - connect - } - } - def sub = Mock(EthereumEgressSubscription) { - 1 * getLogs() >> logs - } - def up = Mock(EthereumPosMultiStream) { - 1 * getEgressSubscription() >> sub - _ * cast(EthereumPosMultiStream) >> { args -> - it - } - } - def mup = Mock(MultistreamHolder) { - _ * getUpstream(Chain.ETHEREUM__MAINNET) >> up - } - TokensConfig tokens = new TokensConfig([ - new TokensConfig.Token().tap { - id = "test" - blockchain = Chain.ETHEREUM__MAINNET - name = "TEST" - type = TokensConfig.Type.ERC20 - address = Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9") - } - ]) - TrackERC20Address track = new TrackERC20Address(mup, tokens) - track.init() - track.erc20Balance = Mock(ERC20Balance) { - 2 * it.getBalance(_, _, _) >>> [ - Mono.just(100000.toBigInteger()), - Mono.just(150000.toBigInteger()) - ] - } - def request = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAddress( - Common.AnyAddress.newBuilder() - .setAddressSingle(Common.SingleAddress.newBuilder().setAddress("0x16c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b")) - ) - .setAsset( - Common.Asset.newBuilder() - .setChain(Common.ChainRef.CHAIN_ETHEREUM__MAINNET) - .setCode("TEST") - ) - .build() - when: - def act = track.subscribe(request) - - then: - StepVerifier.create(act) - .expectNextMatches { - println("Received: $it") - it.getBalance() == "100000" - } - .expectNextMatches { - println("Received: $it") - it.getBalance() == "150000" - } - .expectComplete() - .verify(Duration.ofSeconds(1)) - } -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackEthereumAddressSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackEthereumAddressSpec.groovy deleted file mode 100644 index 91910e7ff..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackEthereumAddressSpec.groovy +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Copyright (c) 2019 ETCDEV GmbH - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.test.MultistreamHolderMock -import io.emeraldpay.dshackle.test.TestingCommons -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.etherjar.domain.BlockHash -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import reactor.test.StepVerifier -import spock.lang.Specification - -import java.time.Duration -import java.time.Instant -import java.time.temporal.ChronoUnit - -class TrackEthereumAddressSpec extends Specification { - - def chain = Common.ChainRef.CHAIN_ETHEREUM__MAINNET - def address1 = "0xe2c8fa8120d813cd0b5e6add120295bf20cfa09f" - def address1Proto = Common.SingleAddress.newBuilder() - .setAddress(address1) - def etherAsset = Common.Asset.newBuilder() - .setChain(chain) - .setCode("ETHER") - - - def "get balance"() { - setup: - def req = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAsset(etherAsset) - .setAddress(Common.AnyAddress.newBuilder().setAddressSingle(address1Proto).build()) - .build() - def exp = BlockchainOuterClass.AddressBalance.newBuilder() - .setAddress(address1Proto) - .setAsset(etherAsset) - .setBalance("1234567890") - .build() - - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, upstreamMock) - TrackEthereumAddress trackAddress = new TrackEthereumAddress(upstreams) - - apiMock.answer("eth_getBalance", ["0xe2c8fa8120d813cd0b5e6add120295bf20cfa09f", "latest"], "0x499602D2") - when: - def flux = trackAddress.getBalance(req) - then: - StepVerifier.create(flux) - .expectNext(exp) - .expectComplete() - .verify(Duration.ofSeconds(3)) - } - - def "recheck address after each block"() { - setup: - def req = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAsset(etherAsset) - .setAddress(Common.AnyAddress.newBuilder().setAddressSingle(address1Proto).build()) - .build() - def exp1 = BlockchainOuterClass.AddressBalance.newBuilder() - .setAddress(address1Proto) - .setAsset(etherAsset) - .setBalance("1234567890") - .build() - def exp2 = BlockchainOuterClass.AddressBalance.newBuilder() - .setAddress(address1Proto) - .setAsset(etherAsset) - .setBalance("65432") - .build() - - def hash = BlockHash.from("0xa0e65cbc1b52a8ca60562112c6060552d882f16f34a9dba2ccdc05c0a6a27c22") - def block2 = new BlockJson().with { - it.number = 1 - it.totalDifficulty = 100 - it.hash = hash - it.parentHash = hash - it.timestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS) - return it - } - - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, upstreamMock) - TrackEthereumAddress trackAddress = new TrackEthereumAddress(upstreams) - - apiMock.answerOnce("eth_getBalance", ["0xe2c8fa8120d813cd0b5e6add120295bf20cfa09f", "latest"], "0x499602D2") - apiMock.answerOnce("eth_getBalance", ["0xe2c8fa8120d813cd0b5e6add120295bf20cfa09f", "0x1"], "0xff98") - when: - def flux = trackAddress.subscribe(req) - then: - StepVerifier.create(flux) - .expectNext(exp1).as("First block") - .then { - upstreamMock.nextBlock(BlockContainer.from(block2)) - } - .expectNext(exp2).as("Second block") - .thenCancel() - .verify(Duration.ofSeconds(2)) - } -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackEthereumTxSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackEthereumTxSpec.groovy deleted file mode 100644 index f4b81ebb7..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/TrackEthereumTxSpec.groovy +++ /dev/null @@ -1,325 +0,0 @@ -/** - * Copyright (c) 2019 ETCDEV GmbH - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.rpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.data.TxId -import io.emeraldpay.dshackle.test.MultistreamHolderMock -import io.emeraldpay.dshackle.test.TestingCommons -import io.emeraldpay.dshackle.upstream.DynamicMergedHead -import io.emeraldpay.dshackle.upstream.MultistreamHolder -import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream -import io.emeraldpay.etherjar.domain.BlockHash -import io.emeraldpay.etherjar.domain.TransactionId -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.etherjar.rpc.json.TransactionJson -import io.emeraldpay.etherjar.rpc.json.TransactionRefJson -import reactor.core.publisher.Flux -import reactor.core.scheduler.Schedulers -import reactor.test.StepVerifier -import reactor.test.scheduler.VirtualTimeScheduler -import spock.lang.Ignore -import spock.lang.Specification - -import java.time.Duration -import java.time.Instant - -class TrackEthereumTxSpec extends Specification { - - def chain = Common.ChainRef.CHAIN_ETHEREUM__MAINNET - def txId = "0xba61ce4672751fd6086a9ac2b55547a5555af17535b6c0334ede2ecb6d64070a" - def parent = BlockHash.from("0xa0e65cbc1b52a8ca60562112c6060552d882f16f34a9dba2ccdc05c0a6a27c22") - - def "Gives details for an old transaction"() { - setup: - def req = BlockchainOuterClass.TxStatusRequest.newBuilder() - .setChain(chain) - .setConfirmationLimit(6) - .setTxId(txId) - .build() - - def blockJson = new BlockJson().with { - it.hash = BlockHash.from("0xa0e65cbc1b52a8ca60562112c6060552d882f16f34a9dba2ccdc05c0a6a27c22") - it.timestamp = Instant.ofEpochMilli(156400000000) - it.number = 100 - it.parentHash = parent - it.totalDifficulty = BigInteger.valueOf(500) - it - } - - def blockHeadJson = new BlockJson().with { - it.hash = BlockHash.from("0x552d882f16f34a9dba2ccdc05c0a6a27c22a0e65cbc1b52a8ca60562112c6060") - it.timestamp = Instant.ofEpochMilli(156400200000) - it.number = 108 - it.totalDifficulty = BigInteger.valueOf(800) - it.parentHash = parent - it.transactions = [] - it - } - - - def txJson = new TransactionJson().with { - it.hash = TransactionId.from("0xba61ce4672751fd6086a9ac2b55547a5555af17535b6c0334ede2ecb6d64070a") - it.blockHash = blockJson.hash - it.blockNumber = blockJson.number - it.nonce = 1 - it - } - - blockJson.transactions = [new TransactionRefJson(txJson.hash)] - - def exp1 = BlockchainOuterClass.TxStatus.newBuilder() - .setTxId(txId) - .setBroadcasted(true) - .setMined(true) - .setConfirmations(8 + 1) - .setBlock( - Common.BlockInfo.newBuilder() - .setHeight(blockJson.number) - .setBlockId(blockJson.hash.toHex().substring(2)) - .setTimestamp(blockJson.timestamp.toEpochMilli()) - ).build() - - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, upstreamMock) - TrackEthereumTx trackTx = new TrackEthereumTx(upstreams, Schedulers.boundedElastic()) - - apiMock.answer("eth_getTransactionByHash", [txId], txJson) - apiMock.answer("eth_getBlockByHash", [blockJson.hash.toHex(), false], blockJson) - upstreamMock.nextBlock(BlockContainer.from(blockHeadJson)) - - when: - def flux = trackTx.subscribe(req) - then: - StepVerifier.create(flux) - .expectNext(exp1) - .expectComplete() - .verify(Duration.ofSeconds(4)) - } - - def "Wait for unknown transaction"() { - setup: - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, upstreamMock) - ((EthereumPosMultiStream) upstreams.getUpstream(Chain.ETHEREUM__MAINNET)).head = Mock(DynamicMergedHead) { - _ * getFlux() >> Flux.empty() - } - def scheduler = VirtualTimeScheduler.create(true) - TrackEthereumTx trackTx = new TrackEthereumTx(upstreams, scheduler) - - apiMock.answer("eth_getTransactionByHash", [txId], null) - - when: - def tx = new TrackEthereumTx.TxDetails(Chain.ETHEREUM__MAINNET, Instant.now(), TransactionId.from(txId), 6) - def act = StepVerifier.withVirtualTime( - { trackTx.subscribe(tx, upstreams.getUpstream(Chain.ETHEREUM__MAINNET).cast(EthereumPosMultiStream)) }, - { scheduler }, - 5) - - then: - act - .expectSubscription() - .expectNoEvent(Duration.ofSeconds(20)).as("Waited for updates") - .thenAwait(Duration.ofSeconds(2)) - .expectComplete() - .verify(Duration.ofSeconds(3)) - } - - def "Known transaction when not mined"() { - setup: - def req = BlockchainOuterClass.TxStatusRequest.newBuilder() - .setChain(chain) - .setConfirmationLimit(6) - .setTxId(txId) - .build() - def exp1 = BlockchainOuterClass.TxStatus.newBuilder() - .setTxId(txId) - .setBroadcasted(false) - .setMined(false) - .build() - def exp2 = BlockchainOuterClass.TxStatus.newBuilder() - .setTxId(txId) - .setBroadcasted(true) - .setMined(false) - .build() - def txJson = new TransactionJson().with { - it.hash = TransactionId.from(txId) - it.nonce = 1 - it - } - - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, upstreamMock) - TrackEthereumTx trackTx = new TrackEthereumTx(upstreams, Schedulers.boundedElastic()) - - apiMock.answer("eth_getTransactionByHash", [txId], null, 2) - apiMock.answer("eth_getTransactionByHash", [txId], txJson) - - when: - def act = trackTx.subscribe(req).take(2) - then: - StepVerifier.create(act) - .expectSubscription() - .expectNext(exp1).as("Unknown tx") - .expectNext(exp2).as("Found in mempool") - .expectComplete() - .verify(Duration.ofSeconds(4)) - } - - def "New block makes tx mined"() { - setup: - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, upstreamMock) - TrackEthereumTx trackTx = new TrackEthereumTx(upstreams, Schedulers.boundedElastic()) - - def tx = new TrackEthereumTx.TxDetails(Chain.ETHEREUM__MAINNET, Instant.now(), TransactionId.from(txId), 6) - def block = new BlockContainer( - 100, BlockId.from(txId), BigInteger.ONE, Instant.now(), false, "".bytes, null, - BlockId.from(txId), [TxId.from(txId)], 0, "unknown" - ) - - when: - def act = trackTx.onNewBlock(tx, block) - then: - StepVerifier.create(act) - .expectNext(tx.withStatus(true, 100, true, BlockHash.from(txId), block.timestamp, BigInteger.ONE, 1)) - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - def "New block without current tx requires a call"() { - setup: - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, upstreamMock) - TrackEthereumTx trackTx = new TrackEthereumTx(upstreams, Schedulers.boundedElastic()) - - def tx = new TrackEthereumTx.TxDetails(Chain.ETHEREUM__MAINNET, Instant.now(), TransactionId.from(txId), 6) - def block = new BlockContainer( - 100, BlockId.from(txId), BigInteger.ONE, Instant.now(), false, "".bytes, null, BlockId.from(txId), - [TxId.from("0xa0e65cbc1b52a8ca60562112c6060552d882f16f34a9dba2ccdc05c0a6a27c22")], - 0, "unknown" - ) - apiMock.answer("eth_getTransactionByHash", [txId], null) - - when: - def act = trackTx.onNewBlock(tx, block) - then: - StepVerifier.create(act) - .expectComplete() - .verify(Duration.ofSeconds(1)) - } - - @Ignore - def "Starts to follow new transaction"() { - setup: - def req = BlockchainOuterClass.TxStatusRequest.newBuilder() - .setChain(chain) - .setConfirmationLimit(4) - .setTxId(txId) - .build() - - List> blocks = (0..9).collect { i -> - return new BlockJson().with { - it.hash = BlockHash.from("0xa0e65cbc1b52a8ca60562112c6060552d882f16f34a9dba2ccdc05c0a6a2000${i}") - it.timestamp = Instant.ofEpochMilli(156400000000 + i * 10000) - it.setNumber(100L + i.longValue()) - it.parentHash = parent - it.totalDifficulty = BigInteger.valueOf(500 + i) - it - } - } - - def txJsonBroadcasted = new TransactionJson().with { - it.hash = TransactionId.from("0xba61ce4672751fd6086a9ac2b55547a5555af17535b6c0334ede2ecb6d64070a") - it.blockHash = null - it.blockNumber = null - it.nonce = 1 - it - } - - def txJsonMined = new TransactionJson().with { - it.hash = TransactionId.from("0xba61ce4672751fd6086a9ac2b55547a5555af17535b6c0334ede2ecb6d64070a") - it.blockHash = blocks[2].hash - it.blockNumber = blocks[2].number - it.nonce = 1 - it - } - - def exp1 = BlockchainOuterClass.TxStatus.newBuilder() - .setTxId(txId) - .setBroadcasted(false) - .setMined(false) - .setConfirmations(0) - - def exp2 = BlockchainOuterClass.TxStatus.newBuilder() - .setTxId(txId) - .setBroadcasted(true) - .setMined(true) - .setBlock( - Common.BlockInfo.newBuilder() - .setHeight(blocks[2].number) - .setBlockId(blocks[2].hash.toHex().substring(2)) - .setTimestamp(blocks[2].timestamp.toEpochMilli()) - ) - - - def apiMock = TestingCommons.api() - def upstreamMock = TestingCommons.upstream(apiMock) - def multi = Mock(EthereumPosMultiStream) { - _ * getHead() >> upstreamMock.getHead() - } - MultistreamHolder upstreams = new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, multi) - - TrackEthereumTx trackTx = new TrackEthereumTx(upstreams, Schedulers.boundedElastic()) - - apiMock.answerOnce("eth_getTransactionByHash", [txId], null) - apiMock.answerOnce("eth_getTransactionByHash", [txId], txJsonBroadcasted) - apiMock.answerOnce("eth_getTransactionByHash", [txId], txJsonBroadcasted) - apiMock.answer("eth_getTransactionByHash", [txId], txJsonMined) - blocks.forEach { block -> - apiMock.answer("eth_getBlockByHash", [block.hash.toHex(), false], block) - } - - upstreamMock.blocks = Flux.fromIterable(blocks) - .map { block -> - BlockContainer.from(block) - } - - when: - def flux = trackTx.subscribe(req) - then: - StepVerifier.create(flux) - .expectNext(exp1.build()).as("Just empty") - .expectNext(exp1.setBroadcasted(true).build()).as("Found in mempool") - .expectNext(exp2.setConfirmations(1).build()).as("Mined") - .expectNext(exp2.setConfirmations(2).build()).as("Confirmed 2") - .expectNext(exp2.setConfirmations(3).build()).as("Confirmed 3") - .expectNext(exp2.setConfirmations(4).build()).as("Confirmed 4") - .expectComplete() - .verify(Duration.ofSeconds(10)) - } - -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy index 06be71607..88b7e6244 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy @@ -109,11 +109,11 @@ class MultistreamHolderMock implements MultistreamHolder { } @Override - EthereumCachingReader getReader() { + EthereumCachingReader getCachingReader() { if (customReader != null) { return customReader } - return super.getReader() + return super.getCachingReader() } @Override diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy index a51ebe9be..09328b419 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy @@ -218,7 +218,7 @@ class MultistreamSpec extends Specification { def multiStream = new TestEthereumPosMultistream(Chain.ETHEREUM__MAINNET, [up1, up2], Caches.default()) when: - def act = multiStream.tryProxy(new Selector.LabelMatcher("provider", ["internal"]), call) + def act = multiStream.tryProxySubscribe(new Selector.LabelMatcher("provider", ["internal"]), call) then: StepVerifier.create(act) @@ -243,7 +243,7 @@ class MultistreamSpec extends Specification { def multiStream = new TestEthereumPosMultistream(Chain.ETHEREUM__MAINNET, [up2], Caches.default()) when: - def act = multiStream.tryProxy(new Selector.LabelMatcher("provider", ["internal"]), call) + def act = multiStream.tryProxySubscribe(new Selector.LabelMatcher("provider", ["internal"]), call) then: !act @@ -356,11 +356,6 @@ class MultistreamSpec extends Specification { return this } - @Override - ChainFees getFeeEstimation() { - return null - } - @Override void init() { diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinFeesSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinFeesSpec.groovy deleted file mode 100644 index 1cfcf8919..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinFeesSpec.groovy +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.bitcoin - -import io.emeraldpay.dshackle.upstream.ChainFees -import io.emeraldpay.dshackle.upstream.Head -import reactor.core.publisher.Mono -import spock.lang.Specification - -import java.time.Duration - -class BitcoinFeesSpec extends Specification { - - def block3 = [ - hash: "3300000000033327aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506", - tx : [ - "d5a67d8f99ad05ece282bc8714da55cbd2266d9d7bae98fd5dda3b55d1696fb8", - "e17eca6b595da9c1c7b883fe505184d7ff586315d5b8a0c476198f0d49a75ad3", - "1735dfef80bcfdc443943dcbc8d26a4133da87c5c81d9e2a1e0a0e5f59d1ef96", - "c83c99e50946a6d30bc344a24401ad0ff0598425d73a5f74d6545e15ae588321", - "0d4f85f840f09f317d0b202725acee868b2e969078bb6a6b89fc3e0bc1216c6a", - "4d56b1a0992a3fa3132083ce29e203165ef4768f028d98a738da79dd26ff91e2", - ] - ] - def block2 = [ - hash: "2200000000022227aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506", - tx : [ - "87cc7d37851af41da60a5d14e40983d0a6750ffcdf1f2ae1291bfc1e712f0712", - "3bde80635c62e81ba86d68a1c78ac47b1a02d8ef334fdf6a7b6e8acc0a475f20", - "2f4f413c07ac072918e12fcac4afd8d30a76dfe49ac68d46d582c3a130772734", - "f29ecfa5b692cfc46b1eae29416a35782657f0bde2ae44acde882e477e8624d8", - ] - ] - - def tx4D5 = [ - hash : "4d56b1a0992a3fa3132083ce29e203165ef4768f028d98a738da79dd26ff91e2", - size : 223, - vsize: 142, - vin : [ - [txid: "3bde80635c62e81ba86d68a1c78ac47b1a02d8ef334fdf6a7b6e8acc0a475f20", vout: 1] - ], - vout : [ - ["value": 0.00029163, "n": 0], - ["value": 0.00144221, "n": 1] - ] - ] - def txF29 = [ - hash : "f29ecfa5b692cfc46b1eae29416a35782657f0bde2ae44acde882e477e8624d8", - size : 2900, - vsize: 1366, - vin : [ - [txid: "37e19fad6c3e5fafc431ee731428452dc01f56b0bc4d9fe876dcee58384934ec", "vout": 0], - [txid: "3da079b10451ba1cde42d9a0f485e79151e5d4216930561227be5471921af17f", "vout": 0] - ], - vout : [ - ["value": 1.20100000, "n": 0], - ["value": 0.00210984, "n": 1] - ] - ] - - def tx3BD = [ - hash: "3bde80635c62e81ba86d68a1c78ac47b1a02d8ef334fdf6a7b6e8acc0a475f20", - size: 292, - vin : [ - ], - vout: [ - ["value": 0.00400000, "n": 0], - ["value": 0.00200000, "n": 1], - ] - ] - def tx37E = [ - hash : "37e19fad6c3e5fafc431ee731428452dc01f56b0bc4d9fe876dcee58384934ec", - size : 292, - vsize: 200, - vin : [ - ], - vout : [ - ["value": 1.00000000, "n": 0], - ] - ] - def tx3DA = [ - hash : "3da079b10451ba1cde42d9a0f485e79151e5d4216930561227be5471921af17f", - size : 297, - vsize: 201, - vin : [ - ], - vout : [ - ["value": 0.21000000, "n": 0], - ] - ] - - - def "fetch tx amount"() { - setup: - def reader = Mock(BitcoinReader) { - 1 * it.getTx("4d56b1a0992a3fa3132083ce29e203165ef4768f028d98a738da79dd26ff91e2") >> Mono.just(tx4D5) - } - def fees = new BitcoinFees(Stub(BitcoinMultistream), reader, 3) - - when: - def amount = fees.getTxOutAmount("4d56b1a0992a3fa3132083ce29e203165ef4768f028d98a738da79dd26ff91e2", 0) - .block(Duration.ofSeconds(1)) - then: - amount == 29163 - } - - def "extract amounts"() { - setup: - def fees = new BitcoinFees(Stub(BitcoinMultistream), Stub(BitcoinReader), 3) - - when: - def act = fees.extractVOuts(txF29) - then: - act == [120100000L, 210984L] - } - - def "extract vsize when available"() { - setup: - def fees = new BitcoinFees(Stub(BitcoinMultistream), Stub(BitcoinReader), 3) - - when: - def act = fees.extractSize(tx4D5) - then: - act == 142 - } - - def "extract size when vsize unavailable"() { - setup: - def fees = new BitcoinFees(Stub(BitcoinMultistream), Stub(BitcoinReader), 3) - - when: - def act = fees.extractSize(tx3BD) - then: - act == 292 - } - - def "calculates fee"() { - setup: - def reader = Mock(BitcoinReader) { - 1 * it.getTx("3bde80635c62e81ba86d68a1c78ac47b1a02d8ef334fdf6a7b6e8acc0a475f20") >> Mono.just(tx3BD) - } - def fees = new BitcoinFees(Stub(BitcoinMultistream), reader, 3) - when: - def act = fees.calculateFee(tx4D5).block(Duration.ofSeconds(1)) - then: - act == 200000 - (29163 + 144221) - } - - def "calculates fee with multiple inputs"() { - setup: - def reader = Mock(BitcoinReader) { - 1 * it.getTx("37e19fad6c3e5fafc431ee731428452dc01f56b0bc4d9fe876dcee58384934ec") >> Mono.just(tx37E) - 1 * it.getTx("3da079b10451ba1cde42d9a0f485e79151e5d4216930561227be5471921af17f") >> Mono.just(tx3DA) - } - def fees = new BitcoinFees(Stub(BitcoinMultistream), reader, 3) - when: - def act = fees.calculateFee(txF29).block(Duration.ofSeconds(1)) - then: - act == (100000000 + 21000000) - (120100000 + 210984) - } - - def "get average bottom fee"() { - setup: - def ups = Mock(BitcoinMultistream) { - _ * getHead() >> Mock(Head) { - _ * getCurrentHeight() >> 100 - } - } - def reader = Mock(BitcoinReader) { - 1 * it.getBlock(100) >> Mono.just(block3) - 1 * it.getBlock(99) >> Mono.just(block2) - // last txes on those blocks - 1 * it.getTx("4d56b1a0992a3fa3132083ce29e203165ef4768f028d98a738da79dd26ff91e2") >> Mono.just(tx4D5) - 1 * it.getTx("f29ecfa5b692cfc46b1eae29416a35782657f0bde2ae44acde882e477e8624d8") >> Mono.just(txF29) - // their inputs - 1 * it.getTx("3bde80635c62e81ba86d68a1c78ac47b1a02d8ef334fdf6a7b6e8acc0a475f20") >> Mono.just(tx3BD) - 1 * it.getTx("37e19fad6c3e5fafc431ee731428452dc01f56b0bc4d9fe876dcee58384934ec") >> Mono.just(tx37E) - 1 * it.getTx("3da079b10451ba1cde42d9a0f485e79151e5d4216930561227be5471921af17f") >> Mono.just(tx3DA) - } - def fees = new BitcoinFees(ups, reader, 3) - - when: - def act = fees.estimate(ChainFees.Mode.AVG_LAST, 2).block(Duration.ofSeconds(1)) - - then: - act.hasBitcoinStd() - // first tx fee: 187.43661971830985915493 - // second tx fee: 504.40409956076134699854 - // average is 345 - // but if we calculate original fees per KB it's 354222 - act.bitcoinStd.satPerKb == 354222 - } - -}