diff --git a/src/main/PubSub.ts b/src/main/PubSub.ts index 0aa3510..808e3b3 100644 --- a/src/main/PubSub.ts +++ b/src/main/PubSub.ts @@ -1,3 +1,5 @@ +import { AbortablePromise } from './AbortablePromise'; + /** * Publish–subscribe pattern implementation that guarantees the delivery of published messages even if any of listeners * would throw an error. @@ -32,6 +34,41 @@ export class PubSub { } } + /** + * Waits for a message that satisfies the given predicate to be published and resolves with that message. + * + * @template R A subtype of T that the predicate function identifies as satisfying the condition. + * @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false. + * @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate. + */ + waitFor(predicate: (message: T) => message is R): AbortablePromise; + + /** + * Waits for a message that satisfies the given predicate to be published and resolves with that message. + * + * @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false. + * @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate. + */ + waitFor(predicate: (message: T) => boolean): AbortablePromise; + + /** + * Waits for a message that satisfies the given predicate to be published and resolves with that message. + * + * @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false. + * @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate. + */ + waitFor(predicate: (message: T) => boolean) { + return new AbortablePromise((resolve, _reject, signal) => { + const unsubscribe = this.subscribe(message => { + if (predicate(message)) { + resolve(message); + unsubscribe(); + } + }); + signal.addEventListener('abort', unsubscribe); + }); + } + /** * Adds a listener that would receive all messages published via {@link publish}. * diff --git a/src/test/PubSub.test.ts b/src/test/PubSub.test.ts index c140bd1..811877a 100644 --- a/src/test/PubSub.test.ts +++ b/src/test/PubSub.test.ts @@ -54,4 +54,29 @@ describe('PubSub', () => { expect(listenerMock1).toHaveBeenCalledTimes(0); expect(listenerMock2).toHaveBeenCalledTimes(1); }); + + test('waits for a specific message and resolves with that message', async () => { + const pubSub = new PubSub(); + const promise = pubSub.waitFor((message): message is 42 => message === 42); + + pubSub.publish(10); + pubSub.publish(20); + pubSub.publish(42); + pubSub.publish(30); + + await expect(promise).resolves.toBe(42); + }); + + test('aborts the waiter of the specific message', () => { + const predicateMock = jest.fn(); + const pubSub = new PubSub(); + const promise = pubSub.waitFor(message => message === 42); + + promise.then(predicateMock, () => {}); + promise.abort(); + + pubSub.publish(42); + + expect(predicateMock).not.toHaveBeenCalled(); + }); });