diff --git a/spec/asynciterable-operators/finalize-spec.ts b/spec/asynciterable-operators/finalize-spec.ts index a136d7a3..9f4ab48f 100644 --- a/spec/asynciterable-operators/finalize-spec.ts +++ b/spec/asynciterable-operators/finalize-spec.ts @@ -1,6 +1,6 @@ import { hasNext, hasErr, noNext } from '../asynciterablehelpers.js'; import { range, throwError } from 'ix/asynciterable/index.js'; -import { flatMap, finalize, tap } from 'ix/asynciterable/operators/index.js'; +import { finalize, tap, concatMap } from 'ix/asynciterable/operators/index.js'; test('AsyncIterable#finalize defers behavior', async () => { let done = false; @@ -79,7 +79,7 @@ test('AsyncIterable#finalize calls with downstream error from flattening', async finalize(async () => { done = true; }), - flatMap(async (x) => { + concatMap(async (x) => { // srcValues.push(x); if (x === 1) { return throwError(err); diff --git a/src/add/asynciterable-operators/mergeall.ts b/src/add/asynciterable-operators/mergeall.ts index 46984a23..06ed2e40 100644 --- a/src/add/asynciterable-operators/mergeall.ts +++ b/src/add/asynciterable-operators/mergeall.ts @@ -8,7 +8,7 @@ export function mergeAllProto( this: AsyncIterableX>, concurrent = Infinity ): AsyncIterableX { - return mergeAll(concurrent)(this); + return mergeAll(concurrent)(this); } AsyncIterableX.prototype.mergeAll = mergeAllProto; diff --git a/src/asynciterable/_extremaby.ts b/src/asynciterable/_extremaby.ts index 3f1dc45d..1f6edba0 100644 --- a/src/asynciterable/_extremaby.ts +++ b/src/asynciterable/_extremaby.ts @@ -9,29 +9,31 @@ export async function extremaBy( ): Promise { throwIfAborted(signal); - let result = []; - const it = wrapWithAbort(source, signal)[Symbol.asyncIterator](); - const { value, done } = await it.next(); - if (done) { - throw new Error('Sequence contains no elements'); - } - - let resKey = await selector(value, signal); - result.push(value); + let hasValue = false; + let key: TKey | undefined; + let result: TSource[] = []; - let next: IteratorResult; - while (!(next = await it.next()).done) { - const current = next.value; - const key = await selector(current, signal); - const cmp = await comparer(key, resKey, signal); + for await (const item of wrapWithAbort(source, signal)) { + if (!hasValue) { + key = await selector(item, signal); + result.push(item); + hasValue = true; + } else { + const currentKey = await selector(item, signal); + const cmp = await comparer(currentKey, key as TKey, signal); - if (cmp === 0) { - result.push(current); - } else if (cmp > 0) { - result = [current]; - resKey = key; + if (cmp === 0) { + result.push(item); + } else if (cmp > 0) { + result = [item]; + key = currentKey; + } } } + if (!hasValue) { + throw new Error('Sequence contains no elements'); + } + return result; } diff --git a/src/asynciterable/asynciterablex.ts b/src/asynciterable/asynciterablex.ts index 6e3a3420..c1cb09ed 100644 --- a/src/asynciterable/asynciterablex.ts +++ b/src/asynciterable/asynciterablex.ts @@ -290,8 +290,7 @@ export class FromPromiseIterable extends AsyncIterab } async *[Symbol.asyncIterator]() { - const item = await this._source; - yield await this._selector(item, 0); + yield await this._selector(await this._source, 0); } } diff --git a/src/asynciterable/average.ts b/src/asynciterable/average.ts index b920a11d..5391b57d 100644 --- a/src/asynciterable/average.ts +++ b/src/asynciterable/average.ts @@ -42,9 +42,12 @@ export async function average( ['signal']: signal, ['thisArg']: thisArg, } = options || {}; + throwIfAborted(signal); + let sum = 0; let count = 0; + for await (const item of wrapWithAbort(source, signal)) { sum += await selector.call(thisArg, item, signal); count++; diff --git a/src/asynciterable/catcherror.ts b/src/asynciterable/catcherror.ts index 90de1cc8..b497b108 100644 --- a/src/asynciterable/catcherror.ts +++ b/src/asynciterable/catcherror.ts @@ -1,5 +1,4 @@ import { AsyncIterableX } from './asynciterablex.js'; -import { returnAsyncIterator } from '../util/returniterator.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; @@ -19,29 +18,14 @@ export class CatchAllAsyncIterable extends AsyncIterableX { let hasError = false; for (const source of this._source) { - const it = wrapWithAbort(source, signal)[Symbol.asyncIterator](); - error = null; hasError = false; - while (1) { - let c = {}; - - try { - const { done, value } = await it.next(); - if (done) { - await returnAsyncIterator(it); - break; - } - c = value; - } catch (e) { - error = e; - hasError = true; - await returnAsyncIterator(it); - break; - } - - yield c; + try { + yield* wrapWithAbort(source, signal); + } catch (e) { + error = e; + hasError = true; } if (!hasError) { @@ -64,7 +48,7 @@ export class CatchAllAsyncIterable extends AsyncIterableX { * sequences until a source sequence terminates successfully. */ export function catchAll(source: Iterable>): AsyncIterableX { - return new CatchAllAsyncIterable(source); + return new CatchAllAsyncIterable(source); } /** @@ -76,5 +60,5 @@ export function catchAll(source: Iterable>): AsyncIterableX< * sequences until a source sequence terminates successfully. */ export function catchError(...args: AsyncIterable[]): AsyncIterableX { - return new CatchAllAsyncIterable(args); + return new CatchAllAsyncIterable(args); } diff --git a/src/asynciterable/combinelatest.ts b/src/asynciterable/combinelatest.ts index 8e911818..44fe81e2 100644 --- a/src/asynciterable/combinelatest.ts +++ b/src/asynciterable/combinelatest.ts @@ -3,9 +3,10 @@ import { identity } from '../util/identity.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; import { safeRace } from '../util/safeRace.js'; +import { returnAsyncIterators } from '../util/returniterator.js'; // eslint-disable-next-line @typescript-eslint/no-empty-function -const NEVER_PROMISE = new Promise(() => {}); +const NEVER_PROMISE = new Promise(() => {}); type MergeResult = { value: T; index: number }; @@ -28,12 +29,11 @@ export class CombineLatestAsyncIterable extends AsyncIterableX>(length); const nexts = new Array>>>(length); - let hasValueAll = false; - const values = new Array(length); - const hasValues = new Array(length); - let active = length; - hasValues.fill(false); + let active = length; + let allValuesAvailable = false; + const values = new Array(length); + const hasValues = new Array(length).fill(false); for (let i = 0; i < length; i++) { const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator](); @@ -41,26 +41,30 @@ export class CombineLatestAsyncIterable extends AsyncIterableX 0) { - const next = safeRace(nexts); - const { - value: { value: value$, done: done$ }, - index, - } = await next; - if (done$) { - nexts[index] = >>>NEVER_PROMISE; - active--; - } else { - values[index] = value$; - hasValues[index] = true; + try { + while (active > 0) { + const { + value: { value, done }, + index, + } = await safeRace(nexts); + + if (done) { + nexts[index] = NEVER_PROMISE; + active--; + } else { + values[index] = value; + hasValues[index] = true; + allValuesAvailable = allValuesAvailable || hasValues.every(identity); - const iterator$ = iterators[index]; - nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); + nexts[index] = wrapPromiseWithIndex(iterators[index].next(), index); - if (hasValueAll || (hasValueAll = hasValues.every(identity))) { - yield values; + if (allValuesAvailable) { + yield values; + } } } + } finally { + await returnAsyncIterators(iterators); } } } @@ -176,5 +180,5 @@ export function combineLatest( */ export function combineLatest(...sources: AsyncIterable[]): AsyncIterableX; export function combineLatest(...sources: any[]): AsyncIterableX { - return new CombineLatestAsyncIterable(sources); + return new CombineLatestAsyncIterable(sources); } diff --git a/src/asynciterable/concat.ts b/src/asynciterable/concat.ts index 70753065..a7b11cf9 100644 --- a/src/asynciterable/concat.ts +++ b/src/asynciterable/concat.ts @@ -13,10 +13,9 @@ export class ConcatAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + for (const outer of this._source) { - for await (const item of wrapWithAbort(outer, signal)) { - yield item; - } + yield* wrapWithAbort(outer, signal); } } } @@ -24,7 +23,7 @@ export class ConcatAsyncIterable extends AsyncIterableX { export function _concatAll( source: Iterable> ): AsyncIterableX { - return new ConcatAsyncIterable(source); + return new ConcatAsyncIterable(source); } /** @@ -136,5 +135,5 @@ export function concat( * @returns {AsyncIterableX} An async-iterable sequence that contains the elements of each given sequence, in sequential order. */ export function concat(...args: AsyncIterable[]): AsyncIterableX { - return new ConcatAsyncIterable(args); + return new ConcatAsyncIterable(args); } diff --git a/src/asynciterable/count.ts b/src/asynciterable/count.ts index e806adb7..32315d78 100644 --- a/src/asynciterable/count.ts +++ b/src/asynciterable/count.ts @@ -18,9 +18,10 @@ export async function count( ): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } = options || {}; + throwIfAborted(signal); - let i = 0; + let i = 0; for await (const item of wrapWithAbort(source, signal)) { if (await predicate.call(thisArg, item, i, signal)) { i++; diff --git a/src/asynciterable/create.ts b/src/asynciterable/create.ts index 709b3f17..bde29ed8 100644 --- a/src/asynciterable/create.ts +++ b/src/asynciterable/create.ts @@ -11,11 +11,12 @@ class AnonymousAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const it = await this._fn(signal); - let next: IteratorResult | undefined; - while (!(next = await it.next()).done) { - yield next.value; - } + + yield* { + [Symbol.asyncIterator]: () => it, + }; } } diff --git a/src/asynciterable/defer.ts b/src/asynciterable/defer.ts index c1b8d482..320d6b9d 100644 --- a/src/asynciterable/defer.ts +++ b/src/asynciterable/defer.ts @@ -14,10 +14,8 @@ class DeferAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const items = await this._fn(signal); - for await (const item of wrapWithAbort(items, signal)) { - yield item; - } + + yield* wrapWithAbort(await this._fn(signal), signal); } } @@ -32,5 +30,5 @@ class DeferAsyncIterable extends AsyncIterableX { export function defer( factory: (signal?: AbortSignal) => AsyncIterable | Promise> ): AsyncIterableX { - return new DeferAsyncIterable(factory); + return new DeferAsyncIterable(factory); } diff --git a/src/asynciterable/elementat.ts b/src/asynciterable/elementat.ts index fe93591c..a3a9782c 100644 --- a/src/asynciterable/elementat.ts +++ b/src/asynciterable/elementat.ts @@ -17,6 +17,7 @@ export async function elementAt( signal?: AbortSignal ): Promise { throwIfAborted(signal); + let i = index; for await (const item of wrapWithAbort(source, signal)) { if (i === 0) { @@ -24,5 +25,6 @@ export async function elementAt( } i--; } + return undefined; } diff --git a/src/asynciterable/every.ts b/src/asynciterable/every.ts index 5032d1df..d4dedcc0 100644 --- a/src/asynciterable/every.ts +++ b/src/asynciterable/every.ts @@ -16,12 +16,15 @@ export async function every( options: FindOptions ): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options; + throwIfAborted(signal); + let i = 0; for await (const item of wrapWithAbort(source, signal)) { if (!(await predicate.call(thisArg, item, i++, signal))) { return false; } } + return true; } diff --git a/src/asynciterable/find.ts b/src/asynciterable/find.ts index 2e52e674..20c2d1a1 100644 --- a/src/asynciterable/find.ts +++ b/src/asynciterable/find.ts @@ -15,13 +15,15 @@ export async function find( options: FindOptions ): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options; + throwIfAborted(signal); - let i = 0; + let i = 0; for await (const item of wrapWithAbort(source, signal)) { if (await predicate.call(thisArg, item, i++, signal)) { return item; } } + return undefined; } diff --git a/src/asynciterable/findindex.ts b/src/asynciterable/findindex.ts index 113455e7..c4df7bcc 100644 --- a/src/asynciterable/findindex.ts +++ b/src/asynciterable/findindex.ts @@ -16,13 +16,15 @@ export async function findIndex( options: FindOptions ): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options; + throwIfAborted(signal); - let i = 0; + let i = 0; for await (const item of wrapWithAbort(source, signal)) { if (await predicate.call(thisArg, item, i++, signal)) { return i; } } + return -1; } diff --git a/src/asynciterable/first.ts b/src/asynciterable/first.ts index 1ebb2bc4..d6b0d084 100644 --- a/src/asynciterable/first.ts +++ b/src/asynciterable/first.ts @@ -16,10 +16,12 @@ export async function first( ): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } = options || {}; + throwIfAborted(signal); + let i = 0; for await (const item of wrapWithAbort(source, signal)) { - if (await predicate!.call(thisArg, item, i++, signal)) { + if (await predicate.call(thisArg, item, i++, signal)) { return item; } } diff --git a/src/asynciterable/forkjoin.ts b/src/asynciterable/forkjoin.ts index 81dee635..897d8c83 100644 --- a/src/asynciterable/forkjoin.ts +++ b/src/asynciterable/forkjoin.ts @@ -1,9 +1,10 @@ import { identity } from '../util/identity.js'; import { wrapWithAbort } from './operators/withabort.js'; import { safeRace } from '../util/safeRace.js'; +import { returnAsyncIterators } from '../util/returniterator.js'; // eslint-disable-next-line @typescript-eslint/no-empty-function -const NEVER_PROMISE = new Promise(() => {}); +const NEVER_PROMISE = new Promise(() => {}); type MergeResult = { value: T; index: number }; @@ -254,9 +255,11 @@ export function forkJoin( * @param {...any[]} sources Async-iterable sequence to collect the last elements for. * @returns {(Promise)} An async-iterable sequence with an array of all the last elements of all sequences. */ -export async function forkJoin(...sources: any[]): Promise { - let signal = sources.shift() as AbortSignal | undefined; - if (!(signal instanceof AbortSignal)) { +export async function forkJoin( + ...sources: (AbortSignal | AsyncIterable)[] +): Promise { + let signal = sources.shift(); + if (signal && !(signal instanceof AbortSignal)) { sources.unshift(signal); signal = undefined; } @@ -267,27 +270,34 @@ export async function forkJoin(...sources: any[]): Promise { let active = length; const values = new Array(length); - const hasValues = new Array(length); - hasValues.fill(false); + const hasValues = new Array(length).fill(false); for (let i = 0; i < length; i++) { - const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator](); + const iterator = wrapWithAbort(sources[i] as AsyncIterable, signal)[ + Symbol.asyncIterator + ](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } - while (active > 0) { - const next = safeRace(nexts); - const { value: next$, index } = await next; - if (next$.done) { - nexts[index] = >>>NEVER_PROMISE; - active--; - } else { - const iterator$ = iterators[index]; - nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); - hasValues[index] = true; - values[index] = next$.value; + try { + while (active > 0) { + const { + value: { value, done }, + index, + } = await safeRace(nexts); + + if (done) { + nexts[index] = NEVER_PROMISE; + active--; + } else { + nexts[index] = wrapPromiseWithIndex(iterators[index].next(), index); + hasValues[index] = true; + values[index] = value; + } } + } finally { + await returnAsyncIterators(iterators); } if (hasValues.length > 0 && hasValues.every(identity)) { diff --git a/src/asynciterable/generate.ts b/src/asynciterable/generate.ts index 34526d60..71516864 100644 --- a/src/asynciterable/generate.ts +++ b/src/asynciterable/generate.ts @@ -22,6 +22,7 @@ class GenerateAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + for ( let i = this._initialState; await this._condition(i, signal); @@ -50,10 +51,5 @@ export function generate( iterate: (value: TState, signal?: AbortSignal) => TState | Promise, resultSelector: (value: TState, signal?: AbortSignal) => TResult | Promise ): AsyncIterableX { - return new GenerateAsyncIterable( - initialState, - condition, - iterate, - resultSelector - ); + return new GenerateAsyncIterable(initialState, condition, iterate, resultSelector); } diff --git a/src/asynciterable/generatetime.ts b/src/asynciterable/generatetime.ts index 245330cc..798dbd24 100644 --- a/src/asynciterable/generatetime.ts +++ b/src/asynciterable/generatetime.ts @@ -26,6 +26,7 @@ class GenerateTimeAsyncIterable extends AsyncIterableX async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + for ( let i = this._initialState; await this._condition(i, signal); @@ -58,7 +59,7 @@ export function generateTime( resultSelector: (value: TState, signal?: AbortSignal) => TResult | Promise, timeSelector: (value: TState, signal?: AbortSignal) => number | Promise ): AsyncIterableX { - return new GenerateTimeAsyncIterable( + return new GenerateTimeAsyncIterable( initialState, condition, iterate, diff --git a/src/asynciterable/includes.ts b/src/asynciterable/includes.ts index 9162998f..58d925bb 100644 --- a/src/asynciterable/includes.ts +++ b/src/asynciterable/includes.ts @@ -19,11 +19,13 @@ export async function includes( signal?: AbortSignal ): Promise { throwIfAborted(signal); + let fromIdx = fromIndex; let i = 0; if (Math.abs(fromIdx)) { fromIdx = 0; } + for await (const item of wrapWithAbort(source, signal)) { if (i++ > fromIdx && comparer(item, valueToFind)) { return true; diff --git a/src/asynciterable/interval.ts b/src/asynciterable/interval.ts index 4938b62e..51238480 100644 --- a/src/asynciterable/interval.ts +++ b/src/asynciterable/interval.ts @@ -12,6 +12,7 @@ class IntervalAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let i = 0; while (1) { await sleep(this._dueTime, signal); diff --git a/src/asynciterable/isempty.ts b/src/asynciterable/isempty.ts index aedc1b70..05fa33d3 100644 --- a/src/asynciterable/isempty.ts +++ b/src/asynciterable/isempty.ts @@ -11,8 +11,10 @@ import { throwIfAborted } from '../aborterror.js'; */ export async function isEmpty(source: AsyncIterable, signal?: AbortSignal): Promise { throwIfAborted(signal); + for await (const _ of wrapWithAbort(source, signal)) { return false; } + return true; } diff --git a/src/asynciterable/last.ts b/src/asynciterable/last.ts index cbee6cbb..ead37cf0 100644 --- a/src/asynciterable/last.ts +++ b/src/asynciterable/last.ts @@ -18,11 +18,13 @@ export async function last( ): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } = options || {}; + throwIfAborted(signal); + let i = 0; let result: T | undefined; for await (const item of wrapWithAbort(source, signal)) { - if (await predicate!.call(thisArg, item, i++, signal)) { + if (await predicate.call(thisArg, item, i++, signal)) { result = item; } } diff --git a/src/asynciterable/max.ts b/src/asynciterable/max.ts index b32b8620..36b40c21 100644 --- a/src/asynciterable/max.ts +++ b/src/asynciterable/max.ts @@ -1,15 +1,14 @@ import { equalityComparerAsync } from '../util/comparer.js'; import { identityAsync } from '../util/identity.js'; import { ExtremaOptions } from './extremaoptions.js'; -import { wrapWithAbort } from './operators/withabort.js'; -import { throwIfAborted } from '../aborterror.js'; +import { reduce } from './reduce.js'; /** * Returns the maximum element with the optional selector. * * @template TSource The type of the elements in the source sequence. * @param {AsyncIterable} source An async-iterable sequence to determine the maximum element of. - * @param {ExtremaByOptions} [options] The options which include an optional comparer and abort signal. + * @param {ExtremaOptions} [options] The options which include an optional comparer and abort signal. * @returns {Promise} The maximum element. */ export async function max( @@ -22,23 +21,12 @@ export async function max( ['selector']: selector = identityAsync, } = options || {}; - throwIfAborted(signal); + return reduce(source, { + async ['callback'](maxValue, item) { + const value = await selector(item); - const it = wrapWithAbort(source, signal)[Symbol.asyncIterator](); - let next = await it.next(); - - if (next.done) { - throw new Error('Sequence contains no elements'); - } - - let maxValue = await selector(next.value); - - while (!(next = await it.next()).done) { - const current = await selector(next.value); - if ((await comparer(current, maxValue)) > 0) { - maxValue = current; - } - } - - return maxValue; + return (await comparer(value, maxValue)) > 0 ? value : maxValue; + }, + ['signal']: signal, + }); } diff --git a/src/asynciterable/maxby.ts b/src/asynciterable/maxby.ts index 6516e549..832bb4fb 100644 --- a/src/asynciterable/maxby.ts +++ b/src/asynciterable/maxby.ts @@ -20,5 +20,6 @@ export function maxBy( ['selector']: selector, ['signal']: signal, } = options || {}; + return extremaBy(source, selector!, comparer, signal); } diff --git a/src/asynciterable/merge.ts b/src/asynciterable/merge.ts index 4eca293a..8eff3887 100644 --- a/src/asynciterable/merge.ts +++ b/src/asynciterable/merge.ts @@ -2,9 +2,10 @@ import { AsyncIterableX } from './asynciterablex.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; import { safeRace } from '../util/safeRace.js'; +import { returnAsyncIterators } from '../util/returniterator.js'; // eslint-disable-next-line @typescript-eslint/no-empty-function -const NEVER_PROMISE = new Promise(() => {}); +const NEVER_PROMISE = new Promise(() => {}); type MergeResult = { value: T; index: number; done?: boolean; error?: any }; @@ -25,28 +26,36 @@ export class MergeAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterator { throwIfAborted(signal); + const length = this._source.length; const iterators = new Array>(length); const nexts = new Array>>(length); + let active = length; + for (let i = 0; i < length; i++) { const iterator = wrapWithAbort(this._source[i], signal)[Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } - while (active > 0) { - const next = await safeRace(nexts); - if (next.hasOwnProperty('error')) { - throw next.error; - } else if (next.done) { - nexts[next.index] = >>NEVER_PROMISE; - active--; - } else { - const iterator$ = iterators[next.index]; - nexts[next.index] = wrapPromiseWithIndex(iterator$.next(), next.index); - yield next.value; + try { + while (active > 0) { + const next = await safeRace(nexts); + + if (next.hasOwnProperty('error')) { + throw next.error; + } else if (next.done) { + nexts[next.index] = NEVER_PROMISE; + active--; + } else { + nexts[next.index] = wrapPromiseWithIndex(iterators[next.index].next(), next.index); + + yield next.value; + } } + } finally { + await returnAsyncIterators(iterators); } } } @@ -161,5 +170,5 @@ export function merge( export function merge(source: AsyncIterable, ...args: AsyncIterable[]): AsyncIterableX; export function merge(source: AsyncIterable, ...args: AsyncIterable[]): AsyncIterableX { - return new MergeAsyncIterable([source, ...args]); + return new MergeAsyncIterable([source, ...args]); } diff --git a/src/asynciterable/min.ts b/src/asynciterable/min.ts index d283da3f..1cf9db4a 100644 --- a/src/asynciterable/min.ts +++ b/src/asynciterable/min.ts @@ -1,16 +1,15 @@ import { equalityComparerAsync } from '../util/comparer.js'; import { identityAsync } from '../util/identity.js'; import { ExtremaOptions } from './extremaoptions.js'; -import { wrapWithAbort } from './operators/withabort.js'; -import { throwIfAborted } from '../aborterror.js'; +import { reduce } from './reduce.js'; /** - * * Returns the minimum element with the optional selector. + * Returns the minimum element with the optional selector. * * @template TSource The type of the elements in the source sequence. * @param {AsyncIterable} source An async-iterable sequence to determine the minimum element of. - * @param {ExtremaOptions} [options] The options which include an optional comparer and abort signal. - * @returns {Promise} A promise containing the minimum element. + * @param {ExtremaOptions} [options] The options which include an optional comparer and abort signal. + * @returns {Promise} A promise containing the minimum element. */ export async function min( source: AsyncIterable, @@ -22,23 +21,12 @@ export async function min( ['selector']: selector = identityAsync, } = options || {}; - throwIfAborted(signal); + return reduce(source, { + async ['callback'](minValue, item) { + const value = await selector(item); - const it = wrapWithAbort(source, signal)[Symbol.asyncIterator](); - let next = await it.next(); - - if (next.done) { - throw new Error('Sequence contains no elements'); - } - - let minValue = await selector(next.value); - - while (!(next = await it.next()).done) { - const current = await selector(next.value); - if ((await comparer(current, minValue)) < 0) { - minValue = current; - } - } - - return minValue; + return (await comparer(value, minValue)) < 0 ? value : minValue; + }, + ['signal']: signal, + }); } diff --git a/src/asynciterable/minby.ts b/src/asynciterable/minby.ts index 44d56614..ab1bccc9 100644 --- a/src/asynciterable/minby.ts +++ b/src/asynciterable/minby.ts @@ -21,5 +21,6 @@ export function minBy( ['signal']: signal, } = options || {}; const newComparer = async (key: TKey, minValue: TKey) => -(await comparer(key, minValue)); + return extremaBy(source, selector!, newComparer, signal); } diff --git a/src/asynciterable/never.ts b/src/asynciterable/never.ts index 18bb6f12..d258eb57 100644 --- a/src/asynciterable/never.ts +++ b/src/asynciterable/never.ts @@ -9,10 +9,9 @@ export class NeverAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + await new Promise((_, reject) => { - if (signal) { - signal.addEventListener('abort', () => reject(new AbortError()), { once: true }); - } + signal?.addEventListener('abort', () => reject(new AbortError()), { once: true }); }); } } diff --git a/src/asynciterable/of.ts b/src/asynciterable/of.ts index 12e49cf0..5c5b4523 100644 --- a/src/asynciterable/of.ts +++ b/src/asynciterable/of.ts @@ -3,18 +3,17 @@ import { throwIfAborted } from '../aborterror.js'; /** @ignore */ export class OfAsyncIterable extends AsyncIterableX { - private _args: TSource[]; + private _args: Iterable; - constructor(args: TSource[]) { + constructor(args: Iterable) { super(); this._args = args; } async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - for (const item of this._args) { - yield item; - } + + yield* this._args; } } @@ -28,5 +27,5 @@ export class OfAsyncIterable extends AsyncIterableX { export function of( ...args: TSource ): AsyncIterableX { - return new OfAsyncIterable(args); + return new OfAsyncIterable(args); } diff --git a/src/asynciterable/onerrorresumenext.ts b/src/asynciterable/onerrorresumenext.ts index c0b0b125..bd6d2df0 100644 --- a/src/asynciterable/onerrorresumenext.ts +++ b/src/asynciterable/onerrorresumenext.ts @@ -13,20 +13,12 @@ export class OnErrorResumeNextAsyncIterable extends AsyncIterableX extends AsyncIterableX} An async-iterable sequence that concatenates the source sequences, even if a sequence terminates exceptionally. */ export function onErrorResumeNext(...args: AsyncIterable[]): AsyncIterableX { - return new OnErrorResumeNextAsyncIterable(args); + return new OnErrorResumeNextAsyncIterable(args); } diff --git a/src/asynciterable/operators/_flatten.ts b/src/asynciterable/operators/_flatten.ts index e808a466..997542ec 100644 --- a/src/asynciterable/operators/_flatten.ts +++ b/src/asynciterable/operators/_flatten.ts @@ -3,7 +3,7 @@ import { wrapWithAbort } from '../operators/withabort.js'; import { AbortError, throwIfAborted } from '../../aborterror.js'; import { safeRace } from '../../util/safeRace.js'; import { isPromise } from '../../util/isiterable.js'; -import { returnAsyncIterator } from '../../util/returniterator.js'; +import { returnAsyncIterators } from '../../util/returniterator.js'; export type FlattenConcurrentSelector = ( value: TSource, @@ -139,7 +139,8 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera } while (!outerComplete || active + outerValues.length > 0); } finally { controllers.forEach((controller) => controller?.abort()); - await Promise.all([outer as AsyncIterator, ...inners].map(returnAsyncIterator)); + // The array can contain empty values, so we filter out those. + await returnAsyncIterators([outer, ...inners].filter((x) => !!x)); } function pullNextOuter(outerValue: TSource) { diff --git a/src/asynciterable/operators/_grouping.ts b/src/asynciterable/operators/_grouping.ts index c2153cad..b4191d67 100644 --- a/src/asynciterable/operators/_grouping.ts +++ b/src/asynciterable/operators/_grouping.ts @@ -10,15 +10,19 @@ export async function createGrouping( signal?: AbortSignal ): Promise> { const map = new Map(); + for await (const item of wrapWithAbort(source, signal)) { const key = await keySelector(item, signal); + let grouping = map.get(key); - if (!map.has(key)) { + const element = await elementSelector(item, signal); + + if (grouping === undefined) { grouping = []; map.set(key, grouping); } - const element = await elementSelector(item, signal); - grouping!.push(element); + + grouping.push(element); } return map; diff --git a/src/asynciterable/operators/batch.ts b/src/asynciterable/operators/batch.ts index f0aed6c4..8bddd3f8 100644 --- a/src/asynciterable/operators/batch.ts +++ b/src/asynciterable/operators/batch.ts @@ -8,24 +8,6 @@ interface AsyncResolver { reject: (reason?: any) => void; } -const WAITING_TYPE = 'waiting'; -const BATCHING_TYPE = 'batching'; - -interface WaitingState { - type: 'waiting'; - resolver: AsyncResolver>; -} -interface BatchingState { - type: 'batching'; - values: T[]; -} - -type State = WaitingState | BatchingState; - -function assertNever(value: never): never { - throw new Error(`Unhandled discriminated union member ${value}`); -} - /** @ignore */ export class BatchAsyncIterable extends AsyncIterableX { private _source: AsyncIterable; @@ -35,87 +17,79 @@ export class BatchAsyncIterable extends AsyncIterableX { this._source = source; } - [Symbol.asyncIterator](signal?: AbortSignal) { + [Symbol.asyncIterator](signal?: AbortSignal): AsyncIterator { throwIfAborted(signal); - const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator](); - let state: State = { type: BATCHING_TYPE, values: [] }; - let ended: null | Promise> = null; - let error: any = null; + let waitingResolver: AsyncResolver | null = null; + let batch: TSource[] = []; + let it: AsyncIterator | undefined; + + let sourceError: any; + let sourceDone = false; - function consumeNext() { - it.next().then( - (res) => { - if (res.done) { - ended = Promise.resolve({ done: true } as IteratorResult); + const run = async () => { + it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator](); - if (state.type === WAITING_TYPE) { - state.resolver.resolve(ended); - } + try { + for await (const item of { + [Symbol.asyncIterator]: () => it!, + }) { + if (waitingResolver) { + waitingResolver.resolve([item]); + waitingResolver = null; } else { - if (state.type === WAITING_TYPE) { - const { resolve } = state.resolver; - state = { type: BATCHING_TYPE, values: [] }; - resolve({ done: res.done, value: [res.value] }); - } else if (state.type === BATCHING_TYPE) { - state.values.push(res.value); - } else { - assertNever(state); - } - - consumeNext(); - } - }, - (err) => { - error = err; - if (state.type === WAITING_TYPE) { - state.resolver.reject(err); + batch.push(item); } } - ); - } - consumeNext(); + sourceDone = true; + waitingResolver?.resolve(null); + } catch (e) { + sourceError = e; + waitingResolver?.reject(e); + } + }; - return { - next() { - if (error) { - return Promise.reject(error); - } + run(); - if (state.type === BATCHING_TYPE && state.values.length > 0) { - const { values } = state; - state.values = []; - return Promise.resolve({ done: false, value: values }); + return { + async next() { + if (batch.length) { + const value = batch; + batch = []; + return { value, done: false }; } - if (ended) { - return ended; + if (sourceError) { + throw sourceError; } - if (state.type === WAITING_TYPE) { - throw new Error('Previous `next()` is still in progress'); + if (sourceDone) { + return { done: true, value: undefined }; } - return new Promise>((resolve, reject) => { - state = { - type: WAITING_TYPE, - resolver: { resolve, reject }, - }; + // There were no queued items, so we need to wait + const value = await new Promise((resolve, reject) => { + waitingResolver = { resolve, reject }; }); + + if (value) { + return { done: false, value }; + } else { + return { done: true, value: undefined }; + } }, + async return(): Promise> { + await it?.return?.(); - return(value: any) { - return it.return - ? it.return(value).then(() => ({ done: true } as IteratorResult)) - : Promise.resolve({ done: true } as IteratorResult); + return { done: true, value: undefined }; }, }; } } /** -Returns an async iterable sequence of batches that are collected from the source sequence between + * Returns an async iterable sequence of batches that are collected from the source sequence between * subsequent `next()` calls. * * @template TSource The type of elements in the source sequence. @@ -123,7 +97,7 @@ Returns an async iterable sequence of batches that are collected from the source * source sequence between subsequent `next()` calls. */ export function batch(): OperatorAsyncFunction { - return function batchOperator(source: AsyncIterable): AsyncIterableX { + return function batchOperator(source) { return new BatchAsyncIterable(source); }; } diff --git a/src/asynciterable/operators/buffer.ts b/src/asynciterable/operators/buffer.ts index 2d3a362a..7c419d3b 100644 --- a/src/asynciterable/operators/buffer.ts +++ b/src/asynciterable/operators/buffer.ts @@ -18,7 +18,9 @@ export class BufferAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const buffers: TSource[][] = []; + let i = 0; for await (const item of wrapWithAbort(this._source, signal)) { if (i % this._skip === 0) { @@ -56,20 +58,7 @@ export function buffer( count: number, skip?: number ): OperatorAsyncFunction { - let s = skip; - if (s == null) { - s = count; - } - return function bufferOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new BufferAsyncIterable(source, count, s!); + return function bufferOperatorFunction(source) { + return new BufferAsyncIterable(source, count, skip ?? count); }; } - -/** - * Projects each element of an async-iterable sequence into consecutive non-overlapping - * buffers which are produced based on element count information. - * @param count Length of each buffer. - * @param skip Number of elements to skip between creation of consecutive buffers. - */ diff --git a/src/asynciterable/operators/buffercountortime.ts b/src/asynciterable/operators/buffercountortime.ts index 083ddb28..8a360ac5 100644 --- a/src/asynciterable/operators/buffercountortime.ts +++ b/src/asynciterable/operators/buffercountortime.ts @@ -18,6 +18,7 @@ class BufferCountOrTime extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { const buffer: TSource[] = []; + const timer = interval(this.maxWaitTime).pipe(map(() => timerEvent)); const source = concat(this.source, of(ended)); const merged = merge(source, timer); @@ -26,9 +27,11 @@ class BufferCountOrTime extends AsyncIterableX { if (item === ended) { break; } + if (item !== timerEvent) { buffer.push(item as TSource); } + if (buffer.length >= this.bufferSize || (buffer.length && item === timerEvent)) { yield buffer.slice(); buffer.length = 0; @@ -55,9 +58,7 @@ export function bufferCountOrTime( count: number, time: number ): OperatorAsyncFunction { - return function bufferOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new BufferCountOrTime(source, count, time); + return function bufferOperatorFunction(source) { + return new BufferCountOrTime(source, count, time); }; } diff --git a/src/asynciterable/operators/catcherror.ts b/src/asynciterable/operators/catcherror.ts index 943cc83c..f7bcc355 100644 --- a/src/asynciterable/operators/catcherror.ts +++ b/src/asynciterable/operators/catcherror.ts @@ -1,6 +1,5 @@ import { AsyncIterableX } from '../asynciterablex.js'; import { OperatorAsyncFunction } from '../../interfaces.js'; -import { returnAsyncIterator } from '../../util/returniterator.js'; import { wrapWithAbort } from './withabort.js'; import { throwIfAborted } from '../../aborterror.js'; @@ -26,33 +25,19 @@ export class CatchWithAsyncIterable extends AsyncIterableX | undefined; let hasError = false; - const source = wrapWithAbort(this._source, signal); - const it = source[Symbol.asyncIterator](); - while (1) { - let c = >{}; - - try { - c = await it.next(); - if (c.done) { - await returnAsyncIterator(it); - break; - } - } catch (e) { - err = await this._handler(e, signal); - hasError = true; - await returnAsyncIterator(it); - break; - } - yield c.value; + try { + yield* wrapWithAbort(this._source, signal); + } catch (e) { + err = await this._handler(e, signal); + hasError = true; } if (hasError) { - for await (const item of wrapWithAbort(err!, signal)) { - yield item; - } + yield* wrapWithAbort(err!, signal); } } } @@ -76,9 +61,7 @@ export function catchError( signal?: AbortSignal ) => AsyncIterable | Promise> ): OperatorAsyncFunction { - return function catchWithOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new CatchWithAsyncIterable(source, handler); + return function catchWithOperatorFunction(source) { + return new CatchWithAsyncIterable(source, handler); }; } diff --git a/src/asynciterable/operators/combinelatestwith.ts b/src/asynciterable/operators/combinelatestwith.ts index 98584474..9d73a1b2 100644 --- a/src/asynciterable/operators/combinelatestwith.ts +++ b/src/asynciterable/operators/combinelatestwith.ts @@ -14,6 +14,7 @@ import { CombineLatestAsyncIterable } from '../combinelatest.js'; export function combineLatestWith( source2: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. @@ -29,6 +30,7 @@ export function combineLatestWith( source2: AsyncIterable, source3: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. @@ -47,6 +49,7 @@ export function combineLatestWith( source3: AsyncIterable, source4: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. @@ -68,6 +71,7 @@ export function combineLatestWith( source4: AsyncIterable, source5: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence as an array whenever * one of the async-iterable sequences produces an element. @@ -102,8 +106,10 @@ export function combineLatestWith( * @returns {OperatorAsyncFunction} An async-iterable sequence containing an array of all sources. */ export function combineLatestWith(...sources: AsyncIterable[]): OperatorAsyncFunction; -export function combineLatestWith(...sources: any[]): OperatorAsyncFunction { - return function combineLatestOperatorFunction(source: AsyncIterable) { - return new CombineLatestAsyncIterable([source, ...sources]); +export function combineLatestWith( + ...sources: AsyncIterable[] +): OperatorAsyncFunction { + return function combineLatestOperatorFunction(source) { + return new CombineLatestAsyncIterable([source, ...sources]); }; } diff --git a/src/asynciterable/operators/concatall.ts b/src/asynciterable/operators/concatall.ts index 2d2ff5a3..4a2604c1 100644 --- a/src/asynciterable/operators/concatall.ts +++ b/src/asynciterable/operators/concatall.ts @@ -14,10 +14,9 @@ export class ConcatAllAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - for await (const outer of wrapWithAbort(this._source, signal)) { - for await (const item of wrapWithAbort(outer, signal)) { - yield item; - } + + for await (const inner of wrapWithAbort(this._source, signal)) { + yield* wrapWithAbort(inner, signal); } } } @@ -30,9 +29,7 @@ export class ConcatAllAsyncIterable extends AsyncIterableX { * @returns {OperatorAsyncFunction, T>} An operator which concatenates all inner async-iterable sources. */ export function concatAll(): OperatorAsyncFunction, T> { - return function concatAllOperatorFunction( - source: AsyncIterable> - ): AsyncIterableX { - return new ConcatAllAsyncIterable(source); + return function concatAllOperatorFunction(source) { + return new ConcatAllAsyncIterable(source); }; } diff --git a/src/asynciterable/operators/concatmap.ts b/src/asynciterable/operators/concatmap.ts index 902ead01..c0c0fb4c 100644 --- a/src/asynciterable/operators/concatmap.ts +++ b/src/asynciterable/operators/concatmap.ts @@ -1,9 +1,8 @@ -import { AsyncIterableInput, AsyncIterableX } from '../asynciterablex.js'; +import { AsyncIterableX } from '../asynciterablex.js'; import { OperatorAsyncFunction } from '../../interfaces.js'; import { wrapWithAbort } from './withabort.js'; import { throwIfAborted } from '../../aborterror.js'; import { FlattenConcurrentSelector } from './_flatten.js'; -import { isPromise } from '../../util/isiterable.js'; class ConcatMapAsyncIterable extends AsyncIterableX { constructor( @@ -16,14 +15,12 @@ class ConcatMapAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let outerIndex = 0; - const { _thisArg: thisArg, _selector: selector } = this; for await (const outer of wrapWithAbort(this._source, signal)) { - const result = selector.call(thisArg, outer, outerIndex++, signal); - const values = (isPromise(result) ? await result : result) as AsyncIterableInput; - for await (const inner of wrapWithAbort(AsyncIterableX.as(values), signal)) { - yield inner; - } + const values = await this._selector.call(this._thisArg, outer, outerIndex++, signal); + + yield* wrapWithAbort(AsyncIterableX.as(values), signal); } } } diff --git a/src/asynciterable/operators/concatwith.ts b/src/asynciterable/operators/concatwith.ts index 7b957304..3fbadce5 100644 --- a/src/asynciterable/operators/concatwith.ts +++ b/src/asynciterable/operators/concatwith.ts @@ -12,6 +12,7 @@ import { ConcatAsyncIterable } from '../concat.js'; * followed by those of the second the sequence. */ export function concatWith(v2: AsyncIterable): OperatorAsyncFunction; + /** * Concatenates all async-iterable sequences in the given sequences, as long as the previous async-iterable * sequence terminated successfully. @@ -29,6 +30,7 @@ export function concatWith( v2: AsyncIterable, v3: AsyncIterable ): OperatorAsyncFunction; + /** * Concatenates all async-iterable sequences in the given sequences, as long as the previous async-iterable * sequence terminated successfully. @@ -48,6 +50,7 @@ export function concatWith( v3: AsyncIterable, v4: AsyncIterable ): OperatorAsyncFunction; + /** * Concatenates all async-iterable sequences in the given sequences, as long as the previous async-iterable * sequence terminated successfully. @@ -70,6 +73,7 @@ export function concatWith( v4: AsyncIterable, v5: AsyncIterable ): OperatorAsyncFunction; + /** * Concatenates all async-iterable sequences in the given sequences, as long as the previous async-iterable * sequence terminated successfully. @@ -105,7 +109,7 @@ export function concatWith( * @returns {AsyncIterableX} An async-iterable sequence that contains the elements of each given sequence, in sequential order. */ export function concatWith(...args: AsyncIterable[]): OperatorAsyncFunction { - return function concatWithOperatorFunction(source: AsyncIterable) { - return new ConcatAsyncIterable([source, ...args]); + return function concatWithOperatorFunction(source) { + return new ConcatAsyncIterable([source, ...args]); }; } diff --git a/src/asynciterable/operators/debounce.ts b/src/asynciterable/operators/debounce.ts index cd21da78..6adb4b71 100644 --- a/src/asynciterable/operators/debounce.ts +++ b/src/asynciterable/operators/debounce.ts @@ -87,9 +87,7 @@ export class DebounceAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} An operator function which debounces by the given timeout. */ export function debounce(time: number): MonoTypeOperatorAsyncFunction { - return function debounceOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new DebounceAsyncIterable(source, time); + return function debounceOperatorFunction(source) { + return new DebounceAsyncIterable(source, time); }; } diff --git a/src/asynciterable/operators/defaultifempty.ts b/src/asynciterable/operators/defaultifempty.ts index 2deb652c..4e48f7f5 100644 --- a/src/asynciterable/operators/defaultifempty.ts +++ b/src/asynciterable/operators/defaultifempty.ts @@ -16,12 +16,14 @@ export class DefaultIfEmptyAsyncIterable extends AsyncIterableX extends AsyncIterableX} An operator which returns the elements of the source sequence or the default value as a singleton. */ export function defaultIfEmpty(defaultValue: T): MonoTypeOperatorAsyncFunction { - return function defaultIfEmptyOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new DefaultIfEmptyAsyncIterable(source, defaultValue); + return function defaultIfEmptyOperatorFunction(source) { + return new DefaultIfEmptyAsyncIterable(source, defaultValue); }; } diff --git a/src/asynciterable/operators/delay.ts b/src/asynciterable/operators/delay.ts index be90c513..c83777e3 100644 --- a/src/asynciterable/operators/delay.ts +++ b/src/asynciterable/operators/delay.ts @@ -17,10 +17,10 @@ export class DelayAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + await sleep(this._dueTime, signal); - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - } + + yield* wrapWithAbort(this._source, signal); } } @@ -32,7 +32,7 @@ export class DelayAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} An operator which delays the before the iteration begins. */ export function delay(dueTime: number): MonoTypeOperatorAsyncFunction { - return function delayOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new DelayAsyncIterable(source, dueTime); + return function delayOperatorFunction(source) { + return new DelayAsyncIterable(source, dueTime); }; } diff --git a/src/asynciterable/operators/delayeach.ts b/src/asynciterable/operators/delayeach.ts index 27d48988..ea3b6215 100644 --- a/src/asynciterable/operators/delayeach.ts +++ b/src/asynciterable/operators/delayeach.ts @@ -17,8 +17,10 @@ export class DelayEachAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + for await (const item of wrapWithAbort(this._source, signal)) { await sleep(this._dueTime, signal); + yield item; } } @@ -32,9 +34,7 @@ export class DelayEachAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} An operator which takes an async-iterable and delays each item in the sequence by the given time. */ export function delayEach(dueTime: number): MonoTypeOperatorAsyncFunction { - return function delayEachOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new DelayEachAsyncIterable(source, dueTime); + return function delayEachOperatorFunction(source) { + return new DelayEachAsyncIterable(source, dueTime); }; } diff --git a/src/asynciterable/operators/distinct.ts b/src/asynciterable/operators/distinct.ts index cddeb0e2..9520c67c 100644 --- a/src/asynciterable/operators/distinct.ts +++ b/src/asynciterable/operators/distinct.ts @@ -26,12 +26,14 @@ export class DistinctAsyncIterable extends AsyncIterabl async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const set = [] as TKey[]; + const set: TKey[] = []; for await (const item of wrapWithAbort(this._source, signal)) { const key = await this._keySelector(item, signal); + if ((await arrayIndexOfAsync(set, key, this._comparer)) === -1) { set.push(key); + yield item; } } @@ -49,11 +51,10 @@ export class DistinctAsyncIterable extends AsyncIterabl export function distinct( options?: DistinctOptions ): MonoTypeOperatorAsyncFunction { - return function distinctOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { + return function distinctOperatorFunction(source) { const { ['keySelector']: keySelector = identityAsync, ['comparer']: comparer = comparerAsync } = options || {}; - return new DistinctAsyncIterable(source, keySelector, comparer); + + return new DistinctAsyncIterable(source, keySelector, comparer); }; } diff --git a/src/asynciterable/operators/distinctuntilchanged.ts b/src/asynciterable/operators/distinctuntilchanged.ts index b648ce3d..de31e3ef 100644 --- a/src/asynciterable/operators/distinctuntilchanged.ts +++ b/src/asynciterable/operators/distinctuntilchanged.ts @@ -27,17 +27,17 @@ export class DistinctUntilChangedAsyncIterable extends async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let currentKey: TKey | undefined; let hasCurrentKey = false; + for await (const item of wrapWithAbort(this._source, signal)) { const key = await this._keySelector(item, signal); - let comparerEquals = false; - if (hasCurrentKey) { - comparerEquals = await this._comparer(currentKey!, key); - } - if (!hasCurrentKey || !comparerEquals) { + + if (!hasCurrentKey || !(await this._comparer(currentKey as TKey, key))) { hasCurrentKey = true; currentKey = key; + yield item; } } @@ -55,11 +55,10 @@ export class DistinctUntilChangedAsyncIterable extends export function distinctUntilChanged( options?: DistinctOptions ): MonoTypeOperatorAsyncFunction { - return function distinctUntilChangedOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { + return function distinctUntilChangedOperatorFunction(source) { const { ['keySelector']: keySelector = identityAsync, ['comparer']: comparer = comparerAsync } = options || {}; - return new DistinctUntilChangedAsyncIterable(source, keySelector, comparer); + + return new DistinctUntilChangedAsyncIterable(source, keySelector, comparer); }; } diff --git a/src/asynciterable/operators/dowhile.ts b/src/asynciterable/operators/dowhile.ts index 765cc603..8524c2a8 100644 --- a/src/asynciterable/operators/dowhile.ts +++ b/src/asynciterable/operators/dowhile.ts @@ -1,4 +1,3 @@ -import { AsyncIterableX } from '../asynciterablex.js'; import { concat } from '../concat.js'; import { whileDo } from '../whiledo.js'; import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; @@ -14,7 +13,7 @@ import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; export function doWhile( condition: (signal?: AbortSignal) => boolean | Promise ): MonoTypeOperatorAsyncFunction { - return function doWhileOperatorFunction(source: AsyncIterable): AsyncIterableX { + return function doWhileOperatorFunction(source) { return concat(source, whileDo(source, condition)); }; } diff --git a/src/asynciterable/operators/endwith.ts b/src/asynciterable/operators/endwith.ts index 42a9f368..22bcf6ba 100644 --- a/src/asynciterable/operators/endwith.ts +++ b/src/asynciterable/operators/endwith.ts @@ -16,12 +16,9 @@ export class EndWithAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - } - for (const x of this._args) { - yield x; - } + + yield* wrapWithAbort(this._source, signal); + yield* this._args; } } @@ -33,9 +30,7 @@ export class EndWithAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} An operator which appends values to the end of the sequence. */ export function endWith(...args: TSource[]): MonoTypeOperatorAsyncFunction { - return function endsWithOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new EndWithAsyncIterable(source, args); + return function endsWithOperatorFunction(source) { + return new EndWithAsyncIterable(source, args); }; } diff --git a/src/asynciterable/operators/except.ts b/src/asynciterable/operators/except.ts index 85cb2950..e2a6f770 100644 --- a/src/asynciterable/operators/except.ts +++ b/src/asynciterable/operators/except.ts @@ -24,7 +24,8 @@ export class ExceptAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const map = [] as TSource[]; + + const map: TSource[] = []; for await (const secondItem of wrapWithAbort(this._second, signal)) { map.push(secondItem); } @@ -32,6 +33,7 @@ export class ExceptAsyncIterable extends AsyncIterableX { for await (const firstItem of wrapWithAbort(this._first, signal)) { if ((await arrayIndexOfAsync(map, firstItem, this._comparer)) === -1) { map.push(firstItem); + yield firstItem; } } @@ -52,7 +54,7 @@ export function except( second: AsyncIterable, comparer: (x: TSource, y: TSource) => boolean | Promise = comparerAsync ): MonoTypeOperatorAsyncFunction { - return function exceptOperatorFunction(first: AsyncIterable): AsyncIterableX { - return new ExceptAsyncIterable(first, second, comparer); + return function exceptOperatorFunction(first) { + return new ExceptAsyncIterable(first, second, comparer); }; } diff --git a/src/asynciterable/operators/expand.ts b/src/asynciterable/operators/expand.ts index 3c472fc1..1f4ac3d7 100644 --- a/src/asynciterable/operators/expand.ts +++ b/src/asynciterable/operators/expand.ts @@ -25,9 +25,11 @@ export class ExpandAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const q = [this._source]; while (q.length > 0) { const src = q.shift(); + for await (const item of wrapWithAbort(src!, signal)) { const items = await this._selector(item, signal); q.push(items); @@ -54,7 +56,7 @@ export function expand( signal?: AbortSignal ) => AsyncIterable | Promise> ): MonoTypeOperatorAsyncFunction { - return function expandOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new ExpandAsyncIterable(source, selector); + return function expandOperatorFunction(source) { + return new ExpandAsyncIterable(source, selector); }; } diff --git a/src/asynciterable/operators/filter.ts b/src/asynciterable/operators/filter.ts index 549278b4..97ea53c3 100644 --- a/src/asynciterable/operators/filter.ts +++ b/src/asynciterable/operators/filter.ts @@ -22,6 +22,7 @@ export class FilterAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let i = 0; for await (const item of wrapWithAbort(this._source, signal)) { if (await this._predicate.call(this._thisArg, item, i++)) { @@ -54,7 +55,7 @@ export function filter( predicate: (value: TSource, index: number, signal?: AbortSignal) => boolean | Promise, thisArg?: any ): OperatorAsyncFunction { - return function filterOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new FilterAsyncIterable(source, predicate, thisArg); + return function filterOperatorFunction(source) { + return new FilterAsyncIterable(source, predicate, thisArg); }; } diff --git a/src/asynciterable/operators/finalize.ts b/src/asynciterable/operators/finalize.ts index 9a5dca45..53778258 100644 --- a/src/asynciterable/operators/finalize.ts +++ b/src/asynciterable/operators/finalize.ts @@ -16,10 +16,9 @@ export class FinallyAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + try { - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - } + yield* wrapWithAbort(this._source, signal); } finally { await this._action(); } @@ -27,7 +26,7 @@ export class FinallyAsyncIterable extends AsyncIterableX { } /** - * Invokes a specified asynchronous action after the source async-iterable sequence terminates gracefully or exceptionally. + * Invokes a specified asynchronous action after the source async-iterable sequence terminates gracefully or exceptionally. * * @template TSource The type of the elements in the source sequence. * @param {(() => void | Promise)} action Action to invoke and await asynchronously after the source async-iterable sequence terminates @@ -37,9 +36,7 @@ export class FinallyAsyncIterable extends AsyncIterableX { export function finalize( action: () => void | Promise ): MonoTypeOperatorAsyncFunction { - return function finalizeOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new FinallyAsyncIterable(source, action); + return function finalizeOperatorFunction(source) { + return new FinallyAsyncIterable(source, action); }; } diff --git a/src/asynciterable/operators/groupby.ts b/src/asynciterable/operators/groupby.ts index e52fe607..575298f0 100644 --- a/src/asynciterable/operators/groupby.ts +++ b/src/asynciterable/operators/groupby.ts @@ -18,9 +18,8 @@ export class GroupedAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - for (const item of this._source) { - yield item; - } + + yield* this._source; } } @@ -45,14 +44,16 @@ export class GroupByAsyncIterable extends AsyncIterableX< async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const map = await createGrouping( this._source, this._keySelector, this._elementSelector, signal ); + for (const [key, values] of map) { - yield new GroupedAsyncIterable(key, values); + yield new GroupedAsyncIterable(key, values); } } } @@ -85,9 +86,7 @@ export function groupBy( signal?: AbortSignal ) => TValue | Promise = identityAsync ): OperatorAsyncFunction> { - return function groupByOperatorFunction( - source: AsyncIterable - ): AsyncIterableX> { - return new GroupByAsyncIterable(source, keySelector, elementSelector); + return function groupByOperatorFunction(source) { + return new GroupByAsyncIterable(source, keySelector, elementSelector); }; } diff --git a/src/asynciterable/operators/groupjoin.ts b/src/asynciterable/operators/groupjoin.ts index f29a36a7..6ddcda01 100644 --- a/src/asynciterable/operators/groupjoin.ts +++ b/src/asynciterable/operators/groupjoin.ts @@ -39,10 +39,16 @@ export class GroupJoinAsyncIterable extends Async async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const map = await createGrouping(this._inner, this._innerSelector, identity, signal); + + const map = (await createGrouping(this._inner, this._innerSelector, identity, signal)) as Map< + TKey, + Iterable + >; + for await (const outerElement of wrapWithAbort(this._outer, signal)) { const outerKey = await this._outerSelector(outerElement, signal); - const innerElements = map.has(outerKey) ? >map.get(outerKey) : empty(); + const innerElements = map.has(outerKey) ? map.get(outerKey)! : empty(); + yield await this._resultSelector(outerElement, AsyncIterableX.as(innerElements), signal); } } @@ -79,13 +85,7 @@ export function groupJoin( signal?: AbortSignal ) => TResult | Promise ): OperatorAsyncFunction { - return function groupJoinOperatorFunction(outer: AsyncIterable): AsyncIterableX { - return new GroupJoinAsyncIterable( - outer, - inner, - outerSelector, - innerSelector, - resultSelector - ); + return function groupJoinOperatorFunction(outer) { + return new GroupJoinAsyncIterable(outer, inner, outerSelector, innerSelector, resultSelector); }; } diff --git a/src/asynciterable/operators/ignoreelements.ts b/src/asynciterable/operators/ignoreelements.ts index d961eb25..b8601d58 100644 --- a/src/asynciterable/operators/ignoreelements.ts +++ b/src/asynciterable/operators/ignoreelements.ts @@ -28,9 +28,7 @@ export class IgnoreElementsAsyncIterable extends AsyncIterableX(): MonoTypeOperatorAsyncFunction { - return function ignoreElementsOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new IgnoreElementsAsyncIterable(source); + return function ignoreElementsOperatorFunction(source) { + return new IgnoreElementsAsyncIterable(source); }; } diff --git a/src/asynciterable/operators/innerjoin.ts b/src/asynciterable/operators/innerjoin.ts index c326b4ef..88908c5e 100644 --- a/src/asynciterable/operators/innerjoin.ts +++ b/src/asynciterable/operators/innerjoin.ts @@ -38,9 +38,12 @@ export class JoinAsyncIterable extends AsyncItera async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const map = await createGrouping(this._inner, this._innerSelector, identity, signal); + for await (const outerElement of wrapWithAbort(this._outer, signal)) { const outerKey = await this._outerSelector(outerElement, signal); + if (map.has(outerKey)) { for (const innerElement of map.get(outerKey)!) { yield await this._resultSelector(outerElement, innerElement, signal); @@ -73,13 +76,7 @@ export function innerJoin( innerSelector: (value: TInner, signal?: AbortSignal) => TKey | Promise, resultSelector: (outer: TOuter, inner: TInner, signal?: AbortSignal) => TResult | Promise ): OperatorAsyncFunction { - return function innerJoinOperatorFunction(outer: AsyncIterable): AsyncIterableX { - return new JoinAsyncIterable( - outer, - inner, - outerSelector, - innerSelector, - resultSelector - ); + return function innerJoinOperatorFunction(outer) { + return new JoinAsyncIterable(outer, inner, outerSelector, innerSelector, resultSelector); }; } diff --git a/src/asynciterable/operators/intersect.ts b/src/asynciterable/operators/intersect.ts index 2e9fbf9d..4bdf2341 100644 --- a/src/asynciterable/operators/intersect.ts +++ b/src/asynciterable/operators/intersect.ts @@ -12,11 +12,13 @@ async function arrayRemove( signal?: AbortSignal ): Promise { throwIfAborted(signal); + const idx = await arrayIndexOfAsync(array, item, comparer); if (idx === -1) { return false; } array.splice(idx, 1); + return true; } @@ -38,7 +40,8 @@ export class IntersectAsyncIterable extends AsyncIterableX { } async *[Symbol.asyncIterator](signal?: AbortSignal) { - const map = [] as TSource[]; + const map: TSource[] = []; + for await (const secondItem of wrapWithAbort(this._second, signal)) { map.push(secondItem); } @@ -65,9 +68,7 @@ export function intersect( second: AsyncIterable, comparer: (x: TSource, y: TSource) => boolean | Promise = comparerAsync ): MonoTypeOperatorAsyncFunction { - return function intersectOperatorFunction( - first: AsyncIterable - ): AsyncIterableX { - return new IntersectAsyncIterable(first, second, comparer); + return function intersectOperatorFunction(first) { + return new IntersectAsyncIterable(first, second, comparer); }; } diff --git a/src/asynciterable/operators/map.ts b/src/asynciterable/operators/map.ts index 8c69336d..c0f02558 100644 --- a/src/asynciterable/operators/map.ts +++ b/src/asynciterable/operators/map.ts @@ -26,10 +26,10 @@ export class MapAsyncIterable extends AsyncIterableX async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let i = 0; for await (const item of wrapWithAbort(this._source, signal)) { - const result = await this._selector.call(this._thisArg, item, i++, signal); - yield result; + yield await this._selector.call(this._thisArg, item, i++, signal); } } } @@ -50,7 +50,7 @@ export function map( selector: (value: TSource, index: number, signal?: AbortSignal) => Promise | TResult, thisArg?: any ): OperatorAsyncFunction { - return function mapOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new MapAsyncIterable(source, selector, thisArg); + return function mapOperatorFunction(source) { + return new MapAsyncIterable(source, selector, thisArg); }; } diff --git a/src/asynciterable/operators/memoize.ts b/src/asynciterable/operators/memoize.ts index 160cc70a..8b01cf2b 100644 --- a/src/asynciterable/operators/memoize.ts +++ b/src/asynciterable/operators/memoize.ts @@ -82,6 +82,7 @@ export class MemoizeAsyncBuffer extends AsyncIterableX { * elements from the shared source sequence, without duplicating source iteration side-effects. */ export function memoize(readerCount?: number): OperatorAsyncFunction; + /** * Memoizes the source sequence within a selector function where a specified number of iterators can get access * to all of the sequence's elements without causing multiple iterations over the source. @@ -99,6 +100,7 @@ export function memoize( readerCount?: number, selector?: (value: AsyncIterable) => AsyncIterable ): OperatorAsyncFunction; + /** * Memoizes the source sequence within a selector function where a specified number of iterators can get access * to all of the sequence's elements without causing multiple iterations over the source. @@ -116,22 +118,13 @@ export function memoize( readerCount = -1, selector?: (value: AsyncIterable) => AsyncIterable ): OperatorAsyncFunction { - return function memoizeOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { + return function memoizeOperatorFunction(source) { if (!selector) { return readerCount === -1 - ? new MemoizeAsyncBuffer( - source[Symbol.asyncIterator](), - new MaxRefCountList() - ) - : new MemoizeAsyncBuffer( - source[Symbol.asyncIterator](), - new RefCountList(readerCount) - ); + ? new MemoizeAsyncBuffer(source[Symbol.asyncIterator](), new MaxRefCountList()) + : new MemoizeAsyncBuffer(source[Symbol.asyncIterator](), new RefCountList(readerCount)); } - return create(() => - selector!(memoize(readerCount)(source))[Symbol.asyncIterator]() - ); + + return create(() => selector!(memoize(readerCount)(source))[Symbol.asyncIterator]()); }; } diff --git a/src/asynciterable/operators/mergeall.ts b/src/asynciterable/operators/mergeall.ts index bde370aa..391ee40c 100644 --- a/src/asynciterable/operators/mergeall.ts +++ b/src/asynciterable/operators/mergeall.ts @@ -1,3 +1,4 @@ +import { OperatorAsyncFunction } from '../../interfaces.js'; import { FlattenConcurrentAsyncIterable } from './_flatten.js'; /** @@ -6,8 +7,10 @@ import { FlattenConcurrentAsyncIterable } from './_flatten.js'; * @template TSource The type of the elements in the source sequences. * @returns {OperatorAsyncFunction, TSource>} The async-iterable sequence that merges the elements of the inner sequences. */ -export function mergeAll(concurrent = Infinity) { - return function mergeAllOperatorFunction(source: AsyncIterable>) { +export function mergeAll( + concurrent = Infinity +): OperatorAsyncFunction, TSource> { + return function mergeAllOperatorFunction(source) { return new FlattenConcurrentAsyncIterable(source, (s) => s, concurrent, false); }; } diff --git a/src/asynciterable/operators/mergewith.ts b/src/asynciterable/operators/mergewith.ts index e4764611..96d08b9c 100644 --- a/src/asynciterable/operators/mergewith.ts +++ b/src/asynciterable/operators/mergewith.ts @@ -12,6 +12,7 @@ import { MergeAsyncIterable } from '../merge.js'; * into a single async-iterable sequence. */ export function mergeWith(v2: AsyncIterable): OperatorAsyncFunction; + /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * @@ -27,6 +28,7 @@ export function mergeWith( v2: AsyncIterable, v3: AsyncIterable ): OperatorAsyncFunction; + /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * @@ -46,6 +48,7 @@ export function mergeWith( v3: AsyncIterable, v4: AsyncIterable ): OperatorAsyncFunction; + /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * @@ -67,6 +70,7 @@ export function mergeWith( v4: AsyncIterable, v5: AsyncIterable ): OperatorAsyncFunction; + /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * @@ -91,6 +95,7 @@ export function mergeWith( v5: AsyncIterable, v6: AsyncIterable ): OperatorAsyncFunction; + /** * Merges elements from all of the specified async-iterable sequences into a single async-iterable sequence. * @@ -104,7 +109,7 @@ export function mergeWith( ): OperatorAsyncFunction; export function mergeWith(...args: AsyncIterable[]): OperatorAsyncFunction { - return function mergeWithOperatorFunction(source: AsyncIterable) { - return new MergeAsyncIterable([source, ...args]); + return function mergeWithOperatorFunction(source) { + return new MergeAsyncIterable([source, ...args]); }; } diff --git a/src/asynciterable/operators/orderby.ts b/src/asynciterable/operators/orderby.ts index b7aae68a..0863ea22 100644 --- a/src/asynciterable/operators/orderby.ts +++ b/src/asynciterable/operators/orderby.ts @@ -15,6 +15,7 @@ export abstract class OrderedAsyncIterableBaseX extends AsyncIterableX< async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const array = await toArray(this._source, signal); const len = array.length; const indices = new Array(len); @@ -182,14 +183,8 @@ export function thenByDescending( keySelector: (item: TSource) => TKey, comparer: (fst: TKey, snd: TKey) => number = defaultSorter ): UnaryFunction, OrderedAsyncIterableX> { - return function thenByDescendingOperatorFunction(source: AsyncIterable) { - const orderSource = >source; - return new OrderedAsyncIterableX( - orderSource._source, - keySelector, - comparer, - true, - orderSource - ); + return function thenByDescendingOperatorFunction(source) { + const orderSource = source as OrderedAsyncIterableBaseX; + return new OrderedAsyncIterableX(orderSource._source, keySelector, comparer, true, orderSource); }; } diff --git a/src/asynciterable/operators/pairwise.ts b/src/asynciterable/operators/pairwise.ts index 90ceb385..1b7b68e9 100644 --- a/src/asynciterable/operators/pairwise.ts +++ b/src/asynciterable/operators/pairwise.ts @@ -14,14 +14,17 @@ export class PairwiseAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let value: TSource | undefined; let hasValue = false; + for await (const item of wrapWithAbort(this._source, signal)) { if (!hasValue) { hasValue = true; } else { yield [value!, item]; } + value = item; } } @@ -35,9 +38,7 @@ export class PairwiseAsyncIterable extends AsyncIterableX { * @returns {OperatorAsyncFunction} The result sequence. */ export function pairwise(): OperatorAsyncFunction { - return function pairwiseOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new PairwiseAsyncIterable(source); + return function pairwiseOperatorFunction(source) { + return new PairwiseAsyncIterable(source); }; } diff --git a/src/asynciterable/operators/pluck.ts b/src/asynciterable/operators/pluck.ts index 1918e5be..0fc99d35 100644 --- a/src/asynciterable/operators/pluck.ts +++ b/src/asynciterable/operators/pluck.ts @@ -1,4 +1,3 @@ -import { AsyncIterableX } from '../asynciterablex.js'; import { map } from './map.js'; import { OperatorAsyncFunction } from '../../interfaces.js'; @@ -30,9 +29,7 @@ function plucker(props: string[], length: number): (x: any) => any { export function pluck( ...args: string[] ): OperatorAsyncFunction { - return function pluckOperatorFunction(source: AsyncIterable): AsyncIterableX { - return map( - (plucker(args, args.length) as any) as (value: TSource) => TResult - )(source); + return function pluckOperatorFunction(source) { + return map(plucker(args, args.length) as (value: TSource) => TResult)(source); }; } diff --git a/src/asynciterable/operators/publish.ts b/src/asynciterable/operators/publish.ts index 55eadea3..80869b01 100644 --- a/src/asynciterable/operators/publish.ts +++ b/src/asynciterable/operators/publish.ts @@ -1,4 +1,3 @@ -import { AsyncIterableX } from '../asynciterablex.js'; import { RefCountList } from '../../iterable/operators/_refcountlist.js'; import { create } from '../create.js'; import { OperatorAsyncFunction } from '../../interfaces.js'; @@ -14,6 +13,7 @@ class PublishedAsyncBuffer extends MemoizeAsyncBuffer { [Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + this._buffer.readerCount++; return this._getIterable(this._buffer.count)[Symbol.asyncIterator](); } @@ -28,6 +28,7 @@ class PublishedAsyncBuffer extends MemoizeAsyncBuffer { * the shared source sequence, starting from the index at the point of obtaining the enumerator. */ export function publish(): OperatorAsyncFunction; + /** * Buffer enabling each iterator to retrieve elements from the shared source sequence, starting from the * index at the point of obtaining the iterator. @@ -42,6 +43,7 @@ export function publish(): OperatorAsyncFunction; export function publish( selector?: (value: AsyncIterable) => AsyncIterable ): OperatorAsyncFunction; + /** * Buffer enabling each iterator to retrieve elements from the shared source sequence, starting from the * index at the point of obtaining the iterator. @@ -56,11 +58,9 @@ export function publish( export function publish( selector?: (value: AsyncIterable) => AsyncIterable ): OperatorAsyncFunction { - return function publishOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { + return function publishOperatorFunction(source) { return selector ? create(async () => selector(publish()(source))[Symbol.asyncIterator]()) - : new PublishedAsyncBuffer(source[Symbol.asyncIterator]()); + : new PublishedAsyncBuffer(source[Symbol.asyncIterator]()); }; } diff --git a/src/asynciterable/operators/racewith.ts b/src/asynciterable/operators/racewith.ts index 8de7e685..6f62cdf7 100644 --- a/src/asynciterable/operators/racewith.ts +++ b/src/asynciterable/operators/racewith.ts @@ -10,7 +10,7 @@ import { RaceAsyncIterable } from '../race.js'; export function raceWith( ...sources: AsyncIterable[] ): MonoTypeOperatorAsyncFunction { - return function raceWithOperatorFunction(source: AsyncIterable) { - return new RaceAsyncIterable([source, ...sources]); + return function raceWithOperatorFunction(source) { + return new RaceAsyncIterable([source, ...sources]); }; } diff --git a/src/asynciterable/operators/repeat.ts b/src/asynciterable/operators/repeat.ts index 79be2a92..8be38e3a 100644 --- a/src/asynciterable/operators/repeat.ts +++ b/src/asynciterable/operators/repeat.ts @@ -16,17 +16,14 @@ export class RepeatAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + if (this._count === -1) { while (1) { - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - } + yield* wrapWithAbort(this._source, signal); } } else { for (let i = 0; i < this._count; i++) { - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - } + yield* wrapWithAbort(this._source, signal); } } } @@ -40,7 +37,7 @@ export class RepeatAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} The async-iterable sequence producing the elements of the given sequence repeatedly. */ export function repeat(count = -1): MonoTypeOperatorAsyncFunction { - return function repeatOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new RepeatAsyncIterable(source, count); + return function repeatOperatorFunction(source) { + return new RepeatAsyncIterable(source, count); }; } diff --git a/src/asynciterable/operators/retry.ts b/src/asynciterable/operators/retry.ts index 1b9156f8..9c9121ea 100644 --- a/src/asynciterable/operators/retry.ts +++ b/src/asynciterable/operators/retry.ts @@ -1,4 +1,3 @@ -import { AsyncIterableX } from '../asynciterablex.js'; import { repeatValue } from '../../iterable/repeatvalue.js'; import { CatchAllAsyncIterable } from '../catcherror.js'; import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; @@ -12,7 +11,7 @@ import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; * given sequence repeatedly until it terminates successfully. */ export function retry(count = -1): MonoTypeOperatorAsyncFunction { - return function retryOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new CatchAllAsyncIterable(repeatValue>(source, count)); + return function retryOperatorFunction(source) { + return new CatchAllAsyncIterable(repeatValue(source, count)); }; } diff --git a/src/asynciterable/operators/reverse.ts b/src/asynciterable/operators/reverse.ts index d51a3a3e..f3cc4b26 100644 --- a/src/asynciterable/operators/reverse.ts +++ b/src/asynciterable/operators/reverse.ts @@ -14,10 +14,13 @@ export class ReverseAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const results = [] as TSource[]; + + const results: TSource[] = []; + for await (const item of wrapWithAbort(this._source, signal)) { results.unshift(item); } + yield* results; } } @@ -29,7 +32,7 @@ export class ReverseAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} The async-iterable in reversed sequence. */ export function reverse(): MonoTypeOperatorAsyncFunction { - return function reverseOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new ReverseAsyncIterable(source); + return function reverseOperatorFunction(source) { + return new ReverseAsyncIterable(source); }; } diff --git a/src/asynciterable/operators/scan.ts b/src/asynciterable/operators/scan.ts index dad951a1..ad37d05c 100644 --- a/src/asynciterable/operators/scan.ts +++ b/src/asynciterable/operators/scan.ts @@ -21,12 +21,14 @@ export class ScanAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let i = 0; - let hasValue = false; + let hasValue = this._hasSeed; let acc = this._seed; + for await (const item of wrapWithAbort(this._source, signal)) { - if (hasValue || (hasValue = this._hasSeed)) { - acc = await this._fn(acc, item, i++, signal); + if (hasValue) { + acc = await this._fn(acc as R, item, i++, signal); yield acc; } else { acc = item; @@ -34,6 +36,7 @@ export class ScanAsyncIterable extends AsyncIterableX { i++; } } + if (i === 1 && !this._hasSeed) { yield acc as R; } @@ -50,7 +53,7 @@ export class ScanAsyncIterable extends AsyncIterableX { * @returns {OperatorAsyncFunction} An async-enumerable sequence containing the accumulated values. */ export function scan(options: ScanOptions): OperatorAsyncFunction { - return function scanOperatorFunction(source: AsyncIterable): AsyncIterableX { + return function scanOperatorFunction(source) { return new ScanAsyncIterable(source, options); }; } diff --git a/src/asynciterable/operators/scanoptions.ts b/src/asynciterable/operators/scanoptions.ts index 13ac00a6..e96012a0 100644 --- a/src/asynciterable/operators/scanoptions.ts +++ b/src/asynciterable/operators/scanoptions.ts @@ -2,22 +2,28 @@ * The options for performing a scan operation, including the callback and the optional seed. * * @interface ScanOptions - * @template T The type of the elements in the source sequence. - * @template R The type of the result for the reducer callback. + * @template TSource The type of the elements in the source sequence. + * @template TResult The type of the result for the reducer callback. */ -export interface ScanOptions { +export interface ScanOptions { /** * The optional seed used for the scan operation. * - * @type {R} The type of the result + * @type {TResult} The type of the result * @memberof ScanOptions */ - seed?: R; + seed?: TResult; + /** * The callback used for the scan operation, which passes the accumulator, current value, the * current index, and an Abort Signal. This returns a result or a Promise containing a result. * * @memberof ScanOptions */ - callback: (accumulator: R, current: T, index: number, signal?: AbortSignal) => R | Promise; + callback: ( + accumulator: TResult, + current: TSource, + index: number, + signal?: AbortSignal + ) => TResult | Promise; } diff --git a/src/asynciterable/operators/scanright.ts b/src/asynciterable/operators/scanright.ts index f02c2e3f..6ad2b533 100644 --- a/src/asynciterable/operators/scanright.ts +++ b/src/asynciterable/operators/scanright.ts @@ -21,13 +21,17 @@ export class ScanRightAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - let hasValue = false; + + let hasValue = this._hasSeed; let acc = this._seed; + const source = await toArray(this._source, signal); + for (let offset = source.length - 1; offset >= 0; offset--) { const item = source[offset]; - if (hasValue || (hasValue = this._hasSeed)) { - acc = await this._fn(acc, item, offset, signal); + + if (hasValue) { + acc = await this._fn(acc as R, item, offset, signal); yield acc; } else { acc = item; @@ -47,7 +51,7 @@ export class ScanRightAsyncIterable extends AsyncIterableX { * @returns {OperatorAsyncFunction} An async-enumerable sequence containing the accumulated values from the right. */ export function scanRight(options: ScanOptions): OperatorAsyncFunction { - return function scanRightOperatorFunction(source: AsyncIterable): AsyncIterableX { + return function scanRightOperatorFunction(source) { return new ScanRightAsyncIterable(source, options); }; } diff --git a/src/asynciterable/operators/share.ts b/src/asynciterable/operators/share.ts index b3247044..28ab3696 100644 --- a/src/asynciterable/operators/share.ts +++ b/src/asynciterable/operators/share.ts @@ -8,6 +8,7 @@ class SharedAsyncIterable extends AsyncIterableX { constructor(it: AsyncIterator) { super(); + this._it = { next(value) { return it.next(value); @@ -17,6 +18,7 @@ class SharedAsyncIterable extends AsyncIterableX { [Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + return this._it; } } @@ -29,6 +31,7 @@ class SharedAsyncIterable extends AsyncIterableX { * @returns {OperatorAsyncFunction} Buffer enabling each enumerator to retrieve elements from the shared source sequence. */ export function share(): OperatorAsyncFunction; + /** * Shares the source sequence within a selector function where each iterator can fetch the next element from the * source sequence. @@ -49,6 +52,7 @@ export function share( signal?: AbortSignal ) => AsyncIterable | Promise> ): OperatorAsyncFunction; + /** * Shares the source sequence within a selector function where each iterator can fetch the next element from the * source sequence. @@ -73,13 +77,14 @@ export function share( source: AsyncIterable ): AsyncIterableX { return selector - ? create(async (signal) => { + ? create(async (signal) => { const it = await selector( new SharedAsyncIterable(source[Symbol.asyncIterator](signal)), signal ); + return it[Symbol.asyncIterator](signal); }) - : new SharedAsyncIterable(source[Symbol.asyncIterator]()); + : new SharedAsyncIterable(source[Symbol.asyncIterator]()); }; } diff --git a/src/asynciterable/operators/skip.ts b/src/asynciterable/operators/skip.ts index ac4f6943..bf160937 100644 --- a/src/asynciterable/operators/skip.ts +++ b/src/asynciterable/operators/skip.ts @@ -1,6 +1,5 @@ import { AsyncIterableX } from '../asynciterablex.js'; import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; -import { wrapWithAbort } from './withabort.js'; import { throwIfAborted } from '../../aborterror.js'; /** @ignore */ @@ -16,17 +15,14 @@ export class SkipAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const source = wrapWithAbort(this._source, signal); - const it = source[Symbol.asyncIterator](); - let count = this._count; - let next; - while (count > 0 && !(next = await it.next()).done) { - count--; - } - if (count <= 0) { - while (!(next = await it.next()).done) { - yield next.value; + + let countLeft = this._count; + for await (const value of this._source) { + if (countLeft <= 0) { + yield value; } + + countLeft--; } } } @@ -40,7 +36,7 @@ export class SkipAsyncIterable extends AsyncIterableX { * occur after the specified index in the input sequence. */ export function skip(count: number): MonoTypeOperatorAsyncFunction { - return function skipOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new SkipAsyncIterable(source, count); + return function skipOperatorFunction(source) { + return new SkipAsyncIterable(source, count); }; } diff --git a/src/asynciterable/operators/skiplast.ts b/src/asynciterable/operators/skiplast.ts index 20f54df7..b982cba9 100644 --- a/src/asynciterable/operators/skiplast.ts +++ b/src/asynciterable/operators/skiplast.ts @@ -16,9 +16,11 @@ export class SkipLastAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const q = [] as TSource[]; + + const q: TSource[] = []; for await (const item of wrapWithAbort(this._source, signal)) { q.push(item); + if (q.length > this._count) { yield q.shift()!; } @@ -35,9 +37,7 @@ export class SkipLastAsyncIterable extends AsyncIterableX { * source sequence elements except for the bypassed ones at the end. */ export function skipLast(count: number): MonoTypeOperatorAsyncFunction { - return function skipLastOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new SkipLastAsyncIterable(source, count); + return function skipLastOperatorFunction(source) { + return new SkipLastAsyncIterable(source, count); }; } diff --git a/src/asynciterable/operators/skipuntil.ts b/src/asynciterable/operators/skipuntil.ts index 0c0b1461..378a7b7e 100644 --- a/src/asynciterable/operators/skipuntil.ts +++ b/src/asynciterable/operators/skipuntil.ts @@ -16,8 +16,10 @@ export class SkipUntilAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let otherDone = false; this._other(signal).then(() => (otherDone = true)); + for await (const item of wrapWithAbort(this._source, signal)) { if (otherDone) { yield item; @@ -38,9 +40,7 @@ export class SkipUntilAsyncIterable extends AsyncIterableX { export function skipUntil( other: (signal?: AbortSignal) => Promise ): MonoTypeOperatorAsyncFunction { - return function skipUntilOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new SkipUntilAsyncIterable(source, other); + return function skipUntilOperatorFunction(source) { + return new SkipUntilAsyncIterable(source, other); }; } diff --git a/src/asynciterable/operators/skipwhile.ts b/src/asynciterable/operators/skipwhile.ts index 151040aa..5dd3218e 100644 --- a/src/asynciterable/operators/skipwhile.ts +++ b/src/asynciterable/operators/skipwhile.ts @@ -23,13 +23,13 @@ export class SkipWhileAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - let yielding = false; + + let metCondition = false; let i = 0; for await (const element of wrapWithAbort(this._source, signal)) { - if (!yielding && !(await this._predicate(element, i++, signal))) { - yielding = true; - } - if (yielding) { + metCondition = metCondition || !(await this._predicate(element, i++, signal)); + + if (metCondition) { yield element; } } @@ -49,6 +49,7 @@ export class SkipWhileAsyncIterable extends AsyncIterableX { export function skipWhile( predicate: (value: T, index: number, signal?: AbortSignal) => value is S ): OperatorAsyncFunction; + /** * Bypasses elements in an async-iterale sequence as long as a specified condition is true * and then returns the remaining elements. @@ -61,6 +62,7 @@ export function skipWhile( export function skipWhile( predicate: (value: T, index: number, signal?: AbortSignal) => boolean | Promise ): OperatorAsyncFunction; + /** * Bypasses elements in an async-iterale sequence as long as a specified condition is true * and then returns the remaining elements. @@ -73,7 +75,7 @@ export function skipWhile( export function skipWhile( predicate: (value: T, index: number) => boolean | Promise ): OperatorAsyncFunction { - return function skipWhileOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new SkipWhileAsyncIterable(source, predicate); + return function skipWhileOperatorFunction(source) { + return new SkipWhileAsyncIterable(source, predicate); }; } diff --git a/src/asynciterable/operators/slice.ts b/src/asynciterable/operators/slice.ts index 282f2c42..9b5bdb2c 100644 --- a/src/asynciterable/operators/slice.ts +++ b/src/asynciterable/operators/slice.ts @@ -7,33 +7,34 @@ import { throwIfAborted } from '../../aborterror.js'; export class SliceAsyncIterable extends AsyncIterableX { private _source: AsyncIterable; private _begin: number; - private _end: number; + private _count: number; - constructor(source: AsyncIterable, begin: number, end: number) { + constructor(source: AsyncIterable, begin: number, count: number) { super(); this._source = source; this._begin = begin; - this._end = end; + this._count = count; } async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const source = wrapWithAbort(this._source, signal); - const it = source[Symbol.asyncIterator](); - let begin = this._begin; - let next; - while (begin > 0 && !(next = await it.next()).done) { - begin--; - } - let end = this._end; - if (end > 0) { - while (!(next = await it.next()).done) { - yield next.value; - if (--end === 0) { - break; - } + let i = 0; + for await (const item of source) { + if (i < this._begin) { + i++; + continue; + } + + if (i >= this._begin + this._count) { + break; } + + yield item; + + i++; } } } @@ -42,15 +43,15 @@ export class SliceAsyncIterable extends AsyncIterableX { * Returns the elements from the source async-iterable sequence only after the function that returns a promise produces an element. * * @template TSource The type of elements in the source sequence. - * @param {number} begin Zero-based index at which to begin extraction. - * @param {number} [end=Infinity] Zero-based index before which to end extraction. + * @param {number} begin Zero-based index at which to begin extraction (inclusive). + * @param {number} [count=Infinity] The number of items to extract. * @returns {MonoTypeOperatorAsyncFunction} An async-iterable containing the extracted elements. */ export function slice( begin: number, - end = Infinity + count = Infinity ): MonoTypeOperatorAsyncFunction { - return function sliceOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new SliceAsyncIterable(source, begin, end); + return function sliceOperatorFunction(source) { + return new SliceAsyncIterable(source, begin, count); }; } diff --git a/src/asynciterable/operators/startwith.ts b/src/asynciterable/operators/startwith.ts index a987a2b4..cbf961c5 100644 --- a/src/asynciterable/operators/startwith.ts +++ b/src/asynciterable/operators/startwith.ts @@ -15,12 +15,9 @@ export class StartWithAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - for (const x of this._args) { - yield x; - } - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - } + + yield* this._args; + yield* wrapWithAbort(this._source, signal); } } @@ -31,10 +28,12 @@ export class StartWithAsyncIterable extends AsyncIterableX { * @param {...TSource[]} args Elements to prepend to the specified sequence. * @returns The source sequence prepended with the specified values. */ -export function startWith(...args: TSource) { - return function startWithOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new StartWithAsyncIterable(source, args); +export function startWith( + ...args: TSource +): ( + source: AsyncIterable +) => AsyncIterableX { + return function startWithOperatorFunction(source) { + return new StartWithAsyncIterable(source, args); }; } diff --git a/src/asynciterable/operators/switchall.ts b/src/asynciterable/operators/switchall.ts index 8a78cbb0..40381ba4 100644 --- a/src/asynciterable/operators/switchall.ts +++ b/src/asynciterable/operators/switchall.ts @@ -1,3 +1,4 @@ +import { OperatorAsyncFunction } from '../../interfaces.js'; import { FlattenConcurrentAsyncIterable } from './_flatten.js'; /** @@ -6,10 +7,8 @@ import { FlattenConcurrentAsyncIterable } from './_flatten.js'; * @template TSource The type of the elements in the source sequences. * @returns {OperatorAsyncFunction, TSource>} The async-iterable sequence that merges the elements of the inner sequences. */ -export function switchAll() { - return function switchAllOperatorFunction( - source: AsyncIterable> - ) { +export function switchAll(): OperatorAsyncFunction, TSource> { + return function switchAllOperatorFunction(source) { return new FlattenConcurrentAsyncIterable(source, (s) => s, 1, true); }; } diff --git a/src/asynciterable/operators/take.ts b/src/asynciterable/operators/take.ts index 79ddb014..e5e6b821 100644 --- a/src/asynciterable/operators/take.ts +++ b/src/asynciterable/operators/take.ts @@ -16,13 +16,17 @@ export class TakeAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - let i = this._count; - if (i > 0) { - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - if (--i === 0) { - break; - } + + if (this._count <= 0) { + return; + } + + let left = this._count; + for await (const item of wrapWithAbort(this._source, signal)) { + yield item; + + if (--left === 0) { + break; } } } @@ -37,7 +41,7 @@ export class TakeAsyncIterable extends AsyncIterableX { * number of elements from the start of the input sequence. */ export function take(count: number): MonoTypeOperatorAsyncFunction { - return function takeOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new TakeAsyncIterable(source, count); + return function takeOperatorFunction(source) { + return new TakeAsyncIterable(source, count); }; } diff --git a/src/asynciterable/operators/takelast.ts b/src/asynciterable/operators/takelast.ts index 111e4730..1348799b 100644 --- a/src/asynciterable/operators/takelast.ts +++ b/src/asynciterable/operators/takelast.ts @@ -16,12 +16,15 @@ export class TakeLastAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + if (this._count > 0) { - const q = [] as TSource[]; + const q: TSource[] = []; + for await (const item of wrapWithAbort(this._source, signal)) { if (q.length >= this._count) { q.shift(); } + q.push(item); } @@ -41,9 +44,7 @@ export class TakeLastAsyncIterable extends AsyncIterableX { * number of elements from the end of the source sequence. */ export function takeLast(count: number): MonoTypeOperatorAsyncFunction { - return function takeLastOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new TakeLastAsyncIterable(source, count); + return function takeLastOperatorFunction(source) { + return new TakeLastAsyncIterable(source, count); }; } diff --git a/src/asynciterable/operators/takeuntil.ts b/src/asynciterable/operators/takeuntil.ts index 2e6e3c68..59469952 100644 --- a/src/asynciterable/operators/takeuntil.ts +++ b/src/asynciterable/operators/takeuntil.ts @@ -2,9 +2,9 @@ import { AsyncIterableX } from '../asynciterablex.js'; import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; import { wrapWithAbort } from './withabort.js'; import { throwIfAborted } from '../../aborterror.js'; -import { safeRace } from '../../util/safeRace.js'; +import { merge } from '../merge.js'; -const DONE_PROMISE_VALUE = undefined; +const doneEvent = {}; /** @ignore */ export class TakeUntilAsyncIterable extends AsyncIterableX { @@ -19,15 +19,17 @@ export class TakeUntilAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const donePromise = this._other(signal).then(() => DONE_PROMISE_VALUE); - const itemsAsyncIterator = wrapWithAbort(this._source, signal)[Symbol.asyncIterator](); - for (;;) { - const itemPromise = itemsAsyncIterator.next(); - const result = await safeRace([donePromise, itemPromise]); - if (result === DONE_PROMISE_VALUE || result.done) { + + const done = AsyncIterableX.as(this._other(signal).then(() => doneEvent)); + const source = wrapWithAbort(this._source, signal); + const merged = merge(source, done); + + for await (const item of merged) { + if (item === doneEvent) { break; } - yield result.value; + + yield item as TSource; } } } @@ -45,9 +47,7 @@ export class TakeUntilAsyncIterable extends AsyncIterableX { export function takeUntil( other: (signal?: AbortSignal) => Promise ): MonoTypeOperatorAsyncFunction { - return function takeUntilOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new TakeUntilAsyncIterable(source, other); + return function takeUntilOperatorFunction(source) { + return new TakeUntilAsyncIterable(source, other); }; } diff --git a/src/asynciterable/operators/takewhile.ts b/src/asynciterable/operators/takewhile.ts index 1b594bf9..21decb60 100644 --- a/src/asynciterable/operators/takewhile.ts +++ b/src/asynciterable/operators/takewhile.ts @@ -23,11 +23,13 @@ export class TakeWhileAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + let i = 0; for await (const item of wrapWithAbort(this._source, signal)) { if (!(await this._predicate(item, i++, signal))) { break; } + yield item; } } @@ -45,6 +47,7 @@ export class TakeWhileAsyncIterable extends AsyncIterableX { export function takeWhile( predicate: (value: T, index: number, signal?: AbortSignal) => value is S ): OperatorAsyncFunction; + /** * Returns elements from an async-iterable sequence as long as a specified condition is true. * @@ -56,6 +59,7 @@ export function takeWhile( export function takeWhile( predicate: (value: T, index: number, signal?: AbortSignal) => boolean | Promise ): OperatorAsyncFunction; + /** * Returns elements from an async-iterable sequence as long as a specified condition is true. * @@ -67,7 +71,7 @@ export function takeWhile( export function takeWhile( predicate: (value: T, index: number, signal?: AbortSignal) => boolean | Promise ): OperatorAsyncFunction { - return function takeWhileOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new TakeWhileAsyncIterable(source, predicate); + return function takeWhileOperatorFunction(source) { + return new TakeWhileAsyncIterable(source, predicate); }; } diff --git a/src/asynciterable/operators/tap.ts b/src/asynciterable/operators/tap.ts index e7a24356..d645fe9c 100644 --- a/src/asynciterable/operators/tap.ts +++ b/src/asynciterable/operators/tap.ts @@ -3,7 +3,7 @@ import { PartialAsyncObserver } from '../../observer.js'; import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; import { toObserver } from '../../util/toobserver.js'; import { AbortError, throwIfAborted } from '../../aborterror.js'; -import { returnAsyncIterator } from '../../util/returniterator.js'; +import { wrapWithAbort } from './withabort.js'; /** @ignore */ export class TapAsyncIterable extends AsyncIterableX { @@ -18,25 +18,21 @@ export class TapAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const obs = this._observer; - const it = this._source[Symbol.asyncIterator](signal); + try { - for (let res: IteratorResult; !(res = await it.next()).done; ) { - if (obs.next) { - await obs.next(res.value); - } - yield res.value; - } - if (obs.complete) { - await obs.complete(); + for await (const item of wrapWithAbort(this._source, signal)) { + await this._observer.next?.(item); + + yield item; } + + await this._observer.complete?.(); } catch (e) { - if (!(e instanceof AbortError) && obs.error) { - await obs.error(e); + if (!(e instanceof AbortError)) { + await this._observer.error?.(e); } + throw e; - } finally { - await returnAsyncIterator(it); } } } @@ -88,7 +84,7 @@ export function tap( error?: ((err: any) => any) | null, complete?: (() => any) | null ): MonoTypeOperatorAsyncFunction { - return function tapOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new TapAsyncIterable(source, toObserver(observerOrNext, error, complete)); + return function tapOperatorFunction(source) { + return new TapAsyncIterable(source, toObserver(observerOrNext, error, complete)); }; } diff --git a/src/asynciterable/operators/throttle.ts b/src/asynciterable/operators/throttle.ts index f98e10d2..38564914 100644 --- a/src/asynciterable/operators/throttle.ts +++ b/src/asynciterable/operators/throttle.ts @@ -16,12 +16,14 @@ export class ThrottleAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - let currentTime; - let previousTime; + + let last = -Infinity; + for await (const item of wrapWithAbort(this._source, signal)) { - currentTime = Date.now(); - if (!previousTime || currentTime - previousTime > this._time) { - previousTime = currentTime; + const now = Date.now(); + + if (now - last > this._time) { + last = now; yield item; } } @@ -36,9 +38,7 @@ export class ThrottleAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} The source sequence throttled by the given timeframe. */ export function throttle(time: number): MonoTypeOperatorAsyncFunction { - return function throttleOperatorFunction( - source: AsyncIterable - ): AsyncIterableX { - return new ThrottleAsyncIterable(source, time); + return function throttleOperatorFunction(source) { + return new ThrottleAsyncIterable(source, time); }; } diff --git a/src/asynciterable/operators/timeinterval.ts b/src/asynciterable/operators/timeinterval.ts index 79d1dd60..2f733291 100644 --- a/src/asynciterable/operators/timeinterval.ts +++ b/src/asynciterable/operators/timeinterval.ts @@ -20,11 +20,14 @@ export class TimeIntervalAsyncIterable extends AsyncIterableX extends AsyncIterableX(): OperatorAsyncFunction> { - return function timeIntervalOperatorFunction( - source: AsyncIterable - ): AsyncIterableX> { - return new TimeIntervalAsyncIterable(source); + return function timeIntervalOperatorFunction(source) { + return new TimeIntervalAsyncIterable(source); }; } diff --git a/src/asynciterable/operators/timeout.ts b/src/asynciterable/operators/timeout.ts index 8a23493e..48b0c575 100644 --- a/src/asynciterable/operators/timeout.ts +++ b/src/asynciterable/operators/timeout.ts @@ -5,7 +5,6 @@ import { wrapWithAbort } from './withabort.js'; import { throwIfAborted } from '../../aborterror.js'; import { isObject } from '../../util/isiterable.js'; import { safeRace } from '../../util/safeRace.js'; -import { returnAsyncIterator } from '../../util/returniterator.js'; /** @ignore */ export class TimeoutError extends Error { @@ -53,7 +52,9 @@ export class TimeoutAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator](); + try { while (1) { const { type, value } = await safeRace>([ @@ -75,7 +76,7 @@ export class TimeoutAsyncIterable extends AsyncIterableX { yield value.value; } } finally { - await returnAsyncIterator(it); + await it?.return?.(); } } } @@ -89,7 +90,7 @@ export class TimeoutAsyncIterable extends AsyncIterableX { * @returns {MonoTypeOperatorAsyncFunction} The source sequence with a TimeoutError in case of a timeout. */ export function timeout(dueTime: number): MonoTypeOperatorAsyncFunction { - return function timeoutOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new TimeoutAsyncIterable(source, dueTime); + return function timeoutOperatorFunction(source) { + return new TimeoutAsyncIterable(source, dueTime); }; } diff --git a/src/asynciterable/operators/timestamp.ts b/src/asynciterable/operators/timestamp.ts index f69f436d..aeb01860 100644 --- a/src/asynciterable/operators/timestamp.ts +++ b/src/asynciterable/operators/timestamp.ts @@ -20,6 +20,7 @@ export class TimestampAsyncIterable extends AsyncIterableX extends AsyncIterableX>} An async-iterable sequence with timestamp information on elements. */ export function timestamp(): OperatorAsyncFunction> { - return function timestampOperatorFunction( - source: AsyncIterable - ): AsyncIterableX> { - return new TimestampAsyncIterable(source); + return function timestampOperatorFunction(source) { + return new TimestampAsyncIterable(source); }; } diff --git a/src/asynciterable/operators/union.ts b/src/asynciterable/operators/union.ts index 196b5882..c8206eb4 100644 --- a/src/asynciterable/operators/union.ts +++ b/src/asynciterable/operators/union.ts @@ -24,7 +24,9 @@ export class UnionAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const map = [] as TSource[]; + + const map: TSource[] = []; + for await (const lItem of wrapWithAbort(this._left, signal)) { if ((await arrayIndexOfAsync(map, lItem, this._comparer)) === -1) { map.push(lItem); @@ -54,7 +56,7 @@ export function union( right: AsyncIterable, comparer: (x: TSource, y: TSource) => boolean | Promise = comparerAsync ): MonoTypeOperatorAsyncFunction { - return function unionOperatorFunction(left: AsyncIterable): AsyncIterableX { - return new UnionAsyncIterable(left, right, comparer); + return function unionOperatorFunction(left) { + return new UnionAsyncIterable(left, right, comparer); }; } diff --git a/src/asynciterable/operators/withlatestfrom.ts b/src/asynciterable/operators/withlatestfrom.ts index 87183e24..2816576c 100644 --- a/src/asynciterable/operators/withlatestfrom.ts +++ b/src/asynciterable/operators/withlatestfrom.ts @@ -4,9 +4,10 @@ import { wrapWithAbort } from './withabort.js'; import { throwIfAborted } from '../../aborterror.js'; import { identity } from '../../util/identity.js'; import { safeRace } from '../../util/safeRace.js'; +import { returnAsyncIterators } from '../../util/returniterator.js'; // eslint-disable-next-line @typescript-eslint/no-empty-function -const NEVER_PROMISE = new Promise(() => {}); +const NEVER_PROMISE = new Promise(() => {}); type MergeResult = { value: T; index: number }; @@ -28,54 +29,56 @@ export class WithLatestFromAsyncIterable extends AsyncIterableX>(newLength); - const nexts = new Array>>>(newLength); + const othersLength = this._others.length; + // The last iterator is the source + const iterators = new Array>(othersLength + 1); + const nexts = new Array>>>(othersLength + 1); let hasValueAll = false; - const hasValue = new Array(length); - const values = new Array(length); - hasValue.fill(false); + const values = new Array(othersLength); + const hasValue = new Array(othersLength).fill(false); - for (let i = 0; i < length; i++) { + for (let i = 0; i < othersLength; i++) { const iterator = wrapWithAbort(this._others[i], signal)[Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator](); - iterators[length] = it; - nexts[length] = wrapPromiseWithIndex(it.next(), length); - - for (;;) { - const next = safeRace(nexts); - const { - value: { value: value$, done: done$ }, - index, - } = await next; - - if (index === length) { - if (done$) { - break; + iterators[othersLength] = it; + nexts[othersLength] = wrapPromiseWithIndex(it.next(), othersLength); + + try { + while (1) { + const { + value: { value, done }, + index, + } = await safeRace(nexts); + + if (index === othersLength) { + if (done) { + break; + } + + nexts[index] = wrapPromiseWithIndex(iterators[index].next(), index); + + if (hasValueAll) { + yield [value, ...values]; + } + } else { + if (done) { + nexts[index] = NEVER_PROMISE; + } else { + values[index] = value; + hasValue[index] = true; + hasValueAll = hasValueAll || hasValue.every(identity); + + nexts[index] = wrapPromiseWithIndex(iterators[index].next(), index); + } } - - const iterator$ = iterators[index]; - nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); - - if (hasValueAll) { - yield [value$, ...values]; - } - } else if (done$) { - nexts[index] = >>>NEVER_PROMISE; - } else { - values[index] = value$; - hasValue[index] = true; - hasValueAll = hasValue.every(identity); - - const iterator$ = iterators[index]; - nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); } + } finally { + await returnAsyncIterators(iterators); } } } @@ -93,6 +96,7 @@ export class WithLatestFromAsyncIterable extends AsyncIterableX( source2: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining each element * from the first source with the latest element from the other sources, if any. @@ -109,6 +113,7 @@ export function withLatestFrom( source2: AsyncIterable, source3: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining each element * from the first source with the latest element from the other sources, if any. @@ -128,6 +133,7 @@ export function withLatestFrom( source3: AsyncIterable, source4: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining each element * from the first source with the latest element from the other sources, if any. @@ -150,6 +156,7 @@ export function withLatestFrom( source4: AsyncIterable, source5: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining each element * from the first source with the latest element from the other sources, if any. @@ -187,8 +194,8 @@ export function withLatestFrom( */ export function withLatestFrom(...sources: AsyncIterable[]): OperatorAsyncFunction; -export function withLatestFrom(...sources: any[]): OperatorAsyncFunction { - return function withLatestFromOperatorFunction(source: AsyncIterable) { - return new WithLatestFromAsyncIterable(source, sources); +export function withLatestFrom(...sources: AsyncIterable[]): OperatorAsyncFunction { + return function withLatestFromOperatorFunction(source) { + return new WithLatestFromAsyncIterable(source, sources); }; } diff --git a/src/asynciterable/operators/zipwith.ts b/src/asynciterable/operators/zipwith.ts index 8f48da32..38fbefc1 100644 --- a/src/asynciterable/operators/zipwith.ts +++ b/src/asynciterable/operators/zipwith.ts @@ -10,6 +10,7 @@ import { ZipAsyncIterable } from '../zip.js'; * @returns {OperatorAsyncFunction} Async iterable with an array of each element from the source sequences in a pairwise fashion. */ export function zipWith(source2: AsyncIterable): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining their elements in a pairwise fashion. * @@ -24,6 +25,7 @@ export function zipWith( source2: AsyncIterable, source3: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining their elements in a pairwise fashion. * @@ -41,6 +43,7 @@ export function zipWith( source3: AsyncIterable, source4: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining their elements in a pairwise fashion. * @@ -62,6 +65,7 @@ export function zipWith( source4: AsyncIterable, source5: AsyncIterable ): OperatorAsyncFunction; + /** * Merges multiple async-iterable sequences into one async-iterable sequence by combining their elements in a pairwise fashion. * @@ -95,8 +99,8 @@ export function zipWith( * @returns {AsyncIterableX} Async iterable with an array of each element from the source sequences in a pairwise fashion. */ export function zipWith(...sources: AsyncIterable[]): OperatorAsyncFunction; -export function zipWith(...sources: any[]): OperatorAsyncFunction { - return function zipWithOperatorFunction(source: AsyncIterable) { - return new ZipAsyncIterable([source, ...sources]); +export function zipWith(...sources: AsyncIterable[]): OperatorAsyncFunction { + return function zipWithOperatorFunction(source) { + return new ZipAsyncIterable([source, ...sources]); }; } diff --git a/src/asynciterable/race.ts b/src/asynciterable/race.ts index 80137a24..160e13ed 100644 --- a/src/asynciterable/race.ts +++ b/src/asynciterable/race.ts @@ -2,11 +2,12 @@ import { AsyncIterableX } from './asynciterablex.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; import { safeRace } from '../util/safeRace.js'; +import { returnAsyncIterators } from '../util/returniterator.js'; type MergeResult = { value: T; index: number }; -function wrapPromiseWithIndex(promise: Promise, index: number) { - return promise.then((value) => ({ value, index })) as Promise>; +function wrapPromiseWithIndex(promise: Promise, index: number): Promise> { + return promise.then((value) => ({ value, index })); } /** @ignore */ @@ -20,43 +21,31 @@ export class RaceAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const sources = this._sources; - const length = sources.length; + const length = this._sources.length; const iterators = new Array>(length); const nexts = new Array>>>(length); for (let i = 0; i < length; i++) { - const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator](); + const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } - const next = safeRace(nexts); - const { value: next$, index } = await next; + const { + value: { value, done }, + index, + } = await safeRace(nexts); - if (!next$.done) { - yield next$.value; + if (!done) { + yield value; } - const iterator$ = iterators[index]; + await returnAsyncIterators(iterators.filter((_, i) => i !== index)); - // Cancel/finish other iterators - for (let i = 0; i < length; i++) { - if (i === index) { - continue; - } - - const otherIterator = iterators[i]; - if (otherIterator.return) { - otherIterator.return(); - } - } - - let nextItem; - while (!(nextItem = await iterator$.next()).done) { - yield nextItem.value; - } + yield* { + [Symbol.asyncIterator]: () => iterators[index], + }; } } @@ -67,5 +56,5 @@ export class RaceAsyncIterable extends AsyncIterableX { * @return {AsyncIterable} An async sequence that surfaces either of the given sequences, whichever reacted first. */ export function race(...sources: AsyncIterable[]): AsyncIterableX { - return new RaceAsyncIterable(sources); + return new RaceAsyncIterable(sources); } diff --git a/src/asynciterable/range.ts b/src/asynciterable/range.ts index ee44332b..c3f42c2b 100644 --- a/src/asynciterable/range.ts +++ b/src/asynciterable/range.ts @@ -13,6 +13,7 @@ class RangeAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + for (let current = this._start, end = this._start + this._count; current < end; current++) { yield current; } diff --git a/src/asynciterable/reduce.ts b/src/asynciterable/reduce.ts index e628df66..ac1305ea 100644 --- a/src/asynciterable/reduce.ts +++ b/src/asynciterable/reduce.ts @@ -7,35 +7,38 @@ import { throwIfAborted } from '../aborterror.js'; * single element in the result sequence. The seed value, if specified, is used as the initial accumulator value. * For aggregation behavior with incremental intermediate results, scan. * - * @template T The type of the elements in the source sequence. - * @template R The type of the result of the aggregation. - * @param {AsyncIterable} source An async-iterable sequence to aggregate over. - * @param {ReduceOptions} options The options which contains a callback, with optional seedn and an optional abort signal for cancellation. - * @returns {Promise} A promise with the final accumulator value. + * @template TSource The type of the elements in the source sequence. + * @template TResult The type of the result of the aggregation. + * @param {AsyncIterable} source An async-iterable sequence to aggregate over. + * @param {ReduceOptions} options The options which contains a callback, with optional seed and an optional abort signal for cancellation. + * @returns {Promise} A promise with the final accumulator value. */ -export async function reduce( - source: AsyncIterable, - options: ReduceOptions -): Promise { +export async function reduce( + source: AsyncIterable, + options: ReduceOptions +): Promise { const { ['seed']: seed, ['signal']: signal, ['callback']: callback } = options; - const hasSeed = options.hasOwnProperty('seed'); + throwIfAborted(signal); + + let hasValue = options.hasOwnProperty('seed'); + let acc = seed; + let i = 0; - let hasValue = false; - let acc = seed as T | R; for await (const item of wrapWithAbort(source, signal)) { - if (hasValue || (hasValue = hasSeed)) { - acc = await callback(acc, item, i++, signal); - } else { - acc = item; + if (!hasValue) { + acc = (item as unknown) as TResult; hasValue = true; - i++; + } else { + acc = await callback(acc as TResult, item, i, signal); } + + i++; } - if (!(hasSeed || hasValue)) { + if (!hasValue) { throw new Error('Sequence contains no elements'); } - return acc as R; + return acc as TResult; } diff --git a/src/asynciterable/reduceright.ts b/src/asynciterable/reduceright.ts index fbb46198..c0219d24 100644 --- a/src/asynciterable/reduceright.ts +++ b/src/asynciterable/reduceright.ts @@ -7,35 +7,39 @@ import { throwIfAborted } from '../aborterror.js'; * single element in the result sequence. The seed value, if specified, is used as the initial accumulator value. * For aggregation behavior with incremental intermediate results, scan. * - * @template T The type of the elements in the source sequence. - * @template R The type of the result of the aggregation. - * @param {AsyncIterable} source An async-iterable sequence to aggregate over from the right. - * @param {ReduceOptions} options The options which contains a callback, with optional seed and an optional abort signal for cancellation. - * @returns {Promise} A promise with the final accumulator value. + * @template TSource The type of the elements in the source sequence. + * @template TResult The type of the result of the aggregation. + * @param {AsyncIterable} source An async-iterable sequence to aggregate over from the right. + * @param {ReduceOptions} options The options which contains a callback, with optional seed and an optional abort signal for cancellation. + * @returns {Promise} A promise with the final accumulator value. */ -export async function reduceRight( - source: AsyncIterable, - options: ReduceOptions -): Promise { +export async function reduceRight( + source: AsyncIterable, + options: ReduceOptions +): Promise { const { ['seed']: seed, ['signal']: signal, ['callback']: callback } = options; - const hasSeed = options.hasOwnProperty('seed'); + throwIfAborted(signal); + const array = await toArray(source, signal); - let hasValue = false; - let acc = seed as T | R; + + let hasValue = options.hasOwnProperty('seed'); + let acc = seed; + for (let offset = array.length - 1; offset >= 0; offset--) { const item = array[offset]; - if (hasValue || (hasValue = hasSeed)) { - acc = await callback(acc, item, offset, signal); - } else { - acc = item; + + if (!hasValue) { + acc = (item as unknown) as TResult; hasValue = true; + } else { + acc = await callback(acc as TResult, item, offset, signal); } } - if (!(hasSeed || hasValue)) { + if (!hasValue) { throw new Error('Sequence contains no elements'); } - return acc as R; + return acc as TResult; } diff --git a/src/asynciterable/repeatvalue.ts b/src/asynciterable/repeatvalue.ts index 882c1fa8..a52aecff 100644 --- a/src/asynciterable/repeatvalue.ts +++ b/src/asynciterable/repeatvalue.ts @@ -36,5 +36,5 @@ export class RepeatValueAsyncIterable extends AsyncIterableX { * @returns {AsyncIterableX} An async-iterable with a single item that is repeated over the specified times. */ export function repeatValue(value: TSource, count = -1): AsyncIterableX { - return new RepeatValueAsyncIterable(value, count); + return new RepeatValueAsyncIterable(value, count); } diff --git a/src/asynciterable/sequenceequal.ts b/src/asynciterable/sequenceequal.ts index a19ba286..7c26a1b6 100644 --- a/src/asynciterable/sequenceequal.ts +++ b/src/asynciterable/sequenceequal.ts @@ -1,6 +1,7 @@ import { comparerAsync } from '../util/comparer.js'; import { wrapWithAbort } from './operators/withabort.js'; import { throwIfAborted } from '../aborterror.js'; +import { returnAsyncIterators } from '../util/returniterator.js'; /** * The options for sequence equal operations including a comparer and abort signal @@ -16,6 +17,7 @@ export interface SequencEqualOptions { * @memberof SequencEqualOptions */ comparer?: (first: T, second: T) => boolean | Promise; + /** * An optional abort signal to cancel the operation at any time. * @@ -41,16 +43,24 @@ export async function sequenceEqual( options?: SequencEqualOptions ): Promise { const { ['comparer']: comparer = comparerAsync, ['signal']: signal } = options || {}; + throwIfAborted(signal); + const it1 = wrapWithAbort(source, signal)[Symbol.asyncIterator](); const it2 = wrapWithAbort(other, signal)[Symbol.asyncIterator](); let next1: IteratorResult; let next2: IteratorResult; - while (!(next1 = await it1.next()).done) { - if (!(!(next2 = await it2.next()).done && (await comparer(next1.value, next2.value)))) { - return false; + + try { + while (!(next1 = await it1.next()).done) { + if (!(!(next2 = await it2.next()).done && (await comparer(next1.value, next2.value)))) { + return false; + } } - } - return !!(await it2.next()).done; + // Place return inside try block to ensure iterators are returned after final .next() call + return !!(await it2.next()).done; + } finally { + await returnAsyncIterators([it1, it2]); + } } diff --git a/src/asynciterable/single.ts b/src/asynciterable/single.ts index 146d5b2c..056abb59 100644 --- a/src/asynciterable/single.ts +++ b/src/asynciterable/single.ts @@ -20,15 +20,19 @@ export async function single( ): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } = options || {}; + throwIfAborted(signal); + let result: T | undefined; let hasResult = false; let i = 0; + for await (const item of wrapWithAbort(source, signal)) { - if (hasResult && (await predicate!.call(thisArg, item, i++, signal))) { + if (hasResult && (await predicate.call(thisArg, item, i, signal))) { throw new Error('More than one element was found'); } - if (await predicate!.call(thisArg, item, i++, signal)) { + + if (await predicate.call(thisArg, item, i++, signal)) { result = item; hasResult = true; } diff --git a/src/asynciterable/some.ts b/src/asynciterable/some.ts index 4be6c5df..8439d3c9 100644 --- a/src/asynciterable/some.ts +++ b/src/asynciterable/some.ts @@ -14,12 +14,15 @@ import { FindOptions } from './findoptions.js'; */ export async function some(source: AsyncIterable, options: FindOptions): Promise { const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options; + throwIfAborted(signal); + let i = 0; for await (const item of wrapWithAbort(source, signal)) { if (await predicate.call(thisArg, item, i++, signal)) { return true; } } + return false; } diff --git a/src/asynciterable/sum.ts b/src/asynciterable/sum.ts index b906b8e6..065ab890 100644 --- a/src/asynciterable/sum.ts +++ b/src/asynciterable/sum.ts @@ -36,7 +36,9 @@ export async function sum(source: AsyncIterable, options?: MathOptions ['signal']: signal, ['thisArg']: thisArg, } = options || {}; + throwIfAborted(signal); + let value = 0; for await (const item of wrapWithAbort(source, signal)) { value += await selector.call(thisArg, item, signal); diff --git a/src/asynciterable/throwerror.ts b/src/asynciterable/throwerror.ts index 5bc6981a..ca3377f5 100644 --- a/src/asynciterable/throwerror.ts +++ b/src/asynciterable/throwerror.ts @@ -11,6 +11,7 @@ class ThrowAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterator { throwIfAborted(signal); + throw this._error; } } diff --git a/src/asynciterable/toarray.ts b/src/asynciterable/toarray.ts index c336ef47..4f47245f 100644 --- a/src/asynciterable/toarray.ts +++ b/src/asynciterable/toarray.ts @@ -14,9 +14,12 @@ export async function toArray( signal?: AbortSignal ): Promise { throwIfAborted(signal); + const results = [] as TSource[]; + for await (const item of wrapWithAbort(source, signal)) { results.push(item); } + return results; } diff --git a/src/asynciterable/tomap.ts b/src/asynciterable/tomap.ts index 1d2cd65d..90ada472 100644 --- a/src/asynciterable/tomap.ts +++ b/src/asynciterable/tomap.ts @@ -17,12 +17,14 @@ export interface ToMapOptions { * @memberof ToMapOptions */ keySelector: (item: TSource, signal?: AbortSignal) => TKey | Promise; + /** * The selector used to get the element for the Map. * * @memberof ToMapOptions */ elementSelector?: (item: TSource, signal?: AbortSignal) => TElement | Promise; + /** * An optional abort signal to cancel the operation at any time. * @@ -48,15 +50,18 @@ export async function toMap( ): Promise> { const { ['signal']: signal, - ['elementSelector']: elementSelector = identityAsync as any, - ['keySelector']: keySelector = identityAsync as any, + ['elementSelector']: elementSelector = identityAsync, + ['keySelector']: keySelector = identityAsync, } = options || {}; + throwIfAborted(signal); + const map = new Map(); for await (const item of wrapWithAbort(source, signal)) { - const value = await elementSelector!(item, signal); + const value = await elementSelector(item, signal); const key = await keySelector(item, signal); map.set(key, value); } + return map; } diff --git a/src/asynciterable/toobservable.ts b/src/asynciterable/toobservable.ts index 24126fa0..d8375b81 100644 --- a/src/asynciterable/toobservable.ts +++ b/src/asynciterable/toobservable.ts @@ -65,5 +65,5 @@ class AsyncIterableObservable implements Observable { * @returns {Observable} The observable containing the elements from the async-iterable. */ export function toObservable(source: AsyncIterable): Observable { - return new AsyncIterableObservable(source); + return new AsyncIterableObservable(source); } diff --git a/src/asynciterable/toset.ts b/src/asynciterable/toset.ts index e08d9bcd..3c4b1255 100644 --- a/src/asynciterable/toset.ts +++ b/src/asynciterable/toset.ts @@ -14,9 +14,11 @@ export async function toSet( signal?: AbortSignal ): Promise> { throwIfAborted(signal); + const set = new Set(); for await (const item of wrapWithAbort(source, signal)) { set.add(item); } + return set; } diff --git a/src/asynciterable/whiledo.ts b/src/asynciterable/whiledo.ts index 9cb90f64..4ec5c417 100644 --- a/src/asynciterable/whiledo.ts +++ b/src/asynciterable/whiledo.ts @@ -17,10 +17,9 @@ class WhileAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); + while (await this._condition(signal)) { - for await (const item of wrapWithAbort(this._source, signal)) { - yield item; - } + yield* wrapWithAbort(this._source, signal); } } } @@ -38,5 +37,5 @@ export function whileDo( source: AsyncIterable, condition: (signal?: AbortSignal) => boolean | Promise ): AsyncIterableX { - return new WhileAsyncIterable(condition, source); + return new WhileAsyncIterable(condition, source); } diff --git a/src/asynciterable/zip.ts b/src/asynciterable/zip.ts index 7271aaf5..425b8af5 100644 --- a/src/asynciterable/zip.ts +++ b/src/asynciterable/zip.ts @@ -1,7 +1,7 @@ import { wrapWithAbort } from './operators/withabort.js'; import { AsyncIterableX } from './asynciterablex.js'; -import { returnAsyncIterator } from '../util/returniterator.js'; import { throwIfAborted } from '../aborterror.js'; +import { returnAsyncIterators } from '../util/returniterator.js'; /** @ignore */ export class ZipAsyncIterable extends AsyncIterableX { @@ -15,19 +15,25 @@ export class ZipAsyncIterable extends AsyncIterableX { // eslint-disable-next-line consistent-return async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterableIterator { throwIfAborted(signal); - const sourcesLength = this._sources.length; - const its = this._sources.map((x) => wrapWithAbort(x, signal)[Symbol.asyncIterator]()); - while (sourcesLength > 0) { - const values = new Array(sourcesLength); - for (let i = -1; ++i < sourcesLength; ) { - const { value, done } = await its[i].next(); - if (done) { - await Promise.all(its.map(returnAsyncIterator)); - return undefined; + + if (this._sources.length === 0) { + return; + } + + const iterators = this._sources.map((x) => wrapWithAbort(x, signal)[Symbol.asyncIterator]()); + + try { + while (1) { + const results = await Promise.all(iterators.map((x) => x.next())); + + if (results.some(({ done }) => done)) { + return; } - values[i] = value; + + yield results.map(({ value }) => value); } - yield values; + } finally { + await returnAsyncIterators(iterators); } } } @@ -137,5 +143,5 @@ export function zip( */ export function zip(...sources: AsyncIterable[]): AsyncIterableX; export function zip(...sources: any[]): AsyncIterableX { - return new ZipAsyncIterable(sources); + return new ZipAsyncIterable(sources); } diff --git a/src/util/returniterator.ts b/src/util/returniterator.ts index 6f34b9a5..6b8f965f 100644 --- a/src/util/returniterator.ts +++ b/src/util/returniterator.ts @@ -10,8 +10,11 @@ export function returnIterator(it: Iterator) { /** * @ignore */ -export async function returnAsyncIterator(it: AsyncIterator): Promise { - if (typeof it?.return === 'function') { - await it.return(); +export async function returnAsyncIterators(iterators: AsyncIterator[]): Promise { + for (const iterator of iterators) { + // The other generators may not be suspended (executing but stuck in an await instead), so awaiting + // a return call may not do anything. Instead, we need to cancel the + // TODO: Send a signal to the other iterators to stop + void iterator.return?.(); } }