-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexample_transformer_test.go
170 lines (157 loc) · 5.36 KB
/
example_transformer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package ipfix_test
import (
"bytes"
"context"
"fmt"
"log"
"os"
"github.com/zoomoid/go-ipfix"
)
// Transforms IPFIX messages containing more than one record and template set per message into
// a stream of messages that only contain one record in one typed set per message.
// Note that while this obviously includes a lot of overhead, it is helpful in scenarios where
// we want _individual records_ to end up in a hypothetical database, because queries and filters
// are much easier implemented, and the grouping of records in homogeneous sets is mostly only
// done for reducing message overhead from redundancy.
// In practice, due to the possibly complex timing of flow records, sets often only contain single
// records anyways and rarely do messages contain more than one set.
// Software exporters such as yaf behave differently in terms of message packing when writing
// to files or sending via TCP or UDP. To ease this difference, normalization appears reasonable.
func Example_transformerNormalizeRecords() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in, _ := os.Open("demo_flow_records.ipfix")
defer in.Close()
out, _ := os.CreateTemp("", "normalized_flow_records.ipfix")
r := ipfix.NewIPFIXFileReader(in)
go r.Start(ctx)
templateCache := ipfix.NewDefaultEphemeralCache()
fieldCache := ipfix.NewEphemeralFieldCache(templateCache)
decoder := ipfix.NewDecoder(templateCache, fieldCache, ipfix.DecoderOptions{OmitRFC5610Records: false})
go func() {
for {
select {
case raw := <-r.Messages():
msg, err := decoder.Decode(ctx, bytes.NewBuffer(raw))
if err != nil {
log.Fatalln(fmt.Errorf("failed to decode IPFIX message: %w", err))
}
normalizedMessages, err := NormalizeIPFIXMessage(msg)
if err != nil {
log.Fatalln(fmt.Errorf("failed to normalize IPFIX message: %w", err))
}
for _, newMsg := range normalizedMessages {
_, err := newMsg.Encode(out)
if err != nil {
log.Fatalln(fmt.Errorf("failed to encode IPFIX message: %w", err))
}
}
case err := <-r.Errors():
log.Println(fmt.Errorf("failed to read IPFIX message: %w", err))
case <-ctx.Done():
return
}
}
}()
<-ctx.Done()
}
const (
ipfixPacketHeaderLength int = 16
ipfixSetHeaderLength int = 4
)
var (
sequenceNumber uint32 = 0
)
func NormalizeIPFIXMessage(old *ipfix.Message) (new []*ipfix.Message, err error) {
new = make([]*ipfix.Message, 0)
for _, fs := range old.Sets {
switch fss := fs.Set.(type) {
case *ipfix.TemplateSet:
for _, rr := range fss.Records {
flow := &bytes.Buffer{}
n, err := rr.Encode(flow) // we use this to determine the NEW set length!
if err != nil {
return nil, err // skip entire packet
}
pp := &ipfix.Message{
Version: 10,
ExportTime: old.ExportTime,
SequenceNumber: uint32(sequenceNumber), // this needs to be rewritten!
ObservationDomainId: old.ObservationDomainId,
Length: uint16(n + ipfixPacketHeaderLength + ipfixSetHeaderLength),
Sets: []ipfix.Set{
{
SetHeader: ipfix.SetHeader{
Id: fs.Id,
Length: uint16(n + ipfixSetHeaderLength), // single record length + set header length
},
Set: &ipfix.TemplateSet{
Records: []ipfix.TemplateRecord{rr},
},
},
},
}
// sequenceNumber++ - RFC 7011: "Template and Options Template Records do not increase the Sequence Number."
new = append(new, pp)
}
case *ipfix.OptionsTemplateSet:
for _, rr := range fss.Records {
flow := &bytes.Buffer{}
n, err := rr.Encode(flow) // we use this to determine the NEW set length!
if err != nil {
return nil, err // skip entire packet
}
pp := &ipfix.Message{
Version: 10,
ExportTime: old.ExportTime,
SequenceNumber: uint32(sequenceNumber), // this needs to be rewritten!
ObservationDomainId: old.ObservationDomainId,
Length: uint16(n + ipfixPacketHeaderLength + ipfixSetHeaderLength),
Sets: []ipfix.Set{
{
SetHeader: ipfix.SetHeader{
Id: fs.Id,
Length: uint16(n + ipfixSetHeaderLength), // single record length + set header length
},
Set: &ipfix.OptionsTemplateSet{
Records: []ipfix.OptionsTemplateRecord{rr},
},
},
},
}
// sequenceNumber++ - RFC 7011: "Template and Options Template Records do not increase the Sequence Number."
new = append(new, pp)
// recordCounter++
}
case *ipfix.DataSet:
for _, rr := range fss.Records {
flow := &bytes.Buffer{}
n, err := rr.Encode(flow) // we use this to determine the *new* set length!
if err != nil {
return nil, err // skip entire packet
}
pp := &ipfix.Message{
Version: 10,
ExportTime: old.ExportTime,
SequenceNumber: uint32(sequenceNumber), // this needs to be rewritten!
ObservationDomainId: old.ObservationDomainId,
Length: uint16(n + ipfixPacketHeaderLength + ipfixSetHeaderLength),
Sets: []ipfix.Set{
{
SetHeader: ipfix.SetHeader{
Id: fs.Id,
Length: uint16(n + ipfixSetHeaderLength), // single record length + set header length
},
Set: &ipfix.DataSet{
Records: []ipfix.DataRecord{rr},
},
},
},
}
sequenceNumber++
new = append(new, pp)
}
}
}
return
}