Skip to content

Commit

Permalink
Support journald -ojson output (#30)
Browse files Browse the repository at this point in the history
This change introduces processors for specific output and the processor
for journald.
The journald processor will parse __REALTIME_TIMESTAMP and PRIORITY make
the log entries compatible with normal jl output.
  • Loading branch information
koenbollen authored Jul 8, 2023
1 parent ffd8717 commit 83211ab
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 4 deletions.
23 changes: 23 additions & 0 deletions examples/journald.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# journald

Journald is a system service for collecting and storing log data, introduced with systemd. It tries to make it easier for system administrators to find interesting and relevant information among an ever-increasing amount of log messages. It also provides structured logging output, which `jl` can parse.

This is an short, slightly redacted, example of journald output:

$ journald -xe -ojson
{"_HOSTNAME":"example.org","_SYSTEMD_CGROUP":"/system.slice/sshd.service","_EXE":"/usr/sbin/sshd","__MONOTONIC_TIMESTAMP":"4231192657117","_CMDLINE":"sshd: unknown [priv]","_SYSTEMD_UNIT":"sshd.service","_MACHINE_ID":"be3292bb238d21a8de53f89d25ec97c4","_TRANSPORT":"stdout","PRIORITY":"5","__REALTIME_TIMESTAMP":"1686919896987169","_GID":"0","_CAP_EFFECTIVE":"1ffffffffff","__CURSOR":"s=11054c7dc82b4645a45da01c6bf62842","MESSAGE":"Invalid user hacker from 127.106.119.170 port 54520","SYSLOG_IDENTIFIER":"sshd","_UID":"0","_COMM":"sshd","SYSLOG_FACILITY":"3","_SYSTEMD_SLICE":"system.slice","_STREAM_ID":"08acce59fe1b44648b1d054f9a35156f","_PID":"1977203","_SYSTEMD_INVOCATION_ID":"9b199c04cfbe43afb339f73299c02a20","_BOOT_ID":"4cef257cf46b4818a75a0f463024e90d"}
{"_UID":"0","__REALTIME_TIMESTAMP":"1686919897133605","_EXE":"/usr/sbin/sshd","_SYSTEMD_SLICE":"system.slice","_HOSTNAME":"example.org","_PID":"1977203","_STREAM_ID":"08acce59fe1b44648b1d054f9a35156f","_BOOT_ID":"4cef257cf46b4818a75a0f463024e90d","_CMDLINE":"sshd: unknown [priv]","_COMM":"sshd","PRIORITY":"6","_MACHINE_ID":"be3292bb238d21a8de53f89d25ec97c4","MESSAGE":"Disconnected from invalid user hacker 127.106.119.170 port 54520 [preauth]","_CAP_EFFECTIVE":"1ffffffffff","_SYSTEMD_CGROUP":"/system.slice/sshd.service","_SYSTEMD_UNIT":"sshd.service","__MONOTONIC_TIMESTAMP":"4231192803553","_TRANSPORT":"stdout","__CURSOR":"s=11054c7dc82b4645a45da01c6bf62842","_SYSTEMD_INVOCATION_ID":"9b199c04cfbe43afb339f73299c02a20","SYSLOG_IDENTIFIER":"sshd","_GID":"0","SYSLOG_FACILITY":"3"}

Passing this through `jl` will make it more readable:

$ journald -xe -ojson | jl
[2023-06-16 12:51:36] NOTICE: Invalid user hacker from 127.106.119.170 port 54520 [SYSLOG_FACILITY=3 SYSLOG_IDENTIFIER=sshd _CAP_EFFECTIVE=1ffffffffff _CMDLINE=sshd: unknown [priv] _COMM=sshd _EXE=/usr/sbin/sshd _GID=0 _HOSTNAME=example.org _PID=1977203 _SYSTEMD_SLICE=system.slice _SYSTEMD_UNIT=sshd.service _TRANSPORT=stdout _UID=0]
[2023-06-16 12:51:37] INFO: Disconnected from invalid user hacker 127.106.119.170 port 54520 [preauth] [SYSLOG_FACILITY=3 SYSLOG_IDENTIFIER=sshd _CAP_EFFECTIVE=1ffffffffff _CMDLINE=sshd: unknown [priv] _COMM=sshd _EXE=/usr/sbin/sshd _GID=0 _HOSTNAME=example.org _PID=1977203 _SYSTEMD_SLICE=system.slice _SYSTEMD_UNIT=sshd.service _TRANSPORT=stdout _UID=0]

### Skipping fields:

`jl` will not output fields it used to parse the message, like __REALTIME_TIMESTAMP or PRIOIRTY. You can force `jl` to output these fields by explicitly including them:

$ journald -xe -ojson | jl --include-field PRIORITY
[2023-06-16 12:51:36] NOTICE: Invalid user hacker from 127.106.119.170 port 54520 [PRIORITY=5 SYSLOG_FACILITY=3 SYSLOG_IDENTIFIER=sshd _CAP_EFFECTIVE=1ffffffffff _CMDLINE=sshd: unknown [priv] _COMM=sshd _EXE=/usr/sbin/sshd _GID=0 _HOSTNAME=example.org _PID=1977203 _SYSTEMD_SLICE=system.slice _SYSTEMD_UNIT=sshd.service _TRANSPORT=stdout _UID=0]
[2023-06-16 12:51:37] INFO: Disconnected from invalid user hacker 127.106.119.170 port 54520 [preauth] [PRIORITY=6 SYSLOG_FACILITY=3 SYSLOG_IDENTIFIER=sshd _CAP_EFFECTIVE=1ffffffffff _CMDLINE=sshd: unknown [priv] _COMM=sshd _EXE=/usr/sbin/sshd _GID=0 _HOSTNAME=example.org _PID=1977203 _SYSTEMD_SLICE=system.slice _SYSTEMD_UNIT=sshd.service _TRANSPORT=stdout _UID=0]
4 changes: 4 additions & 0 deletions examples/mocks/journald
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh

echo '{"_HOSTNAME":"example.org","_SYSTEMD_CGROUP":"/system.slice/sshd.service","_EXE":"/usr/sbin/sshd","__MONOTONIC_TIMESTAMP":"4231192657117","_CMDLINE":"sshd: unknown [priv]","_SYSTEMD_UNIT":"sshd.service","_MACHINE_ID":"be3292bb238d21a8de53f89d25ec97c4","_TRANSPORT":"stdout","PRIORITY":"5","__REALTIME_TIMESTAMP":"1686919896987169","_GID":"0","_CAP_EFFECTIVE":"1ffffffffff","__CURSOR":"s=11054c7dc82b4645a45da01c6bf62842","MESSAGE":"Invalid user hacker from 127.106.119.170 port 54520","SYSLOG_IDENTIFIER":"sshd","_UID":"0","_COMM":"sshd","SYSLOG_FACILITY":"3","_SYSTEMD_SLICE":"system.slice","_STREAM_ID":"08acce59fe1b44648b1d054f9a35156f","_PID":"1977203","_SYSTEMD_INVOCATION_ID":"9b199c04cfbe43afb339f73299c02a20","_BOOT_ID":"4cef257cf46b4818a75a0f463024e90d"}'
echo '{"_UID":"0","__REALTIME_TIMESTAMP":"1686919897133605","_EXE":"/usr/sbin/sshd","_SYSTEMD_SLICE":"system.slice","_HOSTNAME":"example.org","_PID":"1977203","_STREAM_ID":"08acce59fe1b44648b1d054f9a35156f","_BOOT_ID":"4cef257cf46b4818a75a0f463024e90d","_CMDLINE":"sshd: unknown [priv]","_COMM":"sshd","PRIORITY":"6","_MACHINE_ID":"be3292bb238d21a8de53f89d25ec97c4","MESSAGE":"Disconnected from invalid user hacker 127.106.119.170 port 54520 [preauth]","_CAP_EFFECTIVE":"1ffffffffff","_SYSTEMD_CGROUP":"/system.slice/sshd.service","_SYSTEMD_UNIT":"sshd.service","__MONOTONIC_TIMESTAMP":"4231192803553","_TRANSPORT":"stdout","__CURSOR":"s=11054c7dc82b4645a45da01c6bf62842","_SYSTEMD_INVOCATION_ID":"9b199c04cfbe43afb339f73299c02a20","SYSLOG_IDENTIFIER":"sshd","_GID":"0","SYSLOG_FACILITY":"3"}'
14 changes: 13 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/koenbollen/jl/djson"
"github.com/koenbollen/jl/processors"
"github.com/koenbollen/jl/stream"
"github.com/koenbollen/jl/structure"

Expand Down Expand Up @@ -41,7 +42,9 @@ func main() {
s := stream.New(r)
for line := range s.Lines() {
var err error
entry := &structure.Entry{}
entry := &structure.Entry{
SkipFields: make(map[string]bool),
}
if line.JSON != nil && len(line.JSON) > 0 {
var unused interface{}
err = json.Unmarshal(line.JSON, &unused)
Expand All @@ -61,6 +64,15 @@ func main() {
entry.Timestamp = &t
}

for _, processor := range processors.All {
if processor.Detect(line, entry) {
if err := processor.Process(line, entry); err != nil {
fmt.Fprintf(os.Stderr, "failed to process message: %v\n", err)
os.Exit(1)
}
}
}

// Passing entry to formatter to output:
prefix, suffix := split(line.Raw, line.JSON)
err = formatter.Format(entry, line.JSON, prefix, suffix)
Expand Down
68 changes: 68 additions & 0 deletions processors/journald.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package processors

import (
"strconv"
"strings"
"time"

"github.com/koenbollen/jl/stream"
"github.com/koenbollen/jl/structure"
"github.com/tidwall/gjson"
)

var priorityMapping = map[int64]string{
0: "EMERGENCY",
1: "ALERT",
2: "CRITICAL",
3: "ERROR",
4: "WARNING",
5: "NOTICE",
6: "INFO",
7: "DEBUG",
}

type JournaldProcessor struct {
}

func (p *JournaldProcessor) Detect(line *stream.Line, entry *structure.Entry) bool {
return gjson.GetBytes(line.JSON, "SYSLOG_IDENTIFIER").Exists() && gjson.GetBytes(line.JSON, "__REALTIME_TIMESTAMP").Exists() && gjson.GetBytes(line.JSON, "PRIORITY").Exists()
}

func (p *JournaldProcessor) Process(line *stream.Line, entry *structure.Entry) error {
entry.Message = gjson.GetBytes(line.JSON, "MESSAGE").String()
entry.SkipFields["MESSAGE"] = true

rawTimestamp := gjson.GetBytes(line.JSON, "__REALTIME_TIMESTAMP").String()
micro, err := strconv.ParseInt(rawTimestamp, 10, 64)
if err != nil {
entry.RawTimestamp = rawTimestamp
} else {
t := time.Unix(micro/1_000_000, micro%1_000_000*1_000).UTC()
entry.Timestamp = &t
}
entry.SkipFields["__REALTIME_TIMESTAMP"] = true

prioField := gjson.GetBytes(line.JSON, "PRIORITY")
switch prioField.Type {
case gjson.Number:
priority := prioField.Int()
entry.Severity = priorityMapping[priority]
entry.SkipFields["PRIORITY"] = true
case gjson.String:
priority := prioField.String()
if i, err := strconv.ParseInt(priority, 10, 64); err == nil {
entry.Severity = priorityMapping[i]
entry.SkipFields["PRIORITY"] = true
break
} else if len(priority) < 12 {
entry.Severity = strings.ToUpper(priority)
entry.SkipFields["PRIORITY"] = true
break
}
fallthrough
default:
entry.Severity = "UNKNOWN"
}

return nil
}
185 changes: 185 additions & 0 deletions processors/journald_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package processors

import (
"bytes"
"encoding/json"
"testing"
"time"

"github.com/koenbollen/jl/djson"
"github.com/koenbollen/jl/stream"
"github.com/koenbollen/jl/structure"
)

func TestJournald_Happypath(t *testing.T) {
t.Parallel()

in := `{
"_HOSTNAME": "example.org",
"_SYSTEMD_CGROUP": "/system.slice/sshd.service",
"_EXE": "/usr/sbin/sshd",
"__MONOTONIC_TIMESTAMP": "4231192657117",
"_CMDLINE": "sshd: unknown [priv]",
"_SYSTEMD_UNIT": "sshd.service",
"_MACHINE_ID": "be3292bb238d21a8de53f89d25ec97c4",
"_TRANSPORT": "stdout",
"PRIORITY": "6",
"__REALTIME_TIMESTAMP": "1686919896987169",
"_GID": "0",
"_CAP_EFFECTIVE": "1ffffffffff",
"__CURSOR": "s=11054c7dc82b4645a45da01c6bf62842",
"MESSAGE": "Invalid user hacker from 127.106.119.170 port 54520",
"SYSLOG_IDENTIFIER": "sshd",
"_UID": "0",
"_COMM": "sshd",
"SYSLOG_FACILITY": "3",
"_SYSTEMD_SLICE": "system.slice",
"_STREAM_ID": "08acce59fe1b44648b1d054f9a35156f",
"_PID": "1977203",
"_SYSTEMD_INVOCATION_ID": "9b199c04cfbe43afb339f73299c02a20",
"_BOOT_ID": "4cef257cf46b4818a75a0f463024e90d"
}`
raw := &bytes.Buffer{}
_ = json.Compact(raw, []byte(in))

line := &stream.Line{Raw: raw.Bytes(), JSON: raw.Bytes()}
entry := &structure.Entry{SkipFields: make(map[string]bool)}
djson.Unmarshal(line.JSON, entry)

p := &JournaldProcessor{}

detected := p.Detect(line, entry)
if got, want := detected, true; got != want {
t.Errorf("Detect() = %v, want %v", got, want)
}

err := p.Process(line, entry)
if err != nil {
t.Errorf("Process() = %v, want nil", err)
}

if got, want := entry.Message, "Invalid user hacker from 127.106.119.170 port 54520"; got != want {
t.Errorf("entry.Message = %v, want %v", got, want)
}

if got, want := entry.Timestamp.String(), "2023-06-16 12:51:36.987169 +0000 UTC"; got != want {
t.Errorf("entry.Timestamp = %v, want %v", got, want)
}

if got, want := entry.Severity, "INFO"; got != want {
t.Errorf("entry.Severity = %v, want %v", got, want)
}

if got, want := entry.SkipFields["MESSAGE"], true; got != want {
t.Errorf("entry.SkipFields['MESSAGE'] = %v, want %v", got, want)
}

if got, want := entry.SkipFields["__REALTIME_TIMESTAMP"], true; got != want {
t.Errorf("entry.SkipFields['__REALTIME_TIMESTAMP'] = %v, want %v", got, want)
}

if got, want := entry.SkipFields["PRIORITY"], true; got != want {
t.Errorf("entry.SkipFields['PRIORITY'] = %v, want %v", got, want)
}
}

func TestJournald_InvalidTimestamp(t *testing.T) {
t.Parallel()

entry := process(t, `{
"__REALTIME_TIMESTAMP": "yesterday",
"PRIORITY": "6",
"MESSAGE": "Invalid timestamp in __REALTIME_TIMESTAMP",
"SYSLOG_IDENTIFIER": "sshd"
}`)

if got, want := entry.RawTimestamp, "yesterday"; got != want {
t.Errorf("entry.RawTimestamp = %v, want %v", got, want)
}

if got, want := entry.Timestamp, (*time.Time)(nil); got != want {
t.Errorf("entry.Timestamp = %v, want %v", got, want)
}
}

func TestJournald_PriorityString(t *testing.T) {
t.Parallel()

entry := process(t, `{
"__REALTIME_TIMESTAMP": "1686919896987169",
"PRIORITY": "cake",
"MESSAGE": "Invalid number in PRIORITY",
"SYSLOG_IDENTIFIER": "sshd"
}`)

if got, want := entry.Severity, "CAKE"; got != want {
t.Errorf("entry.Severity = %v, want %v", got, want)
}

if got, want := entry.SkipFields["PRIORITY"], true; got != want {
t.Errorf("entry.SkipFields['PRIORITY'] = %v, want %v", got, want)
}
}

func TestJournald_PriorityInt(t *testing.T) {
t.Parallel()

entry := process(t, `{
"__REALTIME_TIMESTAMP": "1686919896987169",
"PRIORITY": 2,
"MESSAGE": "Actual number type in PRIORITY",
"SYSLOG_IDENTIFIER": "sshd"
}`)

if got, want := entry.Severity, "CRITICAL"; got != want {
t.Errorf("entry.Severity = %v, want %v", got, want)
}

if got, want := entry.SkipFields["PRIORITY"], true; got != want {
t.Errorf("entry.SkipFields['PRIORITY'] = %v, want %v", got, want)
}
}

func TestJournald_InvalidPriority(t *testing.T) {
t.Parallel()

entry := process(t, `{
"__REALTIME_TIMESTAMP": "1686919896987169",
"PRIORITY": {},
"MESSAGE": "Object type in PRIORITY",
"SYSLOG_IDENTIFIER": "sshd"
}`)

if got, want := entry.Severity, "UNKNOWN"; got != want {
t.Errorf("entry.Severity = %v, want %v", got, want)
}

if got, want := entry.SkipFields["PRIORITY"], false; got != want {
t.Errorf("entry.SkipFields['PRIORITY'] = %v, want %v", got, want)
}
}

func process(t *testing.T, input string) *structure.Entry {
t.Helper()

raw := &bytes.Buffer{}
_ = json.Compact(raw, []byte(input))

line := &stream.Line{Raw: raw.Bytes(), JSON: raw.Bytes()}
entry := &structure.Entry{SkipFields: make(map[string]bool)}
djson.Unmarshal(line.JSON, entry)

p := &JournaldProcessor{}

detected := p.Detect(line, entry)
if got, want := detected, true; got != want {
t.Fatalf("Detect() = %v, want %v", got, want)
}

err := p.Process(line, entry)
if err != nil {
t.Fatalf("Process() = %v, want nil", err)
}

return entry
}
15 changes: 15 additions & 0 deletions processors/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package processors

import (
"github.com/koenbollen/jl/stream"
"github.com/koenbollen/jl/structure"
)

type Processor interface {
Detect(line *stream.Line, entry *structure.Entry) bool
Process(line *stream.Line, entry *structure.Entry) error
}

var All = []Processor{
&JournaldProcessor{},
}
3 changes: 3 additions & 0 deletions structure/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ type Entry struct {
Message string `djson:"message,msg,text"`

Name string `djson:"app,name,service.name"`

// SkipFields is used by processors to indicate which fields should be skipped
SkipFields map[string]bool
}
8 changes: 5 additions & 3 deletions structure/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (f *Formatter) outputFields(entry *Entry, raw json.RawMessage) {
if _, ok := value.([]interface{}); ok {
continue
}
if !f.shouldSkipField(key, path+"."+key, value) {
if !f.shouldSkipField(entry, key, path+"."+key, value) {
switch v := value.(type) {
case float64:
output = append(output, key+"="+strconv.FormatFloat(v, 'f', -1, 64))
Expand All @@ -180,7 +180,7 @@ func (f *Formatter) outputFields(entry *Entry, raw json.RawMessage) {
}
}

func (f *Formatter) shouldSkipField(field, path string, value interface{}) bool {
func (f *Formatter) shouldSkipField(entry *Entry, field, path string, value interface{}) bool {
if strings.Contains(f.IncludeFields, field) || strings.Contains(f.IncludeFields, path) {
return false
}
Expand All @@ -190,7 +190,9 @@ func (f *Formatter) shouldSkipField(field, path string, value interface{}) bool
if f.MaxFieldLength > 0 && len(path+fmt.Sprintf("%v", value)) >= f.MaxFieldLength {
return true
}

if _, skip := entry.SkipFields[field]; skip {
return true
}
return contains(f.ExcludeFields, field)
}

Expand Down

0 comments on commit 83211ab

Please sign in to comment.