Skip to content

Commit

Permalink
fix(asynciterable): use more yield
Browse files Browse the repository at this point in the history
  • Loading branch information
jeengbe committed Jan 22, 2025
1 parent 7221be2 commit 0883092
Show file tree
Hide file tree
Showing 110 changed files with 788 additions and 786 deletions.
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/finalize-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { hasNext, hasErr, noNext } from '../asynciterablehelpers.js';
import { range, throwError } from 'ix/asynciterable/index.js';
import { flatMap, finalize, tap } from 'ix/asynciterable/operators/index.js';
import { finalize, tap, concatMap } from 'ix/asynciterable/operators/index.js';

test('AsyncIterable#finalize defers behavior', async () => {
let done = false;
Expand Down Expand Up @@ -79,7 +79,7 @@ test('AsyncIterable#finalize calls with downstream error from flattening', async
finalize(async () => {
done = true;
}),
flatMap(async (x) => {
concatMap(async (x) => {
// srcValues.push(x);
if (x === 1) {
return throwError(err);
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable-operators/mergeall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export function mergeAllProto<T>(
this: AsyncIterableX<AsyncIterable<T>>,
concurrent = Infinity
): AsyncIterableX<T> {
return mergeAll(concurrent)(this);
return mergeAll<T>(concurrent)(this);
}

AsyncIterableX.prototype.mergeAll = mergeAllProto;
Expand Down
40 changes: 21 additions & 19 deletions src/asynciterable/_extremaby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,31 @@ export async function extremaBy<TSource, TKey>(
): Promise<TSource[]> {
throwIfAborted(signal);

let result = [];
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();
const { value, done } = await it.next();
if (done) {
throw new Error('Sequence contains no elements');
}

let resKey = await selector(value, signal);
result.push(value);
let hasValue = false;
let key: TKey | undefined;
let result: TSource[] = [];

let next: IteratorResult<TSource>;
while (!(next = await it.next()).done) {
const current = next.value;
const key = await selector(current, signal);
const cmp = await comparer(key, resKey, signal);
for await (const item of wrapWithAbort(source, signal)) {
if (!hasValue) {
key = await selector(item, signal);
result.push(item);
hasValue = true;
} else {
const currentKey = await selector(item, signal);
const cmp = await comparer(currentKey, key as TKey, signal);

if (cmp === 0) {
result.push(current);
} else if (cmp > 0) {
result = [current];
resKey = key;
if (cmp === 0) {
result.push(item);
} else if (cmp > 0) {
result = [item];
key = currentKey;
}
}
}

if (!hasValue) {
throw new Error('Sequence contains no elements');
}

return result;
}
3 changes: 1 addition & 2 deletions src/asynciterable/asynciterablex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ export class FromPromiseIterable<TSource, TResult = TSource> extends AsyncIterab
}

async *[Symbol.asyncIterator]() {
const item = await this._source;
yield await this._selector(item, 0);
yield await this._selector(await this._source, 0);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/asynciterable/average.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ export async function average(
['signal']: signal,
['thisArg']: thisArg,
} = options || {};

throwIfAborted(signal);

let sum = 0;
let count = 0;

for await (const item of wrapWithAbort(source, signal)) {
sum += await selector.call(thisArg, item, signal);
count++;
Expand Down
30 changes: 7 additions & 23 deletions src/asynciterable/catcherror.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { AsyncIterableX } from './asynciterablex.js';
import { returnAsyncIterator } from '../util/returniterator.js';
import { wrapWithAbort } from './operators/withabort.js';
import { throwIfAborted } from '../aborterror.js';

Expand All @@ -19,29 +18,14 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
let hasError = false;

for (const source of this._source) {
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();

error = null;
hasError = false;

while (1) {
let c = <TSource>{};

try {
const { done, value } = await it.next();
if (done) {
await returnAsyncIterator(it);
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
await returnAsyncIterator(it);
break;
}

yield c;
try {
yield* wrapWithAbort(source, signal);
} catch (e) {
error = e;
hasError = true;
}

if (!hasError) {
Expand All @@ -64,7 +48,7 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
* sequences until a source sequence terminates successfully.
*/
export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<T> {
return new CatchAllAsyncIterable<T>(source);
return new CatchAllAsyncIterable(source);
}

/**
Expand All @@ -76,5 +60,5 @@ export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<
* sequences until a source sequence terminates successfully.
*/
export function catchError<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
return new CatchAllAsyncIterable<T>(args);
return new CatchAllAsyncIterable(args);
}
50 changes: 27 additions & 23 deletions src/asynciterable/combinelatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import { identity } from '../util/identity.js';
import { wrapWithAbort } from './operators/withabort.js';
import { throwIfAborted } from '../aborterror.js';
import { safeRace } from '../util/safeRace.js';
import { returnAsyncIterators } from '../util/returniterator.js';

// eslint-disable-next-line @typescript-eslint/no-empty-function
const NEVER_PROMISE = new Promise(() => {});
const NEVER_PROMISE = new Promise<never>(() => {});

type MergeResult<T> = { value: T; index: number };

Expand All @@ -28,39 +29,42 @@ export class CombineLatestAsyncIterable<TSource> extends AsyncIterableX<TSource[
const length = this._sources.length;
const iterators = new Array<AsyncIterator<TSource>>(length);
const nexts = new Array<Promise<MergeResult<IteratorResult<TSource>>>>(length);
let hasValueAll = false;
const values = new Array<TSource>(length);
const hasValues = new Array<boolean>(length);
let active = length;

hasValues.fill(false);
let active = length;
let allValuesAvailable = false;
const values = new Array<TSource>(length);
const hasValues = new Array<boolean>(length).fill(false);

for (let i = 0; i < length; i++) {
const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
}

while (active > 0) {
const next = safeRace(nexts);
const {
value: { value: value$, done: done$ },
index,
} = await next;
if (done$) {
nexts[index] = <Promise<MergeResult<IteratorResult<TSource>>>>NEVER_PROMISE;
active--;
} else {
values[index] = value$;
hasValues[index] = true;
try {
while (active > 0) {
const {
value: { value, done },
index,
} = await safeRace(nexts);

if (done) {
nexts[index] = NEVER_PROMISE;
active--;
} else {
values[index] = value;
hasValues[index] = true;
allValuesAvailable = allValuesAvailable || hasValues.every(identity);

const iterator$ = iterators[index];
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
nexts[index] = wrapPromiseWithIndex(iterators[index].next(), index);

if (hasValueAll || (hasValueAll = hasValues.every(identity))) {
yield values;
if (allValuesAvailable) {
yield values;
}
}
}
} finally {
await returnAsyncIterators(iterators);
}
}
}
Expand Down Expand Up @@ -176,5 +180,5 @@ export function combineLatest<T, T2, T3, T4, T5, T6>(
*/
export function combineLatest<T>(...sources: AsyncIterable<T>[]): AsyncIterableX<T[]>;
export function combineLatest<T>(...sources: any[]): AsyncIterableX<T[]> {
return new CombineLatestAsyncIterable<T>(sources);
return new CombineLatestAsyncIterable(sources);
}
9 changes: 4 additions & 5 deletions src/asynciterable/concat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ export class ConcatAsyncIterable<TSource> extends AsyncIterableX<TSource> {

async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);

for (const outer of this._source) {
for await (const item of wrapWithAbort(outer, signal)) {
yield item;
}
yield* wrapWithAbort(outer, signal);
}
}
}

export function _concatAll<TSource>(
source: Iterable<AsyncIterable<TSource>>
): AsyncIterableX<TSource> {
return new ConcatAsyncIterable<TSource>(source);
return new ConcatAsyncIterable(source);
}

/**
Expand Down Expand Up @@ -136,5 +135,5 @@ export function concat<T, T2, T3, T4, T5, T6>(
* @returns {AsyncIterableX<T>} An async-iterable sequence that contains the elements of each given sequence, in sequential order.
*/
export function concat<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
return new ConcatAsyncIterable<T>(args);
return new ConcatAsyncIterable(args);
}
3 changes: 2 additions & 1 deletion src/asynciterable/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ export async function count<T>(
): Promise<number> {
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } =
options || {};

throwIfAborted(signal);
let i = 0;

let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (await predicate.call(thisArg, item, i, signal)) {
i++;
Expand Down
9 changes: 5 additions & 4 deletions src/asynciterable/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ class AnonymousAsyncIterable<T> extends AsyncIterableX<T> {

async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);

const it = await this._fn(signal);
let next: IteratorResult<T> | undefined;
while (!(next = await it.next()).done) {
yield next.value;
}

yield* {
[Symbol.asyncIterator]: () => it,
};
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/asynciterable/defer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ class DeferAsyncIterable<TSource> extends AsyncIterableX<TSource> {

async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);
const items = await this._fn(signal);
for await (const item of wrapWithAbort(items, signal)) {
yield item;
}

yield* wrapWithAbort(await this._fn(signal), signal);
}
}

Expand All @@ -32,5 +30,5 @@ class DeferAsyncIterable<TSource> extends AsyncIterableX<TSource> {
export function defer<TSource>(
factory: (signal?: AbortSignal) => AsyncIterable<TSource> | Promise<AsyncIterable<TSource>>
): AsyncIterableX<TSource> {
return new DeferAsyncIterable<TSource>(factory);
return new DeferAsyncIterable(factory);
}
2 changes: 2 additions & 0 deletions src/asynciterable/elementat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ export async function elementAt<T>(
signal?: AbortSignal
): Promise<T | undefined> {
throwIfAborted(signal);

let i = index;
for await (const item of wrapWithAbort(source, signal)) {
if (i === 0) {
return item;
}
i--;
}

return undefined;
}
3 changes: 3 additions & 0 deletions src/asynciterable/every.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ export async function every<T>(
options: FindOptions<T>
): Promise<boolean> {
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options;

throwIfAborted(signal);

let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (!(await predicate.call(thisArg, item, i++, signal))) {
return false;
}
}

return true;
}
4 changes: 3 additions & 1 deletion src/asynciterable/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ export async function find<T>(
options: FindOptions<T>
): Promise<T | undefined> {
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options;

throwIfAborted(signal);
let i = 0;

let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (await predicate.call(thisArg, item, i++, signal)) {
return item;
}
}

return undefined;
}
4 changes: 3 additions & 1 deletion src/asynciterable/findindex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ export async function findIndex<T>(
options: FindOptions<T>
): Promise<number> {
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options;

throwIfAborted(signal);
let i = 0;

let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (await predicate.call(thisArg, item, i++, signal)) {
return i;
}
}

return -1;
}
4 changes: 3 additions & 1 deletion src/asynciterable/first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ export async function first<T>(
): Promise<T | undefined> {
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } =
options || {};

throwIfAborted(signal);

let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (await predicate!.call(thisArg, item, i++, signal)) {
if (await predicate.call(thisArg, item, i++, signal)) {
return item;
}
}
Expand Down
Loading

0 comments on commit 0883092

Please sign in to comment.