-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
57 lines (53 loc) · 1.16 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class Subscriber {
constructor(channel) {
this._queue = [];
this.channel = channel;
this.channel.subscribers.push(this);
}
destroy() {
this.channel.subscribers = this.channel.subscribers.filter((subscriber) => subscriber !== this);
if (this._promise) {
this._promise.resolve({value: null, done: true});
delete this._promise;
}
}
next() {
return this._queue.length ?
Promise.resolve(this._queue.shift()) :
new Promise((resolve, reject) => {
this._promise = {resolve, reject};
});
}
[Symbol.asyncIterator]() {
return this;
}
}
class Channel {
constructor() {
this.subscribers = [];
}
write(value, done = false) {
const data = {value, done};
for (let subscriber of this.subscribers) {
if (subscriber._promise) {
subscriber._promise.resolve(data);
delete subscriber._promise;
} else {
subscriber._queue.push(data);
}
if (done) {
subscriber.destroy();
}
}
}
stop() {
this.write(null, true);
}
subscribe() {
return new Subscriber(this);
}
}
// Export
Object.defineProperty(module.exports, "__esModule", {value: true});
module.exports = {Subscriber, Channel};
module.exports.default = Channel;