Skip to content

Commit

Permalink
solution: support Ethereum merge
Browse files Browse the repository at this point in the history
  • Loading branch information
splix authored Aug 25, 2022
1 parent 38e9248 commit ff2ec4e
Show file tree
Hide file tree
Showing 58 changed files with 1,631 additions and 231 deletions.
36 changes: 36 additions & 0 deletions docs/04-upstream-config.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cluster:
upstreams:
- id: us-nodes
chain: auto
priority: 10
connection:
grpc:
host: 35.226.252.117
Expand All @@ -55,6 +56,7 @@ cluster:
key: client.p8.key
- id: infura-eth
chain: ethereum
priority: 5
role: fallback
labels:
provider: infura
Expand Down Expand Up @@ -93,6 +95,40 @@ In the example above we have:
** uses Basic Authentication to authenticate requests on Infura
** label `[provider: infura]` is set for that particular upstream, which can be selected during a request.For example for some requests you may want to use nodes with that label only, i.e. _"send that tx to infura nodes only"_, or _"read only from archive node, with label [archive: true]"_
** upstream validation (peers, sync status, etc) is disabled for that particular upstream
* upstream `us-nodes` has a higher priority than `infura-eth` so if there is a different block on the same height (i.e., a fork) then Dshackle chooses block from `us-nodes`. Note that it is specific only for Proof-of-Stake blockchains, such as Ethereum.

=== Fork Choice configuration

When two different upstream produce a different block it's critical to route the request traffic to an upstream on the right fork.

With Proof-of-Work blockchains Dshackle chooses the block with more works produced, and that's the general consensus for PoW blockchains.

For Proof-of-Stake this is impossible and instead of Total Work you have to configure a priority of upstream, i.e., the process is more manual.
In a situation when Dshackle sees a fork it chooses the upstreams with the block on the side with the highest priority.

.Configuration option for priorities is `priority`:
[source, yaml]
----
- id: eth-one
chain: ethereum
priority: 10
- id: eth-two
chain: ethereum
priority: 20
- id: eth-three
chain: ethereum
priority: 15
----

In this case upstream `eth-two` has the highest priority, and it's blocks are considered as _true_ in case of a fork.
If the upstream is down for a some reason, then blocks from `eth-three` are used instead.

NOTE: Priority is a configuration for a Fork decision, and it doesn't change the priority of the traffic.
Except the situation when Dshackle sees a fork between different upstreams; in this case all traffic is routed only to upstreams on the right side of the fork.

NOTE: Configuration applies only for Proof-of-Stack and for Proof-of-Work blockchains priority options is not used.

=== Roles and Fallback upstream

Expand Down
6 changes: 6 additions & 0 deletions docs/reference-configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,12 @@ See link:09-quorum-and-selectors.adoc[Quorum and Selectors]
`not_empty` - accept not _null_ value, otherwise retry another upstream;
`not_lagging` - accept response only from a fully synced upstream.

| `priority`
| no
| Upstream priority used to resolve Fork Conflicts between different upstreams when a Proof-of-Stake blockchain is used.
When two upstreams has different blocks at the same height then the value from Upstreams with higher `priority` value wins.
Configuration has no effect in Proof-of-Work blockchains.

