Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass all matchers to direct reader #581

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ open class NativeCall(
return ctx.upstream.getLocalReader()
.flatMap { api ->
SpannedReader(api, tracer, LOCAL_READER)
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false))
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false, ctx.upstreamFilter.matcher))
.map {
val result = it.getResult()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
Expand Down Expand Up @@ -799,7 +799,16 @@ open class NativeCall(
selector: BlockchainOuterClass.Selector?,
streamRequest: Boolean,
): ChainRequest {
return ChainRequest(method, params, nonce, selector, streamRequest)
return toChainRequest(nonce, selector, streamRequest, Selector.empty)
}

fun toChainRequest(
nonce: Long?,
selector: BlockchainOuterClass.Selector?,
streamRequest: Boolean,
matcher: Selector.Matcher,
): ChainRequest {
return ChainRequest(method, params, nonce, selector, streamRequest, matcher)
}
}
}
10 changes: 9 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ data class ChainRequest(
val nonce: Long?,
val selector: BlockchainOuterClass.Selector?,
val isStreamed: Boolean = false,
val matcher: Selector.Matcher = Selector.empty,
) {

@JvmOverloads constructor(
Expand All @@ -38,7 +39,14 @@ data class ChainRequest(
nonce: Long? = null,
selectors: BlockchainOuterClass.Selector? = null,
isStreamed: Boolean = false,
) : this(method, params, 1, nonce, selectors, isStreamed)
matcher: Selector.Matcher = Selector.empty,
) : this(method, params, 1, nonce, selectors, isStreamed, matcher)

constructor(
method: String,
params: CallParams,
matcher: Selector.Matcher,
) : this(method, params, 1, null, null, false, matcher)

fun toJson(): ByteArray {
return params.toJson(id, method)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.emeraldpay.dshackle.upstream.ethereum

import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CurrentBlockCache
import io.emeraldpay.dshackle.commons.CACHE_BLOCK_BY_HASH_READER
Expand All @@ -34,16 +32,15 @@ import io.emeraldpay.dshackle.reader.RekeyingReader
import io.emeraldpay.dshackle.reader.SpannedReader
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Request
import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Result
import io.emeraldpay.dshackle.upstream.ethereum.domain.Address
import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash
import io.emeraldpay.dshackle.upstream.ethereum.domain.TransactionId
import io.emeraldpay.dshackle.upstream.ethereum.domain.Wei
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionLogJson
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import org.apache.commons.collections4.Factory
import org.springframework.cloud.sleuth.Tracer
Expand All @@ -60,58 +57,35 @@ open class EthereumCachingReader(
private val tracer: Tracer,
) : CachingReader {

private val objectMapper: ObjectMapper = Global.objectMapper
private val balanceCache = CurrentBlockCache<Address, Wei>()
private val directReader = EthereumDirectReader(up, caches, balanceCache, callMethodsFactory, tracer)

private val extractBlock = Function<Result<BlockContainer>, BlockJson<TransactionRefJson>> { result ->
val block = result.data
val existing = block.getParsed(BlockJson::class.java)
if (existing != null) {
existing.withoutTransactionDetails()
} else {
objectMapper
.readValue(block.json, BlockJson::class.java)
.withoutTransactionDetails()
}
}

private val extractTx = Function<Result<TxContainer>, TransactionJsonSnapshot> { result ->
result.data.getParsed(TransactionJsonSnapshot::class.java)
?: objectMapper.readValue(result.data.json, TransactionJsonSnapshot::class.java)
}

private val idToBlockHash = Function<BlockId, BlockHash> { id -> BlockHash.from(id.value) }
private val blockHashToId = Function<BlockHash, BlockId> { hash -> BlockId.from(hash) }

private val txHashToId = Function<TransactionId, TxId> { hash -> TxId.from(hash) }
private val idToTxHash = Function<TxId, TransactionId> { id -> TransactionId.from(id.value) }

private val blocksByIdAsCont = CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
)

open fun blockByFinalization(): Reader<FinalizationType, Result<BlockContainer>> {
return SpannedReader(directReader.blockByFinalizationReader, tracer, DIRECT_QUORUM_RPC_READER)
}

open fun blocksByIdAsCont(): Reader<BlockId, Result<BlockContainer>> {
return blocksByIdAsCont
open fun blocksByIdAsCont(matcher: Selector.Matcher): Reader<BlockId, Result<BlockContainer>> {
val idToBlockHash = Function<BlockId, Request<BlockHash>> { id -> Request(BlockHash.from(id.value), matcher) }
return CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
)
}

open fun blocksByHeightAsCont(): Reader<Long, Result<BlockContainer>> {
open fun blocksByHeightAsCont(matcher: Selector.Matcher): Reader<Long, Result<BlockContainer>> {
val numToRequest = Function<Long, Request<Long>> { num -> Request(num, matcher) }
return CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHeight()), tracer, CACHE_BLOCK_BY_HEIGHT_READER),
SpannedReader(directReader.blockByHeightReader, tracer, DIRECT_QUORUM_RPC_READER),
SpannedReader(RekeyingReader(numToRequest, directReader.blockByHeightReader), tracer, DIRECT_QUORUM_RPC_READER),
)
}

