Skip to content

Commit

Permalink
Move fields to atomic (#573)
Browse files Browse the repository at this point in the history
KirillPamPam authored Sep 25, 2024

Verified

This commit was signed with the committer’s verified signature.
bryanculver Bryan Culver
1 parent 89a11b8 commit 2b141f7
Showing 2 changed files with 120 additions and 114 deletions.
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import reactor.kotlin.core.publisher.switchIfEmpty
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

@@ -64,34 +65,34 @@ class GenericWsHead(
}
private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, jsonRpcWsClient)

private var connectionId: String? = null
private var subscribed = false
private var connected = false
private var isSyncing = false
private val connectionId = AtomicReference<String?>(null)
private val subscribed = AtomicBoolean(false)
private val connected = AtomicBoolean(false)
private val isSyncing = AtomicBoolean(false)

private var subscription: Disposable? = null
private var headResubSubscription: Disposable? = null
private val subscription = AtomicReference<Disposable?>()
private val headResubSubscription = AtomicReference<Disposable?>()
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()

private var subscriptionId = AtomicReference("")
private val subscriptionId = AtomicReference("")

override fun isRunning(): Boolean {
return subscription != null
return subscription.get() != null
}

override fun start() {
super.start()
this.subscription?.dispose()
this.subscribed = true
this.subscription.get()?.dispose()
this.subscribed.set(true)
val heads = Flux.merge(
// get the current block, not just wait for the next update
getLatestBlock(api),
listenNewHeads(),
)
this.subscription = super.follow(heads)
this.subscription.set(super.follow(heads))

if (headResubSubscription == null) {
headResubSubscription = registerHeadResubscribeFlux()
if (headResubSubscription.get() == null) {
headResubSubscription.set(registerHeadResubscribeFlux())
}
}

@@ -100,10 +101,10 @@ class GenericWsHead(
}

override fun onSyncingNode(isSyncing: Boolean) {
if (isSyncing && !this.isSyncing) {
if (isSyncing && !this.isSyncing.get()) {
cancelSub()
}
this.isSyncing = isSyncing
this.isSyncing.set(isSyncing)
}

private fun listenNewHeads(): Flux<BlockContainer> {
@@ -129,7 +130,7 @@ class GenericWsHead(
}
UPSTREAM_SETTINGS_ERROR -> {
log.warn("Couldn't check chain settings via ws connection for {}, ws sub will be recreated", upstreamId)
subscribed = false
subscribed.set(false)
Mono.empty()
}
UPSTREAM_FATAL_SETTINGS_ERROR -> {
@@ -144,16 +145,15 @@ class GenericWsHead(
override fun stop() {
super.stop()
cancelSub()
headResubSubscription?.dispose()
headResubSubscription = null
headResubSubscription.getAndSet(null)?.dispose()
}

override fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return chainIdValidator
}

private fun unsubscribe(): Mono<BlockContainer> {
subscribed = false
subscribed.set(false)
return wsSubscriptions.unsubscribe(chainSpecific.unsubscribeNewHeadsRequest(subscriptionId.get()).copy(id = ids.getAndIncrement()))
.flatMap { it.requireResult() }
.doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) }
@@ -170,10 +170,10 @@ class GenericWsHead(
return try {
wsSubscriptions.subscribe(chainSpecific.listenNewHeadsRequest().copy(id = ids.getAndIncrement()))
.also {
connectionId = it.connectionId
subscriptionId = it.subId
if (!connected) {
connected = true
connectionId.set(it.connectionId)
subscriptionId.set(it.subId.get())
if (!connected.get()) {
connected.set(true)
}
}.data
} catch (e: Exception) {
@@ -184,13 +184,13 @@ class GenericWsHead(
private fun registerHeadResubscribeFlux(): Disposable {
val connectionStates = wsSubscriptions.connectionInfoFlux()
.map {
if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
if (it.connectionId == connectionId.get() && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
headLivenessSink.emitNext(HeadLivenessState.DISCONNECTED) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
subscribed = false
connected = false
connectionId = null
subscribed.set(false)
connected.set(false)
connectionId.set(null)
} else if (it.connectionState == WsConnection.ConnectionState.CONNECTED) {
connected = true
connected.set(true)
return@map true
}
return@map false
@@ -200,16 +200,15 @@ class GenericWsHead(
noHeadUpdatesSink.asFlux(),
connectionStates,
).publishOn(wsConnectionResubscribeScheduler)
.filter { it && !subscribed && connected && !isSyncing }
.filter { it && !subscribed.get() && connected.get() && !isSyncing.get() }
.subscribe {
log.warn("Restart ws head, upstreamId: $upstreamId")
start()
}
}

private fun cancelSub() {
subscription?.dispose()
subscription = null
subscribed = false
subscription.getAndSet(null)?.dispose()
subscribed.set(false)
}
}
Original file line number Diff line number Diff line change
@@ -99,13 +99,14 @@ open class GenericUpstream(
}

private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig, versionRules)
private var validatorSubscription: Disposable? = null
private var validationSettingsSubscription: Disposable? = null
private var lowerBlockDetectorSubscription: Disposable? = null
private val validatorSubscription = AtomicReference<Disposable?>()
private val validationSettingsSubscription = AtomicReference<Disposable?>()
private val lowerBlockDetectorSubscription = AtomicReference<Disposable?>()
private val settingsDetectorSubscription = AtomicReference<Disposable?>()

private val hasLiveSubscriptionHead: AtomicBoolean = AtomicBoolean(false)
protected val connector: GenericConnector = connectorFactory.create(this, chain)
private var livenessSubscription: Disposable? = null
private val livenessSubscription = AtomicReference<Disposable?>()
private val settingsDetector = upstreamSettingsDetectorBuilder(chain, this)
private var rpcMethodsDetector: UpstreamRpcMethodsDetector? = null

@@ -116,7 +117,7 @@ open class GenericUpstream(
private val clientVersion = AtomicReference(UNKNOWN_CLIENT_VERSION)

private val finalizationDetector = finalizationDetectorBuilder()
private var finalizationDetectorSubscription: Disposable? = null
private val finalizationDetectorSubscription = AtomicReference<Disposable?>()

private val headLivenessState = Sinks.many().multicast().directBestEffort<ValidateUpstreamSettingsResult>()

@@ -204,63 +205,67 @@ open class GenericUpstream(

private fun validateUpstreamSettings() {
if (validator != null) {
validationSettingsSubscription = Flux.merge(
Flux.interval(
Duration.ofSeconds(20),
).flatMap {
validator.validateUpstreamSettings()
},
headLivenessState.asFlux(),
)
.subscribeOn(upstreamSettingsScheduler)
.distinctUntilChanged()
.subscribe {
when (it) {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
if (isUpstreamValid.get()) {
log.warn("There is a fatal error after upstream settings validation, removing ${getId()}...")
partialStop()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.FATAL_SETTINGS_ERROR_REMOVED)
validationSettingsSubscription.set(
Flux.merge(
Flux.interval(
Duration.ofSeconds(20),
).flatMap {
validator.validateUpstreamSettings()
},
headLivenessState.asFlux(),
)
.subscribeOn(upstreamSettingsScheduler)
.distinctUntilChanged()
.subscribe {
when (it) {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
if (isUpstreamValid.get()) {
log.warn("There is a fatal error after upstream settings validation, removing ${getId()}...")
partialStop()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.FATAL_SETTINGS_ERROR_REMOVED)
}
isUpstreamValid.set(false)
}
isUpstreamValid.set(false)
}

UPSTREAM_VALID -> {
if (!isUpstreamValid.get()) {
log.warn("Upstream ${getId()} is now valid, adding to the multistream...")
upstreamStart()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.ADDED)
UPSTREAM_VALID -> {
if (!isUpstreamValid.get()) {
log.warn("Upstream ${getId()} is now valid, adding to the multistream...")
upstreamStart()
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.ADDED)
}
isUpstreamValid.set(true)
}
isUpstreamValid.set(true)
}

else -> {
log.warn("Continue validation of upstream ${getId()}")
else -> {
log.warn("Continue validation of upstream ${getId()}")
}
}
}
}
},
)
}
}

private fun detectSettings() {
Flux.interval(
Duration.ZERO,
Duration.ofSeconds(getOptions().validationInterval.toLong() * 5),
).flatMap {
Flux.merge(
settingsDetector?.detectLabels()
?.doOnNext { label ->
updateLabels(label)
sendUpstreamStateEvent(UPDATED)
},
settingsDetector?.detectClientVersion()
?.doOnNext {
log.info("Detected node version $it for upstream ${getId()}")
clientVersion.set(it)
},
)
.subscribeOn(settingsScheduler)
}.subscribe()
settingsDetectorSubscription.set(
Flux.interval(
Duration.ZERO,
Duration.ofSeconds(getOptions().validationInterval.toLong() * 5),
).flatMap {
Flux.merge(
settingsDetector?.detectLabels()
?.doOnNext { label ->
updateLabels(label)
sendUpstreamStateEvent(UPDATED)
},
settingsDetector?.detectClientVersion()
?.doOnNext {
log.info("Detected node version $it for upstream ${getId()}")
clientVersion.set(it)
},
)
.subscribeOn(settingsScheduler)
}.subscribe(),
)
}

private fun detectRpcMethods(
@@ -311,22 +316,26 @@ open class GenericUpstream(
this.setStatus(UpstreamAvailability.OK)
} else {
log.debug("Start validation for upstream ${this.getId()}")
validatorSubscription = validator?.start()
?.subscribe(this::setStatus)
validatorSubscription.set(
validator?.start()
?.subscribe(this::setStatus),
)
}
livenessSubscription = connector.headLivenessEvents().subscribe(
{
val hasSub = it == HeadLivenessState.OK
hasLiveSubscriptionHead.set(hasSub)
if (it == HeadLivenessState.FATAL_ERROR) {
headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
} else {
sendUpstreamStateEvent(UPDATED)
}
},
{
log.debug("Error while checking live subscription for ${getId()}", it)
},
livenessSubscription.set(
connector.headLivenessEvents().subscribe(
{
val hasSub = it == HeadLivenessState.OK
hasLiveSubscriptionHead.set(hasSub)
if (it == HeadLivenessState.FATAL_ERROR) {
headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
} else {
sendUpstreamStateEvent(UPDATED)
}
},
{
log.debug("Error while checking live subscription for ${getId()}", it)
},
),
)
detectSettings()

@@ -337,21 +346,17 @@ open class GenericUpstream(

override fun stop() {
partialStop()
validationSettingsSubscription?.dispose()
validationSettingsSubscription = null
validationSettingsSubscription.getAndSet(null)?.dispose()
connector.stop()
started.set(false)
}

private fun partialStop() {
validatorSubscription?.dispose()
validatorSubscription = null
livenessSubscription?.dispose()
livenessSubscription = null
lowerBlockDetectorSubscription?.dispose()
lowerBlockDetectorSubscription = null
finalizationDetectorSubscription?.dispose()
finalizationDetectorSubscription = null
validatorSubscription.getAndSet(null)?.dispose()
livenessSubscription.getAndSet(null)?.dispose()
lowerBlockDetectorSubscription.getAndSet(null)?.dispose()
finalizationDetectorSubscription.getAndSet(null)?.dispose()
settingsDetectorSubscription.getAndSet(null)?.dispose()
connector.getHead().stop()
}

@@ -373,21 +378,23 @@ open class GenericUpstream(
}

private fun detectFinalization() {
finalizationDetectorSubscription =
finalizationDetectorSubscription.set(
finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, getChain())
.subscribeOn(finalizationScheduler)
.subscribe {
sendUpstreamStateEvent(UPDATED)
}
},
)
}

private fun detectLowerBlock() {
lowerBlockDetectorSubscription =
lowerBlockDetectorSubscription.set(
lowerBoundService.detectLowerBounds()
.subscribeOn(lowerScheduler)
.subscribe {
sendUpstreamStateEvent(UPDATED)
}
},
)
}

fun getIngressSubscription(): IngressSubscription {

0 comments on commit 2b141f7

Please sign in to comment.