-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathuse-async-queue.ts
123 lines (111 loc) · 2.93 KB
/
use-async-queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import { useState, useRef, useCallback, useEffect } from "react";
import nextTick from "next-tick";
interface QueueStats {
numPending: number;
numInFlight: number;
numDone: number;
}
export interface QueueTaskResult {
id: unknown;
task(): Promise<unknown>;
result?: Promise<unknown>;
stats?: QueueStats;
}
interface Queue {
add: (task: QueueTaskResult) => void;
stats: QueueStats;
}
interface QueueOpts {
concurrency?: number;
done?: (result: QueueTaskResult) => void;
drain?: () => void;
inflight?: (task: QueueTaskResult) => void;
}
function useAsyncQueue(opts: QueueOpts): Queue {
const { done, drain, inflight } = opts;
let { concurrency } = opts;
concurrency = concurrency || Infinity;
if (concurrency < 1) concurrency = Infinity;
const [stats, setStats] = useState({
numPending: 0,
numInFlight: 0,
numDone: 0,
});
const drained = useRef(true);
const inFlight = useRef([] as QueueTaskResult[]);
const pending = useRef([] as QueueTaskResult[]);
useEffect(() => {
if (
stats.numDone > 0 &&
drain &&
inFlight.current.length === 0 &&
pending.current.length === 0 &&
!drained.current
) {
drained.current = true;
return nextTick(drain);
}
while (
inFlight.current.length < concurrency! &&
pending.current.length > 0
) {
drained.current = false;
const task = pending.current.shift();
if (task) {
inFlight.current.push(task);
setStats((stats) => {
return {
...stats,
numPending: stats.numPending - 1,
numInFlight: stats.numInFlight + 1,
};
});
inflight && inflight({ ...task, stats });
const result = task.task();
result
.then(() => {
inFlight.current.pop();
setStats((stats) => {
return {
...stats,
numInFlight: stats.numInFlight - 1,
numDone: stats.numDone + 1,
};
});
done && done({ ...task, result, stats });
})
.catch(() => {
inFlight.current.pop();
setStats((stats) => {
return {
...stats,
numInFlight: stats.numInFlight - 1,
numDone: stats.numDone + 1,
};
});
done && done({ ...task, result, stats });
});
}
}
}, [concurrency, done, drain, inflight, stats]);
const add = useCallback((task: QueueTaskResult) => {
if (
!pending.current.find((t) => {
return t.id === task.id;
}) &&
!inFlight.current.find((t) => {
return t.id === task.id;
})
) {
pending.current.push(task);
setStats((stats) => {
return {
...stats,
numPending: stats.numPending + 1,
};
});
}
}, []);
return { add, stats };
}
export default useAsyncQueue;