-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathadjRIBIn.go
140 lines (108 loc) · 3.16 KB
/
adjRIBIn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//
// Copyright (c) 2022 Cloudflare, Inc.
//
// Licensed under Apache 2.0 license found in the LICENSE file
// or at http://www.apache.org/licenses/LICENSE-2.0
//
package main
import (
"sync/atomic"
"github.com/cloudflare/bbmp2kafka/protos/bbmp"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
"github.com/bio-routing/bio-rd/net"
"github.com/bio-routing/bio-rd/route"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/bio-routing/bio-rd/routingtable/filter"
log "github.com/sirupsen/logrus"
)
type adjRIBInFactory struct {
producer sarama.SyncProducer
kafkaTopic string
tokenBucket *tokenBucket
}
type adjRIBin struct {
sessionAttrs routingtable.SessionAttrs
producer sarama.SyncProducer
kafkaTopic string
tokenBucket *tokenBucket
}
func (a *adjRIBInFactory) New(exportFilterChain filter.Chain, contributingASNs *routingtable.ContributingASNs, sessionAttrs routingtable.SessionAttrs) routingtable.AdjRIBIn {
return &adjRIBin{
sessionAttrs: sessionAttrs,
producer: a.producer,
kafkaTopic: a.kafkaTopic,
tokenBucket: a.tokenBucket,
}
}
func (a *adjRIBin) createBBMPUnicastMonitoringMessage(pfx *net.Prefix, path *route.Path, announcement bool) []byte {
bbmpMsg := bbmp.BBMPUnicastMonitoringMessage{
RouterIp: a.sessionAttrs.RouterIP.ToProto(),
LocalBpgIp: a.sessionAttrs.LocalIP.ToProto(),
NeighborBgpIp: a.sessionAttrs.PeerIP.ToProto(),
LocalAs: a.sessionAttrs.LocalASN,
RemoteAs: a.sessionAttrs.PeerASN,
Announcement: announcement,
BgpPath: path.BGPPath.ToProto(),
Pfx: pfx.ToProto(),
Timestamp: path.LTime,
}
msg := bbmp.BBMPMessage{
MessageType: bbmp.BBMPMessage_RouteMonitoringMessage,
BbmpUnicastMonitoringMessage: &bbmpMsg,
}
msgBytes, err := proto.Marshal(&msg)
if err != nil {
messagesMarshalFailed.Inc()
if a.tokenBucket.getToken() {
log.Errorf("failed to marshal BBMPMessage: %v", err)
}
return nil
}
return msgBytes
}
func (a *adjRIBin) sendMessage(msg []byte) {
_, _, err := a.producer.SendMessage(&sarama.ProducerMessage{
Topic: a.kafkaTopic,
Value: sarama.ByteEncoder(msg),
})
if err != nil {
atomic.StoreInt32(&healthy, 0)
messagesSendFailed.Inc()
if a.tokenBucket.getToken() {
log.Errorf("could not send message: %v", err)
}
return
}
atomic.StoreInt32(&healthy, 1)
}
func (a *adjRIBin) AddPath(pfx *net.Prefix, path *route.Path) error {
messagesProcessed.Inc()
msg := a.createBBMPUnicastMonitoringMessage(pfx, path, true)
_ = msg
a.sendMessage(msg)
return nil
}
func (a *adjRIBin) RemovePath(pfx *net.Prefix, path *route.Path) bool {
messagesProcessed.Inc()
msg := a.createBBMPUnicastMonitoringMessage(pfx, path, false)
_ = msg
a.sendMessage(msg)
return true
}
/*
* Only here to fulfill the Interface
*/
func (a *adjRIBin) ReplaceFilterChain(filter.Chain) {}
func (a *adjRIBin) Dump() []*route.Route {
return nil
}
func (a *adjRIBin) Register(client routingtable.RouteTableClient) {}
func (a *adjRIBin) Unregister(client routingtable.RouteTableClient) {}
func (a *adjRIBin) Flush() {}
func (a *adjRIBin) RouteCount() int64 {
return 0
}
func (a *adjRIBin) ClientCount() uint64 {
return 0
}