Skip to content

Commit

Permalink
Updated README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
smikhalevski committed Dec 11, 2023
1 parent 6a5a4ab commit 0f2a40a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 119 deletions.
119 changes: 36 additions & 83 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,19 @@ npm install --save-prod parallel-universe

🚀 [API documentation is available here.](https://smikhalevski.github.io/parallel-universe/)

- [`PubSub`](#pubsub)
- [`AsyncQueue`](#asyncqueue)
- [Acknowledgements](#acknowledgements)
- [Blocking vs non-blocking acknowledgements](#blocking-vs-non-blocking-acknowledgements)
- [`WorkPool`](#workpool)
- [`Executor`](#executor)
- [`Lock`](#lock)
- [`Blocker`](#blocker)
- [`PubSub`](#pubsub)
- [`untilTruthy`](#untiltruthy)
- [`repeatUntil`](#repeatuntil)
- [`sleep`](#sleep)
- [`raceTimeout`](#racetimeout)

# `PubSub`

Publish–subscribe pattern implementation that guarantees that published messages are delivered even if some listeners
throw an error.

```ts
const pubSub = new PubSub<string>();

pubSub.subscribe(message => {
message === 'Pluto' // ⮕ true
});

pubSub.publish('Pluto');
```

If listener throws an error, it is passed to an error handler callback:

```ts
const pubSub = new PubSub<string>(error => {
console.log(error);
});

pubSub.subscribe(() => {
throw new Error('Kaput');
});

pubSub.publish('Mars');
// Prints 'Error: Kaput' to the console
```

By default, error handler is set to `PubSub.defaultErrorHandler` which logs errors to the console.

# `AsyncQueue`

Asynchronous queue decouples value providers and value consumers.
Expand All @@ -69,7 +37,7 @@ queue.add('Mars');

// Consumer takes a value
queue.take();
// ⮕ Promise<'Mars'>
// ⮕ Promise { 'Mars' }
```

`add` appends the value to the queue, while `take` removes the value from the queue as soon as it is available. If there
Expand All @@ -80,7 +48,7 @@ const queue = new AsyncQueue();

// The returned promise would be resolved after the add call
queue.take();
// ⮕ Promise<'Mars'>
// ⮕ Promise { 'Mars' }

queue.add('Mars');
```
Expand All @@ -94,10 +62,10 @@ queue.add('Mars');
queue.add('Venus');

queue.take();
// ⮕ Promise<'Mars'>
// ⮕ Promise { 'Mars' }

queue.take();
// ⮕ Promise<'Venus'>
// ⮕ Promise { 'Venus' }
```

## Acknowledgements
Expand Down Expand Up @@ -142,7 +110,7 @@ queue.takeAck(([value, ack]) => {
});

queue.take();
// ⮕ Promise<'Pluto'>
// ⮕ Promise { 'Pluto' }
```

## Blocking vs non-blocking acknowledgements
Expand Down Expand Up @@ -172,7 +140,8 @@ queue.takeBlockingAck()
});
```

Blocking acknowledgement is required if the consumer has to perform asynchronous actions _before_ processing the value.
Blocking acknowledgement should be used if the consumer has to perform asynchronous actions _before_ processing the
value.

To guarantee that consumers receive values in the same order as they were provided, blocking acknowledgements prevent
subsequent consumers from being resolved until `ack` is called. Be sure to call `ack` to prevent the queue from being
Expand Down Expand Up @@ -241,21 +210,8 @@ executor.execute(doSomething);
// ⮕ Promise<void>
```

The `execute` method returns a promise that is fulfilled when the promise returned from the callback is settled.

If there's a pending execution, it is aborted and the new execution is started.

To check that executor is currently executing a callback check
[`pending`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#pending).

After a promise returned from the executed callback is settled, the execution result and rejection reason are available
via [`result`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#result) and
[`reason`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#reason).

You can check whether the promise was
[`fulfilled`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#fulfilled),
[`rejected`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#rejected) or
[`settled`](https://smikhalevski.github.io/parallel-universe/classes/Executor.html#settled).
The `execute` method returns a promise that is fulfilled when the promise returned from the callback is settled. If
there's a pending execution, it is aborted and the new execution is started.

To abort the pending execution, you can use
an [abort signal](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)
Expand Down Expand Up @@ -309,10 +265,10 @@ lock.acquire();
// ⮕ Promise<() => void>
```

You can check that the lock is [`locked`](https://smikhalevski.github.io/parallel-universe/classes/Lock.html#locked)
You can check that the lock is [locked](https://smikhalevski.github.io/parallel-universe/classes/Lock.html#isLocked)
before acquiring a lock.

For example, if you want to force an async callback executions to be sequential you can use ane external lock:
For example, if you want to force an async callback executions to be sequential you can use an external lock:

```ts
const lock = new Lock();
Expand Down Expand Up @@ -349,46 +305,36 @@ You can later unblock it passing a value that would fulfill the promise returned
blocker.unblock('Mars');
```

# `untilTruthy`

Returns a promise that is fulfilled when a callback returns a truthy value, or a promise that is fulfilled with a
truthy value.

```ts
untilTruthy(async () => doSomething());
// ⮕ Promise<ReturnType<typeof doSomething>>
```
# `PubSub`

If you don't want `untilTruthy` to invoke the callback too frequently, provide a delay in milliseconds:
Publish–subscribe pattern implementation:

```ts
untilTruthy(doSomething, 1_000);
```
const pubSub = new PubSub<string>();

Instead of a fixed delay you can pass a function that returns a delay:
pubSub.subscribe(message => {
// Process the message
});

```ts
untilTruthy(
doSomething,
result => result.rejected ? 1_000 : 0
);
pubSub.publish('Pluto');
```

# `repeatUntil`

Much like a [`untilTruthy`](#untiltruthy) and provides more control when the callback polling is fulfilled.
Invokes a callback until the condition is met.

```ts
repeatUntil(
// The callback that is invoked repeatedly
async () => doSomething(),

// The until clause must return a truthy value to stop the loop
result => result.fulfilled,
// The condition clause must return a truthy value to stop
// the loop
result => result.isFulfilled,

// Optional delay between callback invokations
// An optional callback that returns a delay in milliseconds
// between iterations
result => 100,
// or just pass a literal number of milliseconds
);
// ⮕ Promise<ReturnType<typeof doSomething>>
```
Expand All @@ -400,7 +346,7 @@ raceTimeout(
signal =>
repeatUntil(
() => doSomething(),
result => signal.aborted || result.fulfilled,
result => signal.aborted || result.isFulfilled,
100
),
5000
Expand All @@ -411,7 +357,7 @@ raceTimeout(
# `sleep`

Returns a promise that resolves after a timeout. If signal is aborted then the returned promise is rejected with an
`AbortError`.
error.

```ts
sleep(100, abortController.signal);
Expand All @@ -420,9 +366,16 @@ sleep(100, abortController.signal);

# `raceTimeout`

Rejects with an `Error` if execution time exceeds the timeout.
Rejects with an error if the execution time exceeds the timeout.

```ts
raceTimeout(async () => doSomething(), 100);
raceTimeout(async signal => doSomething(), 100);
// ⮕ Promise<ReturnType<typeof doSomething>>

raceTimeout(
new Promise(resolve => {
// Resolve the promise value
}),
100
);
```
4 changes: 2 additions & 2 deletions src/main/WorkPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class WorkPool {
*
* @param cb The callback to invoke.
* @template T The callback result.
* @returns The promise that is fulfilled with the `cb` result.
* @returns The promise that is fulfilled with the callback result.
*/
submit<T>(cb: AbortableCallback<T>): Promise<T> {
return new Promise((resolve, reject) => {
Expand All @@ -54,7 +54,7 @@ export class WorkPool {

/**
* Changes the size of the pool by spawning or terminating workers. When worker is terminated while processing an
* async task, its `signal` is aborted.
* async task, its signal is aborted.
*
* @param size The non-negative integer number of workers in the pool.
* @returns The promise that is fulfilled when the pool reaches the requested size: excessive workers were terminated
Expand Down
26 changes: 13 additions & 13 deletions src/main/repeatUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import { AsyncResult, Awaitable } from './types';
* met.
*
* @param cb The callback that is periodically invoked.
* @param until The callback that should return `true` to terminate the loop, or `false` to proceed to the next
* iteration. `until` is called before the next iteration is scheduled.
* @param ms The number of milliseconds between the settlement of the last promise returned by the `cb` and the
* @param condition The callback that should return `true` to terminate the loop, or `false` to proceed to the next
* iteration. The condition is checked before the next iteration is scheduled.
* @param ms The number of milliseconds between the settlement of the last promise returned by the callback and the
* next invocation. Or a callback that receives the latest result and returns the delay. If omitted then delay is 0.
* @template I The value returned by the `cb`.
* @template I The value returned by the callback.
* @template O The value that fulfills the returned promise.
* @returns The promise that is fulfilled with the `cb` result.
* @returns The promise that is fulfilled with the callback result.
*/
export function repeatUntil<I, O extends I>(
cb: () => Awaitable<I>,
until: (result: AsyncResult<I>) => result is AsyncResult<O>,
condition: (result: AsyncResult<I>) => result is AsyncResult<O>,
ms?: ((result: AsyncResult<O>) => number) | number
): Promise<O>;

Expand All @@ -24,28 +24,28 @@ export function repeatUntil<I, O extends I>(
* met.
*
* @param cb The callback that is periodically invoked.
* @param until The callback that should return truthy value to terminate the loop, or falsy to proceed to the next
* iteration. `until` is called before the next iteration is scheduled.
* @param ms The number of milliseconds between the settlement of the last promise returned by the `cb` and the next
* @param condition The callback that should return truthy value to terminate the loop, or falsy to proceed to the next
* iteration. The condition is checked before the next iteration is scheduled.
* @param ms The number of milliseconds between the settlement of the last promise returned by the callback and the next
* invocation. Or a callback that receives the latest result and returns the delay. If omitted then delay is 0.
* @template T The async result value.
* @returns The promise that is fulfilled with the `cb` result.
* @returns The promise that is fulfilled with the callback result.
*/
export function repeatUntil<T>(
cb: () => Awaitable<T>,
until: (result: AsyncResult<T>) => unknown,
condition: (result: AsyncResult<T>) => unknown,
ms?: ((result: AsyncResult<T>) => number) | number
): Promise<T>;

export function repeatUntil(
cb: () => Awaitable<unknown>,
until: (result: AsyncResult<unknown>) => unknown,
condition: (result: AsyncResult<unknown>) => unknown,
ms?: ((result: AsyncResult<unknown>) => number) | number
): Promise<unknown> {
return new Promise((resolve, reject) => {
const next = (result: AsyncResult) => {
try {
if (!until(result)) {
if (!condition(result)) {
setTimeout(execute, typeof ms === 'function' ? ms(result) : ms);
return;
}
Expand Down
Loading

0 comments on commit 0f2a40a

Please sign in to comment.