Skip to content

Commit

Permalink
Added PubSub.waitFor (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
dPaskhin authored May 6, 2024
1 parent 0c32715 commit 6b49026
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/main/PubSub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { AbortablePromise } from './AbortablePromise';

/**
* Publish–subscribe pattern implementation that guarantees the delivery of published messages even if any of listeners
* would throw an error.
Expand Down Expand Up @@ -32,6 +34,41 @@ export class PubSub<T = void> {
}
}

/**
* Waits for a message that satisfies the given predicate to be published and resolves with that message.
*
* @template R A subtype of T that the predicate function identifies as satisfying the condition.
* @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false.
* @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate.
*/
waitFor<R extends T>(predicate: (message: T) => message is R): AbortablePromise<R>;

/**
* Waits for a message that satisfies the given predicate to be published and resolves with that message.
*
* @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false.
* @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate.
*/
waitFor(predicate: (message: T) => boolean): AbortablePromise<T>;

/**
* Waits for a message that satisfies the given predicate to be published and resolves with that message.
*
* @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false.
* @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate.
*/
waitFor(predicate: (message: T) => boolean) {
return new AbortablePromise((resolve, _reject, signal) => {
const unsubscribe = this.subscribe(message => {
if (predicate(message)) {
resolve(message);
unsubscribe();
}
});
signal.addEventListener('abort', unsubscribe);
});
}

/**
* Adds a listener that would receive all messages published via {@link publish}.
*
Expand Down
25 changes: 25 additions & 0 deletions src/test/PubSub.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,29 @@ describe('PubSub', () => {
expect(listenerMock1).toHaveBeenCalledTimes(0);
expect(listenerMock2).toHaveBeenCalledTimes(1);
});

test('waits for a specific message and resolves with that message', async () => {
const pubSub = new PubSub<number>();
const promise = pubSub.waitFor((message): message is 42 => message === 42);

pubSub.publish(10);
pubSub.publish(20);
pubSub.publish(42);
pubSub.publish(30);

await expect(promise).resolves.toBe(42);
});

test('aborts the waiter of the specific message', () => {
const predicateMock = jest.fn();
const pubSub = new PubSub<number>();
const promise = pubSub.waitFor(message => message === 42);

promise.then(predicateMock, () => {});
promise.abort();

pubSub.publish(42);

expect(predicateMock).not.toHaveBeenCalled();
});
});

0 comments on commit 6b49026

Please sign in to comment.