From d81c6fcbd2a6f6631562e26a1f9025cf28e0fedb Mon Sep 17 00:00:00 2001 From: Brian Floersch Date: Wed, 17 Jul 2024 22:22:42 -0400 Subject: [PATCH] Update readme and add example tests --- README.md | 68 +++++++++++++++++++ .../AsyncChannelTests.swift | 61 +++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/README.md b/README.md index 6de983d..44abd12 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,74 @@ for _ in (0..<20) { } ``` +## Wait Group + +This library also includes a `WaitGroup` implementation. Wait groups are useful when you want to wait for multiple tasks to finish. + +### Example + +```swift +let wg = WaitGroup() +let signal = Channel() +let done = Channel() + +// Task that drains the signal channel +Task { + for await _ in signal { } + await done <- true +} + +// 100 workers that write to the signal channel +for _ in 0..<100 { + await wg.add(1) + Task { + await signal <- true + await wg.done() + } +} +// When all workers are done - signal is drained, so wg will be done. +await wg.wait() + +// Closing the signal channel means it's empty, so done is signaled. +signal.close() +await <-done +``` + +## Advanced Usage +This library also includes some extra features that are made possible by the flexibility of Swift's `resultBuilder`. + +Multiplexing `n:1` channels using select `any` +```swift +let channels = (0..<100).map { _ in Channel() } +let collected = Channel() + +// 100 tasks writing to 100 channels +for c in channels { + Task { + await c <- true + } +} + +// 1 task recieving from 100 channels and writing the results to 1 channel. +Task { + for _ in 0..<100 { + await select { + any(channels) { channel in + receive(channel) { val in + await collected <- val! + } + } + } + } + collected.close() +} + +var sum = 0 +for await _ in collected { + sum += 1 +} +``` + ## Code Samples See the [Examples](/Examples/) folder for real world usage. diff --git a/Tests/AsyncChannelsTests/AsyncChannelTests.swift b/Tests/AsyncChannelsTests/AsyncChannelTests.swift index 415f703..525455f 100644 --- a/Tests/AsyncChannelsTests/AsyncChannelTests.swift +++ b/Tests/AsyncChannelsTests/AsyncChannelTests.swift @@ -708,4 +708,65 @@ final class AsyncTest: XCTestCase { XCTAssertEqual(sum, 100) } + + func testWaitGroup() async { + + let wg = WaitGroup() + let signal = Channel() + let done = Channel() + + // Task that drains the signal channel + Task { + for await _ in signal { } + await done <- true + } + + // 100 workers that write to the signal channel + for _ in 0..<100 { + await wg.add(1) + Task { + await signal <- true + await wg.done() + } + } + // When all workers are done - signal is drained, so wg will be done. + await wg.wait() + + // Closing the signal channel means it's empty, so done is signaled. + signal.close() + await <-done + } + + func testMultiplex() async { + let channels = (0..<100).map { _ in Channel() } + let collected = Channel() + + // 100 tasks writing to 100 channels + for c in channels { + Task { + await c <- true + } + } + + // 1 task recieving from 100 channels and writing the results to 1 channel. + Task { + for _ in 0..<100 { + await select { + any(channels) { channel in + receive(channel) { val in + await collected <- val! + } + } + } + } + collected.close() + } + + var sum = 0 + for await _ in collected { + sum += 1 + } + + XCTAssertEqual(100, sum) + } }