open fun logsByHash(): Reader<BlockId, Result<List<TransactionLogJson>>> {
return directReader.logsByHashReader
}

open fun txByHashAsCont(): Reader<TxId, Result<TxContainer>> {
open fun txByHashAsCont(matcher: Selector.Matcher): Reader<TxId, Result<TxContainer>> {
val idToTxHash = Function<TxId, Request<TransactionId>> { id -> Request(TransactionId.from(id.value), matcher) }
return CompoundReader(
CacheWithUpstreamIdReader(SpannedReader(caches.getTxByHash(), tracer, CACHE_TX_BY_HASH_READER)),
SpannedReader(RekeyingReader(idToTxHash, directReader.txReader), tracer, DIRECT_QUORUM_RPC_READER),
Expand All @@ -126,9 +100,9 @@ open class EthereumCachingReader(
)
}

fun receipts(): Reader<TxId, Result<ByteArray>> {
fun receipts(matcher: Selector.Matcher): Reader<TxId, Result<ByteArray>> {
val requested = RekeyingReader(
{ txid: TxId -> TransactionId.from(txid.value) },
{ txid: TxId -> Request(TransactionId.from(txid.value), matcher) },
directReader.receiptReader,
)
return CompoundReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,31 @@ class EthereumDirectReader(
private val objectMapper: ObjectMapper = Global.objectMapper
var requestReaderFactory: RequestReaderFactory = RequestReaderFactory.default()

val blockReader: Reader<BlockHash, Result<BlockContainer>>
val blockByHeightReader: Reader<Long, Result<BlockContainer>>
val txReader: Reader<TransactionId, Result<TxContainer>>
val blockReader: Reader<Request<BlockHash>, Result<BlockContainer>>
val blockByHeightReader: Reader<Request<Long>, Result<BlockContainer>>
val txReader: Reader<Request<TransactionId>, Result<TxContainer>>
val balanceReader: Reader<Address, Result<Wei>>
val receiptReader: Reader<TransactionId, Result<ByteArray>>
val receiptReader: Reader<Request<TransactionId>, Result<ByteArray>>
val logsByHashReader: Reader<BlockId, Result<List<TransactionLogJson>>>
val blockByFinalizationReader: Reader<FinalizationType, Result<BlockContainer>>

init {
blockReader = object : Reader<BlockHash, Result<BlockContainer>> {
override fun read(key: BlockHash): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByHash", ListParams(key.toHex(), false))
return readBlock(request, key.toHex())
blockReader = object : Reader<Request<BlockHash>, Result<BlockContainer>> {
override fun read(key: Request<BlockHash>): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByHash", ListParams(key.requestBy.toHex(), false))
return readBlock(request, key.requestBy.toHex(), key.matcher)
}
}
blockByHeightReader = object : Reader<Long, Result<BlockContainer>> {
override fun read(key: Long): Mono<Result<BlockContainer>> {
val heightMatcher = Selector.HeightMatcher(key)
val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key).toHex(), false))
return readBlock(request, key.toString(), heightMatcher)
blockByHeightReader = object : Reader<Request<Long>, Result<BlockContainer>> {
override fun read(key: Request<Long>): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key.requestBy).toHex(), false))
return readBlock(request, key.toString(), key.matcher)
}
}
txReader = object : Reader<TransactionId, Result<TxContainer>> {
override fun read(key: TransactionId): Mono<Result<TxContainer>> {
val request = ChainRequest("eth_getTransactionByHash", ListParams(key.toHex()))
return readWithQuorum(request) // retries were removed because we use NotNullQuorum which handle errors too
txReader = object : Reader<Request<TransactionId>, Result<TxContainer>> {
override fun read(key: Request<TransactionId>): Mono<Result<TxContainer>> {
val request = ChainRequest("eth_getTransactionByHash", ListParams(key.requestBy.toHex()))
return readWithQuorum(request, key.matcher) // retries were removed because we use NotNullQuorum which handle errors too
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Tx not read $key")))
.flatMap { result ->
val tx = objectMapper.readValue(result.data, TransactionJsonSnapshot::class.java)
Expand Down Expand Up @@ -150,10 +149,10 @@ class EthereumDirectReader(
}
}

receiptReader = object : Reader<TransactionId, Result<ByteArray>> {
override fun read(key: TransactionId): Mono<Result<ByteArray>> {
val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.toHex()))
return readWithQuorum(request)
receiptReader = object : Reader<Request<TransactionId>, Result<ByteArray>> {
override fun read(key: Request<TransactionId>): Mono<Result<ByteArray>> {
val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.requestBy.toHex()))
return readWithQuorum(request, key.matcher)
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Receipt not read $key")))
.flatMap { result ->
val receipt = objectMapper.readValue(result.data, TransactionReceiptJson::class.java)
Expand All @@ -164,7 +163,7 @@ class EthereumDirectReader(
caches.cacheReceipt(
Caches.Tag.REQUESTED,
DefaultContainer(
txId = TxId.from(key),
txId = TxId.from(key.requestBy),
blockId = BlockId.from(receipt.blockHash),
height = receipt.blockNumber,
json = result.data,
Expand Down Expand Up @@ -252,14 +251,10 @@ class EthereumDirectReader(
): Mono<Result<ByteArray>> {
return Mono.just(requestReaderFactory)
.map {
val requestMatcher = Selector.Builder()
.withMatcher(matcher)
.forMethod(request.method)
.build()
it.create(
RequestReaderFactory.ReaderData(
up,
Selector.UpstreamFilter(sort, requestMatcher),
Selector.UpstreamFilter(sort, matcher),
callMethodsFactory.create().createQuorumFor(request.method),
null,
tracer,
Expand All @@ -276,4 +271,9 @@ class EthereumDirectReader(
val data: T,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData>,
)

data class Request<T>(
val requestBy: T,
val matcher: Selector.Matcher,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.hex.HexQuantity
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
Expand Down Expand Up @@ -87,7 +88,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
reader.txByHashAsCont()
reader.txByHashAsCont(key.matcher)
.read(hash)
.map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}
Expand All @@ -106,14 +107,14 @@ class EthereumLocalReader(
if (withTx) {
null
} else {
reader.blocksByIdAsCont().read(hash).map {
reader.blocksByIdAsCont(key.matcher).read(hash).map {
ChainResponse(it.data.json, null, it.resolvedUpstreamData)
}
}
}

method == "eth_getBlockByNumber" -> {
getBlockByNumber(params.list)
getBlockByNumber(params.list, key.matcher)
}

method == "eth_getTransactionReceipt" -> {
Expand All @@ -126,7 +127,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
reader.receipts()
reader.receipts(key.matcher)
.read(hash)
.map { ChainResponse(it.data, null, it.resolvedUpstreamData) }
}
Expand All @@ -141,7 +142,7 @@ class EthereumLocalReader(
return null
}

fun getBlockByNumber(params: List<Any?>): Mono<ChainResponse>? {
fun getBlockByNumber(params: List<Any?>, matcher: Selector.Matcher): Mono<ChainResponse>? {
if (params.size != 2 || params[0] == null || params[1] == null) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "Must provide 2 parameters")
}
Expand Down Expand Up @@ -189,7 +190,7 @@ class EthereumLocalReader(
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be a block number")
}

return reader.blocksByHeightAsCont()
return reader.blocksByHeightAsCont(matcher)
.read(number).map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}

Expand Down
Loading
Loading