Skip to content

Commit

Permalink
Fix race conditions in CurrentValueRelay (#3447)
Browse files Browse the repository at this point in the history
* Test

tweak test

tweaks

* Slow fix

* Fix test compilation

* nonrecursive lock

* back to os_lock

* undo renaming

* visibility

* Feedback

---------

Co-authored-by: Stephen Celis <stephen@stephencelis.com>
  • Loading branch information
kabiroberai and stephencelis authored Oct 22, 2024
1 parent c134e5a commit 5614943
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
30 changes: 19 additions & 11 deletions Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,27 @@ extension CurrentValueRelay {
}

func receive(_ value: Output) {
guard let downstream else { return }
self.lock.lock()

guard let downstream else {
self.lock.unlock()
return
}

switch self.demand {
case .unlimited:
self.lock.unlock()
// NB: Adding to unlimited demand has no effect and can be ignored.
_ = downstream.receive(value)

case .none:
self.lock.sync {
self.receivedLastValue = false
}
self.receivedLastValue = false
self.lock.unlock()

default:
self.lock.sync {
self.receivedLastValue = true
self.demand -= 1
}
self.receivedLastValue = true
self.demand -= 1
self.lock.unlock()
let moreDemand = downstream.receive(value)
self.lock.sync {
self.demand += moreDemand
Expand All @@ -106,14 +110,18 @@ extension CurrentValueRelay {
func request(_ demand: Subscribers.Demand) {
precondition(demand > 0, "Demand must be greater than zero")

guard let downstream else { return }

self.lock.lock()

guard let downstream else {
self.lock.unlock()
return
}

self.demand += demand

guard
!self.receivedLastValue,
let value = self.upstream?.currentValue
let value = self.upstream?.value
else {
self.lock.unlock()
return
Expand Down
30 changes: 30 additions & 0 deletions Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,35 @@

_ = cancellable
}

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
func testConcurrentSendAndReceive() async {
nonisolated(unsafe) let subject = CurrentValueRelay(0)
let values = LockIsolated<Set<Int>>([])
let cancellable = subject.sink { (value: Int) in
values.withValue {
_ = $0.insert(value)
}
}

let receives = Task.detached { @Sendable in
for await _ in subject.values {}
}

await withTaskGroup(of: Void.self) { group in
for index in 1...1_000 {
group.addTask { @Sendable in
subject.send(index)
}
}
}

receives.cancel()
_ = await receives.value

XCTAssertEqual(values.value, Set(Array(0...1_000)))

_ = cancellable
}
}
#endif

0 comments on commit 5614943

Please sign in to comment.