Skip to content

Commit

Permalink
Communication: Streamline socket connection handling (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
anian03 authored Nov 15, 2024
1 parent 561bee3 commit 3d22541
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 61 deletions.
105 changes: 105 additions & 0 deletions ArtemisKit/Sources/Messages/Networking/SocketConnectionHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//
// SocketConnectionHandler.swift
// ArtemisKit
//
// Created by Anian Schleyer on 15.11.24.
//

import APIClient
import Combine
import Foundation

class SocketConnectionHandler {
private let stompClient = ArtemisStompClient.shared
let messagePublisher = PassthroughSubject<MessageWebsocketDTO, Never>()
let conversationPublisher = PassthroughSubject<ConversationWebsocketDTO, Never>()

private var channelSubscription: Task<(), Never>?
private var conversationSubscription: Task<(), Never>?
private var membershipSubscription: Task<(), Never>?

static let shared = SocketConnectionHandler()

private init() {}

func cancelSubscriptions() {
channelSubscription?.cancel()
conversationSubscription?.cancel()
membershipSubscription?.cancel()

channelSubscription = nil
conversationSubscription = nil
membershipSubscription = nil
}

func subscribeToChannelNotifications(courseId: Int) {
guard channelSubscription == nil else {
return
}

let topic = WebSocketTopic.makeChannelNotifications(courseId: courseId)

channelSubscription = Task { [weak self] in
guard let self else {
return
}

let stream = stompClient.subscribe(to: topic)

for await message in stream {
guard let messageWebsocketDTO = JSONDecoder.getTypeFromSocketMessage(type: MessageWebsocketDTO.self, message: message) else {
continue
}
print("Stomp channel")

messagePublisher.send(messageWebsocketDTO)
}
}
}

func subscribeToConversationNotifications(userId: Int64) {
guard conversationSubscription == nil else {
return
}

let topic = WebSocketTopic.makeConversationNotifications(userId: userId)

conversationSubscription = Task { [weak self] in
guard let self else {
return
}

let stream = stompClient.subscribe(to: topic)

for await message in stream {
guard let messageWebsocketDTO = JSONDecoder.getTypeFromSocketMessage(type: MessageWebsocketDTO.self, message: message) else {
continue
}
print("Stomp convo")
messagePublisher.send(messageWebsocketDTO)
}
}
}

func subscribeToMembershipNotifications(courseId: Int, userId: Int64) {
guard membershipSubscription == nil else {
return
}

let topic = WebSocketTopic.makeConversationMembershipNotifications(courseId: courseId, userId: userId)
membershipSubscription = Task { [weak self] in
guard let self else {
return
}

let stream = stompClient.subscribe(to: topic)

for await message in stream {
guard let conversationWebsocketDTO = JSONDecoder.getTypeFromSocketMessage(type: ConversationWebsocketDTO.self, message: message) else {
continue
}
conversationPublisher.send(conversationWebsocketDTO)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import APIClient
import Foundation
import Common
import Combine
import Extensions
import SharedModels
import SharedServices
Expand Down Expand Up @@ -45,27 +46,24 @@ class ConversationViewModel: BaseViewModel {
}

var shouldScrollToId: String?
var subscription: Task<(), Never>?
var subscription: AnyCancellable?

fileprivate let messagesRepository: MessagesRepository
private let messagesService: MessagesService
private let stompClient: ArtemisStompClient
private let userSession: UserSession

init(
course: Course,
conversation: Conversation,
messagesRepository: MessagesRepository? = nil,
messagesService: MessagesService = MessagesServiceFactory.shared,
stompClient: ArtemisStompClient = .shared,
userSession: UserSession = UserSessionFactory.shared
) {
self.course = course
self.conversation = conversation

self.messagesRepository = messagesRepository ?? .shared
self.messagesService = messagesService
self.stompClient = stompClient
self.userSession = userSession

super.init()
Expand Down Expand Up @@ -320,49 +318,28 @@ private extension ConversationViewModel {
// MARK: Initializer

func subscribeToConversationTopic() {
let topic: String
let socketConnection = SocketConnectionHandler.shared
subscription = socketConnection
.messagePublisher
.sink { [weak self] messageWebsocketDTO in
guard let self else {
return
}
onMessageReceived(messageWebsocketDTO: messageWebsocketDTO)
}

if conversation.baseConversation.type == .channel,
let channel = conversation.baseConversation as? Channel,
channel.isCourseWide == true {
topic = WebSocketTopic.makeChannelNotifications(courseId: course.id)
socketConnection.subscribeToChannelNotifications(courseId: course.id)
} else if let id = userSession.user?.id {
topic = WebSocketTopic.makeConversationNotifications(userId: id)
} else {
return
socketConnection.subscribeToConversationNotifications(userId: id)
}

NotificationCenter.default.addObserver(self,
selector: #selector(onOwnMessageSent(notification:)),
name: .newMessageSent,
object: nil)

if stompClient.didSubscribeTopic(topic) {
/// These web socket topics are the same across multiple channels.
/// We might need to wait until a previously open conversation has unsubscribed
/// before we can subscribe again
Timer.scheduledTimer(withTimeInterval: 5, repeats: false) { [weak self] _ in
DispatchQueue.main.async { [weak self] in
self?.subscribeToConversationTopic()
}
}
return
}
subscription = Task { [weak self] in
guard let stream = self?.stompClient.subscribe(to: topic) else {
return
}

for await message in stream {
guard let messageWebsocketDTO = JSONDecoder.getTypeFromSocketMessage(type: MessageWebsocketDTO.self, message: message) else {
continue
}

guard let self else {
return
}
onMessageReceived(messageWebsocketDTO: messageWebsocketDTO)
}
}
}

func fetchOfflineMessages() {
Expand All @@ -386,15 +363,17 @@ private extension ConversationViewModel {
guard messageWebsocketDTO.post.conversation?.id == conversation.id else {
return
}
switch messageWebsocketDTO.action {
case .create:
handle(new: messageWebsocketDTO.post)
case .update:
handle(update: messageWebsocketDTO.post)
case .delete:
handle(delete: messageWebsocketDTO.post)
default:
return
DispatchQueue.main.async {
switch messageWebsocketDTO.action {
case .create:
self.handle(new: messageWebsocketDTO.post)
case .update:
self.handle(update: messageWebsocketDTO.post)
case .delete:
self.handle(delete: messageWebsocketDTO.post)
default:
return
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//

import APIClient
import Combine
import Common
import DesignLibrary
import Foundation
Expand Down Expand Up @@ -49,20 +50,19 @@ class MessagesAvailableViewModel: BaseViewModel {
let courseId: Int

private let messagesService: MessagesService
private let stompClient: ArtemisStompClient
private let userSession: UserSession

private var subscription: AnyCancellable?

init(
course: Course,
messagesService: MessagesService = MessagesServiceFactory.shared,
stompClient: ArtemisStompClient = ArtemisStompClient.shared,
userSession: UserSession = UserSessionFactory.shared
) {
self.course = course
self.courseId = course.id

self.messagesService = messagesService
self.stompClient = stompClient
self.userSession = userSession

super.init()
Expand All @@ -73,21 +73,29 @@ class MessagesAvailableViewModel: BaseViewModel {
object: nil)
}

deinit {
SocketConnectionHandler.shared.cancelSubscriptions()
subscription?.cancel()
}

func subscribeToConversationMembershipTopic() async {
guard let userId = userSession.user?.id else {
log.debug("User could not be found. Subscribe to Conversation not possible")
return
}

let topic = WebSocketTopic.makeConversationMembershipNotifications(courseId: courseId, userId: userId)
let stream = stompClient.subscribe(to: topic)
let socketConnection = SocketConnectionHandler.shared

for await message in stream {
guard let conversationWebsocketDTO = JSONDecoder.getTypeFromSocketMessage(type: ConversationWebsocketDTO.self, message: message) else {
continue
subscription = socketConnection
.conversationPublisher
.sink { [weak self] conversationWebsocketDTO in
guard let self else {
return
}
onConversationMembershipMessageReceived(conversationWebsocketDTO: conversationWebsocketDTO)
}
onConversationMembershipMessageReceived(conversationWebsocketDTO: conversationWebsocketDTO)
}

socketConnection.subscribeToMembershipNotifications(courseId: courseId, userId: userId)
}

func loadConversations() async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,9 @@ public struct ConversationView: View {
await viewModel.loadMessages()
}
.onDisappear {
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
if navigationController.selectedCourse == nil {
// only cancel task if we navigate back
viewModel.subscription?.cancel()
}
if navigationController.courseTab != .communication && navigationController.tabPath.isEmpty {
// only cancel task if we leave communication
SocketConnectionHandler.shared.cancelSubscriptions()
}
viewModel.saveContext()
}
Expand Down

0 comments on commit 3d22541

Please sign in to comment.