From e4c71cd0364532d34b27e76f4ab7229abaaaefb4 Mon Sep 17 00:00:00 2001 From: Brian Floersch Date: Fri, 18 Oct 2024 08:36:54 -0400 Subject: [PATCH] Add syncSend and syncReceive (#19) --- Sources/AsyncChannels/Channel.swift | 38 ++++++++++++++++++- .../AsyncChannelTests.swift | 21 ++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncChannels/Channel.swift b/Sources/AsyncChannels/Channel.swift index 77f9f10..5148e73 100644 --- a/Sources/AsyncChannels/Channel.swift +++ b/Sources/AsyncChannels/Channel.swift @@ -101,6 +101,8 @@ public final class Channel: @unchecked Sendable { return false } + /// Sends data on the channel. This function will suspend until a reciever is ready or buffer space is avalible. + /// - Parameter value: The data to send. @inline(__always) public func send(_ value: T) async { mutex.lock() @@ -117,6 +119,19 @@ public final class Channel: @unchecked Sendable { } } + /// Sends data synchonosly. Returns true if the data was sent. + /// A fatal error will be triggered if you attpend to send on a closed channel. + /// - Parameter value: The input data. + @inline(__always) + public func syncSend(_ value: T) -> Bool { + mutex.lock() + if nonBlockingSend(value) { + return true + } + mutex.unlock() + return false + } + @inline(__always) private func nonBlockingReceive() -> T? { if buffer.isEmpty { @@ -142,7 +157,10 @@ public final class Channel: @unchecked Sendable { } return val } - + + /// Receive data from the channel. This function will suspend until a sender is ready or there is data in the buffer. + /// This functionw will return `nil` when the channel is closed after all buffered data is read. + /// - Returns: data or nil. @inline(__always) public func receive() async -> T? { mutex.lock() @@ -164,13 +182,29 @@ public final class Channel: @unchecked Sendable { } } + + /// Receive data synchronosly. Returns nil if there is no data or the channel is closed. + /// This function will never block or suspend. + /// - Returns: The data or nil + @inline(__always) + public func syncReceive() -> T? { + mutex.lock() + if let val = nonBlockingReceive() { + return val + } + mutex.unlock() + return nil + } + + + /// Closes the channel. A channel cannot be reopened. + /// Once a channel is closed, no more data can be writeen. The remaining data can be read until the buffer is empty. public func close() { mutex.lock() defer { mutex.unlock() } closed = true selectWaiter?.signal() - while let recvW = recvQueue.pop() { recvW.resume(returning: nil) } diff --git a/Tests/AsyncChannelsTests/AsyncChannelTests.swift b/Tests/AsyncChannelsTests/AsyncChannelTests.swift index 815d38e..bc9cbd7 100644 --- a/Tests/AsyncChannelsTests/AsyncChannelTests.swift +++ b/Tests/AsyncChannelsTests/AsyncChannelTests.swift @@ -806,4 +806,25 @@ final class AsyncTest: XCTestCase { await data <- "bar" await signal <- .done } + + func testSyncSendRecieve() { + let data = Channel(capacity: 3) + + XCTAssertTrue(data.syncSend("1")) + XCTAssertTrue(data.syncSend("2")) + XCTAssertTrue(data.syncSend("3")) + XCTAssertFalse(data.syncSend("4")) + + XCTAssertEqual(data.syncReceive(), "1") + XCTAssertEqual(data.syncReceive(), "2") + XCTAssertEqual(data.syncReceive(), "3") + XCTAssertNil(data.syncReceive()) + XCTAssertNil(data.syncReceive()) + + XCTAssertTrue(data.syncSend("4")) + XCTAssertEqual(data.syncReceive(), "4") + + data.close() + XCTAssertNil(data.syncReceive()) + } }