From 0650ff6cdc4c4cbb99457a5cdbbed25cd7fadc71 Mon Sep 17 00:00:00 2001 From: Thomas Mellenthin Date: Wed, 7 Sep 2022 12:00:18 +0200 Subject: [PATCH] Amb: cancel the publisher that looses the race immediately --- Sources/Operators/Amb.swift | 2 ++ Tests/AmbTests.swift | 52 ++++++++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/Sources/Operators/Amb.swift b/Sources/Operators/Amb.swift index 5fe34cc..71e692d 100644 --- a/Sources/Operators/Amb.swift +++ b/Sources/Operators/Amb.swift @@ -103,8 +103,10 @@ private extension Publishers.Amb { guard let decision = decision else { return } switch decision { case .first: + secondSink?.cancelUpstream() secondSink = nil case .second: + firstSink?.cancelUpstream() firstSink = nil } diff --git a/Tests/AmbTests.swift b/Tests/AmbTests.swift index 917cf54..e0519fb 100644 --- a/Tests/AmbTests.swift +++ b/Tests/AmbTests.swift @@ -48,34 +48,62 @@ class AmbTests: XCTestCase { } func testAmbCancelPreSubscription() { - enum CancelError: Swift.Error { - case cancelled - } - var ambPublisher: AnyCancellable? + let ambPublisher: AnyCancellable? - var firstCompletion: Subscribers.Completion? - let subject1 = PassthroughSubject() + var subject1Cancelled = expectation(description: "first publisher cancelled") + let subject1 = PassthroughSubject() let subject1Publisher = subject1 .handleEvents(receiveCancel: { - firstCompletion = .failure(CancelError.cancelled) + subject1Cancelled.fulfill() }) .eraseToAnyPublisher() - var secondCompletion: Subscribers.Completion? - let subject2 = PassthroughSubject() + var subject2Cancelled = expectation(description: "second publisher cancelled") + let subject2 = PassthroughSubject() let subject2Publisher = subject2 .handleEvents(receiveCancel: { - secondCompletion = .failure(CancelError.cancelled) + subject2Cancelled.fulfill() }) .eraseToAnyPublisher() ambPublisher = Publishers.Amb(first: subject1Publisher, second: subject2Publisher) .sink(receiveCompletion: { _ in }, receiveValue: { _ in }) + + // cancelling amb should cancel the inner publishers ambPublisher?.cancel() - XCTAssertEqual(firstCompletion, .failure(CancelError.cancelled)) - XCTAssertEqual(secondCompletion, .failure(CancelError.cancelled)) + waitForExpectations(timeout: 0.01) + } + + func testAmbCancelPostSubscription() { + let subject1 = PassthroughSubject() + var subject1cancelCounter = 0 + let subject1Publisher = subject1 + .handleEvents(receiveCancel: { + subject1cancelCounter += 1 + }) + .eraseToAnyPublisher() + + let subject2 = PassthroughSubject() + var subject2cancelCounter = 0 + let subject2Publisher = subject2 + .handleEvents(receiveCancel: { + subject2cancelCounter += 1 + }) + .eraseToAnyPublisher() + + Publishers.Amb(first: subject1Publisher, second: subject2Publisher) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + // subject1 wins the race, so 2 has to be cancelled + subject1.send(1) + + // At dealloc both publishes are cancelled, so we cannot use expectations here and count the cancel events instead + XCTAssertEqual(subject1cancelCounter, 0) + XCTAssertEqual(subject2cancelCounter, 1) } func testAmbLimitedPreDemand() {