diff --git a/Sources/Operators/WithLatestFrom.swift b/Sources/Operators/WithLatestFrom.swift index 20c364d..1245b59 100644 --- a/Sources/Operators/WithLatestFrom.swift +++ b/Sources/Operators/WithLatestFrom.swift @@ -12,111 +12,227 @@ import Combine // MARK: - Operator methods @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public extension Publisher { - /// Merges two publishers into a single publisher by combining each value - /// from self with the latest value from the second publisher, if any. - /// - /// - parameter other: A second publisher source. - /// - parameter resultSelector: Function to invoke for each value from the self combined - /// with the latest value from the second source, if any. - /// - /// - returns: A publisher containing the result of combining each value of the self - /// with the latest value from the second publisher, if any, using the - /// specified result selector function. - func withLatestFrom(_ other: Other, - resultSelector: @escaping (Output, Other.Output) -> Result) - -> AnyPublisher - where Other.Failure == Failure { - let upstream = share() - - return other - .map { second in upstream.map { resultSelector($0, second) } } - .switchToLatest() - .zip(upstream) // `zip`ping and discarding `\.1` allows for - // upstream completions to be projected down immediately. - .map(\.0) - .eraseToAnyPublisher() - } + /// Merges two publishers into a single publisher by combining each value + /// from self with the latest value from the second publisher, if any. + /// + /// - parameter other: A second publisher source. + /// - parameter resultSelector: Function to invoke for each value from the self combined + /// with the latest value from the second source, if any. + /// + /// - returns: A publisher containing the result of combining each value of the self + /// with the latest value from the second publisher, if any, using the + /// specified result selector function. + func withLatestFrom(_ other: Other, + resultSelector: @escaping (Output, Other.Output) -> Result) + -> Publishers.WithLatestFrom { + return .init(upstream: self, second: other, resultSelector: resultSelector) + } - /// Merges three publishers into a single publisher by combining each value - /// from self with the latest value from the second and third publisher, if any. - /// - /// - parameter other: A second publisher source. - /// - parameter other1: A third publisher source. - /// - parameter resultSelector: Function to invoke for each value from the self combined - /// with the latest value from the second and third source, if any. - /// - /// - returns: A publisher containing the result of combining each value of the self - /// with the latest value from the second and third publisher, if any, using the - /// specified result selector function. - func withLatestFrom(_ other: Other, - _ other1: Other1, - resultSelector: @escaping (Output, (Other.Output, Other1.Output)) -> Result) - -> AnyPublisher + /// Merges three publishers into a single publisher by combining each value + /// from self with the latest value from the second and third publisher, if any. + /// + /// - parameter other: A second publisher source. + /// - parameter other1: A third publisher source. + /// - parameter resultSelector: Function to invoke for each value from the self combined + /// with the latest value from the second and third source, if any. + /// + /// - returns: A publisher containing the result of combining each value of the self + /// with the latest value from the second and third publisher, if any, using the + /// specified result selector function. + func withLatestFrom(_ other: Other, + _ other1: Other1, + resultSelector: @escaping (Output, (Other.Output, Other1.Output)) -> Result) + -> Publishers.WithLatestFrom, Result> where Other.Failure == Failure, Other1.Failure == Failure { - withLatestFrom(other.combineLatest(other1), resultSelector: resultSelector) - } + let combined = other.combineLatest(other1) + .eraseToAnyPublisher() + return .init(upstream: self, second: combined, resultSelector: resultSelector) + } - /// Merges four publishers into a single publisher by combining each value - /// from self with the latest value from the second, third and fourth publisher, if any. - /// - /// - parameter other: A second publisher source. - /// - parameter other1: A third publisher source. - /// - parameter other2: A fourth publisher source. - /// - parameter resultSelector: Function to invoke for each value from the self combined - /// with the latest value from the second, third and fourth source, if any. - /// - /// - returns: A publisher containing the result of combining each value of the self - /// with the latest value from the second, third and fourth publisher, if any, using the - /// specified result selector function. - func withLatestFrom(_ other: Other, - _ other1: Other1, - _ other2: Other2, - resultSelector: @escaping (Output, (Other.Output, Other1.Output, Other2.Output)) -> Result) - -> AnyPublisher + /// Merges four publishers into a single publisher by combining each value + /// from self with the latest value from the second, third and fourth publisher, if any. + /// + /// - parameter other: A second publisher source. + /// - parameter other1: A third publisher source. + /// - parameter other2: A fourth publisher source. + /// - parameter resultSelector: Function to invoke for each value from the self combined + /// with the latest value from the second, third and fourth source, if any. + /// + /// - returns: A publisher containing the result of combining each value of the self + /// with the latest value from the second, third and fourth publisher, if any, using the + /// specified result selector function. + func withLatestFrom(_ other: Other, + _ other1: Other1, + _ other2: Other2, + resultSelector: @escaping (Output, (Other.Output, Other1.Output, Other2.Output)) -> Result) + -> Publishers.WithLatestFrom, Result> where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure { - withLatestFrom(other.combineLatest(other1, other2), resultSelector: resultSelector) + let combined = other.combineLatest(other1, other2) + .eraseToAnyPublisher() + return .init(upstream: self, second: combined, resultSelector: resultSelector) + } + + /// Upon an emission from self, emit the latest value from the + /// second publisher, if any exists. + /// + /// - parameter other: A second publisher source. + /// + /// - returns: A publisher containing the latest value from the second publisher, if any. + func withLatestFrom(_ other: Other) + -> Publishers.WithLatestFrom { + return .init(upstream: self, second: other) { $1 } + } + + /// Upon an emission from self, emit the latest value from the + /// second and third publisher, if any exists. + /// + /// - parameter other: A second publisher source. + /// - parameter other1: A third publisher source. + /// + /// - returns: A publisher containing the latest value from the second and third publisher, if any. + func withLatestFrom(_ other: Other, + _ other1: Other1) + -> Publishers.WithLatestFrom, (Other.Output, Other1.Output)> + where Other.Failure == Failure, Other1.Failure == Failure { + withLatestFrom(other, other1) { $1 } + } + + /// Upon an emission from self, emit the latest value from the + /// second, third and forth publisher, if any exists. + /// + /// - parameter other: A second publisher source. + /// - parameter other1: A third publisher source. + /// - parameter other2: A forth publisher source. + /// + /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any. + func withLatestFrom(_ other: Other, + _ other1: Other1, + _ other2: Other2) + -> Publishers.WithLatestFrom, (Other.Output, Other1.Output, Other2.Output)> + where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure { + withLatestFrom(other, other1, other2) { $1 } + } +} + +// MARK: - Publisher +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publishers { + struct WithLatestFrom: Publisher where Upstream.Failure == Other.Failure { + public typealias Failure = Upstream.Failure + public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output + + private let upstream: Upstream + private let second: Other + private let resultSelector: ResultSelector + private var latestValue: Other.Output? + + init(upstream: Upstream, + second: Other, + resultSelector: @escaping ResultSelector) { + self.upstream = upstream + self.second = second + self.resultSelector = resultSelector } - /// Upon an emission from self, emit the latest value from the - /// second publisher, if any exists. - /// - /// - parameter other: A second publisher source. - /// - /// - returns: A publisher containing the latest value from the second publisher, if any. - func withLatestFrom(_ other: Other) - -> AnyPublisher - where Other.Failure == Failure { - withLatestFrom(other) { $1 } + public func receive(subscriber: S) where Failure == S.Failure, Output == S.Input { + subscriber.receive(subscription: Subscription(upstream: upstream, + downstream: subscriber, + second: second, + resultSelector: resultSelector)) } + } +} - /// Upon an emission from self, emit the latest value from the - /// second and third publisher, if any exists. - /// - /// - parameter other: A second publisher source. - /// - parameter other1: A third publisher source. - /// - /// - returns: A publisher containing the latest value from the second and third publisher, if any. - func withLatestFrom(_ other: Other, - _ other1: Other1) - -> AnyPublisher<(Other.Output, Other1.Output), Failure> - where Other.Failure == Failure, Other1.Failure == Failure { - withLatestFrom(other, other1) { $1 } +// MARK: - Subscription +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.WithLatestFrom { + class Subscription: Combine.Subscription, CustomStringConvertible where Downstream.Input == Output, Downstream.Failure == Failure { + private let resultSelector: ResultSelector + private var sink: Sink? + + private let upstream: Upstream + private let downstream: Downstream + private let second: Other + + // Secondary (other) publisher + private var latestValue: Other.Output? + private var otherSubscription: Cancellable? + private var preInitialDemand = Subscribers.Demand.none + + init(upstream: Upstream, + downstream: Downstream, + second: Other, + resultSelector: @escaping ResultSelector) { + self.upstream = upstream + self.second = second + self.downstream = downstream + self.resultSelector = resultSelector + + trackLatestFromSecond { [weak self] in + guard let self = self else { return } + self.request(self.preInitialDemand) + self.preInitialDemand = .none + } } - /// Upon an emission from self, emit the latest value from the - /// second, third and forth publisher, if any exists. - /// - /// - parameter other: A second publisher source. - /// - parameter other1: A third publisher source. - /// - parameter other2: A forth publisher source. - /// - /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any. - func withLatestFrom(_ other: Other, - _ other1: Other1, - _ other2: Other2) - -> AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Failure> - where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure { - withLatestFrom(other, other1, other2) { $1 } + func request(_ demand: Subscribers.Demand) { + guard latestValue != nil else { + preInitialDemand += demand + return + } + + self.sink?.demand(demand) + } + + // Create an internal subscription to the `Other` publisher, + // constantly tracking its latest value + private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) { + var gotInitialValue = false + + let subscriber = AnySubscriber( + receiveSubscription: { [weak self] subscription in + self?.otherSubscription = subscription + subscription.request(.unlimited) + }, + receiveValue: { [weak self] value in + guard let self = self else { return .none } + self.latestValue = value + + if !gotInitialValue { + // When getting initial value, start pulling values + // from upstream in the main sink + self.sink = Sink(upstream: self.upstream, + downstream: self.downstream, + transformOutput: { [weak self] value in + guard let self = self, + let other = self.latestValue else { return nil } + + return self.resultSelector(value, other) + }, + transformFailure: { $0 }) + + // Signal initial value to start fulfilling downstream demand + gotInitialValue = true + onInitialValue() + } + + return .unlimited + }, + receiveCompletion: nil) + + self.second.subscribe(subscriber) + } + + var description: String { + return "WithLatestFrom.Subscription<\(Output.self), \(Failure.self)>" + } + + func cancel() { + sink = nil + otherSubscription?.cancel() } + } } #endif diff --git a/Tests/WithLatestFromTests.swift b/Tests/WithLatestFromTests.swift index bff3986..47d6065 100644 --- a/Tests/WithLatestFromTests.swift +++ b/Tests/WithLatestFromTests.swift @@ -386,56 +386,5 @@ class WithLatestFromTests: XCTestCase { subject1.send(completion: .finished) XCTAssertTrue(completed) } - - func testWithLatestFromCompletion() { - let subject1 = PassthroughSubject() - let subject2 = PassthroughSubject() - var results = [String]() - var completed = false - - subscription = subject1 - .withLatestFrom(subject2) - .sink(receiveCompletion: { _ in completed = true }, - receiveValue: { results.append($0) }) - - subject1.send(completion: .finished) - XCTAssertTrue(completed) - XCTAssertTrue(results.isEmpty) - } - - func testWithLatestFrom2Completion() { - let subject1 = PassthroughSubject() - let subject2 = PassthroughSubject() - let subject3 = PassthroughSubject() - var results = [(String, String)]() - var completed = false - - subscription = subject1 - .withLatestFrom(subject2, subject3) - .sink(receiveCompletion: { _ in completed = true }, - receiveValue: { results.append($0) }) - - subject1.send(completion: .finished) - XCTAssertTrue(completed) - XCTAssertTrue(results.isEmpty) - } - - func testWithLatestFrom3Completion() { - let subject1 = PassthroughSubject() - let subject2 = PassthroughSubject() - let subject3 = PassthroughSubject() - let subject4 = PassthroughSubject() - var results = [(String, String, String)]() - var completed = false - - subscription = subject1 - .withLatestFrom(subject2, subject3, subject4) - .sink(receiveCompletion: { _ in completed = true }, - receiveValue: { results.append($0) }) - - subject1.send(completion: .finished) - XCTAssertTrue(completed) - XCTAssertTrue(results.isEmpty) - } } #endif