Skip to content

Commit

Permalink
ethereum as generic upstream - tests
Browse files Browse the repository at this point in the history
  • Loading branch information
a10zn8 committed Oct 31, 2023
1 parent 6c0e150 commit 02567c4
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 81 deletions.
10 changes: 8 additions & 2 deletions foundation/src/main/kotlin/org/drpc/chainsconfig/ChainsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ data class ChainsConfig(private val chains: List<ChainConfig>) : Iterable<Chains
) {
companion object {
@JvmStatic
fun default() = ChainConfig(
fun default() = defaultWithContract(null)

@JvmStatic
fun defaultWithContract(callLimitContract: String?) = ChainConfig(
Duration.ofSeconds(12),
6,
1,
Expand All @@ -43,11 +46,14 @@ data class ChainsConfig(private val chains: List<ChainConfig>) : Iterable<Chains
0,
"UNKNOWN",
emptyList(),
null,
callLimitContract,
"undefined",
"undefined",
)
}



}

fun resolve(chain: String): ChainConfig {
Expand Down
20 changes: 20 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,26 @@ abstract class Multistream(
onUpstreamsUpdated()
}

init {
UpstreamAvailability.entries.forEach { status ->
Metrics.gauge(
"$metrics.availability",
listOf(Tag.of("chain", chain.chainCode), Tag.of("status", status.name.lowercase())),
this,
) {
getAll().count { it.getStatus() == status }.toDouble()
}
}

Metrics.gauge(
"$metrics.connected",
listOf(Tag.of("chain", chain.chainCode)),
this,
) {
getAll().size.toDouble()
}
}

/**
* Get list of all underlying upstreams
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object EthereumChainSpecific : ChainSpecific {

override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription {
return { ms ->
// val pendingTxes: PendingTxesSource = (ms.upstreams as MutableList<EthereumLikeUpstream>)
// val pendingTxes: PendingTxesSource = (ms.getAll())
// .mapNotNull {
// it.getIngressSubscription().getPendingTxes()
// }.let {
Expand All @@ -54,6 +54,7 @@ object EthereumChainSpecific : ChainSpecific {
// }
// }
// return

EmptyEgressSubscription
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.emeraldpay.dshackle.upstream.generic

import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.config.UpstreamsConfig
Expand All @@ -34,14 +35,16 @@ import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.Selector.Matcher
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.forkchoice.PriorityForkChoice
import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstream
import org.springframework.util.ConcurrentReferenceHashMap
import org.springframework.util.ConcurrentReferenceHashMap.ReferenceType.WEAK
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler

open class GenericMultistream(
chain: Chain,
private val upstreams: MutableList<GenericUpstream>,
private val upstreams: MutableList<Upstream>,
caches: Caches,
private val headScheduler: Scheduler,
cachingReaderBuilder: CachingReaderBuilder,
Expand Down Expand Up @@ -179,4 +182,20 @@ open class GenericMultistream(
super.onUpstreamsUpdated()
subscription = subscriptionBuilder(this)
}

override fun tryProxySubscribe(
matcher: Matcher,
request: BlockchainOuterClass.NativeSubscribeRequest,
): Flux<out Any>? =
upstreams.filter {
matcher.matches(it)
}.takeIf { ups ->
ups.size == 1 && ups.all { it.isGrpc() }
}?.map {
it as GrpcUpstream
}?.map {
it.proxySubscribe(request)
}?.let {
Flux.merge(it)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ open class GenericUpstream(
private var validationSettingsSubscription: Disposable? = null

private val hasLiveSubscriptionHead: AtomicBoolean = AtomicBoolean(false)
private val connector: GenericConnector = connectorFactory.create(this, chain, true)
protected val connector: GenericConnector = connectorFactory.create(this, chain, true)
private var livenessSubscription: Disposable? = null
private val labelsDetector = labelsDetectorBuilder(chain, this.getIngressReader())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,4 @@ class CacheConfigReaderSpec extends Specification {
//later may be not null if we support something else besides Redis
act == null
}

def "Local read disabled"() {
setup:
def config = this.class.getClassLoader().getResourceAsStream("cache-local-router-disabled.yaml")
when:
def act = reader.read(config)

then:
!act.requestsCacheEnabled
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class NativeCallSpec extends Specification {

ObjectMapper objectMapper = Global.objectMapper

def nativeCall(MultistreamHolder upstreams = null, ResponseSigner signer = null, Boolean enableCache = true, Boolean passthrough = false) {
def nativeCall(MultistreamHolder upstreams = null, ResponseSigner signer = null, Boolean passthrough = false) {

if (upstreams == null) {
upstreams = Stub(MultistreamHolder)
Expand All @@ -65,7 +65,6 @@ class NativeCallSpec extends Specification {

def config = new MainConfig()
def cacheConfig = new CacheConfig()
cacheConfig.requestsCacheEnabled = enableCache
config.cache = cacheConfig
config.passthrough = passthrough

Expand All @@ -77,7 +76,7 @@ class NativeCallSpec extends Specification {
1 * read(new JsonRpcRequest("eth_test", [])) >> Mono.just(new JsonRpcResponse("1".bytes, null))
}
def upstream = Mock(Multistream) {
1 * getLocalReader(_) >> Mono.just(routedApi)
1 * getLocalReader() >> Mono.just(routedApi)
}

def nativeCall = nativeCall()
Expand All @@ -98,7 +97,7 @@ class NativeCallSpec extends Specification {
1 * read(new JsonRpcRequest("eth_test", [])) >> Mono.error(new RpcException(RpcResponseError.CODE_METHOD_NOT_EXIST, "Test message"))
}
def upstream = Mock(Multistream) {
1 * getLocalReader(_) >> Mono.just(routedApi)
1 * getLocalReader() >> Mono.just(routedApi)
}

def nativeCall = nativeCall()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumCachingReader
import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific
import org.jetbrains.annotations.NotNull
import org.springframework.cloud.sleuth.brave.bridge.BraveTracer
import reactor.core.scheduler.Schedulers
Expand All @@ -47,7 +48,10 @@ class MultistreamHolderMock implements MultistreamHolder {
} else if (up instanceof GenericUpstream) {
upstreams[chain] = new GenericMultistream(
chain, [up as GenericUpstream], Caches.default(),
Schedulers.boundedElastic(), TestingCommons.tracerMock()
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(TestingCommons.tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic())
)
} else {
throw new IllegalArgumentException("Unsupported upstream type ${up.class}")
Expand Down Expand Up @@ -96,7 +100,10 @@ class MultistreamHolderMock implements MultistreamHolder {
Head customHead = null

EthereumMultistreamMock(@NotNull Chain chain, @NotNull List<GenericUpstream> upstreams, @NotNull Caches caches) {
super(chain, upstreams, caches, Schedulers.boundedElastic(), new BraveTracer(null, null, null))
super(chain, upstreams, caches, Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(new BraveTracer(null, null, null)),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
}

EthereumMultistreamMock(@NotNull Chain chain, @NotNull List<GenericUpstream> upstreams) {
Expand Down
18 changes: 15 additions & 3 deletions src/test/groovy/io/emeraldpay/dshackle/test/TestingCommons.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import io.emeraldpay.dshackle.upstream.calls.DirectCallMethods
import io.emeraldpay.dshackle.upstream.generic.GenericMultistream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific
import io.emeraldpay.etherjar.domain.BlockHash
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.etherjar.domain.TransactionId
Expand Down Expand Up @@ -90,7 +91,12 @@ class TestingCommons {
}

static Multistream multistream(GenericUpstreamMock up) {
return new EthereumMultistreamMock(Chain.ETHEREUM__MAINNET, [up], Caches.default(), Schedulers.boundedElastic(), tracerMock()).tap {
return new GenericMultistream(Chain.ETHEREUM__MAINNET, [up], Caches.default(),
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()),
).tap {
start()
}
}
Expand All @@ -111,11 +117,17 @@ class TestingCommons {
}

static Multistream multistreamWithoutUpstreams(Chain chain) {
return new GenericMultistream(chain, [], emptyCaches().getCaches(chain), Schedulers.boundedElastic(), tracerMock())
return new GenericMultistream(chain, [], emptyCaches().getCaches(chain), Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
}

static Multistream multistreamClassicWithoutUpstreams(Chain chain) {
return new GenericMultistream(chain, [], emptyCaches().getCaches(chain), Schedulers.boundedElastic(), tracerMock())
return new GenericMultistream(chain, [], emptyCaches().getCaches(chain), Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
}

static FileResolver fileResolver() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ class FilteredApisSpec extends Specification {
)
new GenericUpstream(
"test",
(byte) 123,
Chain.ETHEREUM__MAINNET,
(byte) 123,
new ChainOptions.PartialOptions().buildOptions(),
UpstreamsConfig.UpstreamRole.PRIMARY,
ethereumTargets,
new QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(it)),
ChainsConfig.ChainConfig.default(),
connectorFactory,
false,
null,
cs.&validator,
cs.&labelDetector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.test.GenericUpstreamMock
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.calls.DirectCallMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific
import io.emeraldpay.dshackle.upstream.generic.GenericUpstream
import io.emeraldpay.dshackle.upstream.generic.GenericMultistream
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
Expand All @@ -53,7 +54,11 @@ class MultistreamSpec extends Specification {
setup:
def up1 = new GenericUpstreamMock("test1", Chain.ETHEREUM__MAINNET, TestingCommons.api(), new DirectCallMethods(["eth_test1", "eth_test2"]))
def up2 = new GenericUpstreamMock("test1", Chain.ETHEREUM__MAINNET, TestingCommons.api(), new DirectCallMethods(["eth_test2", "eth_test3"]))
def aggr = new GenericMultistream(Chain.ETHEREUM__MAINNET, [up1, up2], Caches.default(), Schedulers.boundedElastic(), TestingCommons.tracerMock())
def aggr = new GenericMultistream(Chain.ETHEREUM__MAINNET, [up1, up2], Caches.default(),
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(TestingCommons.tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
when:
aggr.onUpstreamsUpdated()
def act = aggr.getMethods()
Expand Down Expand Up @@ -184,7 +189,11 @@ class MultistreamSpec extends Specification {
def up1 = TestingCommons.upstream("test-1", "internal")
def up2 = TestingCommons.upstream("test-2", "external")
def up3 = TestingCommons.upstream("test-3", "external")
def multistream = new GenericMultistream(Chain.ETHEREUM__MAINNET, [up1, up2, up3], Caches.default(), Schedulers.boundedElastic(), TestingCommons.tracerMock())
def multistream = new GenericMultistream(Chain.ETHEREUM__MAINNET, [up1, up2, up3], Caches.default(),
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(TestingCommons.tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))

expect:
multistream.getHead(new Selector.LabelMatcher("provider", ["internal"])).is(up1.ethereumHeadMock)
Expand All @@ -207,12 +216,12 @@ class MultistreamSpec extends Specification {

def up1 = Mock(GenericGrpcUpstream) {
1 * isGrpc() >> true
1 * getId() >> "internal"
_ * getId() >> "internal"
1 * getLabels() >> [UpstreamsConfig.Labels.fromMap(Collections.singletonMap("provider", "internal"))]
1 * proxySubscribe(call) >> Flux.just("{}")
}
def up2 = Mock(GenericGrpcUpstream) {
1 * getId() >> "external"
def up2 = Mock(GenericUpstream) {
_ * getId() >> "external"
1 * getLabels() >> [UpstreamsConfig.Labels.fromMap(Collections.singletonMap("provider", "external"))]
}
def multiStream = new TestEthereumPosMultistream(Chain.ETHEREUM__MAINNET, [up1, up2], Caches.default())
Expand All @@ -235,9 +244,9 @@ class MultistreamSpec extends Specification {
.setMethod("newHeads")
.build()

def up2 = Mock(GenericGrpcUpstream) {
def up2 = Mock(GenericUpstream) {
1 * isGrpc() >> false
1 * getId() >> "2"
_ * getId() >> "2"
1 * getLabels() >> [UpstreamsConfig.Labels.fromMap(Collections.singletonMap("provider", "internal"))]
}
def multiStream = new TestEthereumPosMultistream(Chain.ETHEREUM__MAINNET, [up2], Caches.default())
Expand All @@ -253,7 +262,11 @@ class MultistreamSpec extends Specification {
setup:
def up1 = new GenericUpstreamMock("test1", Chain.ETHEREUM__MAINNET, TestingCommons.api(), new DirectCallMethods(["eth_test1", "eth_test2", "eth_test3"]))
def up2 = new GenericUpstreamMock("test2", Chain.ETHEREUM__MAINNET, TestingCommons.api(), new DirectCallMethods(["eth_test1", "eth_test2"]))
def ms = new GenericMultistream(Chain.ETHEREUM__MAINNET, new ArrayList<GenericMultistream>(), Caches.default(), Schedulers.boundedElastic(), TestingCommons.tracerMock())
def ms = new GenericMultistream(Chain.ETHEREUM__MAINNET, new ArrayList<GenericMultistream>(), Caches.default(),
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(TestingCommons.tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
when:
ms.onUpstreamChange(
new UpstreamChangeEvent(Chain.ETHEREUM__MAINNET, up1, UpstreamChangeEvent.ChangeType.ADDED)
Expand All @@ -280,7 +293,11 @@ class MultistreamSpec extends Specification {
setup:
def up1 = new GenericUpstreamMock("test1", Chain.ETHEREUM__MAINNET, TestingCommons.api(), new DirectCallMethods(["eth_test1", "eth_test2", "eth_test3"]))
def up2 = new GenericUpstreamMock("test2", Chain.ETHEREUM__MAINNET, TestingCommons.api(), new DirectCallMethods(["eth_test1", "eth_test2"]))
def ms = new GenericMultistream(Chain.ETHEREUM__MAINNET, new ArrayList<GenericMultistream>(), Caches.default(), Schedulers.boundedElastic(), TestingCommons.tracerMock())
def ms = new GenericMultistream(Chain.ETHEREUM__MAINNET, new ArrayList<GenericMultistream>(), Caches.default(),
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(TestingCommons.tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
def head1 = createBlock(250, "0x0d050c785de17179f935b9b93aca09c442964cc59972c71ae68e74731448401b")
def head2 = createBlock(270, "0x0d050c785de17179f935b9b93aca09c442964cc59972c71ae68e74731448402b")
def head3 = createBlock(100, "0x0d050c785de17179f935b9b93aca09c442964cc59972c71ae68e74731448412b")
Expand Down Expand Up @@ -311,7 +328,11 @@ class MultistreamSpec extends Specification {
def up1 = TestingCommons.upstream("test-1", "internal")
def up2 = TestingCommons.upstream("test-2", "external")
def up3 = TestingCommons.upstream("test-3", "external")
def multistream = new GenericMultistream(Chain.ETHEREUM__MAINNET, [up1, up2, up3], Caches.default(), Schedulers.boundedElastic(), TestingCommons.tracerMock())
def multistream = new GenericMultistream(Chain.ETHEREUM__MAINNET, [up1, up2, up3], Caches.default(),
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(TestingCommons.tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
def observer = multistream.lagObserver
multistream.onUpstreamsUpdated()

Expand All @@ -338,7 +359,11 @@ class MultistreamSpec extends Specification {
class TestEthereumPosMultistream extends GenericMultistream {

TestEthereumPosMultistream(@NotNull Chain chain, @NotNull List<GenericUpstream> upstreams, @NotNull Caches caches) {
super(chain, upstreams, caches, Schedulers.boundedElastic(), TestingCommons.tracerMock())
super(chain, upstreams, caches,
Schedulers.boundedElastic(),
EthereumChainSpecific.INSTANCE.makeCachingReaderBuilder(TestingCommons.tracerMock()),
EthereumChainSpecific.INSTANCE.&localReaderBuilder,
EthereumChainSpecific.INSTANCE.subscriptionBuilder(Schedulers.boundedElastic()))
}

@NotNull
Expand Down
Loading

0 comments on commit 02567c4

Please sign in to comment.