Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operators: add compactScan() and tryCompactScan() #100

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CombineExt.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/* Begin PBXBuildFile section */
1970A8AA25246FBD00799AB6 /* FilterMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8A925246FBD00799AB6 /* FilterMany.swift */; };
1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8B32524730400799AB6 /* FilterManyTests.swift */; };
1A2FF9A826E3D9770098C2D1 /* CompactScan.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */; };
1A2FF9AB26E3D9FC0098C2D1 /* CompactScanTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */; };
BF330EF624F1FFFE001281FC /* CombineSchedulers in Frameworks */ = {isa = PBXBuildFile; productRef = BF330EF524F1FFFE001281FC /* CombineSchedulers */; };
BF330EF924F20032001281FC /* Timer.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EF824F20032001281FC /* Timer.swift */; };
BF330EFB24F20080001281FC /* Lock.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EFA24F20080001281FC /* Lock.swift */; };
Expand Down Expand Up @@ -108,6 +110,8 @@
/* Begin PBXFileReference section */
1970A8A925246FBD00799AB6 /* FilterMany.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterMany.swift; sourceTree = "<group>"; };
1970A8B32524730400799AB6 /* FilterManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterManyTests.swift; sourceTree = "<group>"; };
1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactScan.swift; sourceTree = "<group>"; };
1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactScanTests.swift; sourceTree = "<group>"; };
BF330EF824F20032001281FC /* Timer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Timer.swift; sourceTree = "<group>"; };
BF330EFA24F20080001281FC /* Lock.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Lock.swift; sourceTree = "<group>"; };
BF3D3B5C253B83F300D830ED /* IgnoreFailure.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreFailure.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -242,6 +246,7 @@
OBJ_18 /* AssignOwnership.swift */,
OBJ_19 /* AssignToMany.swift */,
OBJ_20 /* CombineLatestMany.swift */,
1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */,
OBJ_21 /* Create.swift */,
OBJ_22 /* Dematerialize.swift */,
OBJ_23 /* FlatMapLatest.swift */,
Expand Down Expand Up @@ -290,6 +295,7 @@
OBJ_42 /* AssignOwnershipTests.swift */,
OBJ_43 /* AssignToManyTests.swift */,
OBJ_44 /* CombineLatestManyTests.swift */,
1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */,
OBJ_45 /* CreateTests.swift */,
OBJ_46 /* CurrentValueRelayTests.swift */,
OBJ_47 /* DematerializeTests.swift */,
Expand Down Expand Up @@ -550,6 +556,7 @@
OBJ_123 /* AssignOwnershipTests.swift in Sources */,
OBJ_124 /* AssignToManyTests.swift in Sources */,
1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */,
1A2FF9AB26E3D9FC0098C2D1 /* CompactScanTests.swift in Sources */,
D836234A24EA9888002353AC /* MergeManyTests.swift in Sources */,
OBJ_125 /* CombineLatestManyTests.swift in Sources */,
BF3D3B67253B88E500D830ED /* IgnoreFailureTests.swift in Sources */,
Expand Down Expand Up @@ -581,6 +588,7 @@
buildActionMask = 0;
files = (
OBJ_79 /* DemandBuffer.swift in Sources */,
1A2FF9A826E3D9770098C2D1 /* CompactScan.swift in Sources */,
OBJ_80 /* Sink.swift in Sources */,
OBJ_81 /* Optional.swift in Sources */,
C387777C24E6BBE900FAD2D8 /* Nwise.swift in Sources */,
Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ All operators, utilities and helpers respect Combine's publisher contract, inclu
* [ignoreFailure](#ignoreFailure)
* [mapToResult](#mapToResult)
* [flatMapBatches(of:)](#flatMapBatchesof)
* [compactScan()](#compactScan)

### Publishers
* [AnyPublisher.create](#AnypublisherCreate)
Expand Down Expand Up @@ -755,6 +756,30 @@ subscription = ints
.finished
```

------

### compactScan()

Transforms elements from the upstream publisher by providing the current element to a closure along with the last value returned by the closure. If the closure returns a nil value, then the accumulator won't change until the next non-nil upstream publisher value.

```swift
let cancellable = (0...5)
.publisher
.compactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.sink { print ("\($0)") }
```

#### Output

```none
0 2 6
```

The `tryCompactScan()` version behaves the same but with a throwing closure.

## Publishers

This section outlines some of the custom Combine publishers CombineExt provides
Expand Down
92 changes: 92 additions & 0 deletions Sources/Operators/CompactScan.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// CompactScan.swift
// CombineExt
//
// Created by Thibault Wittemberg on 04/09/2021.
// Copyright © 2021 Combine Community. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {
/// Transforms elements from the upstream publisher by providing the current
/// element to a closure along with the last value returned by the closure.
///
/// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value.
///
/// Use ``Publisher/compactScan(_:_:)`` to accumulate all previously-published values into a single
/// value, which you then combine with each newly-published value.
///
/// The following example logs a running total of all values received
/// from the sequence publisher.
///
/// let range = (0...5)
/// let cancellable = range.publisher
/// .compactScan(0) {
/// guard $1.isMultiple(of: 2) else { return nil }
/// return $0 + $1
/// }
/// .sink { print ("\($0)", terminator: " ") }
/// // Prints: "0 2 6 ".
///
/// - Parameters:
/// - initialResult: The previous result returned by the `nextPartialResult` closure.
/// - nextPartialResult: A closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher.
/// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher.
func compactScan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) -> T?) -> AnyPublisher<T, Failure> {
self.scan((initialResult, initialResult)) { accumulator, value -> (T, T?) in
let lastNonNilAccumulator = accumulator.0
let newAccumulator = nextPartialResult(lastNonNilAccumulator, value)
return (newAccumulator ?? lastNonNilAccumulator, newAccumulator)
}
.compactMap { $0.1 }
.eraseToAnyPublisher()
}

/// Transforms elements from the upstream publisher by providing the current element to an error-throwing closure along with the last value returned by the closure.
///
/// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value.
///
/// Use ``Publisher/tryCompactScan(_:_:)`` to accumulate all previously-published values into a single value, which you then combine with each newly-published value.
/// If your accumulator closure throws an error, the publisher terminates with the error.
///
/// In the example below, ``Publisher/tryCompactScan(_:_:)`` calls a division function on elements of a collection publisher. The resulting publisher publishes each result until the function encounters a `DivisionByZeroError`, which terminates the publisher.
///
/// struct DivisionByZeroError: Error {}
///
/// /// A function that throws a DivisionByZeroError if `current` provided by the TryScan publisher is zero.
/// func myThrowingFunction(_ lastValue: Int, _ currentValue: Int) throws -> Int? {
/// guard currentValue.isMultiple(of: 2) else { return nil }
/// guard currentValue != 0 else { throw DivisionByZeroError() }
/// return lastValue / currentValue
/// }
///
/// let numbers = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9]
/// let cancellable = numbers.publisher
/// .tryCompactScan(10) { try myThrowingFunction($0, $1) }
/// .sink(
/// receiveCompletion: { print ("\($0)") },
/// receiveValue: { print ("\($0)", terminator: " ") }
/// )
///
/// // Prints: "6 2 failure(DivisionByZeroError())".
///
/// If the closure throws an error, the publisher fails with the error.
///
/// - Parameters:
/// - initialResult: The previous result returned by the `nextPartialResult` closure.
/// - nextPartialResult: An error-throwing closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher.
/// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher.
func tryCompactScan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) throws -> T?) -> AnyPublisher<T, Error> {
self.tryScan((initialResult, initialResult)) { accumulator, value -> (T, T?) in
let lastNonNilAccumulator = accumulator.0
let newAccumulator = try nextPartialResult(lastNonNilAccumulator, value)
return (newAccumulator ?? lastNonNilAccumulator, newAccumulator)
}
.compactMap { $0.1 }
.eraseToAnyPublisher()
}
}
#endif
98 changes: 98 additions & 0 deletions Tests/CompactScanTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//
// CompactScanTests.swift
// CombineExtTests
//
// Created by Thibault Wittemberg on 04/09/2021.
// Copyright © 2021 Combine Community. All rights reserved.
//

#if !os(watchOS)
import XCTest
import Combine
import CombineExt

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final class CompactScanTests: XCTestCase {
func testCompactScan_drops_nil_values() {
let expectedValues = [0, 2, 6]
var receivedValues = [Int]()

// Given: a stream of integers from 0 to 5
let sut = (0...5).publisher

// When: using a compactScan operator using a closure that returns nil when the value from the upstream publisher is odd
let cancellable = sut
.compactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.assertNoFailure()
.sink { receivedValues.append($0) }

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

cancellable.cancel()
}

func testTryCompactScan_drops_nil_values() {
let expectedValues = [0, 2, 6]
var receivedValues = [Int]()

// Given: a stream of integers from 0 to 5
let sut = (0...5).publisher

// When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd
let cancellable = sut
.tryCompactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.assertNoFailure()
.sink { receivedValues.append($0) }

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

cancellable.cancel()
}

func testTryCompactScan_drops_nil_values_and_throws_error() {
struct DivisionByZeroError: Error, Equatable {}

let expectedValues = [6, 2]
var receivedValues = [Int]()

let expectedError = DivisionByZeroError()
var receivedCompletion: Subscribers.Completion<Error>?

// Given: a sequence a integers containing a 0
let sut = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9].publisher

// When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd
// and throws when the value is 0
let cancellable = sut
.tryCompactScan(10) {
guard $1.isMultiple(of: 2) else { return nil }
guard $1 != 0 else { throw expectedError }
return ($0 + $1) / $1
}
.sink {
receivedCompletion = $0
} receiveValue: {
receivedValues.append($0)
}

cancellable.cancel()

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

// Then: the thrown error provoqued a failure
switch receivedCompletion {
case let .failure(receivedError): XCTAssertEqual(receivedError as? DivisionByZeroError, expectedError)
default: XCTFail()
}
}
}
#endif