-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlogstash-dissect.conf
102 lines (91 loc) · 2.57 KB
/
logstash-dissect.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
input {
file {
exit_after_read => true
mode => "read"
start_position => "beginning"
sincedb_path => "/dev/null"
file_completed_action => "log"
file_completed_log_path => "/dev/null"
file_chunk_size => 131072
path => "${EXPORT_PATH}/*.jsonl"
codec => json {
ecs_compatibility => "disabled"
}
}
}
filter {
# Remove Logstash fields and add timeline identifiers
mutate {
remove_field => ["[event][original]", "[log][file][path]", "@timestamp", "@version", "[host][name]"]
remove_field => ["[event]", "[log]", "[host]"]
add_field => { "__ts_timeline_filter_id" => "${TIMELINE_FILTER_ID}" }
add_field => { "__ts_timeline_id" => "${TIMELINE_ID}" }
}
# Convert value
mutate {
convert => {"__ts_timeline_id" => "integer"}
}
# Lowercase only supports top level fields https://github.com/elastic/logstash/issues/2526
ruby {
code => "
event.to_hash.keys.each do |k|
event.set(k.downcase, event.remove(k))
end
"
}
################################################################################
# Dissect records + Timesketch fields
################################################################################
# Drop recorddescriptor records
if [_type] and [_data] {
drop {}
}
# Rename or remove fields used by Timesketch
mutate {
rename => { "[data_type]" => "dissect_data_type" }
remove_field => ["datetime", "timestamp_desc"]
}
# Rename tag field used by Timesketch
if [tag] {
mutate {
rename => { "[tag]" => "dissect_tag" }
}
}
# If ts enrich fields, otherwise set datetime to zero for non datetime records
if [ts] {
mutate {
rename => { "ts" => "datetime" }
rename => { "ts_description" => "timestamp_desc" }
}
}
else {
mutate {
remove_field => ["[ts]"]
remove_field => ["[ts_description]"]
add_field => { "datetime" => "1970-01-01T00:00:00" }
add_field => { "timestamp_desc" => "" }
}
}
# Use Dissect record name as data_type
mutate {
replace => {"data_type" => "%{[_recorddescriptor][0]}"}
gsub => ["data_type", "/", ":"]
}
# Remove Dissect metadata fields
mutate {
remove_field => ["_classification", "_generated", "_version", "_type", "_recorddescriptor", "_source"]
}
}
output {
elasticsearch {
action => "index"
hosts => "${ELASTIC_HOST}"
ssl_enabled => true
index => "${INDEX_NAME}"
manage_template => false
user => "logstash"
password => "${LOGSTASH_PASSWORD}"
ecs_compatibility => "disabled"
data_stream => "false"
}
}