diff --git a/readme.md b/readme.md index bf148e1..154a099 100644 --- a/readme.md +++ b/readme.md @@ -137,6 +137,12 @@ Default: `0` Priority of operation. Operations with greater priority will be scheduled first. +##### id + +Type `string` + +Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`. + ##### signal [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an [error](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/reason). If the operation is already running, the signal will need to be handled by the operation itself. @@ -236,6 +242,44 @@ console.log(queue.sizeBy({priority: 0})); //=> 1 ``` +#### .setPriority(id, priority) + +Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect. + +For example, this can be used to prioritize a promise function to run earlier. + +```js +import PQueue from 'p-queue'; + +const queue = new PQueue({concurrency: 1}); + +queue.add(async () => '🦄', {priority: 1}); +queue.add(async () => '🦀', {priority: 0, id: '🦀'}); +queue.add(async () => '🦄', {priority: 1}); +queue.add(async () => '🦄', {priority: 1}); + +queue.setPriority('🦀', 2); +``` + +In this case, the promise function with `id: '🦀'` runs second. + +You can also deprioritize a promise function to delay its execution: + +```js +import PQueue from 'p-queue'; + +const queue = new PQueue({concurrency: 1}); + +queue.add(async () => '🦄', {priority: 1}); +queue.add(async () => '🦀', {priority: 1, id: '🦀'}); +queue.add(async () => '🦄'); +queue.add(async () => '🦄', {priority: 0}); + +queue.setPriority('🦀', -1); +``` + +Here, the promise function with `id: '🦀'` executes last. + #### .pending Number of running items (no longer in the queue). diff --git a/source/index.ts b/source/index.ts index faf2c9e..9a16cdb 100644 --- a/source/index.ts +++ b/source/index.ts @@ -43,6 +43,9 @@ export default class PQueue '🦄', {priority: 1}); + queue.add(async () => '🦀', {priority: 0, id: '🦀'}); + queue.add(async () => '🦄', {priority: 1}); + queue.add(async () => '🦄', {priority: 1}); + + queue.setPriority('🦀', 2); + ``` + + In this case, the promise function with `id: '🦀'` runs second. + + You can also deprioritize a promise function to delay its execution: + + ```js + import PQueue from 'p-queue'; + + const queue = new PQueue({concurrency: 1}); + + queue.add(async () => '🦄', {priority: 1}); + queue.add(async () => '🦀', {priority: 1, id: '🦀'}); + queue.add(async () => '🦄'); + queue.add(async () => '🦄', {priority: 0}); + + queue.setPriority('🦀', -1); + ``` + Here, the promise function with `id: '🦀'` executes last. + */ + setPriority(id: string, priority: number) { + this.#queue.setPriority(id, priority); + } + /** Adds a sync or async task to the queue. Always returns a promise. */ async add(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; async add(function_: Task, options?: Partial): Promise; async add(function_: Task, options: Partial = {}): Promise { + // In case `id` is not defined. + options.id ??= (this.#idAssigner++).toString(); + options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, diff --git a/source/options.ts b/source/options.ts index 8e8bf7b..c1ae5a9 100644 --- a/source/options.ts +++ b/source/options.ts @@ -69,6 +69,11 @@ export type QueueAddOptions = { @default 0 */ readonly priority?: number; + + /** + Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`. + */ + id?: string; } & TaskOptions & TimeoutOptions; export type TaskOptions = { diff --git a/source/priority-queue.ts b/source/priority-queue.ts index e161610..944514a 100644 --- a/source/priority-queue.ts +++ b/source/priority-queue.ts @@ -17,10 +17,11 @@ export default class PriorityQueue implements Queue= options.priority!) { + if (this.size === 0 || this.#queue[this.size - 1]!.priority! >= options.priority!) { this.#queue.push(element); return; } @@ -32,6 +33,16 @@ export default class PriorityQueue implements Queue) => element.id === id); + if (index === -1) { + throw new ReferenceError(`No promise function with the id "${id}" exists in the queue.`); + } + + const [item] = this.#queue.splice(index, 1); + this.enqueue(item!.run, {priority, id}); + } + dequeue(): RunFunction | undefined { const item = this.#queue.shift(); return item?.run; diff --git a/source/queue.ts b/source/queue.ts index be3316c..459d29b 100644 --- a/source/queue.ts +++ b/source/queue.ts @@ -5,4 +5,5 @@ export type Queue = { filter: (options: Readonly>) => Element[]; dequeue: () => Element | undefined; enqueue: (run: Element, options?: Partial) => void; + setPriority: (id: string, priority: number) => void; }; diff --git a/test/test.ts b/test/test.ts index d511eae..748ee29 100644 --- a/test/test.ts +++ b/test/test.ts @@ -6,7 +6,7 @@ import inRange from 'in-range'; import timeSpan from 'time-span'; import randomInt from 'random-int'; import pDefer from 'p-defer'; -import PQueue, {AbortError} from '../source/index.js'; +import PQueue from '../source/index.js'; const fixture = Symbol('fixture'); @@ -1134,3 +1134,164 @@ test('aborting multiple jobs at the same time', async t => { await t.throwsAsync(task2, {instanceOf: DOMException}); t.like(queue, {size: 0, pending: 0}); }); + +test('.setPriority() - execute a promise before planned', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {id: '🐌'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {id: '🐢'}); + queue.setPriority('🐢', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🐢', '🦆']); +}); + +test('.setPriority() - execute a promise after planned', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {id: '🐌'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {id: '🐢'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.setPriority('🐢', -1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🦆', '🦆', '🦆', '🦆', '🐢']); +}); + +test('.setPriority() - execute a promise before planned - concurrency 2', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 2}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {id: '🐌'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {id: '🐢'}); + queue.add(async () => { + await delay(400); + result.push('⚡️'); + }, {id: '⚡️'}); + queue.setPriority('⚡️', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🦆', '⚡️', '🐢']); +}); + +test('.setPriority() - execute a promise before planned - concurrency 3', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 3}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {id: '🐌'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {id: '🐢'}); + queue.add(async () => { + await delay(400); + result.push('⚡️'); + }, {id: '⚡️'}); + queue.add(async () => { + await delay(400); + result.push('🦀'); + }, {id: '🦀'}); + queue.setPriority('🦀', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']); +}); + +test('.setPriority() - execute a multiple promise before planned, with variable priority', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 2}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {id: '🐌'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {id: '🐢'}); + queue.add(async () => { + await delay(400); + result.push('⚡️'); + }, {id: '⚡️'}); + queue.add(async () => { + await delay(400); + result.push('🦀'); + }, {id: '🦀'}); + queue.setPriority('⚡️', 1); + queue.setPriority('🦀', 2); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🦆', '🦀', '⚡️', '🐢']); +}); + +test('.setPriority() - execute a promise before planned - concurrency 3 and unspecified `id`', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 3}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }); + queue.add(async () => { + await delay(400); + result.push('⚡️'); + }); + queue.add(async () => { + await delay(400); + result.push('🦀'); + }); + queue.setPriority('5', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']); +});