-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathwebsocket.js
167 lines (154 loc) · 5.59 KB
/
websocket.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
var logger = new (require("./logger"))(__filename);
var url = require("url");
var WebSocket = require('./operatorServer');
var wss;
var eventHandlers = {
pong: () => null,
noop: () => null,
ping: function (operator_id, data, cb) {
cb("pong", data, 'self');
},
//debug
echo: function (operator_id, data, cb) {
cb(data.event, data.data, 'self');
},
};
exports.connectedOperators = (...args) => {
if (wss && wss.connectedOperators) return wss.connectedOperators(...args);
else logger.warning("Operator Server not initialized"); //temp
};
exports.unicastEvent = (...args) => {
if (wss && wss.unicastEvent) return wss.unicastEvent(...args);
else logger.warning("Operator Server not initialized"); //temp
};
exports.broadcastEvent = (...args) => {
if (wss && wss.broadcastEvent) return wss.broadcastEvent(...args);
else logger.warning("Operator Server not initialized"); //temp
};
/*additional options: {
allowOrigin: (string), // allow connections only from this origin
}*/
exports.init = (options, callback) => {
if (wss) { //temp
logger.warning("Operator Server already initialized");
callback("Operator Server already initialized");
return;
}
/*
// executed during handshake, before upgrade to websocket
// on error, code and message not visible to browser js (only generic 1006)
// on success, 'operator_id' is added to the request object (info.req)
function verifyClient (info, cb) { // cb(verified, code, message)
//logger.debug('verifyClient');
let logTitle = "Verify dispatcher client";
let remoteIp = info.req.headers['x-forwarded-for'] || info.req.headers["x-real-ip"] || info.req.connection.remoteAddress;
if (this.allowOrigin && this.allowOrigin !== info.origin) {
let err = "unknown origin";
cb(false, 406, err);
logger.warning(logTitle, err, remoteIp, info.req.url, info.req.headers["user-agent"]);
} else {
let location = url.parse(info.req.url, true);
if (!location.query.operator_id) {
cb(true);
} else {
let operator_id = parseInt(location.query.operator_id);
// disallow < 1 (rezerved for system use)
if (isNaN(operator_id) || operator_id < 1) {
let err = "invalid operator_id";
cb(false, 400, err);
logger.error(logTitle, err, remoteIp, location.pathname, location.query, info.req.headers["user-agent"]);
} else if (!eventHandlers.access.checkOperator(operator_id)) {
let err = "unauthorized";
cb(false, 401, err);
logger.error(logTitle, err, remoteIp, location.pathname, location.query, info.req.headers["user-agent"]);
} else {
info.req.operator_id = operator_id;
cb(true);
}
}
}
}
*/
// load eventHandlers after ws server is instantiated
function cb() {
callback();
setTimeout(() => Object.assign(eventHandlers, require("./eventHandlers")), 0);
}
wss = new WebSocket.OperatorServer(Object.assign({
pongTimeout: 1500,
//verifyClient,
}, options), cb);
wss.on('client:init', function login(ws, operator_id, options) {
let operator = operator_id ? "Operator " + operator_id : "Anonymous";
if (operator_id && this.options.validOperators) {
operator += ": " + this.options.validOperators.get(operator_id);
}
logger.notice(operator, "connected, total:", this.clients.size);
let eventCallback = (name, data, target) => {
switch (target) {
case 'self':
if (ws.readyState === WebSocket.OPEN) {
//let silentEvents = ["pong", "calls:list", "contact:info", "initialize", "inboxMessages"];
let silentEvents = ["pong", "contact:info"];
if (!silentEvents.includes(name)) {
let mutedEvents = ["calls:list", "inboxMessages"];
logger.debug("ws-out ==>>>>", operator_id, name, !mutedEvents.includes(name) ? data : '~');
}
ws.sendEvent(name, data);
}
break;
case 'others':
default: // if target not specified, broadcast to all
// if operator_id is not set, use WebSocket as client reference
this.broadcastEvent(name, data, operator_id ? operator_id : ws, target);
}
};
eventHandlers.initialize(operator_id, options, eventCallback);
logger.json(options.clientInfo, ["telegraf", "debug"], ["websocket"], { websocket: {
count: wss.clients.length,
value: 1
}}, "connected");
ws.on('close', function(code, reason) {
logger.notice(operator, "disconnected, total:", wss.clients.size, code, reason);
logger.json(options.clientInfo, ["telegraf", "debug"], ["websocket"], { websocket: {
count: wss.clients.length,
value: 0
}}, "disconnected");
if (operator_id) {
//eventHandlers['client:exit'](operator_id, null, eventCallback);
}
});
ws.on('event', function (name, data) {
/*var eventCallback = function (event, data, target) {
if (target === 'error') {
target = 'self';
data = Object.assign(message, {msg: data});
}
eventDispatcher(event, data, target);
};*/
try { // temp
if (typeof eventHandlers[name] === 'function') {
eventHandlers[name](operator_id, data, eventCallback, name);
} else {
let module, action;
[module, action] = name.split(':', 2);
if (action && eventHandlers[module] && typeof eventHandlers[module][action] === 'function') {
eventHandlers[module][action](operator_id, data, eventCallback, action);
} else {
let err = "unknown event";
logger.debug("websocket-in", operator_id, err, name, data);
ws.sendEvent("client:error", {
event: name,
data: data,
msg: err,
});
}
}
} catch (e) {
logger.error("websocket-in", "unhandled eventHandler exception", name, data, e);
logger.email("unhandled eventHandler exception", name, data, e);
}
});
});
return module.exports;
};