From 0f2a40aafed27908cce8e8acc9892b6026e9fe4e Mon Sep 17 00:00:00 2001 From: smikhalevski Date: Mon, 11 Dec 2023 19:03:35 +0300 Subject: [PATCH] Updated README.md --- README.md | 119 +++++++++++------------------------ src/main/WorkPool.ts | 4 +- src/main/repeatUntil.ts | 26 ++++---- src/test/repeatUntil.test.ts | 42 ++++++------- 4 files changed, 72 insertions(+), 119 deletions(-) diff --git a/README.md b/README.md index 7c98e9e..14c6b63 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,6 @@ npm install --save-prod parallel-universe 🚀 [API documentation is available here.](https://smikhalevski.github.io/parallel-universe/) -- [`PubSub`](#pubsub) - [`AsyncQueue`](#asyncqueue) - [Acknowledgements](#acknowledgements) - [Blocking vs non-blocking acknowledgements](#blocking-vs-non-blocking-acknowledgements) @@ -20,43 +19,12 @@ npm install --save-prod parallel-universe - [`Executor`](#executor) - [`Lock`](#lock) - [`Blocker`](#blocker) +- [`PubSub`](#pubsub) - [`untilTruthy`](#untiltruthy) - [`repeatUntil`](#repeatuntil) - [`sleep`](#sleep) - [`raceTimeout`](#racetimeout) -# `PubSub` - -Publish–subscribe pattern implementation that guarantees that published messages are delivered even if some listeners -throw an error. - -```ts -const pubSub = new PubSub(); - -pubSub.subscribe(message => { - message === 'Pluto' // ⮕ true -}); - -pubSub.publish('Pluto'); -``` - -If listener throws an error, it is passed to an error handler callback: - -```ts -const pubSub = new PubSub(error => { - console.log(error); -}); - -pubSub.subscribe(() => { - throw new Error('Kaput'); -}); - -pubSub.publish('Mars'); -// Prints 'Error: Kaput' to the console -``` - -By default, error handler is set to `PubSub.defaultErrorHandler` which logs errors to the console. - # `AsyncQueue` Asynchronous queue decouples value providers and value consumers. @@ -69,7 +37,7 @@ queue.add('Mars'); // Consumer takes a value queue.take(); -// ⮕ Promise<'Mars'> +// ⮕ Promise { 'Mars' } ``` `add` appends the value to the queue, while `take` removes the value from the queue as soon as it is available. If there @@ -80,7 +48,7 @@ const queue = new AsyncQueue(); // The returned promise would be resolved after the add call queue.take(); -// ⮕ Promise<'Mars'> +// ⮕ Promise { 'Mars' } queue.add('Mars'); ``` @@ -94,10 +62,10 @@ queue.add('Mars'); queue.add('Venus'); queue.take(); -// ⮕ Promise<'Mars'> +// ⮕ Promise { 'Mars' } queue.take(); -// ⮕ Promise<'Venus'> +// ⮕ Promise { 'Venus' } ``` ## Acknowledgements @@ -142,7 +110,7 @@ queue.takeAck(([value, ack]) => { }); queue.take(); -// ⮕ Promise<'Pluto'> +// ⮕ Promise { 'Pluto' } ``` ## Blocking vs non-blocking acknowledgements @@ -172,7 +140,8 @@ queue.takeBlockingAck() }); ``` -Blocking acknowledgement is required if the consumer has to perform asynchronous actions _before_ processing the value. +Blocking acknowledgement should be used if the consumer has to perform asynchronous actions _before_ processing the +value. To guarantee that consumers receive values in the same order as they were provided, blocking acknowledgements prevent subsequent consumers from being resolved until `ack` is called. Be sure to call `ack` to prevent the queue from being @@ -241,21 +210,8 @@ executor.execute(doSomething); // ⮕ Promise ``` -The `execute` method returns a promise that is fulfilled when the promise returned from the callback is settled. - -If there's a pending execution, it is aborted and the new execution is started. - -To check that executor is currently executing a callback check -[`pending`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#pending). - -After a promise returned from the executed callback is settled, the execution result and rejection reason are available -via [`result`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#result) and -[`reason`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#reason). - -You can check whether the promise was -[`fulfilled`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#fulfilled), -[`rejected`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#rejected) or -[`settled`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#settled). +The `execute` method returns a promise that is fulfilled when the promise returned from the callback is settled. If +there's a pending execution, it is aborted and the new execution is started. To abort the pending execution, you can use an [abort signal](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) @@ -309,10 +265,10 @@ lock.acquire(); // ⮕ Promise<() => void> ``` -You can check that the lock is [`locked`](https://smikhalevski.github.io/parallel-universe/classes/Lock.html#locked) +You can check that the lock is [locked](https://smikhalevski.github.io/parallel-universe/classes/Lock.html#isLocked) before acquiring a lock. -For example, if you want to force an async callback executions to be sequential you can use ane external lock: +For example, if you want to force an async callback executions to be sequential you can use an external lock: ```ts const lock = new Lock(); @@ -349,46 +305,36 @@ You can later unblock it passing a value that would fulfill the promise returned blocker.unblock('Mars'); ``` -# `untilTruthy` - -Returns a promise that is fulfilled when a callback returns a truthy value, or a promise that is fulfilled with a -truthy value. - -```ts -untilTruthy(async () => doSomething()); -// ⮕ Promise> -``` +# `PubSub` -If you don't want `untilTruthy` to invoke the callback too frequently, provide a delay in milliseconds: +Publish–subscribe pattern implementation: ```ts -untilTruthy(doSomething, 1_000); -``` +const pubSub = new PubSub(); -Instead of a fixed delay you can pass a function that returns a delay: +pubSub.subscribe(message => { + // Process the message +}); -```ts -untilTruthy( - doSomething, - result => result.rejected ? 1_000 : 0 -); +pubSub.publish('Pluto'); ``` # `repeatUntil` -Much like a [`untilTruthy`](#untiltruthy) and provides more control when the callback polling is fulfilled. +Invokes a callback until the condition is met. ```ts repeatUntil( // The callback that is invoked repeatedly async () => doSomething(), - // The until clause must return a truthy value to stop the loop - result => result.fulfilled, + // The condition clause must return a truthy value to stop + // the loop + result => result.isFulfilled, - // Optional delay between callback invokations + // An optional callback that returns a delay in milliseconds + // between iterations result => 100, - // or just pass a literal number of milliseconds ); // ⮕ Promise> ``` @@ -400,7 +346,7 @@ raceTimeout( signal => repeatUntil( () => doSomething(), - result => signal.aborted || result.fulfilled, + result => signal.aborted || result.isFulfilled, 100 ), 5000 @@ -411,7 +357,7 @@ raceTimeout( # `sleep` Returns a promise that resolves after a timeout. If signal is aborted then the returned promise is rejected with an -`AbortError`. +error. ```ts sleep(100, abortController.signal); @@ -420,9 +366,16 @@ sleep(100, abortController.signal); # `raceTimeout` -Rejects with an `Error` if execution time exceeds the timeout. +Rejects with an error if the execution time exceeds the timeout. ```ts -raceTimeout(async () => doSomething(), 100); +raceTimeout(async signal => doSomething(), 100); // ⮕ Promise> + +raceTimeout( + new Promise(resolve => { + // Resolve the promise value + }), + 100 +); ``` diff --git a/src/main/WorkPool.ts b/src/main/WorkPool.ts index 4d62abc..e11e097 100644 --- a/src/main/WorkPool.ts +++ b/src/main/WorkPool.ts @@ -44,7 +44,7 @@ export class WorkPool { * * @param cb The callback to invoke. * @template T The callback result. - * @returns The promise that is fulfilled with the `cb` result. + * @returns The promise that is fulfilled with the callback result. */ submit(cb: AbortableCallback): Promise { return new Promise((resolve, reject) => { @@ -54,7 +54,7 @@ export class WorkPool { /** * Changes the size of the pool by spawning or terminating workers. When worker is terminated while processing an - * async task, its `signal` is aborted. + * async task, its signal is aborted. * * @param size The non-negative integer number of workers in the pool. * @returns The promise that is fulfilled when the pool reaches the requested size: excessive workers were terminated diff --git a/src/main/repeatUntil.ts b/src/main/repeatUntil.ts index adb6758..5dda38c 100644 --- a/src/main/repeatUntil.ts +++ b/src/main/repeatUntil.ts @@ -5,17 +5,17 @@ import { AsyncResult, Awaitable } from './types'; * met. * * @param cb The callback that is periodically invoked. - * @param until The callback that should return `true` to terminate the loop, or `false` to proceed to the next - * iteration. `until` is called before the next iteration is scheduled. - * @param ms The number of milliseconds between the settlement of the last promise returned by the `cb` and the + * @param condition The callback that should return `true` to terminate the loop, or `false` to proceed to the next + * iteration. The condition is checked before the next iteration is scheduled. + * @param ms The number of milliseconds between the settlement of the last promise returned by the callback and the * next invocation. Or a callback that receives the latest result and returns the delay. If omitted then delay is 0. - * @template I The value returned by the `cb`. + * @template I The value returned by the callback. * @template O The value that fulfills the returned promise. - * @returns The promise that is fulfilled with the `cb` result. + * @returns The promise that is fulfilled with the callback result. */ export function repeatUntil( cb: () => Awaitable, - until: (result: AsyncResult) => result is AsyncResult, + condition: (result: AsyncResult) => result is AsyncResult, ms?: ((result: AsyncResult) => number) | number ): Promise; @@ -24,28 +24,28 @@ export function repeatUntil( * met. * * @param cb The callback that is periodically invoked. - * @param until The callback that should return truthy value to terminate the loop, or falsy to proceed to the next - * iteration. `until` is called before the next iteration is scheduled. - * @param ms The number of milliseconds between the settlement of the last promise returned by the `cb` and the next + * @param condition The callback that should return truthy value to terminate the loop, or falsy to proceed to the next + * iteration. The condition is checked before the next iteration is scheduled. + * @param ms The number of milliseconds between the settlement of the last promise returned by the callback and the next * invocation. Or a callback that receives the latest result and returns the delay. If omitted then delay is 0. * @template T The async result value. - * @returns The promise that is fulfilled with the `cb` result. + * @returns The promise that is fulfilled with the callback result. */ export function repeatUntil( cb: () => Awaitable, - until: (result: AsyncResult) => unknown, + condition: (result: AsyncResult) => unknown, ms?: ((result: AsyncResult) => number) | number ): Promise; export function repeatUntil( cb: () => Awaitable, - until: (result: AsyncResult) => unknown, + condition: (result: AsyncResult) => unknown, ms?: ((result: AsyncResult) => number) | number ): Promise { return new Promise((resolve, reject) => { const next = (result: AsyncResult) => { try { - if (!until(result)) { + if (!condition(result)) { setTimeout(execute, typeof ms === 'function' ? ms(result) : ms); return; } diff --git a/src/test/repeatUntil.test.ts b/src/test/repeatUntil.test.ts index be3f4fa..ca77a37 100644 --- a/src/test/repeatUntil.test.ts +++ b/src/test/repeatUntil.test.ts @@ -72,24 +72,24 @@ describe('repeatUntil', () => { test('resolves when until callback returns true', async () => { const cbMock = jest.fn(); - const untilMock = jest.fn(); + const conditionMock = jest.fn(); - untilMock.mockReturnValueOnce(false); - untilMock.mockReturnValueOnce(false); - untilMock.mockReturnValueOnce(true); + conditionMock.mockReturnValueOnce(false); + conditionMock.mockReturnValueOnce(false); + conditionMock.mockReturnValueOnce(true); - await repeatUntil(cbMock, untilMock); + await repeatUntil(cbMock, conditionMock); expect(cbMock).toHaveBeenCalledTimes(3); }); test('passes result to until callback on resolve', async () => { - const untilMock = jest.fn(() => true); + const conditionMock = jest.fn(() => true); - await repeatUntil(() => 111, untilMock); + await repeatUntil(() => 111, conditionMock); - expect(untilMock).toHaveBeenCalledTimes(1); - expect(untilMock).toHaveBeenCalledWith({ + expect(conditionMock).toHaveBeenCalledTimes(1); + expect(conditionMock).toHaveBeenCalledWith({ isSettled: true, isFulfilled: true, isRejected: false, @@ -99,16 +99,16 @@ describe('repeatUntil', () => { }); test('passes reason to until callback on reject', async () => { - const untilMock = jest.fn(() => true); + const conditionMock = jest.fn(() => true); await expect( repeatUntil(() => { throw 111; - }, untilMock) + }, conditionMock) ).rejects.toBe(111); - expect(untilMock).toHaveBeenCalledTimes(1); - expect(untilMock).toHaveBeenCalledWith({ + expect(conditionMock).toHaveBeenCalledTimes(1); + expect(conditionMock).toHaveBeenCalledWith({ isSettled: true, isFulfilled: false, isRejected: true, @@ -119,12 +119,12 @@ describe('repeatUntil', () => { test('passes result to ms callback on resolve', async () => { const msMock = jest.fn(); - const untilMock = jest.fn(); + const conditionMock = jest.fn(); - untilMock.mockReturnValueOnce(false); - untilMock.mockReturnValueOnce(true); + conditionMock.mockReturnValueOnce(false); + conditionMock.mockReturnValueOnce(true); - await repeatUntil(() => 111, untilMock, msMock); + await repeatUntil(() => 111, conditionMock, msMock); expect(msMock).toHaveBeenCalledTimes(1); expect(msMock).toHaveBeenCalledWith({ @@ -138,17 +138,17 @@ describe('repeatUntil', () => { test('passes reason to ms callback on reject', async () => { const msMock = jest.fn(); - const untilMock = jest.fn(); + const conditionMock = jest.fn(); - untilMock.mockReturnValueOnce(false); - untilMock.mockReturnValueOnce(true); + conditionMock.mockReturnValueOnce(false); + conditionMock.mockReturnValueOnce(true); await expect( repeatUntil( () => { throw 111; }, - untilMock, + conditionMock, msMock ) ).rejects.toBe(111);