Skip to content

Commit

Permalink
feat(websocket): implement keepalive functionality (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
knotekt authored Mar 19, 2024
2 parents 5fd54fb + 5b50930 commit cf5b7fa
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 15 deletions.
19 changes: 19 additions & 0 deletions src/commonMain/kotlin/Command.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cz.smarteon.loxone

import cz.smarteon.loxone.message.LoxoneMsg
import cz.smarteon.loxone.message.LoxoneMsgVal
import kotlin.jvm.JvmOverloads
import kotlin.reflect.KClass

interface Command<out RESPONSE : LoxoneResponse> {
Expand All @@ -13,6 +14,24 @@ interface Command<out RESPONSE : LoxoneResponse> {
val authenticated: Boolean
}

/**
* Skeleton for commands which have no response from Loxone miniserver.
*/
abstract class NoResponseCommand @JvmOverloads constructor(
override val pathSegments: List<String>,
override val authenticated: Boolean = false
) : Command<Nothing> {

@JvmOverloads
constructor(vararg pathSegments: String, authenticated: Boolean = false) : this(
pathSegments.toList(),
authenticated
)

override val responseType
get() = Nothing::class
}

interface LoxoneMsgCommand<out VALUE : LoxoneMsgVal> : Command<LoxoneMsg> {
override val responseType
get() = LoxoneMsg::class
Expand Down
18 changes: 18 additions & 0 deletions src/commonMain/kotlin/LoxoneCommands.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cz.smarteon.loxone

/**
* Central registry of Loxone commands.
*
* Please note that primarily, the functions for commands construction are accessible on the related message classes.
* @see[Command]
* @see[LoxoneMsgCommand]
* @see[CommandSupplier]
*/
object LoxoneCommands {

/**
* Keep alive command used solely in [cz.smarteon.loxone.ktor.WebsocketLoxoneClient] to ensure connection alive
* functionality.
*/
val KEEP_ALIVE = object : NoResponseCommand("keepalive") {}
}
65 changes: 54 additions & 11 deletions src/commonMain/kotlin/ktor/WebsocketLoxoneClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import cz.smarteon.loxone.Codec
import cz.smarteon.loxone.Codec.loxJson
import cz.smarteon.loxone.Command
import cz.smarteon.loxone.LoxoneClient
import cz.smarteon.loxone.LoxoneCommands
import cz.smarteon.loxone.LoxoneEndpoint
import cz.smarteon.loxone.LoxoneResponse
import cz.smarteon.loxone.LoxoneTokenAuthenticator
import cz.smarteon.loxone.message.MessageHeader
import cz.smarteon.loxone.message.MessageKind
import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
Expand All @@ -18,13 +20,18 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.jvm.JvmOverloads
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds

class WebsocketLoxoneClient internal constructor(
private val client: HttpClient,
Expand All @@ -42,9 +49,14 @@ class WebsocketLoxoneClient internal constructor(
private val sessionMutex = Mutex()
private var session: ClientWebSocketSession? = null

/**
* Scope used for all background tasks in the client. Should not be used for executing commands (call* functions).
*/
private val scope = CoroutineScope(Dispatchers.Default) // TODO think more about correct dispacthers

private val textHeader = Channel<MessageHeader>(capacity = 1)
private val keepAliveHeader = Channel<MessageHeader>(capacity = 1)
private val textMsgHeader = Channel<MessageHeader>(capacity = 1)
private val binaryMsgHeader = Channel<MessageHeader>(capacity = 1)
private val textMessages = Channel<String>(capacity = 10)

override suspend fun <RESPONSE : LoxoneResponse> call(command: Command<RESPONSE>): RESPONSE {
Expand All @@ -53,10 +65,7 @@ class WebsocketLoxoneClient internal constructor(
authenticator?.ensureAuthenticated(this)
}

// TODO is url encoding of segments needed here?
val joinedCmd = command.pathSegments.joinToString(separator = "/")
logger.trace { "Sending command: $joinedCmd" }
session.send(joinedCmd)
session.send(command)

@Suppress("UNCHECKED_CAST")
return loxJson.decodeFromString(command.responseType.deserializer, receiveTextMessage()) as RESPONSE
Expand Down Expand Up @@ -89,6 +98,7 @@ class WebsocketLoxoneClient internal constructor(
logger.debug { "WebSocketSession session created" }
session = newSession
newSession.incoming.receiveAsFlow().onEach(::processFrame).launchIn(scope)
startKeepAlive(newSession)
}
}
}
Expand All @@ -98,9 +108,19 @@ class WebsocketLoxoneClient internal constructor(
private suspend fun processFrame(frame: Frame) {
when (frame.frameType) {
FrameType.BINARY -> {
val header = Codec.readHeader(frame.data)
logger.trace { "Incoming message header: $header" }
textHeader.send(header)
val incomingBinaryMsgHeader = binaryMsgHeader.tryReceive()
if (incomingBinaryMsgHeader.isSuccess) {
logger.trace { "Incoming binary message" }
// TODO process binary message
} else {
val header = Codec.readHeader(frame.data)
logger.trace { "Incoming message header: $header" }
when (header.kind) {
MessageKind.KEEP_ALIVE -> keepAliveHeader.send(header)
MessageKind.TEXT -> textMsgHeader.send(header)
else -> binaryMsgHeader.send(header)
}
}
}
FrameType.TEXT -> {
val textData = frame.data.decodeToString()
Expand All @@ -111,13 +131,36 @@ class WebsocketLoxoneClient internal constructor(
}
}

private suspend fun receiveTextMessage() = withTimeout(RCV_TXT_MSG_TIMEOUT_MILLIS) {
textHeader.receive()
private suspend fun receiveTextMessage() = withTimeout(RCV_TXT_MSG_TIMEOUT) {
textMsgHeader.receive()
textMessages.receive()
}

private suspend fun startKeepAlive(session: ClientWebSocketSession) = scope.launch {
while (true) {
session.send(LoxoneCommands.KEEP_ALIVE)
val keepAliveResponse = withTimeoutOrNull(KEEP_ALIVE_RESPONSE_TIMEOUT) {
keepAliveHeader.receive()
}
if (keepAliveResponse == null) {
logger.info { "Keepalive response not received within timeout, closing connection" }
close()
}
delay(KEEP_ALIVE_INTERVAL)
}
}

private suspend fun ClientWebSocketSession.send(command: Command<*>) {
// TODO is url encoding of segments needed here?
val joinedCmd = command.pathSegments.joinToString(separator = "/")
logger.trace { "Sending command: $joinedCmd" }
send(joinedCmd)
}

companion object {
private const val WS_PATH = "/ws/rfc6455"
private const val RCV_TXT_MSG_TIMEOUT_MILLIS = 10000L
private val RCV_TXT_MSG_TIMEOUT = 10.seconds
private val KEEP_ALIVE_INTERVAL = 4.minutes
private val KEEP_ALIVE_RESPONSE_TIMEOUT = 30.seconds
}
}
13 changes: 13 additions & 0 deletions src/commonMain/kotlin/message/MessageHeader.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package cz.smarteon.loxone.message

/**
* Message header used to identify websocket message kind and size.
*
* @param kind message kind
* @param sizeEstimated whether message size is estimated or calculated, `false` if message size is exact
* @param messageSize number of bytes in message payload
* @see[cz.smarteon.loxone.ktor.WebsocketLoxoneClient]
*/
internal data class MessageHeader(
val kind: MessageKind,
val sizeEstimated: Boolean,
Expand All @@ -10,6 +18,11 @@ internal data class MessageHeader(
const val PAYLOAD_LENGTH = 8
const val FIRST_BYTE: Byte = 0x03
const val MSG_SIZE_POSITION = 4

/**
* Keep alive message header, received from miniserver as response to keep alive command.
*/
val KEEP_ALIVE = MessageHeader(MessageKind.KEEP_ALIVE, false, 0)
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/commonTest/kotlin/CodecTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ class CodecTest : StringSpec({
val header = MessageHeader(MessageKind.TEXT, false, 171)
Codec.writeHeader(header) shouldBe Codec.hexToBytes("03000000ab000000")
}

"should read KEEP_ALIVE header" {
val header = Codec.readHeader(Codec.hexToBytes("0306000000000000"))
header shouldBe MessageHeader.KEEP_ALIVE
}
})
19 changes: 15 additions & 4 deletions src/jvmTest/kotlin/ktor/WebsocketLoxoneClientIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cz.smarteon.loxone.message.MessageKind.TEXT
import cz.smarteon.loxone.message.TestingLoxValues.API_INFO_MSG_VAL
import cz.smarteon.loxone.message.TestingMessages.okMsg
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.collections.shouldContain
import io.kotest.matchers.shouldBe
import io.ktor.client.*
import io.ktor.server.application.*
Expand All @@ -21,27 +22,35 @@ class WebsocketLoxoneClientIT : StringSpec({

"should call simple command" {
testWebsocket {
val client = WebsocketLoxoneClient(this)
val client = WebsocketLoxoneClient(testedClient)

client.callRaw("jdev/cfg/api") shouldBe okMsg("dev/cfg/api", API_INFO_MSG_VAL)

val response = client.call(ApiInfo.command)
response.code shouldBe "200"
response.value shouldBe API_INFO_MSG_VAL
response.control shouldBe "dev/cfg/api"

received shouldContain "jdev/cfg/api"
received shouldContain "keepalive"

}
}

})

private suspend fun testWebsocket(test: suspend HttpClient.() -> Unit) = testApplication {
private suspend fun testWebsocket(test: suspend ClientTestContext.() -> Unit) = testApplication {
application { install(ServerWebsockets) }

val receivedMsgs = mutableListOf<String>()

routing {
webSocketRaw(path = "/ws/rfc6455") {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
when (frame.readText()) {
val payload = frame.readText()
receivedMsgs.add(payload)
when (payload) {
"jdev/cfg/api" -> {
val text = okMsg("dev/cfg/api", API_INFO_MSG_VAL).encodeToByteArray()
send(Frame.Binary(true, Codec.writeHeader(MessageHeader(TEXT, false, text.size.toLong()))))
Expand All @@ -53,5 +62,7 @@ private suspend fun testWebsocket(test: suspend HttpClient.() -> Unit) = testApp
}
}

createClient { install(ClientWebsockets) }.use { it.test() }
createClient { install(ClientWebsockets) }.use { ClientTestContext(it, receivedMsgs).test() }
}

class ClientTestContext(val testedClient: HttpClient, val received: List<String>)

0 comments on commit cf5b7fa

Please sign in to comment.