From 8787ff9557574be5355b9110caf6f54ab3a7fcb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Mikul=C3=A1=C5=A1ek?= Date: Sun, 17 Mar 2024 13:09:12 +0100 Subject: [PATCH 1/2] feat(websocket): implement keepalive functionality --- src/commonMain/kotlin/Command.kt | 19 ++++++ src/commonMain/kotlin/LoxoneCommands.kt | 18 +++++ .../kotlin/ktor/WebsocketLoxoneClient.kt | 65 +++++++++++++++---- .../kotlin/message/MessageHeader.kt | 5 ++ src/commonTest/kotlin/CodecTest.kt | 5 ++ .../kotlin/ktor/WebsocketLoxoneClientIT.kt | 19 ++++-- 6 files changed, 116 insertions(+), 15 deletions(-) create mode 100644 src/commonMain/kotlin/LoxoneCommands.kt diff --git a/src/commonMain/kotlin/Command.kt b/src/commonMain/kotlin/Command.kt index 4f0c870..a1cffed 100644 --- a/src/commonMain/kotlin/Command.kt +++ b/src/commonMain/kotlin/Command.kt @@ -2,6 +2,7 @@ package cz.smarteon.loxone import cz.smarteon.loxone.message.LoxoneMsg import cz.smarteon.loxone.message.LoxoneMsgVal +import kotlin.jvm.JvmOverloads import kotlin.reflect.KClass interface Command { @@ -13,6 +14,24 @@ interface Command { val authenticated: Boolean } +/** + * Skeleton for commands which have no response from Loxone miniserver. + */ +abstract class NoResponseCommand @JvmOverloads constructor( + override val pathSegments: List, + override val authenticated: Boolean = false +) : Command { + + @JvmOverloads + constructor(vararg pathSegments: String, authenticated: Boolean = false) : this( + pathSegments.toList(), + authenticated + ) + + override val responseType + get() = Nothing::class +} + interface LoxoneMsgCommand : Command { override val responseType get() = LoxoneMsg::class diff --git a/src/commonMain/kotlin/LoxoneCommands.kt b/src/commonMain/kotlin/LoxoneCommands.kt new file mode 100644 index 0000000..314c1c8 --- /dev/null +++ b/src/commonMain/kotlin/LoxoneCommands.kt @@ -0,0 +1,18 @@ +package cz.smarteon.loxone + +/** + * Central registry of Loxone commands. + * + * Please note that primarily, the functions for commands construction are accessible on the related message classes. + * @see[Command] + * @see[LoxoneMsgCommand] + * @see[CommandSupplier] + */ +object LoxoneCommands { + + /** + * Keep alive command used solely in [cz.smarteon.loxone.ktor.WebsocketLoxoneClient] to ensure connection alive + * functionality. + */ + val KEEP_ALIVE = object : NoResponseCommand("keepalive") {} +} diff --git a/src/commonMain/kotlin/ktor/WebsocketLoxoneClient.kt b/src/commonMain/kotlin/ktor/WebsocketLoxoneClient.kt index 0e10576..4e2bb6b 100644 --- a/src/commonMain/kotlin/ktor/WebsocketLoxoneClient.kt +++ b/src/commonMain/kotlin/ktor/WebsocketLoxoneClient.kt @@ -4,10 +4,12 @@ import cz.smarteon.loxone.Codec import cz.smarteon.loxone.Codec.loxJson import cz.smarteon.loxone.Command import cz.smarteon.loxone.LoxoneClient +import cz.smarteon.loxone.LoxoneCommands import cz.smarteon.loxone.LoxoneEndpoint import cz.smarteon.loxone.LoxoneResponse import cz.smarteon.loxone.LoxoneTokenAuthenticator import cz.smarteon.loxone.message.MessageHeader +import cz.smarteon.loxone.message.MessageKind import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.client.* import io.ktor.client.plugins.websocket.* @@ -18,13 +20,18 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withTimeout +import kotlinx.coroutines.withTimeoutOrNull import kotlin.jvm.JvmOverloads +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds class WebsocketLoxoneClient internal constructor( private val client: HttpClient, @@ -42,9 +49,14 @@ class WebsocketLoxoneClient internal constructor( private val sessionMutex = Mutex() private var session: ClientWebSocketSession? = null + /** + * Scope used for all background tasks in the client. Should not be used for executing commands (call* functions). + */ private val scope = CoroutineScope(Dispatchers.Default) // TODO think more about correct dispacthers - private val textHeader = Channel(capacity = 1) + private val keepAliveHeader = Channel(capacity = 1) + private val textMsgHeader = Channel(capacity = 1) + private val binaryMsgHeader = Channel(capacity = 1) private val textMessages = Channel(capacity = 10) override suspend fun call(command: Command): RESPONSE { @@ -53,10 +65,7 @@ class WebsocketLoxoneClient internal constructor( authenticator?.ensureAuthenticated(this) } - // TODO is url encoding of segments needed here? - val joinedCmd = command.pathSegments.joinToString(separator = "/") - logger.trace { "Sending command: $joinedCmd" } - session.send(joinedCmd) + session.send(command) @Suppress("UNCHECKED_CAST") return loxJson.decodeFromString(command.responseType.deserializer, receiveTextMessage()) as RESPONSE @@ -89,6 +98,7 @@ class WebsocketLoxoneClient internal constructor( logger.debug { "WebSocketSession session created" } session = newSession newSession.incoming.receiveAsFlow().onEach(::processFrame).launchIn(scope) + startKeepAlive(newSession) } } } @@ -98,9 +108,19 @@ class WebsocketLoxoneClient internal constructor( private suspend fun processFrame(frame: Frame) { when (frame.frameType) { FrameType.BINARY -> { - val header = Codec.readHeader(frame.data) - logger.trace { "Incoming message header: $header" } - textHeader.send(header) + val incomingBinaryMsgHeader = binaryMsgHeader.tryReceive() + if (incomingBinaryMsgHeader.isSuccess) { + logger.trace { "Incoming binary message" } + // TODO process binary message + } else { + val header = Codec.readHeader(frame.data) + logger.trace { "Incoming message header: $header" } + when (header.kind) { + MessageKind.KEEP_ALIVE -> keepAliveHeader.send(header) + MessageKind.TEXT -> textMsgHeader.send(header) + else -> binaryMsgHeader.send(header) + } + } } FrameType.TEXT -> { val textData = frame.data.decodeToString() @@ -111,13 +131,36 @@ class WebsocketLoxoneClient internal constructor( } } - private suspend fun receiveTextMessage() = withTimeout(RCV_TXT_MSG_TIMEOUT_MILLIS) { - textHeader.receive() + private suspend fun receiveTextMessage() = withTimeout(RCV_TXT_MSG_TIMEOUT) { + textMsgHeader.receive() textMessages.receive() } + private suspend fun startKeepAlive(session: ClientWebSocketSession) = scope.launch { + while (true) { + session.send(LoxoneCommands.KEEP_ALIVE) + val keepAliveResponse = withTimeoutOrNull(KEEP_ALIVE_RESPONSE_TIMEOUT) { + keepAliveHeader.receive() + } + if (keepAliveResponse == null) { + logger.info { "Keepalive response not received within timeout, closing connection" } + close() + } + delay(KEEP_ALIVE_INTERVAL) + } + } + + private suspend fun ClientWebSocketSession.send(command: Command<*>) { + // TODO is url encoding of segments needed here? + val joinedCmd = command.pathSegments.joinToString(separator = "/") + logger.trace { "Sending command: $joinedCmd" } + send(joinedCmd) + } + companion object { private const val WS_PATH = "/ws/rfc6455" - private const val RCV_TXT_MSG_TIMEOUT_MILLIS = 10000L + private val RCV_TXT_MSG_TIMEOUT = 10.seconds + private val KEEP_ALIVE_INTERVAL = 4.minutes + private val KEEP_ALIVE_RESPONSE_TIMEOUT = 30.seconds } } diff --git a/src/commonMain/kotlin/message/MessageHeader.kt b/src/commonMain/kotlin/message/MessageHeader.kt index fd957d9..97545d2 100644 --- a/src/commonMain/kotlin/message/MessageHeader.kt +++ b/src/commonMain/kotlin/message/MessageHeader.kt @@ -10,6 +10,11 @@ internal data class MessageHeader( const val PAYLOAD_LENGTH = 8 const val FIRST_BYTE: Byte = 0x03 const val MSG_SIZE_POSITION = 4 + + /** + * Keep alive message header, received from miniserver as response to keep alive command. + */ + val KEEP_ALIVE = MessageHeader(MessageKind.KEEP_ALIVE, false, 0) } } diff --git a/src/commonTest/kotlin/CodecTest.kt b/src/commonTest/kotlin/CodecTest.kt index 48dbf64..8069da2 100644 --- a/src/commonTest/kotlin/CodecTest.kt +++ b/src/commonTest/kotlin/CodecTest.kt @@ -25,4 +25,9 @@ class CodecTest : StringSpec({ val header = MessageHeader(MessageKind.TEXT, false, 171) Codec.writeHeader(header) shouldBe Codec.hexToBytes("03000000ab000000") } + + "should read KEEP_ALIVE header" { + val header = Codec.readHeader(Codec.hexToBytes("0306000000000000")) + header shouldBe MessageHeader.KEEP_ALIVE + } }) diff --git a/src/jvmTest/kotlin/ktor/WebsocketLoxoneClientIT.kt b/src/jvmTest/kotlin/ktor/WebsocketLoxoneClientIT.kt index 70bf5c8..0907944 100644 --- a/src/jvmTest/kotlin/ktor/WebsocketLoxoneClientIT.kt +++ b/src/jvmTest/kotlin/ktor/WebsocketLoxoneClientIT.kt @@ -7,6 +7,7 @@ import cz.smarteon.loxone.message.MessageKind.TEXT import cz.smarteon.loxone.message.TestingLoxValues.API_INFO_MSG_VAL import cz.smarteon.loxone.message.TestingMessages.okMsg import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.shouldBe import io.ktor.client.* import io.ktor.server.application.* @@ -21,7 +22,7 @@ class WebsocketLoxoneClientIT : StringSpec({ "should call simple command" { testWebsocket { - val client = WebsocketLoxoneClient(this) + val client = WebsocketLoxoneClient(testedClient) client.callRaw("jdev/cfg/api") shouldBe okMsg("dev/cfg/api", API_INFO_MSG_VAL) @@ -29,19 +30,27 @@ class WebsocketLoxoneClientIT : StringSpec({ response.code shouldBe "200" response.value shouldBe API_INFO_MSG_VAL response.control shouldBe "dev/cfg/api" + + received shouldContain "jdev/cfg/api" + received shouldContain "keepalive" + } } }) -private suspend fun testWebsocket(test: suspend HttpClient.() -> Unit) = testApplication { +private suspend fun testWebsocket(test: suspend ClientTestContext.() -> Unit) = testApplication { application { install(ServerWebsockets) } + val receivedMsgs = mutableListOf() + routing { webSocketRaw(path = "/ws/rfc6455") { incoming.consumeEach { frame -> if (frame is Frame.Text) { - when (frame.readText()) { + val payload = frame.readText() + receivedMsgs.add(payload) + when (payload) { "jdev/cfg/api" -> { val text = okMsg("dev/cfg/api", API_INFO_MSG_VAL).encodeToByteArray() send(Frame.Binary(true, Codec.writeHeader(MessageHeader(TEXT, false, text.size.toLong())))) @@ -53,5 +62,7 @@ private suspend fun testWebsocket(test: suspend HttpClient.() -> Unit) = testApp } } - createClient { install(ClientWebsockets) }.use { it.test() } + createClient { install(ClientWebsockets) }.use { ClientTestContext(it, receivedMsgs).test() } } + +class ClientTestContext(val testedClient: HttpClient, val received: List) From 5b50930920afef925149552bb051d8f4e10254f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Mikul=C3=A1=C5=A1ek?= Date: Sun, 17 Mar 2024 13:12:00 +0100 Subject: [PATCH 2/2] doc: add MessageHeader doc --- src/commonMain/kotlin/message/MessageHeader.kt | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/commonMain/kotlin/message/MessageHeader.kt b/src/commonMain/kotlin/message/MessageHeader.kt index 97545d2..55d8012 100644 --- a/src/commonMain/kotlin/message/MessageHeader.kt +++ b/src/commonMain/kotlin/message/MessageHeader.kt @@ -1,5 +1,13 @@ package cz.smarteon.loxone.message +/** + * Message header used to identify websocket message kind and size. + * + * @param kind message kind + * @param sizeEstimated whether message size is estimated or calculated, `false` if message size is exact + * @param messageSize number of bytes in message payload + * @see[cz.smarteon.loxone.ktor.WebsocketLoxoneClient] + */ internal data class MessageHeader( val kind: MessageKind, val sizeEstimated: Boolean,