| `connection.ethereum`
| yes
| Connection configuration for Ethereum API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class BlocksRedisCache(
return BlockContainer(
meta.height,
BlockId(meta.hash.toByteArray()),
if (meta.parentHash.isEmpty) null else BlockId(meta.parentHash.toByteArray()),
BigInteger(meta.difficulty.toByteArray()),
Instant.ofEpochMilli(meta.timestamp),
false,
Expand All @@ -76,7 +77,7 @@ class BlocksRedisCache(
if (block.timestamp == null || block.hash == null) { // null in unit tests
return Mono.empty()
}
if (block.full) {
if (block.includesFullTransactions) {
return Mono.error(IllegalArgumentException("Full Block is not supposed to be cached"))
}
return super.add(block, block)
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/cache/Caches.kt
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ open class Caches(
} else if (tag == Tag.REQUESTED) {
val blockOnlyContainer: BlockContainer?
var jsonValue: BlockJson<*>? = null
if (block.full) {
if (block.includesFullTransactions) {
jsonValue = Global.objectMapper.readValue<BlockJson<*>>(block.json, BlockJson::class.java)
// shouldn't cache block json with transactions, separate txes and blocks with refs
val blockOnly = jsonValue.withoutTransactionDetails()
Expand Down
22 changes: 18 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,38 @@ open class UpstreamsConfig {
var defaultOptions: MutableList<DefaultOptions> = ArrayList<DefaultOptions>()
var upstreams: MutableList<Upstream<*>> = ArrayList<Upstream<*>>()

companion object {
private const val MIN_PRIORITY = 0
private const val MAX_PRIORITY = 1_000_000
private const val DEFAULT_PRIORITY = 10
}

open class Options {
var disableValidation: Boolean? = null
var timeout = Defaults.timeout
var providesBalance: Boolean? = null
var priority: Int = DEFAULT_PRIORITY
set(value) {
require(value in MIN_PRIORITY..MAX_PRIORITY) {
"Upstream priority must be in $MIN_PRIORITY..$MAX_PRIORITY. Configured: $value"
}
field = value
}

var minPeers: Int? = 1
set(minPeers) {
if (minPeers != null && minPeers < 0) {
throw IllegalArgumentException("minPeers must be positive number")
set(value) {
require(value != null && value > 0) {
"minPeers must be a positive number: $value"
}
field = minPeers
field = value
}

fun merge(additional: Options?): Options {
if (additional == null) {
return this
}
val copy = Options()
copy.priority = this.priority.coerceAtLeast(additional.priority)
copy.minPeers = if (this.minPeers != null) this.minPeers else additional.minPeers
copy.disableValidation =
if (this.disableValidation != null) this.disableValidation else additional.disableValidation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ class UpstreamsConfigReader(
upstream.id = getValueAsString(upNode, "id")
upstream.options = tryReadOptions(upNode)
upstream.methods = tryReadMethods(upNode)
getValueAsInt(upNode, "priority")?.let {
if (upstream.options == null) {
upstream.options = UpstreamsConfig.Options.getDefaults()
}
upstream.options!!.priority = it
}
getValueAsBool(upNode, "enabled")?.let {
upstream.isEnabled = it
}
Expand Down Expand Up @@ -313,6 +319,9 @@ class UpstreamsConfigReader(
getValueAsBool(values, "balance")?.let {
options.providesBalance = it
}
getValueAsInt(values, "priority")?.let {
options.priority = it
}
return options
}
}
14 changes: 11 additions & 3 deletions src/main/kotlin/io/emeraldpay/dshackle/data/BlockContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,29 @@ import java.time.Instant
class BlockContainer(
val height: Long,
val hash: BlockId,
val parentHash: BlockId?,
val difficulty: BigInteger,
val timestamp: Instant,
val full: Boolean,
val includesFullTransactions: Boolean,
json: ByteArray?,
val parsed: Any?,
val transactions: List<TxId> = emptyList()
) : SourceContainer(json, parsed) {

constructor(height: Long, hash: BlockId, difficulty: BigInteger, timestamp: Instant) :
this(height, hash, null, difficulty, timestamp, false, null, null)

constructor(height: Long, hash: BlockId, difficulty: BigInteger, timestamp: Instant, transactions: List<TxId>) :
this(height, hash, null, difficulty, timestamp, false, null, null, transactions)

companion object {
@JvmStatic
fun from(block: BlockJson<*>, raw: ByteArray): BlockContainer {
val hasTransactions = block.transactions?.filterIsInstance<TransactionJson>()?.count() ?: 0 > 0
val hasTransactions = (block.transactions?.filterIsInstance<TransactionJson>()?.count() ?: 0) > 0
return BlockContainer(
block.number,
BlockId.from(block),
block.parentHash?.let(BlockId.Companion::from),
block.totalDifficulty,
block.timestamp,
hasTransactions,
Expand Down Expand Up @@ -76,7 +84,7 @@ class BlockContainer(
if (hash != other.hash) return false
if (difficulty != other.difficulty) return false
if (timestamp != other.timestamp) return false
if (full != other.full) return false
if (includesFullTransactions != other.includesFullTransactions) return false
if (transactions != other.transactions) return false

return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder
import io.emeraldpay.dshackle.upstream.ForkWatchFactory
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.MergedHead
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcHead
Expand Down Expand Up @@ -62,6 +63,8 @@ open class ConfiguredUpstreams(
private val log = LoggerFactory.getLogger(ConfiguredUpstreams::class.java)
private var seq = AtomicInteger(0)

private val forkWatchFactory = ForkWatchFactory(currentUpstreams)

@PostConstruct
fun start() {
log.debug("Starting upstreams")
Expand Down Expand Up @@ -175,7 +178,7 @@ open class ConfiguredUpstreams(
val upstream = BitcoinRpcUpstream(
config.id
?: "bitcoin-${seq.getAndIncrement()}",
chain, directApi, head,
chain, forkWatchFactory.create(chain), directApi, head,
options, config.role,
QuorumForLabels.QuorumItem(1, config.labels),
methods, esplora
Expand Down Expand Up @@ -223,15 +226,15 @@ open class ConfiguredUpstreams(
val ethereumUpstream = if (wsFactoryApi != null && !conn.preferHttp) {
EthereumWsUpstream(
config.id!!,
chain, directApi, wsFactoryApi,
chain, forkWatchFactory.create(chain), directApi, wsFactoryApi,
options, config.role,
QuorumForLabels.QuorumItem(1, config.labels),
methods
)
} else {
EthereumRpcUpstream(
config.id!!,
chain, directApi, wsFactoryApi,
chain, forkWatchFactory.create(chain), directApi, wsFactoryApi,
options, config.role,
QuorumForLabels.QuorumItem(1, config.labels),
methods
Expand All @@ -249,13 +252,14 @@ open class ConfiguredUpstreams(
val endpoint = config.connection!!
val ds = GrpcUpstreams(
config.id!!,
forkWatchFactory,
config.role,
endpoint.host!!,
endpoint.port,
endpoint.auth,
fileResolver
).apply {
timeout = options.timeout
this.options = options
}
log.info("Using ALL CHAINS (gRPC) upstream, at ${endpoint.host}:${endpoint.port}")
ds.start()
Expand Down
33 changes: 15 additions & 18 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.atomic.AtomicReference

abstract class AbstractHead : Head {
Expand All @@ -44,9 +45,6 @@ abstract class AbstractHead : Head {
return source
.distinctUntilChanged {
it.hash
}.filter { block ->
val curr = head.get()
curr == null || curr.difficulty < block.difficulty
}
.doFinally {
// close internal stream if upstream is finished, otherwise it gets stuck,
Expand All @@ -58,19 +56,11 @@ abstract class AbstractHead : Head {
.subscribeOn(Schedulers.boundedElastic())
.subscribe { block ->
notifyBeforeBlock()
val prev = head.getAndUpdate { curr ->
if (curr == null || curr.difficulty < block.difficulty) {
block
} else {
curr
}
}
if (prev == null || prev.hash != block.hash) {
log.debug("New block ${block.height} ${block.hash}")
val result = stream.tryEmitNext(block)
if (result.isFailure && result != Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER) {
log.warn("Failed to dispatch block: $result as ${this.javaClass}")
}
head.set(block)
log.debug("New block ${block.height} ${block.hash}")
val result = stream.tryEmitNext(block)
if (result.isFailure && result != Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER) {
log.warn("Failed to dispatch block: $result as ${this.javaClass}")
}
}
}
Expand All @@ -92,8 +82,15 @@ abstract class AbstractHead : Head {
override fun getFlux(): Flux<BlockContainer> {
return Flux.concat(
Mono.justOrEmpty(head.get()),
stream.asFlux()
).onBackpressureLatest()
stream.asFlux(),
// when the upstream makes a reconfiguration the head may be restarted,
// i.e. `follow` can be called multiple times and create a new stream each time
// in this case just continue with the new stream for all existing subscribers
Mono.fromCallable { log.warn("Restarting the Head...") }
.delaySubscription(Duration.ofMillis(100))
.thenMany { getFlux() }
)
.onBackpressureLatest()
}

fun getCurrent(): BlockContainer? {
Expand Down
Loading

0 comments on commit ff2ec4e

Please sign in to comment.