From 8b5e3bdff69d4d5c94730e050e8257a2cde8ec66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Tue, 24 Sep 2024 16:58:09 +0400 Subject: [PATCH] Move fields to atomic --- .../upstream/ethereum/GenericWsHead.kt | 32 ++-- .../upstream/generic/GenericUpstream.kt | 171 +++++++++--------- 2 files changed, 104 insertions(+), 99 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index 0849e908b..c1347fab3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -65,34 +65,34 @@ class GenericWsHead( } private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, jsonRpcWsClient) - private var connectionId = AtomicReference("") - private var subscribed = AtomicBoolean(false) - private var connected = AtomicBoolean(false) - private var isSyncing = AtomicBoolean(false) + private val connectionId = AtomicReference("") + private val subscribed = AtomicBoolean(false) + private val connected = AtomicBoolean(false) + private val isSyncing = AtomicBoolean(false) - private var subscription: Disposable? = null - private var headResubSubscription: Disposable? = null + private val subscription = AtomicReference() + private val headResubSubscription = AtomicReference() private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort() - private var subscriptionId = AtomicReference("") + private val subscriptionId = AtomicReference("") override fun isRunning(): Boolean { - return subscription != null + return subscription.get() != null } override fun start() { super.start() - this.subscription?.dispose() + this.subscription.get()?.dispose() this.subscribed.set(true) val heads = Flux.merge( // get the current block, not just wait for the next update getLatestBlock(api), listenNewHeads(), ) - this.subscription = super.follow(heads) + this.subscription.set(super.follow(heads)) - if (headResubSubscription == null) { - headResubSubscription = registerHeadResubscribeFlux() + if (headResubSubscription.get() == null) { + headResubSubscription.set(registerHeadResubscribeFlux()) } } @@ -145,8 +145,7 @@ class GenericWsHead( override fun stop() { super.stop() cancelSub() - headResubSubscription?.dispose() - headResubSubscription = null + headResubSubscription.getAndSet(null)?.dispose() } override fun chainIdValidator(): SingleValidator? { @@ -172,7 +171,7 @@ class GenericWsHead( wsSubscriptions.subscribe(chainSpecific.listenNewHeadsRequest().copy(id = ids.getAndIncrement())) .also { connectionId.set(it.connectionId) - subscriptionId = it.subId + subscriptionId.set(it.subId.get()) if (!connected.get()) { connected.set(true) } @@ -209,8 +208,7 @@ class GenericWsHead( } private fun cancelSub() { - subscription?.dispose() - subscription = null + subscription.getAndSet(null)?.dispose() subscribed.set(false) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 0969b1924..a662317c4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -99,13 +99,14 @@ open class GenericUpstream( } private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig, versionRules) - private var validatorSubscription: Disposable? = null - private var validationSettingsSubscription: Disposable? = null - private var lowerBlockDetectorSubscription: Disposable? = null + private val validatorSubscription = AtomicReference() + private val validationSettingsSubscription = AtomicReference() + private val lowerBlockDetectorSubscription = AtomicReference() + private val settingsDetectorSubscription = AtomicReference() private val hasLiveSubscriptionHead: AtomicBoolean = AtomicBoolean(false) protected val connector: GenericConnector = connectorFactory.create(this, chain) - private var livenessSubscription: Disposable? = null + private val livenessSubscription = AtomicReference() private val settingsDetector = upstreamSettingsDetectorBuilder(chain, this) private var rpcMethodsDetector: UpstreamRpcMethodsDetector? = null @@ -116,7 +117,7 @@ open class GenericUpstream( private val clientVersion = AtomicReference(UNKNOWN_CLIENT_VERSION) private val finalizationDetector = finalizationDetectorBuilder() - private var finalizationDetectorSubscription: Disposable? = null + private val finalizationDetectorSubscription = AtomicReference() private val headLivenessState = Sinks.many().multicast().directBestEffort() @@ -204,63 +205,67 @@ open class GenericUpstream( private fun validateUpstreamSettings() { if (validator != null) { - validationSettingsSubscription = Flux.merge( - Flux.interval( - Duration.ofSeconds(20), - ).flatMap { - validator.validateUpstreamSettings() - }, - headLivenessState.asFlux(), - ) - .subscribeOn(upstreamSettingsScheduler) - .distinctUntilChanged() - .subscribe { - when (it) { - UPSTREAM_FATAL_SETTINGS_ERROR -> { - if (isUpstreamValid.get()) { - log.warn("There is a fatal error after upstream settings validation, removing ${getId()}...") - partialStop() - sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.FATAL_SETTINGS_ERROR_REMOVED) + validationSettingsSubscription.set( + Flux.merge( + Flux.interval( + Duration.ofSeconds(20), + ).flatMap { + validator.validateUpstreamSettings() + }, + headLivenessState.asFlux(), + ) + .subscribeOn(upstreamSettingsScheduler) + .distinctUntilChanged() + .subscribe { + when (it) { + UPSTREAM_FATAL_SETTINGS_ERROR -> { + if (isUpstreamValid.get()) { + log.warn("There is a fatal error after upstream settings validation, removing ${getId()}...") + partialStop() + sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.FATAL_SETTINGS_ERROR_REMOVED) + } + isUpstreamValid.set(false) } - isUpstreamValid.set(false) - } - UPSTREAM_VALID -> { - if (!isUpstreamValid.get()) { - log.warn("Upstream ${getId()} is now valid, adding to the multistream...") - upstreamStart() - sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.ADDED) + UPSTREAM_VALID -> { + if (!isUpstreamValid.get()) { + log.warn("Upstream ${getId()} is now valid, adding to the multistream...") + upstreamStart() + sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.ADDED) + } + isUpstreamValid.set(true) } - isUpstreamValid.set(true) - } - else -> { - log.warn("Continue validation of upstream ${getId()}") + else -> { + log.warn("Continue validation of upstream ${getId()}") + } } - } - } + }, + ) } } private fun detectSettings() { - Flux.interval( - Duration.ZERO, - Duration.ofSeconds(getOptions().validationInterval.toLong() * 5), - ).flatMap { - Flux.merge( - settingsDetector?.detectLabels() - ?.doOnNext { label -> - updateLabels(label) - sendUpstreamStateEvent(UPDATED) - }, - settingsDetector?.detectClientVersion() - ?.doOnNext { - log.info("Detected node version $it for upstream ${getId()}") - clientVersion.set(it) - }, - ) - .subscribeOn(settingsScheduler) - }.subscribe() + settingsDetectorSubscription.set( + Flux.interval( + Duration.ZERO, + Duration.ofSeconds(getOptions().validationInterval.toLong() * 5), + ).flatMap { + Flux.merge( + settingsDetector?.detectLabels() + ?.doOnNext { label -> + updateLabels(label) + sendUpstreamStateEvent(UPDATED) + }, + settingsDetector?.detectClientVersion() + ?.doOnNext { + log.info("Detected node version $it for upstream ${getId()}") + clientVersion.set(it) + }, + ) + .subscribeOn(settingsScheduler) + }.subscribe(), + ) } private fun detectRpcMethods( @@ -311,22 +316,26 @@ open class GenericUpstream( this.setStatus(UpstreamAvailability.OK) } else { log.debug("Start validation for upstream ${this.getId()}") - validatorSubscription = validator?.start() - ?.subscribe(this::setStatus) + validatorSubscription.set( + validator?.start() + ?.subscribe(this::setStatus), + ) } - livenessSubscription = connector.headLivenessEvents().subscribe( - { - val hasSub = it == HeadLivenessState.OK - hasLiveSubscriptionHead.set(hasSub) - if (it == HeadLivenessState.FATAL_ERROR) { - headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } - } else { - sendUpstreamStateEvent(UPDATED) - } - }, - { - log.debug("Error while checking live subscription for ${getId()}", it) - }, + livenessSubscription.set( + connector.headLivenessEvents().subscribe( + { + val hasSub = it == HeadLivenessState.OK + hasLiveSubscriptionHead.set(hasSub) + if (it == HeadLivenessState.FATAL_ERROR) { + headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + } else { + sendUpstreamStateEvent(UPDATED) + } + }, + { + log.debug("Error while checking live subscription for ${getId()}", it) + }, + ), ) detectSettings() @@ -337,21 +346,17 @@ open class GenericUpstream( override fun stop() { partialStop() - validationSettingsSubscription?.dispose() - validationSettingsSubscription = null + validationSettingsSubscription.getAndSet(null)?.dispose() connector.stop() started.set(false) } private fun partialStop() { - validatorSubscription?.dispose() - validatorSubscription = null - livenessSubscription?.dispose() - livenessSubscription = null - lowerBlockDetectorSubscription?.dispose() - lowerBlockDetectorSubscription = null - finalizationDetectorSubscription?.dispose() - finalizationDetectorSubscription = null + validatorSubscription.getAndSet(null)?.dispose() + livenessSubscription.getAndSet(null)?.dispose() + lowerBlockDetectorSubscription.getAndSet(null)?.dispose() + finalizationDetectorSubscription.getAndSet(null)?.dispose() + settingsDetectorSubscription.getAndSet(null)?.dispose() connector.getHead().stop() } @@ -373,21 +378,23 @@ open class GenericUpstream( } private fun detectFinalization() { - finalizationDetectorSubscription = + finalizationDetectorSubscription.set( finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, getChain()) .subscribeOn(finalizationScheduler) .subscribe { sendUpstreamStateEvent(UPDATED) - } + }, + ) } private fun detectLowerBlock() { - lowerBlockDetectorSubscription = + lowerBlockDetectorSubscription.set( lowerBoundService.detectLowerBounds() .subscribeOn(lowerScheduler) .subscribe { sendUpstreamStateEvent(UPDATED) - } + }, + ) } fun getIngressSubscription(): IngressSubscription {