diff --git a/CHANGELOG.md b/CHANGELOG.md index a2aa4f9a..de210e48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Re.this Changelog +## 0.2.1 + +* Added connection retrying. +* Added socket configuration. +* Upgraded JVM target version to 17 + ## 0.2.0 * Implemented Bitmap, Stream commands. diff --git a/buildSrc/src/main/kotlin/ConfigureKotlin.kt b/buildSrc/src/main/kotlin/ConfigureKotlin.kt index 450b2349..058ef216 100644 --- a/buildSrc/src/main/kotlin/ConfigureKotlin.kt +++ b/buildSrc/src/main/kotlin/ConfigureKotlin.kt @@ -15,7 +15,7 @@ fun Project.configureKotlin(block: KotlinMultiplatformExtension.() -> Unit) { freeCompilerArgs = listOf("-opt-in=eu.vendeli.rethis.annotations.ReThisInternal") } - val jvmTargetVer = 11 + val jvmTargetVer = 17 jvm { withJava() compilations.all { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5c60b41c..62a5ed92 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,10 +12,10 @@ kotlinter = "5.0.1" deteKT = "1.23.7" publisher = "0.30.0" binvalid = "0.17.0" -kover = "0.9.0" +kover = "0.9.1" kotest = "5.9.1" -logback = "1.5.15" +logback = "1.5.16" [libraries] kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "io" } diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt index f12fe9e0..d6cf1e12 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt @@ -17,7 +17,7 @@ import kotlin.jvm.JvmName @ReThisDSL class ReThis( - address: Address = Host(DEFAULT_HOST, DEFAULT_PORT), + internal val address: Address = Host(DEFAULT_HOST, DEFAULT_PORT), val protocol: RespVer = RespVer.V3, configurator: ClientConfiguration.() -> Unit = {}, ) { @@ -32,11 +32,9 @@ class ReThis( internal val cfg: ClientConfiguration = ClientConfiguration().apply(configurator) internal val rootJob = SupervisorJob() internal val rethisCoScope = CoroutineScope(rootJob + cfg.poolConfiguration.dispatcher + CoroutineName("ReThis")) - internal val connectionPool by lazy { ConnectionPool(this, address.socket).also { it.prepare() } } + internal val connectionPool by lazy { ConnectionPool(this).also { it.prepare() } } init { - logger.info("Created client (RESP $protocol)") - if (address is Url) { cfg.db = address.db if (address.credentials.isNotEmpty()) { @@ -46,6 +44,16 @@ class ReThis( ) } } + + buildString { + append("Created ReThis client.\n") + append("Address: $address\n") + append("DB: ${cfg.db ?: 0}\n") + append("Auth: ${cfg.auth != null}\n") + append("TLS: ${cfg.tlsConfig != null}\n") + append("Pool size: ${cfg.poolConfiguration.poolSize}\n") + append("Protocol: ${protocol}\n") + }.let { logger.info(it) } } val subscriptions = ActiveSubscriptions() diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt index abf23f72..857448ef 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt @@ -9,6 +9,13 @@ expect sealed class Address() { internal abstract val socket: SocketAddress } +@Suppress("NOTHING_TO_INLINE") +internal inline fun Address.stringify() = when (val thiz = socket) { + is InetSocketAddress -> "InetSocket(${thiz.hostname}:${thiz.port})" + is UnixSocketAddress -> "UnixSocket(${thiz.path})" + else -> socket.toString() +} + class Host( host: String, port: Int, diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt index 377db4a4..06b51440 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ClientConfiguration.kt @@ -7,29 +7,113 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.IO +/** + * A configuration class for the client. + * + * @property db The database index to switch to after connecting. + * @property charset The character set to use for communication. + * @property tlsConfig The TLS configuration to use when connecting. + * @property auth The authentication options. + * @property poolConfiguration The pool configuration. + * @property reconnectionStrategy The reconnection strategy to use. + */ @ConfigurationDSL data class ClientConfiguration( var db: Int? = null, var charset: Charset = Charsets.UTF_8, var tlsConfig: TLSConfig? = null, internal var auth: AuthConfiguration? = null, - internal var poolConfiguration: PoolConfiguration = PoolConfiguration(), + internal val poolConfiguration: PoolConfiguration = PoolConfiguration(), + internal val reconnectionStrategy: ReconnectionStrategyConfiguration = ReconnectionStrategyConfiguration(), + internal val socketConfiguration: SocketConfiguration = SocketConfiguration() ) { + /** + * Configures authentication for the client. + * + * @param password The password to use for authentication. + * @param username The username to use for authentication, defaults to null. + */ fun auth(password: String, username: String? = null) { auth = AuthConfiguration(password, username) } + /** + * Configures the connection pool settings. + * + * @param block A lambda to configure the pool settings. + */ fun pool(block: PoolConfiguration.() -> Unit) { poolConfiguration.block() } + + /** + * Configures the reconnection strategy. + * + * @param block A lambda to configure the reconnection strategy. + */ + fun reconnectionStrategy(block: ReconnectionStrategyConfiguration.() -> Unit) { + reconnectionStrategy.block() + } + + /** + * Configures the socket options. + * + * @param block A lambda to configure the socket settings. + */ + fun socket(block: SocketConfiguration.() -> Unit) { + socketConfiguration.block() + } } +/** + * Configuration for redis connection authentication. + * + * @property password the password to use for authentication + * @property username the username to use for authentication, defaults to null + */ @ConfigurationDSL data class AuthConfiguration( var password: String, var username: String? = null, ) +/** + * Configuration for redis connection socket options. + * + * @property timeout the timeout in milliseconds for the redis connection, defaults to null + * @property linger SO_LINGER option for the redis connection, defaults to null, + * [see](https://api.ktor.io/ktor-network/io.ktor.network.sockets/-socket-options/-t-c-p-client-socket-options/linger-seconds.html). + * @property noDelay TCP_NODELAY option for the redis connection, defaults to true + * @property keepAlive TCP_KEEPALIVE option for the redis connection, defaults to true + */ +@ConfigurationDSL +data class SocketConfiguration( + var timeout: Long? = null, + var linger: Int? = null, + var noDelay: Boolean = true, + var keepAlive: Boolean = true, +) + +/** + * Configuration for redis connection reconnection strategy. + * + * @property doHealthCheck if true, performs health checks on the connection before adding it to the pool, defaults to true + * @property reconnectAttempts the number of times to attempt reconnecting to the redis server on failure, defaults to 3 + * @property reconnectDelay the delay in milliseconds between reconnecting, defaults to 3000L + */ +@ConfigurationDSL +data class ReconnectionStrategyConfiguration( + var doHealthCheck: Boolean = true, + var reconnectAttempts: Int = 3, + var reconnectDelay: Long = 3000L, +) + +/** + * Configuration for redis connection pool. + * + * @property poolSize the size of the connection pool, defaults to 50 + * @property dispatcher the dispatcher to use for connection pool coroutines, defaults to [Dispatchers.IO] + */ @ConfigurationDSL data class PoolConfiguration( var poolSize: Int = 50, diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index dd2fdaa8..ef341f09 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -17,7 +17,6 @@ import kotlin.contracts.contract internal class ConnectionPool( internal val client: ReThis, - private val address: SocketAddress, ) { internal val logger = KtorSimpleLogger("eu.vendeli.rethis.ConnectionPool") @@ -25,14 +24,24 @@ internal class ConnectionPool( internal val isEmpty: Boolean get() = connections.isEmpty private val job = SupervisorJob(client.rootJob) + private val poolScope = CoroutineScope( + client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis-ConnectionPool"), + ) private val connections = Channel(client.cfg.poolConfiguration.poolSize) - private val selector = SelectorManager(client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis Pool")) + private val selector = SelectorManager(poolScope.coroutineContext) internal suspend fun createConn(): Connection { - logger.trace("Creating connection to $address") + logger.trace("Creating connection to ${client.address}") val conn = aSocket(selector) .tcp() - .connect(address) + .connect(client.address.socket) { + client.cfg.socketConfiguration.run { + timeout?.let { socketTimeout = it } + linger?.let { lingerSeconds = it } + this@connect.keepAlive = keepAlive + this@connect.noDelay = noDelay + } + } .let { socket -> client.cfg.tlsConfig?.let { socket.tls(selector.coroutineContext, it) @@ -43,19 +52,21 @@ internal class ConnectionPool( var requests = 0 if (client.cfg.auth != null) client.cfg.auth?.run { - logger.debug("Authenticating to $address with $this") + logger.trace("Authenticating to ${client.address}.") reqBuffer.writeRedisValue(listOfNotNull("AUTH".toArg(), username?.toArg(), password.toArg())) requests++ } client.cfg.db?.takeIf { it > 0 }?.let { - requests++ + logger.trace("Selecting database $it to ${client.address}.") reqBuffer.writeRedisValue(listOf("SELECT".toArg(), it.toArg())) + requests++ } reqBuffer.writeRedisValue(listOf("HELLO".toArg(), client.protocol.literal.toArg())) requests++ + logger.trace("Sending connection establishment requests ($requests)") conn.sendRequest(reqBuffer) repeat(requests) { logger.trace("Connection establishment response: " + conn.input.readRedisMessage(client.cfg.charset)) @@ -73,8 +84,47 @@ internal class ConnectionPool( suspend fun acquire(): Connection = connections.receive() - suspend fun release(connection: Connection) { - connections.send(connection) + fun release(connection: Connection) { + handle(connection) + } + + private fun handle(connection: Connection) = poolScope.launch { + logger.trace("Releasing connection ${connection.socket}") + val cfg = client.cfg.reconnectionStrategy + if (cfg.doHealthCheck && connection.input.isClosedForRead) { // connection is corrupted + logger.warn("Connection ${connection.socket} is corrupted, refilling") + connection.socket.close() + refill() + } else { + connections.send(connection) + } + } + + private suspend fun refill() { + val cfg = client.cfg.reconnectionStrategy + if (cfg.reconnectAttempts <= 0) return + var attempt = 0 + var ex: Throwable? = null + + while (attempt < cfg.reconnectAttempts) { + attempt++ + logger.trace("Refilling ConnectionPool. Attempt $attempt") + runCatching { createConn() } + .onSuccess { + connections.send(it) + logger.trace("Connection refilled with $it") + return + }.onFailure { + if (ex != null) ex.addSuppressed(it) else ex = it + } + + logger.debug("Connection refill failed, remaining attempts: ${cfg.reconnectAttempts - attempt}") + delay(attempt * cfg.reconnectDelay) + } + + val logMsg = "Connection refills failed, maximum attempts reached" + if (ex == null) logger.warn(logMsg) + else logger.warn(logMsg, ex) } @OptIn(ExperimentalCoroutinesApi::class) diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt index 8ba1a506..d5fbd639 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt @@ -37,7 +37,7 @@ internal suspend inline fun ReThis.registerSubscription( messageMarker: String, handler: SubscriptionHandler, ) { - val connection = connectionPool.acquire() + val connection = connectionPool.createConn() val handlerJob = rethisCoScope.launch(CoLocalConn(connection)) { val conn = currentCoroutineContext()[CoLocalConn]!!.connection try { @@ -76,8 +76,8 @@ internal suspend inline fun ReThis.registerSubscription( subscriptions.eventHandler?.onException(target, e) } finally { conn.sendRequest(bufferValues(listOf(unRegCommand.toArg(), target.toArg()), cfg.charset)) - connectionPool.release(conn) subscriptions.unsubscribe(target) + connection.socket.close() } } diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt index 813cccc7..d601e6ad 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt @@ -74,7 +74,7 @@ internal suspend fun ByteReadChannel.readRedisMessage(charset: Charset, rawOnly: val content = readRemaining(size).readText(charset) readShort() // Skip CRLF val encoding = content.subSequence(0, 3) // First 3 bytes are encoding - val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:) + val data = content.subSequence(4, size.toInt()) VerbatimString(encoding.toString(), data.toString()) } @@ -156,7 +156,7 @@ internal suspend inline fun ByteReadChannel.processRedisSimpleResponse( val content = readRemaining(size).readText(charset) readShort() // Skip CRLF val encoding = content.subSequence(0, 3) // First 3 bytes are encoding - val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:) + val data = content.subSequence(4, size.toInt()) "$encoding:$data" } diff --git a/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt b/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt index ba266be0..3458c130 100644 --- a/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt +++ b/src/jvmMain/kotlin/eu/vendeli/rethis/types/core/Address.jvm.kt @@ -5,6 +5,8 @@ import io.ktor.network.sockets.* @Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") actual sealed class Address actual constructor() { internal actual abstract val socket: SocketAddress + + override fun toString() = stringify() } class UnixSocket( diff --git a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/RTypeResponseTest.kt b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/RTypeResponseTest.kt new file mode 100644 index 00000000..e544a803 --- /dev/null +++ b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/RTypeResponseTest.kt @@ -0,0 +1,192 @@ +package eu.vendeli.rethis.tests.utils + +import com.ionspin.kotlin.bignum.integer.toBigInteger +import eu.vendeli.rethis.ReThisTestCtx +import eu.vendeli.rethis.types.core.* +import eu.vendeli.rethis.utils.readRedisMessage +import eu.vendeli.rethis.utils.safeCast +import io.kotest.matchers.shouldBe +import io.ktor.utils.io.* +import io.ktor.utils.io.core.* +import kotlinx.io.Buffer + +class RTypeResponseTest : ReThisTestCtx() { + @Test + suspend fun `test readRedisMessage with simple string`() { + val channel = ByteReadChannel { + writeFully("+Hello, World!\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe PlainString("Hello, World!") + } + + @Test + suspend fun `test readRedisMessage with verbatim string`() { + val channel = ByteReadChannel { + writeFully("=15\r\ntxt:Some string\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe VerbatimString("txt", "Some string") + } + + @Test + suspend fun `test readRedisMessage with error`() { + val channel = ByteReadChannel { + writeFully("-Error message\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + + result.safeCast()?.exception?.message shouldBe "Error message" + } + + @Test + suspend fun `test readRedisMessage with bulk error`() { + val channel = ByteReadChannel { + writeFully("!21\r\nSYNTAX invalid syntax\r\n".encodeToByteArray()) + } + + channel.readRedisMessage(charset).safeCast()?.exception?.message shouldBe "SYNTAX invalid syntax" + } + + @Test + suspend fun `test readRedisMessage with integer`() { + val channel = ByteReadChannel { + writeFully(":123\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe Int64(123) + } + + @Test + suspend fun `test readRedisMessage with bulk string`() { + val channel = ByteReadChannel { + writeFully("$5\r\nHello\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe BulkString("Hello") + } + + @Test + suspend fun `test readRedisMessage with array`() { + val channel = ByteReadChannel { + writeFully("*2\r\n+first\r\n+second\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RArray( + listOf( + PlainString("first"), PlainString("second"), + ), + ) + } + + @Test + suspend fun `test readRedisMessage with null bulk string`() { + val channel = ByteReadChannel { + writeFully("$-1\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RType.Null + } + + @Test + suspend fun `test readRedisMessage with null`() { + val channel = ByteReadChannel { + writeFully("_\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RType.Null + } + + @Test + suspend fun `test readRedisMessage with null array`() { + val channel = ByteReadChannel { + writeFully("*-1\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RType.Null + } + + @Test + suspend fun `test readRedisMessage with boolean`() { + val channel = ByteReadChannel { + writeFully("#t\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe Bool(true) + } + + @Test + suspend fun `test readRedisMessage with double`() { + val channel = ByteReadChannel { + writeFully(",3.14\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe F64(3.14) + } + + @Test + suspend fun `test readRedisMessage with big number`() { + val channel = ByteReadChannel { + writeFully("(12345678901234567890\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe BigNumber("12345678901234567890".toBigInteger()) + } + + @Test + suspend fun `test readRedisMessage with set`() { + val channel = ByteReadChannel { + writeFully("~2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RSet(setOf(BulkString("Hello"), BulkString("World"))) + } + + @Test + suspend fun `test processRedisListResponse with push`() { + val channel = ByteReadChannel { + writeFully(">2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe Push(listOf(BulkString("Hello"), BulkString("World"))) + } + + @Test + suspend fun `test processRedisListResponse with map`() { + val channel = ByteReadChannel { + writeFully("%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n".encodeToByteArray()) + } + + val result = channel.readRedisMessage(charset) + result shouldBe RMap( + mapOf( + PlainString("first") to Int64(1), + PlainString("second") to Int64(2), + ), + ) + } + + private val charset = Charsets.UTF_8 + + @Suppress("TestFunctionName") + private suspend fun ByteReadChannel(block: suspend Buffer.() -> Unit): ByteReadChannel { + val buff = Buffer() + buff.block() + + return ByteReadChannel(buff) + } +} diff --git a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/ResponseUtilsTest.kt b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/ResponseUtilsTest.kt new file mode 100644 index 00000000..30b76ab7 --- /dev/null +++ b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/utils/ResponseUtilsTest.kt @@ -0,0 +1,181 @@ +package eu.vendeli.rethis.tests.utils + +import com.ionspin.kotlin.bignum.integer.BigInteger +import com.ionspin.kotlin.bignum.integer.toBigInteger +import eu.vendeli.rethis.ReThisTestCtx +import eu.vendeli.rethis.utils.processRedisListResponse +import eu.vendeli.rethis.utils.processRedisMapResponse +import eu.vendeli.rethis.utils.processRedisSimpleResponse +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import io.ktor.utils.io.* +import io.ktor.utils.io.core.* +import kotlinx.io.Buffer + +class ResponseUtilsTest : ReThisTestCtx() { + private val charset = Charsets.UTF_8 + + @Test + suspend fun `test processRedisSimpleResponse with simple string`() { + val channel = ByteReadChannel { + writeFully("+Hello, World!\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "Hello, World!" + } + + @Test + suspend fun `test processRedisSimpleResponse with verbatim string`() { + val channel = ByteReadChannel { + writeFully("=15\r\ntxt:Some string\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "txt:Some string" + } + + @Test + suspend fun `test processRedisSimpleResponse with error`() { + val channel = ByteReadChannel { + writeFully("-Error message\r\n".encodeToByteArray()) + } + + shouldThrow { + channel.processRedisSimpleResponse(charset) + }.message shouldBe "Error message" + } + + @Test + suspend fun `test processRedisSimpleResponse with bulk error`() { + val channel = ByteReadChannel { + writeFully("!21\r\nSYNTAX invalid syntax\r\n".encodeToByteArray()) + } + + shouldThrow { + channel.processRedisSimpleResponse(charset) + }.message shouldBe "SYNTAX invalid syntax" + } + + @Test + suspend fun `test processRedisSimpleResponse with integer`() { + val channel = ByteReadChannel { + writeFully(":123\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe 123L + } + + @Test + suspend fun `test processRedisSimpleResponse with bulk string`() { + val channel = ByteReadChannel { + writeFully("$5\r\nHello\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "Hello" + } + + @Test + suspend fun `test processRedisSimpleResponse with null bulk string`() { + val channel = ByteReadChannel { + writeFully("$-1\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result.shouldBeNull() + } + + @Test + suspend fun `test processRedisSimpleResponse with null`() { + val channel = ByteReadChannel { + writeFully("_\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result.shouldBeNull() + } + + @Test + suspend fun `test processRedisSimpleResponse with boolean`() { + val channel = ByteReadChannel { + writeFully("#t\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe true + } + + @Test + suspend fun `test processRedisSimpleResponse with double`() { + val channel = ByteReadChannel { + writeFully(",3.14\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe 3.14 + } + + @Test + suspend fun `test processRedisSimpleResponse with bignumber`() { + val channel = ByteReadChannel { + writeFully("(12345678901234567890\r\n".encodeToByteArray()) + } + + val result = channel.processRedisSimpleResponse(charset) + result shouldBe "12345678901234567890".toBigInteger() + } + + @Test + suspend fun `test processRedisListResponse with array`() { + val channel = ByteReadChannel { + writeFully("*2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.processRedisListResponse(charset) + result shouldBe listOf("Hello", "World") + } + + @Test + suspend fun `test processRedisListResponse with set`() { + val channel = ByteReadChannel { + writeFully("~2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.processRedisListResponse(charset) + result shouldBe listOf("Hello", "World") + } + + @Test + suspend fun `test processRedisListResponse with push`() { + val channel = ByteReadChannel { + writeFully(">2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n".encodeToByteArray()) + } + + val result = channel.processRedisListResponse(charset) + result shouldBe listOf("Hello", "World") + } + + @Test + suspend fun `test processRedisListResponse with map`() { + val channel = ByteReadChannel { + writeFully("%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n".encodeToByteArray()) + } + + val result = channel.processRedisMapResponse(charset) + result shouldBe mapOf( + "first" to 1L, + "second" to 2L, + ) + } + + @Suppress("TestFunctionName") + private suspend fun ByteReadChannel(block: suspend Buffer.() -> Unit): ByteReadChannel { + val buff = Buffer() + buff.block() + + return ByteReadChannel(buff) + } +} diff --git a/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt b/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt index ba266be0..3458c130 100644 --- a/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt +++ b/src/nativeMain/kotlin/eu/vendeli/rethis/types/core/Address.native.kt @@ -5,6 +5,8 @@ import io.ktor.network.sockets.* @Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") actual sealed class Address actual constructor() { internal actual abstract val socket: SocketAddress + + override fun toString() = stringify() } class UnixSocket(