Skip to content

Commit

Permalink
fix(websocket): ensure the websocket session is created only once
Browse files Browse the repository at this point in the history
This can't be achieved by AtomicReference since the session has been created always before passing to it.
  • Loading branch information
jimirocks committed Mar 10, 2024
1 parent 781ecff commit c3b62f2
Showing 1 changed file with 20 additions and 16 deletions.
36 changes: 20 additions & 16 deletions src/commonMain/kotlin/ktor/WebsocketLoxoneClient.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package cz.smarteon.loxone.ktor

import co.touchlab.stately.concurrency.AtomicReference
import cz.smarteon.loxone.Codec
import cz.smarteon.loxone.Codec.loxJson
import cz.smarteon.loxone.Command
Expand All @@ -22,6 +21,8 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import kotlin.jvm.JvmOverloads

Expand All @@ -38,7 +39,8 @@ class WebsocketLoxoneClient internal constructor(

private val logger = KotlinLogging.logger {}

private val webSocketSession = AtomicReference<ClientWebSocketSession?>(null)
private val sessionMutex = Mutex()
private var session: ClientWebSocketSession? = null

private val scope = CoroutineScope(Dispatchers.Default) // TODO think more about correct dispacthers

Expand Down Expand Up @@ -70,25 +72,27 @@ class WebsocketLoxoneClient internal constructor(

override suspend fun close() {
scope.cancel("Closing the connection")
webSocketSession.get()?.close(CloseReason(NORMAL, "LoxoneKotlin finished"))
session?.close(CloseReason(NORMAL, "LoxoneKotlin finished"))
}

private suspend fun ensureSession(): ClientWebSocketSession {
webSocketSession.compareAndSet(
null,
client.webSocketSession(
host = endpoint?.host,
port = endpoint?.port,
path = if (endpoint != null) endpoint.path + WS_PATH else WS_PATH,
block = {
url.protocol = if (endpoint?.useSsl == true) URLProtocol.WSS else URLProtocol.WS
sessionMutex.withLock {
if (session == null) {
client.webSocketSession(
host = endpoint?.host,
port = endpoint?.port,
path = if (endpoint != null) endpoint.path + WS_PATH else WS_PATH,
block = {
url.protocol = if (endpoint?.useSsl == true) URLProtocol.WSS else URLProtocol.WS
}
).let { newSession ->
logger.debug { "WebSocketSession session created"}
session = newSession
newSession.incoming.receiveAsFlow().onEach(::processFrame).launchIn(scope)
}
)
)
return checkNotNull(webSocketSession.get()) { "WebSocketSession should not be null right after init" }
.also { session ->
session.incoming.receiveAsFlow().onEach(::processFrame).launchIn(scope)
}
}
return checkNotNull(session) { "WebSocketSession should not be null after init" }
}

private suspend fun processFrame(frame: Frame) {
Expand Down

0 comments on commit c3b62f2

Please sign in to comment.