-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdist.go
392 lines (345 loc) · 8.64 KB
/
dist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
package dist
import (
"bytes"
"crypto/md5"
"encoding/binary"
"errors"
"flag"
"fmt"
"github.com/goerlang/etf"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"strconv"
"strings"
"time"
)
var dTrace bool
func init() {
flag.BoolVar(&dTrace, "erlang.dist.trace", false, "trace erlang distribution protocol")
}
func dLog(f string, a ...interface{}) {
if dTrace {
log.Printf("d# "+f, a...)
}
}
type flagId uint32
const (
PUBLISHED flagId = 0x1
ATOM_CACHE = 0x2
EXTENDED_REFERENCES = 0x4
DIST_MONITOR = 0x8
FUN_TAGS = 0x10
DIST_MONITOR_NAME = 0x20
HIDDEN_ATOM_CACHE = 0x40
NEW_FUN_TAGS = 0x80
EXTENDED_PIDS_PORTS = 0x100
EXPORT_PTR_TAG = 0x200
BIT_BINARIES = 0x400
NEW_FLOATS = 0x800
UNICODE_IO = 0x1000
DIST_HDR_ATOM_CACHE = 0x2000
SMALL_ATOM_TAGS = 0x4000
)
type nodeFlag flagId
func (nf nodeFlag) toUint32() (flag uint32) {
flag = uint32(nf)
return
}
func (nf nodeFlag) isSet(f flagId) (is bool) {
is = (uint32(nf) & uint32(f)) != 0
return
}
func toNodeFlag(f ...flagId) (nf nodeFlag) {
var flags uint32
for _, v := range f {
flags |= uint32(v)
}
nf = nodeFlag(flags)
return
}
type nodeState uint8
const (
HANDSHAKE nodeState = iota
CONNECTED
)
type NodeDesc struct {
Name string
Cookie string
Hidden bool
remote *NodeDesc
state nodeState
challenge uint32
flag nodeFlag
version uint16
term *etf.Context
}
func NewNodeDesc(name, cookie string, isHidden bool) (nd *NodeDesc) {
nd = &NodeDesc{
Name: name,
Cookie: cookie,
Hidden: isHidden,
remote: nil,
state: HANDSHAKE,
flag: toNodeFlag(PUBLISHED, UNICODE_IO, EXTENDED_PIDS_PORTS, EXTENDED_REFERENCES, DIST_HDR_ATOM_CACHE, HIDDEN_ATOM_CACHE, SMALL_ATOM_TAGS),
version: 5,
term: new(etf.Context),
}
return nd
}
func (currNd *NodeDesc) ReadMessage(c net.Conn) (ts []etf.Term, err error) {
sendData := func(headerLen int, data []byte) (int, error) {
reply := make([]byte, len(data)+headerLen)
if headerLen == 2 {
binary.BigEndian.PutUint16(reply[0:headerLen], uint16(len(data)))
} else {
binary.BigEndian.PutUint32(reply[0:headerLen], uint32(len(data)))
}
copy(reply[headerLen:], data)
dLog("Write to enode: %v", reply)
return c.Write(reply)
}
switch currNd.state {
case HANDSHAKE:
var length uint16
if err = binary.Read(c, binary.BigEndian, &length); err != nil {
return
}
msg := make([]byte, length)
if _, err = io.ReadFull(c, msg); err != nil {
return
}
dLog("Read from enode %d: %v", length, msg)
switch msg[0] {
case 'n':
sn := currNd.read_SEND_NAME(msg)
// Statuses: ok, nok, ok_simultaneous, alive, not_allowed
sok := currNd.compose_SEND_STATUS(sn, true)
_, err = sendData(2, sok)
if err != nil {
return
}
rand.Seed(time.Now().UTC().UnixNano())
currNd.challenge = rand.Uint32()
// Now send challenge
challenge := currNd.compose_SEND_CHALLENGE(sn)
sendData(2, challenge)
if err != nil {
return
}
case 'r':
sn := currNd.remote
ok := currNd.read_SEND_CHALLENGE_REPLY(sn, msg)
if ok {
challengeAck := currNd.compose_SEND_CHALLENGE_ACK(sn)
sendData(2, challengeAck)
if err != nil {
return
}
dLog("Remote: %#v", sn)
ts = []etf.Term{etf.Term(etf.Tuple{etf.Atom("$go_set_node"), etf.Atom(sn.Name)})}
} else {
err = errors.New("bad handshake")
return
}
}
case CONNECTED:
var length uint32
if err = binary.Read(c, binary.BigEndian, &length); err != nil {
return
}
if length == 0 {
dLog("Keepalive")
sendData(4, []byte{})
return
}
r := &io.LimitedReader{c, int64(length)}
if currNd.flag.isSet(DIST_HDR_ATOM_CACHE) {
var ctl, message etf.Term
if err = currNd.readDist(r); err != nil {
break
}
if ctl, err = currNd.readCtl(r); err != nil {
break
}
dLog("READ CTL: %#v", ctl)
if message, err = currNd.readMessage(r); err != nil {
break
}
dLog("READ MESSAGE: %#v", message)
ts = append(ts, ctl, message)
} else {
msg := make([]byte, 1)
if _, err = io.ReadFull(r, msg); err != nil {
return
}
dLog("Read from enode %d: %#v", length, msg)
switch msg[0] {
case 'p':
ts = make([]etf.Term, 0)
for {
var res etf.Term
if res, err = currNd.readTerm(r); err != nil {
break
}
ts = append(ts, res)
dLog("READ TERM: %#v", res)
}
if err == io.EOF {
err = nil
}
default:
_, err = ioutil.ReadAll(r)
}
}
}
return
}
func (currNd *NodeDesc) WriteMessage(c net.Conn, ts []etf.Term) (err error) {
sendData := func(data []byte) (int, error) {
reply := make([]byte, len(data)+4)
binary.BigEndian.PutUint32(reply[0:4], uint32(len(data)))
copy(reply[4:], data)
dLog("Write to enode: %v", reply)
return c.Write(reply)
}
buf := new(bytes.Buffer)
if currNd.flag.isSet(DIST_HDR_ATOM_CACHE) {
buf.Write([]byte{etf.EtVersion})
currNd.term.WriteDist(buf, ts)
for _, v := range ts {
currNd.term.Write(buf, v)
}
} else {
buf.Write([]byte{'p'})
for _, v := range ts {
buf.Write([]byte{etf.EtVersion})
currNd.term.Write(buf, v)
}
}
dLog("WRITE: %#v: %#v", ts, buf.Bytes())
sendData(buf.Bytes())
return
}
func (nd *NodeDesc) compose_SEND_NAME() (msg []byte) {
msg = make([]byte, 7+len(nd.Name))
msg[0] = byte('n')
binary.BigEndian.PutUint16(msg[1:3], nd.version)
binary.BigEndian.PutUint32(msg[3:7], nd.flag.toUint32())
copy(msg[7:], nd.Name)
return
}
func (currNd *NodeDesc) read_SEND_NAME(msg []byte) (nd *NodeDesc) {
version := binary.BigEndian.Uint16(msg[1:3])
flag := nodeFlag(binary.BigEndian.Uint32(msg[3:7]))
name := string(msg[7:])
nd = &NodeDesc{
Name: name,
version: version,
flag: flag,
}
currNd.remote = nd
return
}
func (currNd *NodeDesc) compose_SEND_STATUS(nd *NodeDesc, isOk bool) (msg []byte) {
msg = make([]byte, 3)
msg[0] = byte('s')
copy(msg[1:], "ok")
return
}
func (currNd *NodeDesc) compose_SEND_CHALLENGE(nd *NodeDesc) (msg []byte) {
msg = make([]byte, 11+len(currNd.Name))
msg[0] = byte('n')
binary.BigEndian.PutUint16(msg[1:3], currNd.version)
binary.BigEndian.PutUint32(msg[3:7], currNd.flag.toUint32())
binary.BigEndian.PutUint32(msg[7:11], currNd.challenge)
copy(msg[11:], currNd.Name)
return
}
func (currNd *NodeDesc) read_SEND_CHALLENGE_REPLY(nd *NodeDesc, msg []byte) (isOk bool) {
nd.challenge = binary.BigEndian.Uint32(msg[1:5])
digestB := msg[5:]
digestA := genDigest(currNd.challenge, currNd.Cookie)
if bytes.Compare(digestA, digestB) == 0 {
isOk = true
currNd.state = CONNECTED
} else {
dLog("BAD HANDSHAKE: digestA: %+v, digestB: %+v", digestA, digestB)
isOk = false
}
return
}
func (currNd *NodeDesc) compose_SEND_CHALLENGE_ACK(nd *NodeDesc) (msg []byte) {
msg = make([]byte, 17)
msg[0] = byte('a')
digestB := genDigest(nd.challenge, currNd.Cookie) // FIXME: use his cookie, not mine
copy(msg[1:], digestB)
return
}
func genDigest(challenge uint32, cookie string) (sum []byte) {
h := md5.New()
s := strings.Join([]string{cookie, strconv.FormatUint(uint64(challenge), 10)}, "")
io.WriteString(h, s)
sum = h.Sum(nil)
return
}
func (nd NodeDesc) Flags() (flags []string) {
fs := map[flagId]string{
PUBLISHED: "PUBLISHED",
ATOM_CACHE: "ATOM_CACHE",
EXTENDED_REFERENCES: "EXTENDED_REFERENCES",
DIST_MONITOR: "DIST_MONITOR",
FUN_TAGS: "FUN_TAGS",
DIST_MONITOR_NAME: "DIST_MONITOR_NAME",
HIDDEN_ATOM_CACHE: "HIDDEN_ATOM_CACHE",
NEW_FUN_TAGS: "NEW_FUN_TAGS",
EXTENDED_PIDS_PORTS: "EXTENDED_PIDS_PORTS",
EXPORT_PTR_TAG: "EXPORT_PTR_TAG",
BIT_BINARIES: "BIT_BINARIES",
NEW_FLOATS: "NEW_FLOATS",
UNICODE_IO: "UNICODE_IO",
DIST_HDR_ATOM_CACHE: "DIST_HDR_ATOM_CACHE",
SMALL_ATOM_TAGS: "SMALL_ATOM_TAGS",
}
for k, v := range fs {
if nd.flag.isSet(k) {
flags = append(flags, v)
}
}
return
}
func (currNd *NodeDesc) readTerm(r io.Reader) (t etf.Term, err error) {
b := make([]byte, 1)
_, err = io.ReadFull(r, b)
if err != nil {
return
}
if b[0] != etf.EtVersion {
err = fmt.Errorf("Not ETF: %d", b[0])
return
}
t, err = currNd.term.Read(r)
return
}
func (currNd *NodeDesc) readDist(r io.Reader) (err error) {
b := make([]byte, 1)
_, err = io.ReadFull(r, b)
if err != nil {
return
}
if b[0] != etf.EtVersion {
err = fmt.Errorf("Not dist header: %d", b[0])
return
}
return currNd.term.ReadDist(r)
}
func (currNd *NodeDesc) readCtl(r io.Reader) (t etf.Term, err error) {
t, err = currNd.term.Read(r)
return
}
func (currNd *NodeDesc) readMessage(r io.Reader) (t etf.Term, err error) {
t, err = currNd.term.Read(r)
return
}