Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if config was reloaded, remove all upstream metrics if it was r… #327

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ class ReloadConfigSetup(
try {
log.info("Reloading config...")

reloadConfig()

log.info("Config is reloaded")
if (reloadConfig()) {
log.info("Config is reloaded")
} else {
log.info("There is nothing to reload, config is the same")
}
} finally {
reloadLock.unlock()
}
Expand All @@ -54,10 +56,14 @@ class ReloadConfigSetup(
}
}

private fun reloadConfig() {
private fun reloadConfig(): Boolean {
val newUpstreamsConfig = reloadConfigService.readUpstreamsConfig()
val currentUpstreamsConfig = reloadConfigService.currentUpstreamsConfig()

if (newUpstreamsConfig == currentUpstreamsConfig) {
return false
}

val chainsToReload = analyzeDefaultOptions(
currentUpstreamsConfig.defaultOptions,
newUpstreamsConfig.defaultOptions,
Expand All @@ -76,6 +82,8 @@ class ReloadConfigSetup(
reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig)

reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig)

return true
}

private fun analyzeUpstreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component

@Component
class ReloadConfigUpstreamService(
open class ReloadConfigUpstreamService(
private val eventPublisher: ApplicationEventPublisher,
private val multistreamHolder: CurrentMultistreamHolder,
private val configuredUpstreams: ConfiguredUpstreams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse

typealias JsonRpcReader = Reader<JsonRpcRequest, JsonRpcResponse>

interface JsonRpcHttpReader : JsonRpcReader {
fun onStop()
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig.HttpEndpoint
import io.emeraldpay.dshackle.config.UpstreamsConfig.RpcConnection
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.foundation.ChainOptions.Options
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.CallTargetsHolder
import io.emeraldpay.dshackle.upstream.Head
Expand Down Expand Up @@ -343,7 +342,7 @@ open class ConfiguredUpstreams(
log.warn("Upstream doesn't have API configuration")
return null
}
val directApi: JsonRpcReader = httpFactory.create(config.id, chain)
val directApi = httpFactory.create(config.id, chain)
val esplora = conn.esplora?.let { endpoint ->
val tls = endpoint.tls?.let { tls ->
tls.ca?.let { ca ->
Expand Down
23 changes: 14 additions & 9 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Metrics
import org.slf4j.LoggerFactory
import reactor.core.Disposable
Expand All @@ -32,7 +33,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock

abstract class AbstractHead @JvmOverloads constructor(
Expand All @@ -57,14 +57,24 @@ abstract class AbstractHead @JvmOverloads constructor(
private var future: Future<*>? = null
private val delayed = AtomicBoolean(false)

private val metrics = mutableSetOf<Meter>()

init {
val className = this.javaClass.simpleName
Gauge.builder("stuck_head", delayed) {
if (it.get()) 1.0 else 0.0
}.tag("upstream", upstreamId).tag("class", className).register(Metrics.globalRegistry)
}
.tag("upstream", upstreamId)
.tag("class", className)
.register(Metrics.globalRegistry)
.also { metrics.add(it) }
Gauge.builder("current_head", forkChoice) {
it.getHead()?.height?.toDouble() ?: 0.0
}.tag("upstream", upstreamId).tag("class", className).register(Metrics.globalRegistry)
}
.tag("upstream", upstreamId)
.tag("class", className)
.register(Metrics.globalRegistry)
.also { metrics.add(it) }
}

fun follow(source: Flux<BlockContainer>): Disposable {
Expand Down Expand Up @@ -147,6 +157,7 @@ abstract class AbstractHead @JvmOverloads constructor(
it.cancel(true)
}
future = null
metrics.forEach { Metrics.globalRegistry.remove(it) }
}

protected open fun onNoHeadUpdates() {
Expand Down Expand Up @@ -176,10 +187,4 @@ abstract class AbstractHead @JvmOverloads constructor(
)
}
}

private fun toHeadCountMetric(counter: AtomicInteger, status: String) {
Gauge.builder("head_count", counter) {
it.get().toDouble()
}.tag("class", this.javaClass.simpleName).tag("status", status).register(Metrics.globalRegistry)
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader

interface HttpFactory {
fun create(id: String?, chain: Chain): JsonRpcReader
fun create(id: String?, chain: Chain): JsonRpcHttpReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.AuthConfig
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcHttpClient
import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
import io.micrometer.core.instrument.Counter
Expand All @@ -15,7 +15,7 @@ open class HttpRpcFactory(
private val basicAuth: AuthConfig.ClientBasicAuth?,
private val tls: ByteArray?,
) : HttpFactory {
override fun create(id: String?, chain: Chain): JsonRpcReader {
override fun create(id: String?, chain: Chain): JsonRpcHttpReader {
val metricsTags = listOf(
// "unknown" is not supposed to happen
Tag.of("upstream", id ?: "unknown"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.Capability
Expand All @@ -32,7 +33,7 @@ import reactor.core.Disposable
open class BitcoinRpcUpstream(
id: String,
chain: Chain,
private val directApi: JsonRpcReader,
private val directApi: JsonRpcHttpReader,
private val head: Head,
options: ChainOptions.Options,
role: UpstreamsConfig.UpstreamRole,
Expand Down Expand Up @@ -115,5 +116,6 @@ open class BitcoinRpcUpstream(
head.stop()
}
validatorSubscription?.dispose()
directApi.onStop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.generic.connectors

import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CachesEnabled
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
Expand Down Expand Up @@ -31,7 +32,7 @@ import java.time.Duration

class GenericRpcConnector(
connectorType: ConnectorMode,
private val directReader: JsonRpcReader,
private val directReader: JsonRpcHttpReader,
wsFactory: WsConnectionPoolFactory?,
upstream: DefaultUpstream,
forkChoice: ForkChoice,
Expand Down Expand Up @@ -137,6 +138,7 @@ class GenericRpcConnector(
head.stop()
}
pool?.close()
directReader.onStop()
}

override fun getIngressReader(): JsonRpcReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package io.emeraldpay.dshackle.upstream.rpcclient

import io.emeraldpay.dshackle.config.AuthConfig
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.etherjar.rpc.RpcException
import io.emeraldpay.etherjar.rpc.RpcResponseError
import io.micrometer.core.instrument.Metrics
import io.netty.buffer.Unpooled
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaders
Expand Down Expand Up @@ -47,7 +48,7 @@ class JsonRpcHttpClient(
private val metrics: RpcMetrics,
basicAuth: AuthConfig.ClientBasicAuth? = null,
tlsCAAuth: ByteArray? = null,
) : JsonRpcReader {
) : JsonRpcHttpReader {

private val parser = ResponseRpcParser()
private val httpClient: HttpClient
Expand Down Expand Up @@ -104,6 +105,11 @@ class JsonRpcHttpClient(
}.single()
}

override fun onStop() {
Metrics.globalRegistry.remove(metrics.timer)
Metrics.globalRegistry.remove(metrics.fails)
}

override fun read(key: JsonRpcRequest): Mono<JsonRpcResponse> {
val startTime = StopWatch()
return Mono.just(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.test.EthereumApiStub
import io.emeraldpay.dshackle.test.TestingCommons
Expand Down Expand Up @@ -51,7 +52,7 @@ class FilteredApisSpec extends Specification {
[test: "baz"]
].collect {
def httpFactory = Mock(HttpFactory) {
create(_, _) >> TestingCommons.api().tap { it.id = "${i++}" }
create(_, _) >> Stub(JsonRpcHttpReader)
}
def connectorFactory = new GenericConnectorFactory(
GenericConnectorFactory.ConnectorMode.RPC_ONLY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.ArgumentCaptor
import org.mockito.kotlin.any
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
Expand Down Expand Up @@ -99,7 +101,7 @@ class ReloadConfigTest {
}

@Test
fun `stop multistream of there are no upstreams left`() {
fun `stop multistream if there are no upstreams left`() {
val up1 = upstream("local1")
val up2 = upstream("local2")
val up3 = upstream("local3")
Expand Down Expand Up @@ -154,6 +156,25 @@ class ReloadConfigTest {
)
}

@Test
fun `reload the same config cause to nothing`() {
val initialConfigFile = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml")
val initialConfig = upstreamsConfigReader.read(initialConfigFile.inputStream())!!
mainConfig.upstreams = initialConfig

val reloadConfigUpstreamService = mock<ReloadConfigUpstreamService>()

val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService)

whenever(config.getConfigPath()).thenReturn(initialConfigFile)

reloadConfig.handle(Signal("HUP"))

verify(reloadConfigUpstreamService, never()).reloadUpstreams(any(), any(), any(), any())

assertEquals(initialConfig, mainConfig.upstreams)
}

private fun upstream(id: String): Upstream =
mock {
on { getId() } doReturn id
Expand Down