Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.2.1 #33

Merged
merged 14 commits into from
Jan 15, 2025
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Re.this Changelog

## 0.2.1

* Added connection retrying.
* Added socket configuration.
* Upgraded JVM target version to 17

## 0.2.0

* Implemented Bitmap, Stream commands.
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/ConfigureKotlin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fun Project.configureKotlin(block: KotlinMultiplatformExtension.() -> Unit) {
freeCompilerArgs = listOf("-opt-in=eu.vendeli.rethis.annotations.ReThisInternal")
}

val jvmTargetVer = 11
val jvmTargetVer = 17
jvm {
withJava()
compilations.all {
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ kotlinter = "5.0.1"
deteKT = "1.23.7"
publisher = "0.30.0"
binvalid = "0.17.0"
kover = "0.9.0"
kover = "0.9.1"

kotest = "5.9.1"
logback = "1.5.15"
logback = "1.5.16"

[libraries]
kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "io" }
Expand Down
16 changes: 12 additions & 4 deletions src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import kotlin.jvm.JvmName

@ReThisDSL
class ReThis(
address: Address = Host(DEFAULT_HOST, DEFAULT_PORT),
internal val address: Address = Host(DEFAULT_HOST, DEFAULT_PORT),
val protocol: RespVer = RespVer.V3,
configurator: ClientConfiguration.() -> Unit = {},
) {
Expand All @@ -32,11 +32,9 @@ class ReThis(
internal val cfg: ClientConfiguration = ClientConfiguration().apply(configurator)
internal val rootJob = SupervisorJob()
internal val rethisCoScope = CoroutineScope(rootJob + cfg.poolConfiguration.dispatcher + CoroutineName("ReThis"))
internal val connectionPool by lazy { ConnectionPool(this, address.socket).also { it.prepare() } }
internal val connectionPool by lazy { ConnectionPool(this).also { it.prepare() } }

init {
logger.info("Created client (RESP $protocol)")

if (address is Url) {
cfg.db = address.db
if (address.credentials.isNotEmpty()) {
Expand All @@ -46,6 +44,16 @@ class ReThis(
)
}
}

buildString {
append("Created ReThis client.\n")
append("Address: $address\n")
append("DB: ${cfg.db ?: 0}\n")
append("Auth: ${cfg.auth != null}\n")
append("TLS: ${cfg.tlsConfig != null}\n")
append("Pool size: ${cfg.poolConfiguration.poolSize}\n")
append("Protocol: ${protocol}\n")
}.let { logger.info(it) }
}

val subscriptions = ActiveSubscriptions()
Expand Down
7 changes: 7 additions & 0 deletions src/commonMain/kotlin/eu/vendeli/rethis/types/core/Address.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ expect sealed class Address() {
internal abstract val socket: SocketAddress
}

@Suppress("NOTHING_TO_INLINE")
internal inline fun Address.stringify() = when (val thiz = socket) {
is InetSocketAddress -> "InetSocket(${thiz.hostname}:${thiz.port})"
is UnixSocketAddress -> "UnixSocket(${thiz.path})"
else -> socket.toString()
}

class Host(
host: String,
port: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,113 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO

/**
* A configuration class for the client.
*
* @property db The database index to switch to after connecting.
* @property charset The character set to use for communication.
* @property tlsConfig The TLS configuration to use when connecting.
* @property auth The authentication options.
* @property poolConfiguration The pool configuration.
* @property reconnectionStrategy The reconnection strategy to use.
*/
@ConfigurationDSL
data class ClientConfiguration(
var db: Int? = null,
var charset: Charset = Charsets.UTF_8,
var tlsConfig: TLSConfig? = null,
internal var auth: AuthConfiguration? = null,
internal var poolConfiguration: PoolConfiguration = PoolConfiguration(),
internal val poolConfiguration: PoolConfiguration = PoolConfiguration(),
internal val reconnectionStrategy: ReconnectionStrategyConfiguration = ReconnectionStrategyConfiguration(),
internal val socketConfiguration: SocketConfiguration = SocketConfiguration()
) {
/**
* Configures authentication for the client.
*
* @param password The password to use for authentication.
* @param username The username to use for authentication, defaults to null.
*/
fun auth(password: String, username: String? = null) {
auth = AuthConfiguration(password, username)
}

/**
* Configures the connection pool settings.
*
* @param block A lambda to configure the pool settings.
*/
fun pool(block: PoolConfiguration.() -> Unit) {
poolConfiguration.block()
}

/**
* Configures the reconnection strategy.
*
* @param block A lambda to configure the reconnection strategy.
*/
fun reconnectionStrategy(block: ReconnectionStrategyConfiguration.() -> Unit) {
reconnectionStrategy.block()
}

/**
* Configures the socket options.
*
* @param block A lambda to configure the socket settings.
*/
fun socket(block: SocketConfiguration.() -> Unit) {
socketConfiguration.block()
}
}

/**
* Configuration for redis connection authentication.
*
* @property password the password to use for authentication
* @property username the username to use for authentication, defaults to null
*/
@ConfigurationDSL
data class AuthConfiguration(
var password: String,
var username: String? = null,
)

/**
* Configuration for redis connection socket options.
*
* @property timeout the timeout in milliseconds for the redis connection, defaults to null
* @property linger SO_LINGER option for the redis connection, defaults to null,
* [see](https://api.ktor.io/ktor-network/io.ktor.network.sockets/-socket-options/-t-c-p-client-socket-options/linger-seconds.html).
* @property noDelay TCP_NODELAY option for the redis connection, defaults to true
* @property keepAlive TCP_KEEPALIVE option for the redis connection, defaults to true
*/
@ConfigurationDSL
data class SocketConfiguration(
var timeout: Long? = null,
var linger: Int? = null,
var noDelay: Boolean = true,
var keepAlive: Boolean = true,
)

/**
* Configuration for redis connection reconnection strategy.
*
* @property doHealthCheck if true, performs health checks on the connection before adding it to the pool, defaults to true
* @property reconnectAttempts the number of times to attempt reconnecting to the redis server on failure, defaults to 3
* @property reconnectDelay the delay in milliseconds between reconnecting, defaults to 3000L
*/
@ConfigurationDSL
data class ReconnectionStrategyConfiguration(
var doHealthCheck: Boolean = true,
var reconnectAttempts: Int = 3,
var reconnectDelay: Long = 3000L,
)

/**
* Configuration for redis connection pool.
*
* @property poolSize the size of the connection pool, defaults to 50
* @property dispatcher the dispatcher to use for connection pool coroutines, defaults to [Dispatchers.IO]
*/
@ConfigurationDSL
data class PoolConfiguration(
var poolSize: Int = 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,31 @@ import kotlin.contracts.contract

internal class ConnectionPool(
internal val client: ReThis,
private val address: SocketAddress,
) {
internal val logger = KtorSimpleLogger("eu.vendeli.rethis.ConnectionPool")

@OptIn(ExperimentalCoroutinesApi::class)
internal val isEmpty: Boolean get() = connections.isEmpty

private val job = SupervisorJob(client.rootJob)
private val poolScope = CoroutineScope(
client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis-ConnectionPool"),
)
private val connections = Channel<Connection>(client.cfg.poolConfiguration.poolSize)
private val selector = SelectorManager(client.cfg.poolConfiguration.dispatcher + job + CoroutineName("ReThis Pool"))
private val selector = SelectorManager(poolScope.coroutineContext)

internal suspend fun createConn(): Connection {
logger.trace("Creating connection to $address")
logger.trace("Creating connection to ${client.address}")
val conn = aSocket(selector)
.tcp()
.connect(address)
.connect(client.address.socket) {
client.cfg.socketConfiguration.run {
timeout?.let { socketTimeout = it }
linger?.let { lingerSeconds = it }
this@connect.keepAlive = keepAlive
this@connect.noDelay = noDelay
}
}
.let { socket ->
client.cfg.tlsConfig?.let {
socket.tls(selector.coroutineContext, it)
Expand All @@ -43,19 +52,21 @@ internal class ConnectionPool(
var requests = 0

if (client.cfg.auth != null) client.cfg.auth?.run {
logger.debug("Authenticating to $address with $this")
logger.trace("Authenticating to ${client.address}.")
reqBuffer.writeRedisValue(listOfNotNull("AUTH".toArg(), username?.toArg(), password.toArg()))
requests++
}

client.cfg.db?.takeIf { it > 0 }?.let {
requests++
logger.trace("Selecting database $it to ${client.address}.")
reqBuffer.writeRedisValue(listOf("SELECT".toArg(), it.toArg()))
requests++
}

reqBuffer.writeRedisValue(listOf("HELLO".toArg(), client.protocol.literal.toArg()))
requests++

logger.trace("Sending connection establishment requests ($requests)")
conn.sendRequest(reqBuffer)
repeat(requests) {
logger.trace("Connection establishment response: " + conn.input.readRedisMessage(client.cfg.charset))
Expand All @@ -73,8 +84,47 @@ internal class ConnectionPool(

suspend fun acquire(): Connection = connections.receive()

suspend fun release(connection: Connection) {
connections.send(connection)
fun release(connection: Connection) {
handle(connection)
}

private fun handle(connection: Connection) = poolScope.launch {
logger.trace("Releasing connection ${connection.socket}")
val cfg = client.cfg.reconnectionStrategy
if (cfg.doHealthCheck && connection.input.isClosedForRead) { // connection is corrupted
logger.warn("Connection ${connection.socket} is corrupted, refilling")
connection.socket.close()
refill()
} else {
connections.send(connection)
}
}

private suspend fun refill() {
val cfg = client.cfg.reconnectionStrategy
if (cfg.reconnectAttempts <= 0) return
var attempt = 0
var ex: Throwable? = null

while (attempt < cfg.reconnectAttempts) {
attempt++
logger.trace("Refilling ConnectionPool. Attempt $attempt")
runCatching { createConn() }
.onSuccess {
connections.send(it)
logger.trace("Connection refilled with $it")
return
}.onFailure {
if (ex != null) ex.addSuppressed(it) else ex = it
}

logger.debug("Connection refill failed, remaining attempts: ${cfg.reconnectAttempts - attempt}")
delay(attempt * cfg.reconnectDelay)
}

val logMsg = "Connection refills failed, maximum attempts reached"
if (ex == null) logger.warn(logMsg)
else logger.warn(logMsg, ex)
}

@OptIn(ExperimentalCoroutinesApi::class)
Expand Down
4 changes: 2 additions & 2 deletions src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ internal suspend inline fun ReThis.registerSubscription(
messageMarker: String,
handler: SubscriptionHandler,
) {
val connection = connectionPool.acquire()
val connection = connectionPool.createConn()
val handlerJob = rethisCoScope.launch(CoLocalConn(connection)) {
val conn = currentCoroutineContext()[CoLocalConn]!!.connection
try {
Expand Down Expand Up @@ -76,8 +76,8 @@ internal suspend inline fun ReThis.registerSubscription(
subscriptions.eventHandler?.onException(target, e)
} finally {
conn.sendRequest(bufferValues(listOf(unRegCommand.toArg(), target.toArg()), cfg.charset))
connectionPool.release(conn)
subscriptions.unsubscribe(target)
connection.socket.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ internal suspend fun ByteReadChannel.readRedisMessage(charset: Charset, rawOnly:
val content = readRemaining(size).readText(charset)
readShort() // Skip CRLF
val encoding = content.subSequence(0, 3) // First 3 bytes are encoding
val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:)
val data = content.subSequence(4, size.toInt())
VerbatimString(encoding.toString(), data.toString())
}

Expand Down Expand Up @@ -156,7 +156,7 @@ internal suspend inline fun <T> ByteReadChannel.processRedisSimpleResponse(
val content = readRemaining(size).readText(charset)
readShort() // Skip CRLF
val encoding = content.subSequence(0, 3) // First 3 bytes are encoding
val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:)
val data = content.subSequence(4, size.toInt())
"$encoding:$data"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import io.ktor.network.sockets.*
@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
actual sealed class Address actual constructor() {
internal actual abstract val socket: SocketAddress

override fun toString() = stringify()
}

class UnixSocket(
Expand Down
Loading
Loading