Skip to content

Commit

Permalink
Move fields to atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
Кирилл committed Sep 24, 2024
1 parent dc88bd1 commit 8b5e3bd
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,34 +65,34 @@ class GenericWsHead(
}
private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, jsonRpcWsClient)

private var connectionId = AtomicReference("")
private var subscribed = AtomicBoolean(false)
private var connected = AtomicBoolean(false)
private var isSyncing = AtomicBoolean(false)
private val connectionId = AtomicReference("")
private val subscribed = AtomicBoolean(false)
private val connected = AtomicBoolean(false)
private val isSyncing = AtomicBoolean(false)

private var subscription: Disposable? = null
private var headResubSubscription: Disposable? = null
private val subscription = AtomicReference<Disposable?>()
private val headResubSubscription = AtomicReference<Disposable?>()
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()

private var subscriptionId = AtomicReference("")
private val subscriptionId = AtomicReference("")

override fun isRunning(): Boolean {
return subscription != null
return subscription.get() != null
}

override fun start() {
super.start()
this.subscription?.dispose()
this.subscription.get()?.dispose()
this.subscribed.set(true)
val heads = Flux.merge(
// get the current block, not just wait for the next update
getLatestBlock(api),
listenNewHeads(),
)
this.subscription = super.follow(heads)
this.subscription.set(super.follow(heads))

if (headResubSubscription == null) {
headResubSubscription = registerHeadResubscribeFlux()
if (headResubSubscription.get() == null) {
headResubSubscription.set(registerHeadResubscribeFlux())
}
}

Expand Down Expand Up @@ -145,8 +145,7 @@ class GenericWsHead(
override fun stop() {
super.stop()
cancelSub()
headResubSubscription?.dispose()
headResubSubscription = null
headResubSubscription.getAndSet(null)?.dispose()
}

override fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
Expand All @@ -172,7 +171,7 @@ class GenericWsHead(
wsSubscriptions.subscribe(chainSpecific.listenNewHeadsRequest().copy(id = ids.getAndIncrement()))
.also {
connectionId.set(it.connectionId)
subscriptionId = it.subId
subscriptionId.set(it.subId.get())
if (!connected.get()) {
connected.set(true)
}
Expand Down Expand Up @@ -209,8 +208,7 @@ class GenericWsHead(
}

private fun cancelSub() {
subscription?.dispose()
subscription = null
subscription.getAndSet(null)?.dispose()
subscribed.set(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ open class GenericUpstream(
}

private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig, versionRules)
private var validatorSubscription: Disposable? = null
private var validationSettingsSubscription: Disposable? = null
private var lowerBlockDetectorSubscription: Disposable? = null
private val validatorSubscription = AtomicReference<Disposable?>()
private val validationSettingsSubscription = AtomicReference<Disposable?>()
private val lowerBlockDetectorSubscription = AtomicReference<Disposable?>()
private val settingsDetectorSubscription = AtomicReference<Disposable?>()

private val hasLiveSubscriptionHead: AtomicBoolean = AtomicBoolean(false)
protected val connector: GenericConnector = connectorFactory.create(this, chain)
private var livenessSubscription: Disposable? = null
private val livenessSubscription = AtomicReference<Disposable?>()
private val settingsDetector = upstreamSettingsDetectorBuilder(chain, this)
private var rpcMethodsDetector: UpstreamRpcMethodsDetector? = null

Expand All @@ -116,7 +117,7 @@ open class GenericUpstream(
private val clientVersion = AtomicReference(UNKNOWN_CLIENT_VERSION)

private val finalizationDetector = finalizationDetectorBuilder()
private var finalizationDetectorSubscription: Disposable? = null
private val finalizationDetectorSubscription = AtomicReference<Disposable?>()

private val headLivenessState = Sinks.many().multicast().directBestEffort<ValidateUpstreamSettingsResult>()

Expand Down Expand Up @@ -204,63 +205,67 @@ open class GenericUpstream(

private fun validateUpstreamSettings() {
if (validator != null) {
validationSettingsSubscription = Flux.merge(
Flux.interval(
Duration.ofSeconds(20),
).flatMap {
validator.validateUpstreamSettings()
},
headLivenessState.asFlux(),
)
.subscribeOn(upstreamSettingsScheduler)
.distinctUntilChanged()
.subscribe {
when (it) {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
if (isUpstreamValid.get()) {
log.warn("There is a fatal error after upstream settings validation, removing ${getId()}...")
partialStop()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.FATAL_SETTINGS_ERROR_REMOVED)
validationSettingsSubscription.set(
Flux.merge(
Flux.interval(
Duration.ofSeconds(20),
).flatMap {
validator.validateUpstreamSettings()
},
headLivenessState.asFlux(),
)
.subscribeOn(upstreamSettingsScheduler)
.distinctUntilChanged()
.subscribe {
when (it) {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
if (isUpstreamValid.get()) {
log.warn("There is a fatal error after upstream settings validation, removing ${getId()}...")
partialStop()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.FATAL_SETTINGS_ERROR_REMOVED)
}
isUpstreamValid.set(false)
}
isUpstreamValid.set(false)
}

UPSTREAM_VALID -> {
if (!isUpstreamValid.get()) {
log.warn("Upstream ${getId()} is now valid, adding to the multistream...")
upstreamStart()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.ADDED)
UPSTREAM_VALID -> {
if (!isUpstreamValid.get()) {
log.warn("Upstream ${getId()} is now valid, adding to the multistream...")
upstreamStart()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.ADDED)
}
isUpstreamValid.set(true)
}
isUpstreamValid.set(true)
}

else -> {
log.warn("Continue validation of upstream ${getId()}")
else -> {
log.warn("Continue validation of upstream ${getId()}")
}
}
}
}
},
)
}
}

private fun detectSettings() {
Flux.interval(
Duration.ZERO,
Duration.ofSeconds(getOptions().validationInterval.toLong() * 5),
).flatMap {
Flux.merge(
settingsDetector?.detectLabels()
?.doOnNext { label ->
updateLabels(label)
sendUpstreamStateEvent(UPDATED)
},
settingsDetector?.detectClientVersion()
?.doOnNext {
log.info("Detected node version $it for upstream ${getId()}")
clientVersion.set(it)
},
)
.subscribeOn(settingsScheduler)
}.subscribe()
settingsDetectorSubscription.set(
Flux.interval(
Duration.ZERO,
Duration.ofSeconds(getOptions().validationInterval.toLong() * 5),
).flatMap {
Flux.merge(
settingsDetector?.detectLabels()
?.doOnNext { label ->
updateLabels(label)
sendUpstreamStateEvent(UPDATED)
},
settingsDetector?.detectClientVersion()
?.doOnNext {
log.info("Detected node version $it for upstream ${getId()}")
clientVersion.set(it)
},
)
.subscribeOn(settingsScheduler)
}.subscribe(),
)
}

private fun detectRpcMethods(
Expand Down Expand Up @@ -311,22 +316,26 @@ open class GenericUpstream(
this.setStatus(UpstreamAvailability.OK)
} else {
log.debug("Start validation for upstream ${this.getId()}")
validatorSubscription = validator?.start()
?.subscribe(this::setStatus)
validatorSubscription.set(
validator?.start()
?.subscribe(this::setStatus),
)
}
livenessSubscription = connector.headLivenessEvents().subscribe(
{
val hasSub = it == HeadLivenessState.OK
hasLiveSubscriptionHead.set(hasSub)
if (it == HeadLivenessState.FATAL_ERROR) {
headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
} else {
sendUpstreamStateEvent(UPDATED)
}
},
{
log.debug("Error while checking live subscription for ${getId()}", it)
},
livenessSubscription.set(
connector.headLivenessEvents().subscribe(
{
val hasSub = it == HeadLivenessState.OK
hasLiveSubscriptionHead.set(hasSub)
if (it == HeadLivenessState.FATAL_ERROR) {
headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
} else {
sendUpstreamStateEvent(UPDATED)
}
},
{
log.debug("Error while checking live subscription for ${getId()}", it)
},
),
)
detectSettings()

Expand All @@ -337,21 +346,17 @@ open class GenericUpstream(

override fun stop() {
partialStop()
validationSettingsSubscription?.dispose()
validationSettingsSubscription = null
validationSettingsSubscription.getAndSet(null)?.dispose()
connector.stop()
started.set(false)
}

private fun partialStop() {
validatorSubscription?.dispose()
validatorSubscription = null
livenessSubscription?.dispose()
livenessSubscription = null
lowerBlockDetectorSubscription?.dispose()
lowerBlockDetectorSubscription = null
finalizationDetectorSubscription?.dispose()
finalizationDetectorSubscription = null
validatorSubscription.getAndSet(null)?.dispose()
livenessSubscription.getAndSet(null)?.dispose()
lowerBlockDetectorSubscription.getAndSet(null)?.dispose()
finalizationDetectorSubscription.getAndSet(null)?.dispose()
settingsDetectorSubscription.getAndSet(null)?.dispose()
connector.getHead().stop()
}

Expand All @@ -373,21 +378,23 @@ open class GenericUpstream(
}

private fun detectFinalization() {
finalizationDetectorSubscription =
finalizationDetectorSubscription.set(
finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, getChain())
.subscribeOn(finalizationScheduler)
.subscribe {
sendUpstreamStateEvent(UPDATED)
}
},
)
}

private fun detectLowerBlock() {
lowerBlockDetectorSubscription =
lowerBlockDetectorSubscription.set(
lowerBoundService.detectLowerBounds()
.subscribeOn(lowerScheduler)
.subscribe {
sendUpstreamStateEvent(UPDATED)
}
},
)
}

fun getIngressSubscription(): IngressSubscription {
Expand Down

0 comments on commit 8b5e3bd

Please sign in to comment.