Skip to content

Commit

Permalink
Publish status for RPC subscription services.
Browse files Browse the repository at this point in the history
  • Loading branch information
kukabi committed Jun 24, 2022
1 parent a7ef46e commit afc1ee4
Showing 1 changed file with 24 additions and 24 deletions.
48 changes: 24 additions & 24 deletions Sources/SubVTData/Service/WSPRC/RPCSubscriptionService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@ public enum RPCError: Error {
case error(error: Error?)
}

public enum RPCSubscriptionServiceStatus {
case connected
case disconnected(code: UInt16, reason: String)
case error(error: Error?)
case idle
case subscribed(subscriptionId: UInt64)
case unsubscribed
}

/**
Base class for RPC pub/sub services.
*/
public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
private enum State {
case connected
case disconnected(code: UInt16, reason: String)
case error(error: Error?)
case idle
case subscribed(subscriptionId: UInt64)
case unsubscribed
}

public class RPCSubscriptionService<T: Codable>: ObservableObject, WebSocketDelegate {
private let host: String
private let port: UInt16
private let subscribeMethod: String
private let unsubscribeMethod: String
private let socket: WebSocket

private var state = State.idle
@Published public private(set) var status = RPCSubscriptionServiceStatus.idle
private var rpcId: UInt64 = 0
private var subscriptionParameter: String? = nil

Expand Down Expand Up @@ -70,13 +70,13 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
case .binary(_):
break
case .cancelled:
state = .idle
status = .idle
eventBus.send(completion: .finished)
case .connected(_):
state = .connected
status = .connected
sendSubscriptionRequest()
case .disconnected(let reason, let code):
switch state {
switch status {
case .error(let error):
eventBus.send(
completion: .failure(
Expand All @@ -99,12 +99,12 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
default:
break
}
state = .disconnected(
status = .disconnected(
code: code,
reason: reason
)
case .error(let error):
state = .error(error: error)
status = .error(error: error)
eventBus.send(
completion: .failure(
RPCError.error(
Expand Down Expand Up @@ -132,7 +132,7 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {

public func subscribe(parameter: String? = nil) -> AnyPublisher<RPCEvent<T>, RPCError> {
subscriptionParameter = parameter
switch state {
switch status {
case .disconnected(_, _):
fallthrough
case .idle:
Expand All @@ -144,7 +144,7 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
}

public func unsubscribe() {
guard case .subscribed(let subscriptionId) = state else {
guard case .subscribed(let subscriptionId) = status else {
return
}
let request = RPCRequest<UInt64>(
Expand Down Expand Up @@ -183,13 +183,13 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
private func processText(text: String) {
if let data = text.data(using: .utf8) {
do {
switch state {
switch status {
case .connected:
let response = try jsonDecoder.decode(
RPCSubscribeResponse.self,
from: data
)
state = .subscribed(
status = .subscribed(
subscriptionId: response.subscriptionId
)
eventBus.send(
Expand All @@ -215,7 +215,7 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
from: data
)
eventBus.send(RPCEvent.unsubscribed)
state = .unsubscribed
status = .unsubscribed
socket.disconnect()
}
default:
Expand All @@ -235,7 +235,7 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
)
)
)
state = .error(error: nil)
status = .error(error: nil)
} catch {
eventBus.send(
completion: .failure(
Expand All @@ -244,7 +244,7 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
)
)
)
state = .error(error: error)
status = .error(error: error)
}
socket.disconnect()
}
Expand All @@ -254,7 +254,7 @@ public class RPCSubscriptionService<T: Codable>: WebSocketDelegate {
RPCError.dataReadError
)
)
state = .error(error: nil)
status = .error(error: nil)
socket.disconnect()
}
}
Expand Down

0 comments on commit afc1ee4

Please sign in to comment.