-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.ts
85 lines (71 loc) · 2.17 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
type Subscriber<T = any> = (value: T) => void;
type Unsubscriber = () => void;
type Operator<T1 = any, T2 = any> = (
value: T1,
next: (nextValue: T2) => void,
) => void;
type Map<T1 = any, T2 = any> = (value: T1) => T2;
type Filter<T = any> = (value: T) => boolean;
class StreamImpl<T = unknown> {
private start: () => void;
private operator: Operator<T>;
private subscribers: Subscriber[];
private streams: StreamImpl<any>[];
constructor(start: () => void, operator?: Operator) {
this.start = start;
this.operator = operator ?? ((value, next) => next(value));
this.subscribers = [];
this.streams = [];
}
subscribe(subscriber: Subscriber<T>): Unsubscriber {
this.subscribers.push(subscriber);
this.start();
return () =>
this.subscribers = this.subscribers.filter((s) => s !== subscriber);
}
map<TReturn>(map: Map<T, TReturn>): Stream<TReturn> {
return this.createStream<TReturn>((value, next) => {
const nextValue = map(value);
next(nextValue);
});
}
filter<TReturn = T>(filter: Filter<T>): Stream<TReturn> {
return this.createStream<TReturn>((value, next) => {
if (filter(value)) {
next(value);
}
});
}
private createStream<T>(operator: Operator) {
const stream = new StreamImpl<T>(this.start, operator);
this.streams.push(stream);
return stream;
}
protected process(stream: StreamImpl<any>, value: unknown) {
stream.operator(value, (newValue) => {
for (const subscriber of stream.subscribers) {
subscriber(newValue);
}
for (const child of stream.streams) {
this.process(child, newValue);
}
});
}
}
export function createStream<T>(source: AsyncIterable<T>): Stream<T> {
return new (class<T> extends StreamImpl<T> {
private started: boolean;
constructor(source: AsyncIterable<T>) {
super(() => this.read(source));
this.started = false;
}
private async read(source: AsyncIterable<T>) {
if (this.started) return;
this.started = true;
for await (const value of source) {
this.process(this, value);
}
}
})(source);
}
export interface Stream<T> extends StreamImpl<T> {}