Skip to content

Commit

Permalink
Executor events
Browse files Browse the repository at this point in the history
  • Loading branch information
smikhalevski committed Mar 19, 2024
1 parent d237f00 commit 3b65805
Showing 1 changed file with 40 additions and 36 deletions.
76 changes: 40 additions & 36 deletions src/main/Executor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { AbortablePromise } from './AbortablePromise';
import { PubSub } from './PubSub';
import { AbortableCallback, Awaitable } from './types';
import { isEqual, isPromiseLike } from './utils';
import { isEqual, isPromiseLike, withSignal } from './utils';

type ExecutorEvent =
| { type: 'pending'; target: Executor }
| { type: 'cleared'; target: Executor }
| { type: 'aborted'; target: Executor }
| { type: 'invalidated'; target: Executor }
| { type: 'fulfilled'; target: Executor }
| { type: 'rejected'; target: Executor };

/**
* Manages async callback execution process and provides ways to access execution results, abort or replace an
Expand Down Expand Up @@ -54,12 +62,12 @@ export class Executor<T = any> {
return this.promise !== null;
}

private _pubSub = new PubSub();

/**
* The last callback passed to {@link execute}.
*/
private _executee: AbortableCallback<T> | null = null;
private _lastCallback: AbortableCallback<T> | null = null;

private _pubSub = new PubSub<ExecutorEvent>();

/**
* Instantly aborts pending execution (if any), marks executor as pending and invokes the callback.
Expand All @@ -71,27 +79,28 @@ export class Executor<T = any> {
* @returns The promise that is resolved with the result of the callback execution.
*/
execute(cb: AbortableCallback<T>): AbortablePromise<T> {
this._executee = cb;
this._lastCallback = cb;

const nextPromise = new AbortablePromise<T>((resolve, reject, signal) => {
const promise = new AbortablePromise<T>((resolve, reject, signal) => {
signal.addEventListener('abort', () => {
if (this.promise === nextPromise) {
this.abort();
if (this.promise === promise) {
this.promise = null;
this._pubSub.publish({ type: 'aborted', target: this });
}
});

new Promise<T>(resolve => {
resolve(cb(signal));
resolve(withSignal(cb(signal), signal));
}).then(
value => {
if (this.promise === nextPromise) {
if (this.promise === promise) {
this.promise = null;
this.resolve(value);
}
resolve(value);
},
reason => {
if (this.promise === nextPromise) {
if (this.promise === promise) {
this.promise = null;
this.reject(reason);
}
Expand All @@ -102,15 +111,25 @@ export class Executor<T = any> {

const prevPromise = this.promise;

this.promise = nextPromise;
this.promise = promise;

if (prevPromise !== null) {
prevPromise.abort();
}

this._pubSub.publish();
this._pubSub.publish({ type: 'pending', target: this });

return promise;
}

return nextPromise;
/**
* If
*/
retry(): this {
if (this._lastCallback !== null) {
this.execute(this._lastCallback);
}
return this;
}

/**
Expand All @@ -129,7 +148,7 @@ export class Executor<T = any> {
if (this.isSettled) {
this.isFulfilled = this.isRejected = this.isInvalidated = false;
this.value = this.reason = undefined;
this._pubSub.publish();
this._pubSub.publish({ type: 'cleared', target: this });
}
return this;
}
Expand All @@ -141,13 +160,7 @@ export class Executor<T = any> {
* @param reason The abort reason passed to the pending promise.
*/
abort(reason?: unknown): this {
const promise = this.promise;

if (promise !== null) {
this.promise = null;
promise.abort(reason);
this._pubSub.publish();
}
this.promise?.abort(reason);
return this;
}

Expand All @@ -156,14 +169,8 @@ export class Executor<T = any> {
* to {@link execute} is executed again and pending execution is aborted.
*/
invalidate(): this {
if (!this.isSettled || this.isInvalidated) {
return this;
}

this.isInvalidated = true;

if (this._executee !== null) {
this.execute(this._executee);
if (this.isInvalidated !== (this.isInvalidated = this.isSettled)) {
this._pubSub.publish({ type: 'invalidated', target: this });
}
return this;
}
Expand All @@ -175,12 +182,9 @@ export class Executor<T = any> {
const promise = this.promise;

if (isPromiseLike(value)) {
const prevExecutee = this._executee;
this.execute(() => value);
this._executee = prevExecutee;
return this;
}

if (
(promise !== null && (promise.abort(), !(this.promise = null))) ||
this.isInvalidated ||
Expand All @@ -191,7 +195,7 @@ export class Executor<T = any> {
this.isRejected = this.isInvalidated = false;
this.value = value;
this.reason = undefined;
this._pubSub.publish();
this._pubSub.publish({ type: 'fulfilled', target: this });
}
return this;
}
Expand All @@ -212,7 +216,7 @@ export class Executor<T = any> {
this.isRejected = true;
this.value = undefined;
this.reason = reason;
this._pubSub.publish();
this._pubSub.publish({ type: 'rejected', target: this });
}
return this;
}
Expand All @@ -223,7 +227,7 @@ export class Executor<T = any> {
* @param listener The listener that would be notified.
* @returns The callback to unsubscribe the listener.
*/
subscribe(listener: () => void): () => void {
subscribe(listener: (event: ExecutorEvent) => void): () => void {
return this._pubSub.subscribe(listener);
}
}

0 comments on commit 3b65805

Please sign in to comment.