From 5935fa327870072ed7441878734f1fca848a224a Mon Sep 17 00:00:00 2001 From: Dmitrii Paskhin Date: Sat, 4 May 2024 01:02:00 +0400 Subject: [PATCH 1/2] Added waitFor method in PubSub --- src/main/PubSub.ts | 17 +++++++++++++++++ src/test/PubSub.test.ts | 40 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/main/PubSub.ts b/src/main/PubSub.ts index 0aa3510..2185cab 100644 --- a/src/main/PubSub.ts +++ b/src/main/PubSub.ts @@ -32,6 +32,23 @@ export class PubSub { } } + /** + * Waits for a message that satisfies the given predicate to be published and resolves with that message. + * + * @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false. + * @returns A promise that resolves with the published message that satisfies the predicate. + */ + waitFor(predicate: (message: T) => boolean): Promise { + return new Promise(resolve => { + const unsubscribe = this.subscribe(message => { + if (predicate(message)) { + resolve(message); + unsubscribe(); + } + }); + }); + } + /** * Adds a listener that would receive all messages published via {@link publish}. * diff --git a/src/test/PubSub.test.ts b/src/test/PubSub.test.ts index c140bd1..7b37fab 100644 --- a/src/test/PubSub.test.ts +++ b/src/test/PubSub.test.ts @@ -1,4 +1,4 @@ -import { PubSub } from '../main'; +import { PubSub, delay } from '../main'; describe('PubSub', () => { test('publishes a message', () => { @@ -54,4 +54,42 @@ describe('PubSub', () => { expect(listenerMock1).toHaveBeenCalledTimes(0); expect(listenerMock2).toHaveBeenCalledTimes(1); }); + + test('waits for a specific message and resolves with that message', () => { + const pubSub = new PubSub(); + + pubSub + .waitFor(message => message === 42) + .then(message => { + expect(message).toBe(42); + }); + + pubSub.publish(10); + pubSub.publish(20); + pubSub.publish(42); + pubSub.publish(30); + }); + + test('does not resolve if no message matches the predicate', () => { + const listenerMock = jest.fn(); + const pubSub = new PubSub(); + + let result: number; + + pubSub + .waitFor(message => message === 42) + .then(message => { + result = message; + listenerMock(); + }); + + pubSub.publish(10); + pubSub.publish(20); + pubSub.publish(30); + + delay(100).then(() => { + expect(result).toBeUndefined(); + expect(listenerMock).toHaveBeenCalledTimes(0); + }); + }); }); From fea3fe1722019a5ecc62910e99a2b1ea25ea470b Mon Sep 17 00:00:00 2001 From: Dmitrii Paskhin Date: Mon, 6 May 2024 14:09:18 +0400 Subject: [PATCH 2/2] Review fixes --- src/main/PubSub.ts | 26 +++++++++++++++++++++++--- src/test/PubSub.test.ts | 37 ++++++++++++------------------------- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/src/main/PubSub.ts b/src/main/PubSub.ts index 2185cab..808e3b3 100644 --- a/src/main/PubSub.ts +++ b/src/main/PubSub.ts @@ -1,3 +1,5 @@ +import { AbortablePromise } from './AbortablePromise'; + /** * Publish–subscribe pattern implementation that guarantees the delivery of published messages even if any of listeners * would throw an error. @@ -32,20 +34,38 @@ export class PubSub { } } + /** + * Waits for a message that satisfies the given predicate to be published and resolves with that message. + * + * @template R A subtype of T that the predicate function identifies as satisfying the condition. + * @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false. + * @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate. + */ + waitFor(predicate: (message: T) => message is R): AbortablePromise; + + /** + * Waits for a message that satisfies the given predicate to be published and resolves with that message. + * + * @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false. + * @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate. + */ + waitFor(predicate: (message: T) => boolean): AbortablePromise; + /** * Waits for a message that satisfies the given predicate to be published and resolves with that message. * * @param predicate A function that takes the message as a parameter and returns true if the message satisfies the condition, otherwise false. - * @returns A promise that resolves with the published message that satisfies the predicate. + * @returns An {@link AbortablePromise} that resolves with the published message that satisfies the predicate. */ - waitFor(predicate: (message: T) => boolean): Promise { - return new Promise(resolve => { + waitFor(predicate: (message: T) => boolean) { + return new AbortablePromise((resolve, _reject, signal) => { const unsubscribe = this.subscribe(message => { if (predicate(message)) { resolve(message); unsubscribe(); } }); + signal.addEventListener('abort', unsubscribe); }); } diff --git a/src/test/PubSub.test.ts b/src/test/PubSub.test.ts index 7b37fab..811877a 100644 --- a/src/test/PubSub.test.ts +++ b/src/test/PubSub.test.ts @@ -1,4 +1,4 @@ -import { PubSub, delay } from '../main'; +import { PubSub } from '../main'; describe('PubSub', () => { test('publishes a message', () => { @@ -55,41 +55,28 @@ describe('PubSub', () => { expect(listenerMock2).toHaveBeenCalledTimes(1); }); - test('waits for a specific message and resolves with that message', () => { + test('waits for a specific message and resolves with that message', async () => { const pubSub = new PubSub(); - - pubSub - .waitFor(message => message === 42) - .then(message => { - expect(message).toBe(42); - }); + const promise = pubSub.waitFor((message): message is 42 => message === 42); pubSub.publish(10); pubSub.publish(20); pubSub.publish(42); pubSub.publish(30); + + await expect(promise).resolves.toBe(42); }); - test('does not resolve if no message matches the predicate', () => { - const listenerMock = jest.fn(); + test('aborts the waiter of the specific message', () => { + const predicateMock = jest.fn(); const pubSub = new PubSub(); + const promise = pubSub.waitFor(message => message === 42); - let result: number; - - pubSub - .waitFor(message => message === 42) - .then(message => { - result = message; - listenerMock(); - }); + promise.then(predicateMock, () => {}); + promise.abort(); - pubSub.publish(10); - pubSub.publish(20); - pubSub.publish(30); + pubSub.publish(42); - delay(100).then(() => { - expect(result).toBeUndefined(); - expect(listenerMock).toHaveBeenCalledTimes(0); - }); + expect(predicateMock).not.toHaveBeenCalled(); }); });