Skip to content

Commit

Permalink
0.2.1 (#33)
Browse files Browse the repository at this point in the history
* Added connection retrying.
* Added socket configuration.
* Upgraded JVM target version to 17
  • Loading branch information
vendelieu authored Jan 15, 2025
1 parent 978e5f7 commit 00fc309
Show file tree
Hide file tree
Showing 13 changed files with 552 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Re.this Changelog

## 0.2.1

* Added connection retrying.
* Added socket configuration.
* Upgraded JVM target version to 17

## 0.2.0

* Implemented Bitmap, Stream commands.
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/ConfigureKotlin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fun Project.configureKotlin(block: KotlinMultiplatformExtension.() -> Unit) {
freeCompilerArgs = listOf("-opt-in=eu.vendeli.rethis.annotations.ReThisInternal")
}

val jvmTargetVer = 11
val jvmTargetVer = 17
jvm {
withJava()
compilations.all {
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ kotlinter = "5.0.1"
deteKT = "1.23.7"
publisher = "0.30.0"
binvalid = "0.17.0"
kover = "0.9.0"
kover = "0.9.1"

kotest = "5.9.1"
logback = "1.5.15"
logback = "1.5.16"

[libraries]
kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "io" }
Expand Down
16 changes: 12 additions & 4 deletions src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import kotlin.jvm.JvmName

@ReThisDSL
class ReThis(
address: Address = Host(DEFAULT_HOST, DEFAULT_PORT),
internal val address: Address = Host(DEFAULT_HOST, DEFAULT_PORT),
val protocol: RespVer = RespVer.V3,
configurator: ClientConfiguration.() -> Unit = {},
) {
Expand All @@ -32,11 +32,9 @@ class ReThis(
internal val cfg: ClientConfiguration = ClientConfiguration().apply(configurator)
internal val rootJob = SupervisorJob()
internal val rethisCoScope = CoroutineScope(rootJob + cfg.poolConfiguration.dispatcher + CoroutineName("ReThis"))
internal val connectionPool by lazy { ConnectionPool(this, address.socket).also { it.prepare() } }
internal val connectionPool by lazy { ConnectionPool(this).also { it.prepare() } }

init {
logger.info("Created client (RESP $protocol)")

if (address is Url) {
cfg.db = address.db
if (address.credentials.isNotEmpty()) {
Expand All @@ -46,6 +44,16 @@ class ReThis(
)
}
}

buildString {
append("Created ReThis client.\n")
append("Address: $address\n")
append("DB: ${cfg.db ?: 0}\n")
append("Auth: ${cfg.auth != null}\n")
append("TLS: ${cfg.tlsConfig != null}\n")
append("Pool size: ${cfg.poolConfiguration.poolSize}\n")
append("Protocol: ${protocol}\n")
}.let { logger.info(it) }
}

val subscriptions = ActiveSubscriptions()
Expand Down
7 changes: 7 additions & 0 deletions src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ expect sealed class Address() {
internal abstract val socket: SocketAddress
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Address.stringify() = when (val thiz = socket) {
is InetSocketAddress -> "InetSocket(${thiz.hostname}:${thiz.port})"
is UnixSocketAddress -> "UnixSocket(${thiz.path})"
else -> socket.toString()
}

class Host(
host: String,
port: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,113 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO

/**
* A configuration class for the client.
*
* @property db The database index to switch to after connecting.
* @property charset The character set to use for communication.
* @property tlsConfig The TLS configuration to use when connecting.
* @property auth The authentication options.
* @property poolConfiguration The pool configuration.
* @property reconnectionStrategy The reconnection strategy to use.
*/
@ConfigurationDSL
data class ClientConfiguration(
var db: Int? = null,
var charset: Charset = Charsets.UTF_8,
var tlsConfig: TLSConfig? = null,
internal var auth: AuthConfiguration? = null,
internal var poolConfiguration: PoolConfiguration = PoolConfiguration(),
internal val poolConfiguration: PoolConfiguration = PoolConfiguration(),
internal val reconnectionStrategy: ReconnectionStrategyConfiguration = ReconnectionStrategyConfiguration(),
internal val socketConfiguration: SocketConfiguration = SocketConfiguration()
) {
/**
* Configures authentication for the client.
*
* @param password The password to use for authentication.
* @param username The username to use for authentication, defaults to null.
*/
fun auth(password: String, username: String? = null) {
auth = AuthConfiguration(password, username)
}

/**
* Configures the connection pool settings.
*
* @param block A lambda to configure the pool settings.
*/
fun pool(block: PoolConfiguration.() -> Unit) {
poolConfiguration.block()
}

/**
* Configures the reconnection strategy.
*
* @param block A lambda to configure the reconnection strategy.
*/
fun reconnectionStrategy(block: ReconnectionStrategyConfiguration.() -> Unit) {
reconnectionStrategy.block()
}

/**
* Configures the socket options.
*
* @param block A lambda to configure the socket settings.
*/
fun socket(block: SocketConfiguration.() -> Unit) {
socketConfiguration.block()
}
}

/**
* Configuration for redis connection authentication.
*
* @property password the password to use for authentication
* @property username the username to use for authentication, defaults to null
*/
@ConfigurationDSL
data class AuthConfiguration(
var password: String,
var username: String? = null,
)

/**
* Configuration for redis connection socket options.
*
* @property timeout the timeout in milliseconds for the redis connection, defaults to null
* @property linger SO_LINGER option for the redis connection, defaults to null,
* [see](https://api.ktor.io/ktor-network/io.ktor.network.sockets/-socket-options/-t-c-p-client-socket-options/linger-seconds.html).
* @property noDelay TCP_NODELAY option for the redis connection, defaults to true
* @property keepAlive TCP_KEEPALIVE option for the redis connection, defaults to true
*/
@ConfigurationDSL
data class SocketConfiguration(
var timeout: Long? = null,
var linger: Int? = null,
var noDelay: Boolean = true,
var keepAlive: Boolean = true,
)

/**
* Configuration for redis connection reconnection strategy.
*
* @property doHealthCheck if true, performs health checks on the connection before adding it to the pool, defaults to true
* @property reconnectAttempts the number of times to attempt reconnecting to the redis server on failure, defaults to 3
* @property reconnectDelay the delay in milliseconds between reconnecting, defaults to 3000L
*/
@ConfigurationDSL
data class ReconnectionStrategyConfiguration(
var doHealthCheck: Boolean = true,
var reconnectAttempts: Int = 3,
var reconnectDelay: Long = 3000L,
)

/**
* Configuration for redis connection pool.
*
* @property poolSize the size of the connection pool, defaults to 50
* @property dispatcher the dispatcher to use for connection pool coroutines, defaults to [Dispatchers.IO]
*/
@ConfigurationDSL
data class PoolConfiguration(
var poolSize: Int = 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,31 @@ import kotlin.contracts.contract

internal class ConnectionPool(
internal val client: ReThis,
private val address: SocketAddress,
) {
internal val logger = KtorSimpleLogger("eu.vendeli.rethis.ConnectionPool")

@OptIn(ExperimentalCoroutinesApi::class)
internal val isEmpty: Boolean get() = connections.isEmpty

private val job = SupervisorJob(client.rootJob)
private val poolScope = CoroutineScope(
client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis-ConnectionPool"),
)
private val connections = Channel<Connection>(client.cfg.poolConfiguration.poolSize)
private val selector = SelectorManager(client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis Pool"))
private val selector = SelectorManager(poolScope.coroutineContext)

internal suspend fun createConn(): Connection {
logger.trace("Creating connection to $address")
logger.trace("Creating connection to ${client.address}")
val conn = aSocket(selector)
.tcp()
.connect(address)
.connect(client.address.socket) {
client.cfg.socketConfiguration.run {
timeout?.let { socketTimeout = it }
linger?.let { lingerSeconds = it }
this@connect.keepAlive = keepAlive
this@connect.noDelay = noDelay
}
}
.let { socket ->
client.cfg.tlsConfig?.let {
socket.tls(selector.coroutineContext, it)
Expand All @@ -43,19 +52,21 @@ internal class ConnectionPool(
var requests = 0

if (client.cfg.auth != null) client.cfg.auth?.run {
logger.debug("Authenticating to $address with $this")
logger.trace("Authenticating to ${client.address}.")
reqBuffer.writeRedisValue(listOfNotNull("AUTH".toArg(), username?.toArg(), password.toArg()))
requests++
}

client.cfg.db?.takeIf { it > 0 }?.let {
requests++
logger.trace("Selecting database $it to ${client.address}.")
reqBuffer.writeRedisValue(listOf("SELECT".toArg(), it.toArg()))
requests++
}

reqBuffer.writeRedisValue(listOf("HELLO".toArg(), client.protocol.literal.toArg()))
requests++

logger.trace("Sending connection establishment requests ($requests)")
conn.sendRequest(reqBuffer)
repeat(requests) {
logger.trace("Connection establishment response: " + conn.input.readRedisMessage(client.cfg.charset))
Expand All @@ -73,8 +84,47 @@ internal class ConnectionPool(

suspend fun acquire(): Connection = connections.receive()

suspend fun release(connection: Connection) {
connections.send(connection)
fun release(connection: Connection) {
handle(connection)
}

private fun handle(connection: Connection) = poolScope.launch {
logger.trace("Releasing connection ${connection.socket}")
val cfg = client.cfg.reconnectionStrategy
if (cfg.doHealthCheck && connection.input.isClosedForRead) { // connection is corrupted
logger.warn("Connection ${connection.socket} is corrupted, refilling")
connection.socket.close()
refill()
} else {
connections.send(connection)
}
}

private suspend fun refill() {
val cfg = client.cfg.reconnectionStrategy
if (cfg.reconnectAttempts <= 0) return
var attempt = 0
var ex: Throwable? = null

while (attempt < cfg.reconnectAttempts) {
attempt++
logger.trace("Refilling ConnectionPool. Attempt $attempt")
runCatching { createConn() }
.onSuccess {
connections.send(it)
logger.trace("Connection refilled with $it")
return
}.onFailure {
if (ex != null) ex.addSuppressed(it) else ex = it
}

logger.debug("Connection refill failed, remaining attempts: ${cfg.reconnectAttempts - attempt}")
delay(attempt * cfg.reconnectDelay)
}

val logMsg = "Connection refills failed, maximum attempts reached"
if (ex == null) logger.warn(logMsg)
else logger.warn(logMsg, ex)
}

@OptIn(ExperimentalCoroutinesApi::class)
Expand Down
4 changes: 2 additions & 2 deletions src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ internal suspend inline fun ReThis.registerSubscription(
messageMarker: String,
handler: SubscriptionHandler,
) {
val connection = connectionPool.acquire()
val connection = connectionPool.createConn()
val handlerJob = rethisCoScope.launch(CoLocalConn(connection)) {
val conn = currentCoroutineContext()[CoLocalConn]!!.connection
try {
Expand Down Expand Up @@ -76,8 +76,8 @@ internal suspend inline fun ReThis.registerSubscription(
subscriptions.eventHandler?.onException(target, e)
} finally {
conn.sendRequest(bufferValues(listOf(unRegCommand.toArg(), target.toArg()), cfg.charset))
connectionPool.release(conn)
subscriptions.unsubscribe(target)
connection.socket.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ internal suspend fun ByteReadChannel.readRedisMessage(charset: Charset, rawOnly:
val content = readRemaining(size).readText(charset)
readShort() // Skip CRLF
val encoding = content.subSequence(0, 3) // First 3 bytes are encoding
val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:)
val data = content.subSequence(4, size.toInt())
VerbatimString(encoding.toString(), data.toString())
}

Expand Down Expand Up @@ -156,7 +156,7 @@ internal suspend inline fun <T> ByteReadChannel.processRedisSimpleResponse(
val content = readRemaining(size).readText(charset)
readShort() // Skip CRLF
val encoding = content.subSequence(0, 3) // First 3 bytes are encoding
val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:)
val data = content.subSequence(4, size.toInt())
"$encoding:$data"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import io.ktor.network.sockets.*
@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
actual sealed class Address actual constructor() {
internal actual abstract val socket: SocketAddress

override fun toString() = stringify()
}

class UnixSocket(
Expand Down
Loading

0 comments on commit 00fc309

Please sign in to comment.