-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapp.js
80 lines (69 loc) · 1.82 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
const { KafkaClient } = require('kafka-node'),
Case = require('case'),
_ = require('lodash'),
$ = require('steeltoe');
class App {
constructor(name, options) {
this.pipelines = new Map();
this._setKafka(name, $(options)('kafka')());
}
_setKafka(name, kafkaOptions) {
let kafkaConf = Object.assign({}, this._kafkaDefalutConf(), kafkaOptions);
this.kafka = {
client: this._kafkaClient(kafkaConf),
options: this._kafkaCGOption(name, kafkaConf)
}
}
_kafkaDefalutConf() {
return {
kafkaHost: 'localhost:9092',
sessionTimeout: 25000,
protocol: ['roundrobin'],
asyncPush: false,
fromOffset: 'earliest'
}
}
_kafkaClient(conf) {
return new KafkaClient({ kafkaHost: conf.kafkaHost });
}
_kafkaCGOption(name, conf) {
const groupID = Case.snake('willa_' + name);
return Object.assign({}, conf, { groupId: groupID });
}
add(pipelines) {
if ($(pipelines)('_map')() !== undefined
&& _.isFunction(pipelines._map)) {
pipelines._map().forEach((pipe) =>
this._setPipeline(pipe.name, pipe.pipeline));
pipelines._build(this);
}
}
writeStream(name, data) {
const _name = Case.snake(name);
const pipeline = this.pipelines.get(_name);
pipeline._write(data);
}
_setPipeline(name, pipeline) {
if (name !== undefined &&
!this.pipelines.has(name)) {
this.pipelines.set(name, pipeline);
} else {
throw Error(`Pipeline name conflict at ${name}`);
}
}
_restart(name) {
if (name !== undefined &&
this.pipelines.has(name)) {
const pipeline = this.pipelines.get(name);
pipeline._shutdown();
pipeline._build(this);
}
}
}
let app = null;
module.exports = (name, options) => {
if (app == null) {
app = new App(name, options);
}
return app
}