From 6746f40d73f67dbc7678c16a901e3219addca21e Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 5 Jan 2025 10:46:40 +0300 Subject: [PATCH 01/14] improve logging --- src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt | 12 ++++++++++-- .../eu/vendeli/rethis/types/core/ConnectionPool.kt | 11 ++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt index f12fe9e..8f1d748 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt @@ -35,8 +35,6 @@ class ReThis( internal val connectionPool by lazy { ConnectionPool(this, address.socket).also { it.prepare() } } init { - logger.info("Created client (RESP $protocol)") - if (address is Url) { cfg.db = address.db if (address.credentials.isNotEmpty()) { @@ -46,6 +44,16 @@ class ReThis( ) } } + + buildString { + append("Created ReThis client.\n") + append("Address: $address\n") + append("DB: ${cfg.db}\n") + append("Auth: ${cfg.auth != null}\n") + append("TLS: ${cfg.tlsConfig != null}\n") + append("Pool size: ${cfg.poolConfiguration.poolSize}\n") + append("Protocol: ${protocol.literal}\n") + }.let { logger.info(it) } } val subscriptions = ActiveSubscriptions() diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index dd2fdaa..ace6008 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -26,7 +26,9 @@ internal class ConnectionPool( private val job = SupervisorJob(client.rootJob) private val connections = Channel(client.cfg.poolConfiguration.poolSize) - private val selector = SelectorManager(client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis Pool")) + private val selector = SelectorManager( + client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis-ConnectionPool"), + ) internal suspend fun createConn(): Connection { logger.trace("Creating connection to $address") @@ -43,19 +45,21 @@ internal class ConnectionPool( var requests = 0 if (client.cfg.auth != null) client.cfg.auth?.run { - logger.debug("Authenticating to $address with $this") + logger.trace("Authenticating to $address.") reqBuffer.writeRedisValue(listOfNotNull("AUTH".toArg(), username?.toArg(), password.toArg())) requests++ } client.cfg.db?.takeIf { it > 0 }?.let { - requests++ + logger.trace("Selecting database $it to $address.") reqBuffer.writeRedisValue(listOf("SELECT".toArg(), it.toArg())) + requests++ } reqBuffer.writeRedisValue(listOf("HELLO".toArg(), client.protocol.literal.toArg())) requests++ + logger.trace("Sending connection establishment requests ($requests)") conn.sendRequest(reqBuffer) repeat(requests) { logger.trace("Connection establishment response: " + conn.input.readRedisMessage(client.cfg.charset)) @@ -74,6 +78,7 @@ internal class ConnectionPool( suspend fun acquire(): Connection = connections.receive() suspend fun release(connection: Connection) { + logger.trace("Releasing connection ${connection.socket}, health status: ${!connection.input.isClosedForRead}") connections.send(connection) } From 0e06b8e7feb84d1470f807c94c72d267728708ed Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 5 Jan 2025 12:42:28 +0300 Subject: [PATCH 02/14] use direct connection in subscriptions --- src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt index 8ba1a50..ae14325 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt @@ -37,7 +37,7 @@ internal suspend inline fun ReThis.registerSubscription( messageMarker: String, handler: SubscriptionHandler, ) { - val connection = connectionPool.acquire() + val connection = connectionPool.createConn() val handlerJob = rethisCoScope.launch(CoLocalConn(connection)) { val conn = currentCoroutineContext()[CoLocalConn]!!.connection try { @@ -76,7 +76,6 @@ internal suspend inline fun ReThis.registerSubscription( subscriptions.eventHandler?.onException(target, e) } finally { conn.sendRequest(bufferValues(listOf(unRegCommand.toArg(), target.toArg()), cfg.charset)) - connectionPool.release(conn) subscriptions.unsubscribe(target) } } From 0287c74df5a93b71141e7aa1bb822b77f8cc6298 Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 5 Jan 2025 12:44:01 +0300 Subject: [PATCH 03/14] add conn close statement --- src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt index ae14325..d5fbd63 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt @@ -77,6 +77,7 @@ internal suspend inline fun ReThis.registerSubscription( } finally { conn.sendRequest(bufferValues(listOf(unRegCommand.toArg(), target.toArg()), cfg.charset)) subscriptions.unsubscribe(target) + connection.socket.close() } } From 672bcad4379daeea66428f6d6814abf09cb0b89b Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 5 Jan 2025 13:21:09 +0300 Subject: [PATCH 04/14] add to connection pool health check and refilling with retry strategy --- .../kotlin/eu/vendeli/rethis/ReThis.kt | 4 +- .../rethis/types/core/ClientConfiguration.kt | 14 ++++- .../rethis/types/core/ConnectionPool.kt | 51 +++++++++++++++---- 3 files changed, 56 insertions(+), 13 deletions(-) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt index 8f1d748..0c23488 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt @@ -17,7 +17,7 @@ import kotlin.jvm.JvmName @ReThisDSL class ReThis( - address: Address = Host(DEFAULT_HOST, DEFAULT_PORT), + internal val address: Address = Host(DEFAULT_HOST, DEFAULT_PORT), val protocol: RespVer = RespVer.V3, configurator: ClientConfiguration.() -> Unit = {}, ) { @@ -32,7 +32,7 @@ class ReThis( internal val cfg: ClientConfiguration = ClientConfiguration().apply(configurator) internal val rootJob = SupervisorJob() internal val rethisCoScope = CoroutineScope(rootJob + cfg.poolConfiguration.dispatcher + CoroutineName("ReThis")) - internal val connectionPool by lazy { ConnectionPool(this, address.socket).also { it.prepare() } } + internal val connectionPool by lazy { ConnectionPool(this).also { it.prepare() } } init { if (address is Url) { diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt index 377db4a..1c05b57 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt @@ -13,7 +13,8 @@ data class ClientConfiguration( var charset: Charset = Charsets.UTF_8, var tlsConfig: TLSConfig? = null, internal var auth: AuthConfiguration? = null, - internal var poolConfiguration: PoolConfiguration = PoolConfiguration(), + internal val poolConfiguration: PoolConfiguration = PoolConfiguration(), + internal val reconnectionStrategy: ReconnectionStrategyConfiguration = ReconnectionStrategyConfiguration(), ) { fun auth(password: String, username: String? = null) { auth = AuthConfiguration(password, username) @@ -22,6 +23,10 @@ data class ClientConfiguration( fun pool(block: PoolConfiguration.() -> Unit) { poolConfiguration.block() } + + fun reconnectionStrategy(block: ReconnectionStrategyConfiguration.() -> Unit) { + reconnectionStrategy.block() + } } @ConfigurationDSL @@ -30,6 +35,13 @@ data class AuthConfiguration( var username: String? = null, ) +@ConfigurationDSL +data class ReconnectionStrategyConfiguration( + var doHealthCheck: Boolean = true, + var reconnectAttempts: Int = 3, + var reconnectDelay: Long = 3000L, +) + @ConfigurationDSL data class PoolConfiguration( var poolSize: Int = 50, diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index ace6008..b9b0501 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -17,7 +17,6 @@ import kotlin.contracts.contract internal class ConnectionPool( internal val client: ReThis, - private val address: SocketAddress, ) { internal val logger = KtorSimpleLogger("eu.vendeli.rethis.ConnectionPool") @@ -25,16 +24,17 @@ internal class ConnectionPool( internal val isEmpty: Boolean get() = connections.isEmpty private val job = SupervisorJob(client.rootJob) - private val connections = Channel(client.cfg.poolConfiguration.poolSize) - private val selector = SelectorManager( + private val poolScope = CoroutineScope( client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis-ConnectionPool"), ) + private val connections = Channel(client.cfg.poolConfiguration.poolSize) + private val selector = SelectorManager(poolScope.coroutineContext) internal suspend fun createConn(): Connection { - logger.trace("Creating connection to $address") + logger.trace("Creating connection to ${client.address}") val conn = aSocket(selector) .tcp() - .connect(address) + .connect(client.address.socket) .let { socket -> client.cfg.tlsConfig?.let { socket.tls(selector.coroutineContext, it) @@ -45,13 +45,13 @@ internal class ConnectionPool( var requests = 0 if (client.cfg.auth != null) client.cfg.auth?.run { - logger.trace("Authenticating to $address.") + logger.trace("Authenticating to ${client.address}.") reqBuffer.writeRedisValue(listOfNotNull("AUTH".toArg(), username?.toArg(), password.toArg())) requests++ } client.cfg.db?.takeIf { it > 0 }?.let { - logger.trace("Selecting database $it to $address.") + logger.trace("Selecting database $it to ${client.address}.") reqBuffer.writeRedisValue(listOf("SELECT".toArg(), it.toArg())) requests++ } @@ -77,9 +77,40 @@ internal class ConnectionPool( suspend fun acquire(): Connection = connections.receive() - suspend fun release(connection: Connection) { - logger.trace("Releasing connection ${connection.socket}, health status: ${!connection.input.isClosedForRead}") - connections.send(connection) + fun release(connection: Connection) { + handle(connection) + } + + private fun handle(connection: Connection) = poolScope.launch { + logger.trace("Releasing connection ${connection.socket}") + val cfg = client.cfg.reconnectionStrategy + if (cfg.doHealthCheck && connection.input.isClosedForRead) { // connection is corrupted + logger.warn("Connection ${connection.socket} is corrupted, refilling") + connection.socket.close() + refill() + } else { + connections.send(connection) + } + } + + private tailrec suspend fun refill(attempt: Int = 1) { + val cfg = client.cfg.reconnectionStrategy + var ex: Throwable? = null + if (cfg.reconnectAttempts >= attempt) { + if (ex == null) logger.warn("Connection refills failed, maximum attempts reached") + else logger.warn("Connection refills failed, maximum attempts reached", ex) + return + } + delay(cfg.reconnectDelay) + logger.trace("Refilling ConnectionPool. Attempt $attempt") + runCatching { createConn() }.onSuccess { + connections.send(it) + return + }.onFailure { + if (ex != null) ex.addSuppressed(it) else ex = it + } + logger.debug("Connection refill failed, remaining attempts: ${cfg.reconnectAttempts - attempt}") + refill(attempt + 1) } @OptIn(ExperimentalCoroutinesApi::class) From 87497792ca21292cba74212258ffc80a3a40d10e Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 5 Jan 2025 13:27:37 +0300 Subject: [PATCH 05/14] multiply delay on each refilling attempt --- .../kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index b9b0501..45f92d5 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -101,7 +101,7 @@ internal class ConnectionPool( else logger.warn("Connection refills failed, maximum attempts reached", ex) return } - delay(cfg.reconnectDelay) + delay(attempt * cfg.reconnectDelay) logger.trace("Refilling ConnectionPool. Attempt $attempt") runCatching { createConn() }.onSuccess { connections.send(it) From 66a6f155b9a920b6d4e59cec606d244b6eb82f6a Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 5 Jan 2025 13:28:55 +0300 Subject: [PATCH 06/14] lint --- .../eu/vendeli/rethis/types/core/ConnectionPool.kt | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index 45f92d5..980a6a4 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -103,12 +103,13 @@ internal class ConnectionPool( } delay(attempt * cfg.reconnectDelay) logger.trace("Refilling ConnectionPool. Attempt $attempt") - runCatching { createConn() }.onSuccess { - connections.send(it) - return - }.onFailure { - if (ex != null) ex.addSuppressed(it) else ex = it - } + runCatching { createConn() } + .onSuccess { + connections.send(it) + return + }.onFailure { + if (ex != null) ex.addSuppressed(it) else ex = it + } logger.debug("Connection refill failed, remaining attempts: ${cfg.reconnectAttempts - attempt}") refill(attempt + 1) } From f9981f7d4b4627cb2da166826117d700757fc5aa Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 5 Jan 2025 13:31:49 +0300 Subject: [PATCH 07/14] log successful refill --- .../kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index 980a6a4..76f3d6d 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -106,6 +106,7 @@ internal class ConnectionPool( runCatching { createConn() } .onSuccess { connections.send(it) + logger.trace("Connection refilled with $it") return }.onFailure { if (ex != null) ex.addSuppressed(it) else ex = it From 2d365d8ea75cdff23f75a9ebda6221e7800d6095 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sun, 5 Jan 2025 22:58:17 +0000 Subject: [PATCH 08/14] Update dependency ch.qos.logback:logback-classic to v1.5.16 --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5c60b41..53f9793 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -15,7 +15,7 @@ binvalid = "0.17.0" kover = "0.9.0" kotest = "5.9.1" -logback = "1.5.15" +logback = "1.5.16" [libraries] kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "io" } From e908496365878de30bf9fbcfeb96493aeb173e02 Mon Sep 17 00:00:00 2001 From: vendelieu Date: Mon, 6 Jan 2025 20:56:36 +0300 Subject: [PATCH 09/14] improve refill logic --- .../rethis/types/core/ConnectionPool.kt | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index 76f3d6d..89ed45b 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -93,26 +93,31 @@ internal class ConnectionPool( } } - private tailrec suspend fun refill(attempt: Int = 1) { + private suspend fun refill() { val cfg = client.cfg.reconnectionStrategy + if (cfg.reconnectAttempts <= 0) return + var attempt = 0 var ex: Throwable? = null - if (cfg.reconnectAttempts >= attempt) { - if (ex == null) logger.warn("Connection refills failed, maximum attempts reached") - else logger.warn("Connection refills failed, maximum attempts reached", ex) - return + + while (attempt < cfg.reconnectAttempts) { + attempt++ + logger.trace("Refilling ConnectionPool. Attempt $attempt") + runCatching { createConn() } + .onSuccess { + connections.send(it) + logger.trace("Connection refilled with $it") + return + }.onFailure { + if (ex != null) ex.addSuppressed(it) else ex = it + } + + logger.debug("Connection refill failed, remaining attempts: ${cfg.reconnectAttempts - attempt}") + delay(attempt * cfg.reconnectDelay) } - delay(attempt * cfg.reconnectDelay) - logger.trace("Refilling ConnectionPool. Attempt $attempt") - runCatching { createConn() } - .onSuccess { - connections.send(it) - logger.trace("Connection refilled with $it") - return - }.onFailure { - if (ex != null) ex.addSuppressed(it) else ex = it - } - logger.debug("Connection refill failed, remaining attempts: ${cfg.reconnectAttempts - attempt}") - refill(attempt + 1) + + val logMsg = "Connection refills failed, maximum attempts reached" + if (ex == null) logger.warn(logMsg) + else logger.warn(logMsg, ex) } @OptIn(ExperimentalCoroutinesApi::class) From 6c0befb3210c417273f97c5ec2971893add10951 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:24:41 +0000 Subject: [PATCH 10/14] Update dependency org.jetbrains.kotlinx.kover to v0.9.1 --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 53f9793..62a5ed9 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,7 +12,7 @@ kotlinter = "5.0.1" deteKT = "1.23.7" publisher = "0.30.0" binvalid = "0.17.0" -kover = "0.9.0" +kover = "0.9.1" kotest = "5.9.1" logback = "1.5.16" From bb0986333404d0bd64d5515cdcc6f7c3ff59cd43 Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 12 Jan 2025 20:23:30 +0300 Subject: [PATCH 11/14] add more tests --- .../kotlin/eu/vendeli/rethis/ReThis.kt | 4 +- .../eu/vendeli/rethis/types/core/Address.kt | 7 + .../eu/vendeli/rethis/utils/ResponseUtils.kt | 4 +- .../vendeli/rethis/types/core/Address.jvm.kt | 2 + .../rethis/tests/utils/RTypeResponseTest.kt | 192 ++++++++++++++++++ .../rethis/tests/utils/ResponseUtilsTest.kt | 181 +++++++++++++++++ .../rethis/types/core/Address.native.kt | 2 + 7 files changed, 388 insertions(+), 4 deletions(-) create mode 100644 src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/RTypeResponseTest.kt create mode 100644 src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/ResponseUtilsTest.kt diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt index 0c23488..d6cf1e1 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt @@ -48,11 +48,11 @@ class ReThis( buildString { append("Created ReThis client.\n") append("Address: $address\n") - append("DB: ${cfg.db}\n") + append("DB: ${cfg.db ?: 0}\n") append("Auth: ${cfg.auth != null}\n") append("TLS: ${cfg.tlsConfig != null}\n") append("Pool size: ${cfg.poolConfiguration.poolSize}\n") - append("Protocol: ${protocol.literal}\n") + append("Protocol: ${protocol}\n") }.let { logger.info(it) } } diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt index abf23f7..857448e 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt @@ -9,6 +9,13 @@ expect sealed class Address() { internal abstract val socket: SocketAddress } +@Suppress("NOTHING_TO_INLINE") +internal inline fun Address.stringify() = when (val thiz = socket) { + is InetSocketAddress -> "InetSocket(${thiz.hostname}:${thiz.port})" + is UnixSocketAddress -> "UnixSocket(${thiz.path})" + else -> socket.toString() +} + class Host( host: String, port: Int, diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt index 813cccc..d601e6a 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt @@ -74,7 +74,7 @@ internal suspend fun ByteReadChannel.readRedisMessage(charset: Charset, rawOnly: val content = readRemaining(size).readText(charset) readShort() // Skip CRLF val encoding = content.subSequence(0, 3) // First 3 bytes are encoding - val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:) + val data = content.subSequence(4, size.toInt()) VerbatimString(encoding.toString(), data.toString()) } @@ -156,7 +156,7 @@ internal suspend inline fun ByteReadChannel.processRedisSimpleResponse( val content = readRemaining(size).readText(charset) readShort() // Skip CRLF val encoding = content.subSequence(0, 3) // First 3 bytes are encoding - val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:) + val data = content.subSequence(4, size.toInt()) "$encoding:$data" } diff --git a/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt b/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt index ba266be..3458c13 100644 --- a/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt +++ b/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt @@ -5,6 +5,8 @@ import io.ktor.network.sockets.* @Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") actual sealed class Address actual constructor() { internal actual abstract val socket: SocketAddress + + override fun toString() = stringify() } class UnixSocket( diff --git a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/RTypeResponseTest.kt b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/RTypeResponseTest.kt new file mode 100644 index 0000000..e544a80 --- /dev/null +++ b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/RTypeResponseTest.kt @@ -0,0 +1,192 @@ +package eu.vendeli.rethis.tests.utils + +import com.ionspin.kotlin.bignum.integer.toBigInteger +import eu.vendeli.rethis.ReThisTestCtx +import eu.vendeli.rethis.types.core.* +import eu.vendeli.rethis.utils.readRedisMessage +import eu.vendeli.rethis.utils.safeCast +import io.kotest.matchers.shouldBe +import io.ktor.utils.io.* +import io.ktor.utils.io.core.* +import kotlinx.io.Buffer + +class RTypeResponseTest : ReThisTestCtx() { + @Test + suspend fun `test readRedisMessage with simple string`() { + val channel = ByteReadChannel { + writeFully("+Hello, World!\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe PlainString("Hello, World!") + } + + @Test + suspend fun `test readRedisMessage with verbatim string`() { + val channel = ByteReadChannel { + writeFully("=15\r\ntxt:Some string\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe VerbatimString("txt", "Some string") + } + + @Test + suspend fun `test readRedisMessage with error`() { + val channel = ByteReadChannel { + writeFully("-Error message\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + + result.safeCast()?.exception?.message shouldBe "Error message" + } + + @Test + suspend fun `test readRedisMessage with bulk error`() { + val channel = ByteReadChannel { + writeFully("!21\r\nSYNTAX invalid syntax\r\n".encodeToByteArray()) + } + + channel.readRedisMessage(charset).safeCast()?.exception?.message shouldBe "SYNTAX invalid syntax" + } + + @Test + suspend fun `test readRedisMessage with integer`() { + val channel = ByteReadChannel { + writeFully(":123\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe Int64(123) + } + + @Test + suspend fun `test readRedisMessage with bulk string`() { + val channel = ByteReadChannel { + writeFully("$5\r\nHello\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe BulkString("Hello") + } + + @Test + suspend fun `test readRedisMessage with array`() { + val channel = ByteReadChannel { + writeFully("*2\r\n+first\r\n+second\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RArray( + listOf( + PlainString("first"), PlainString("second"), + ), + ) + } + + @Test + suspend fun `test readRedisMessage with null bulk string`() { + val channel = ByteReadChannel { + writeFully("$-1\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RType.Null + } + + @Test + suspend fun `test readRedisMessage with null`() { + val channel = ByteReadChannel { + writeFully("_\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RType.Null + } + + @Test + suspend fun `test readRedisMessage with null array`() { + val channel = ByteReadChannel { + writeFully("*-1\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RType.Null + } + + @Test + suspend fun `test readRedisMessage with boolean`() { + val channel = ByteReadChannel { + writeFully("#t\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe Bool(true) + } + + @Test + suspend fun `test readRedisMessage with double`() { + val channel = ByteReadChannel { + writeFully(",3.14\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe F64(3.14) + } + + @Test + suspend fun `test readRedisMessage with big number`() { + val channel = ByteReadChannel { + writeFully("(12345678901234567890\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe BigNumber("12345678901234567890".toBigInteger()) + } + + @Test + suspend fun `test readRedisMessage with set`() { + val channel = ByteReadChannel { + writeFully("~2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RSet(setOf(BulkString("Hello"), BulkString("World"))) + } + + @Test + suspend fun `test processRedisListResponse with push`() { + val channel = ByteReadChannel { + writeFully(">2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe Push(listOf(BulkString("Hello"), BulkString("World"))) + } + + @Test + suspend fun `test processRedisListResponse with map`() { + val channel = ByteReadChannel { + writeFully("%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RMap( + mapOf( + PlainString("first") to Int64(1), + PlainString("second") to Int64(2), + ), + ) + } + + private val charset = Charsets.UTF_8 + + @Suppress("TestFunctionName") + private suspend fun ByteReadChannel(block: suspend Buffer.() -> Unit): ByteReadChannel { + val buff = Buffer() + buff.block() + + return ByteReadChannel(buff) + } +} diff --git a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/ResponseUtilsTest.kt b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/ResponseUtilsTest.kt new file mode 100644 index 0000000..30b76ab --- /dev/null +++ b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/ResponseUtilsTest.kt @@ -0,0 +1,181 @@ +package eu.vendeli.rethis.tests.utils + +import com.ionspin.kotlin.bignum.integer.BigInteger +import com.ionspin.kotlin.bignum.integer.toBigInteger +import eu.vendeli.rethis.ReThisTestCtx +import eu.vendeli.rethis.utils.processRedisListResponse +import eu.vendeli.rethis.utils.processRedisMapResponse +import eu.vendeli.rethis.utils.processRedisSimpleResponse +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import io.ktor.utils.io.* +import io.ktor.utils.io.core.* +import kotlinx.io.Buffer + +class ResponseUtilsTest : ReThisTestCtx() { + private val charset = Charsets.UTF_8 + + @Test + suspend fun `test processRedisSimpleResponse with simple string`() { + val channel = ByteReadChannel { + writeFully("+Hello, World!\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "Hello, World!" + } + + @Test + suspend fun `test processRedisSimpleResponse with verbatim string`() { + val channel = ByteReadChannel { + writeFully("=15\r\ntxt:Some string\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "txt:Some string" + } + + @Test + suspend fun `test processRedisSimpleResponse with error`() { + val channel = ByteReadChannel { + writeFully("-Error message\r\n".encodeToByteArray()) + } + + shouldThrow { + channel.processRedisSimpleResponse(charset) + }.message shouldBe "Error message" + } + + @Test + suspend fun `test processRedisSimpleResponse with bulk error`() { + val channel = ByteReadChannel { + writeFully("!21\r\nSYNTAX invalid syntax\r\n".encodeToByteArray()) + } + + shouldThrow { + channel.processRedisSimpleResponse(charset) + }.message shouldBe "SYNTAX invalid syntax" + } + + @Test + suspend fun `test processRedisSimpleResponse with integer`() { + val channel = ByteReadChannel { + writeFully(":123\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe 123L + } + + @Test + suspend fun `test processRedisSimpleResponse with bulk string`() { + val channel = ByteReadChannel { + writeFully("$5\r\nHello\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "Hello" + } + + @Test + suspend fun `test processRedisSimpleResponse with null bulk string`() { + val channel = ByteReadChannel { + writeFully("$-1\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result.shouldBeNull() + } + + @Test + suspend fun `test processRedisSimpleResponse with null`() { + val channel = ByteReadChannel { + writeFully("_\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result.shouldBeNull() + } + + @Test + suspend fun `test processRedisSimpleResponse with boolean`() { + val channel = ByteReadChannel { + writeFully("#t\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe true + } + + @Test + suspend fun `test processRedisSimpleResponse with double`() { + val channel = ByteReadChannel { + writeFully(",3.14\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe 3.14 + } + + @Test + suspend fun `test processRedisSimpleResponse with bignumber`() { + val channel = ByteReadChannel { + writeFully("(12345678901234567890\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "12345678901234567890".toBigInteger() + } + + @Test + suspend fun `test processRedisListResponse with array`() { + val channel = ByteReadChannel { + writeFully("*2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.processRedisListResponse(charset) + result shouldBe listOf("Hello", "World") + } + + @Test + suspend fun `test processRedisListResponse with set`() { + val channel = ByteReadChannel { + writeFully("~2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.processRedisListResponse(charset) + result shouldBe listOf("Hello", "World") + } + + @Test + suspend fun `test processRedisListResponse with push`() { + val channel = ByteReadChannel { + writeFully(">2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.processRedisListResponse(charset) + result shouldBe listOf("Hello", "World") + } + + @Test + suspend fun `test processRedisListResponse with map`() { + val channel = ByteReadChannel { + writeFully("%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n".encodeToByteArray()) + } + + val result = channel.processRedisMapResponse(charset) + result shouldBe mapOf( + "first" to 1L, + "second" to 2L, + ) + } + + @Suppress("TestFunctionName") + private suspend fun ByteReadChannel(block: suspend Buffer.() -> Unit): ByteReadChannel { + val buff = Buffer() + buff.block() + + return ByteReadChannel(buff) + } +} diff --git a/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt b/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt index ba266be..3458c13 100644 --- a/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt +++ b/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt @@ -5,6 +5,8 @@ import io.ktor.network.sockets.* @Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") actual sealed class Address actual constructor() { internal actual abstract val socket: SocketAddress + + override fun toString() = stringify() } class UnixSocket( From 38bbc8260d6ed8a0b8c34e9b07494ae4b76d9443 Mon Sep 17 00:00:00 2001 From: vendelieu Date: Wed, 15 Jan 2025 18:41:26 +0300 Subject: [PATCH 12/14] add socket configuration --- .../rethis/types/core/ClientConfiguration.kt | 72 +++++++++++++++++++ .../rethis/types/core/ConnectionPool.kt | 9 ++- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt index 1c05b57..06b5144 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt @@ -7,6 +7,16 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.IO +/** + * A configuration class for the client. + * + * @property db The database index to switch to after connecting. + * @property charset The character set to use for communication. + * @property tlsConfig The TLS configuration to use when connecting. + * @property auth The authentication options. + * @property poolConfiguration The pool configuration. + * @property reconnectionStrategy The reconnection strategy to use. + */ @ConfigurationDSL data class ClientConfiguration( var db: Int? = null, @@ -15,26 +25,82 @@ data class ClientConfiguration( internal var auth: AuthConfiguration? = null, internal val poolConfiguration: PoolConfiguration = PoolConfiguration(), internal val reconnectionStrategy: ReconnectionStrategyConfiguration = ReconnectionStrategyConfiguration(), + internal val socketConfiguration: SocketConfiguration = SocketConfiguration() ) { + /** + * Configures authentication for the client. + * + * @param password The password to use for authentication. + * @param username The username to use for authentication, defaults to null. + */ fun auth(password: String, username: String? = null) { auth = AuthConfiguration(password, username) } + /** + * Configures the connection pool settings. + * + * @param block A lambda to configure the pool settings. + */ fun pool(block: PoolConfiguration.() -> Unit) { poolConfiguration.block() } + /** + * Configures the reconnection strategy. + * + * @param block A lambda to configure the reconnection strategy. + */ fun reconnectionStrategy(block: ReconnectionStrategyConfiguration.() -> Unit) { reconnectionStrategy.block() } + + /** + * Configures the socket options. + * + * @param block A lambda to configure the socket settings. + */ + fun socket(block: SocketConfiguration.() -> Unit) { + socketConfiguration.block() + } } +/** + * Configuration for redis connection authentication. + * + * @property password the password to use for authentication + * @property username the username to use for authentication, defaults to null + */ @ConfigurationDSL data class AuthConfiguration( var password: String, var username: String? = null, ) +/** + * Configuration for redis connection socket options. + * + * @property timeout the timeout in milliseconds for the redis connection, defaults to null + * @property linger SO_LINGER option for the redis connection, defaults to null, + * [see](https://api.ktor.io/ktor-network/io.ktor.network.sockets/-socket-options/-t-c-p-client-socket-options/linger-seconds.html). + * @property noDelay TCP_NODELAY option for the redis connection, defaults to true + * @property keepAlive TCP_KEEPALIVE option for the redis connection, defaults to true + */ +@ConfigurationDSL +data class SocketConfiguration( + var timeout: Long? = null, + var linger: Int? = null, + var noDelay: Boolean = true, + var keepAlive: Boolean = true, +) + +/** + * Configuration for redis connection reconnection strategy. + * + * @property doHealthCheck if true, performs health checks on the connection before adding it to the pool, defaults to true + * @property reconnectAttempts the number of times to attempt reconnecting to the redis server on failure, defaults to 3 + * @property reconnectDelay the delay in milliseconds between reconnecting, defaults to 3000L + */ @ConfigurationDSL data class ReconnectionStrategyConfiguration( var doHealthCheck: Boolean = true, @@ -42,6 +108,12 @@ data class ReconnectionStrategyConfiguration( var reconnectDelay: Long = 3000L, ) +/** + * Configuration for redis connection pool. + * + * @property poolSize the size of the connection pool, defaults to 50 + * @property dispatcher the dispatcher to use for connection pool coroutines, defaults to [Dispatchers.IO] + */ @ConfigurationDSL data class PoolConfiguration( var poolSize: Int = 50, diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index 89ed45b..ef341f0 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -34,7 +34,14 @@ internal class ConnectionPool( logger.trace("Creating connection to ${client.address}") val conn = aSocket(selector) .tcp() - .connect(client.address.socket) + .connect(client.address.socket) { + client.cfg.socketConfiguration.run { + timeout?.let { socketTimeout = it } + linger?.let { lingerSeconds = it } + this@connect.keepAlive = keepAlive + this@connect.noDelay = noDelay + } + } .let { socket -> client.cfg.tlsConfig?.let { socket.tls(selector.coroutineContext, it) From 49d53d90f15be1074e5626db8dcb7106e5fa4e60 Mon Sep 17 00:00:00 2001 From: vendelieu Date: Wed, 15 Jan 2025 18:53:10 +0300 Subject: [PATCH 13/14] note changes --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2aa4f9..50b057d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Re.this Changelog +## 0.2.1 + +* Added connection retrying. +* Added socket configuration. + ## 0.2.0 * Implemented Bitmap, Stream commands. From 0eaec7d3971e8208d49ccfd2130f35edeb06684f Mon Sep 17 00:00:00 2001 From: vendelieu Date: Wed, 15 Jan 2025 19:11:31 +0300 Subject: [PATCH 14/14] update jvm target ver --- CHANGELOG.md | 1 + buildSrc/src/main/kotlin/ConfigureKotlin.kt | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 50b057d..de210e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * Added connection retrying. * Added socket configuration. +* Upgraded JVM target version to 17 ## 0.2.0 diff --git a/buildSrc/src/main/kotlin/ConfigureKotlin.kt b/buildSrc/src/main/kotlin/ConfigureKotlin.kt index 450b234..058ef21 100644 --- a/buildSrc/src/main/kotlin/ConfigureKotlin.kt +++ b/buildSrc/src/main/kotlin/ConfigureKotlin.kt @@ -15,7 +15,7 @@ fun Project.configureKotlin(block: KotlinMultiplatformExtension.() -> Unit) { freeCompilerArgs = listOf("-opt-in=eu.vendeli.rethis.annotations.ReThisInternal") } - val jvmTargetVer = 11 + val jvmTargetVer = 17 jvm { withJava() compilations.all {