Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of slog logs before logging initialized #707

Merged
merged 14 commits into from
May 1, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ Main (unreleased)
- Flow: Fix an issue where `faro.receiver`'s `extra_log_labels` with empty value don't
map existing value in log line. (@hainenber)

- Imported code using `slog` logging will now not panic and replay correctly when logged before the logging
config block is initialized. (@mattdurham)

### Other changes

- Update `alloy-mixin` to use more specific alert group names (for example,
Expand Down
105 changes: 105 additions & 0 deletions internal/alloy/logging/deferred_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package logging

import (
"context"
"log/slog"
"sync"
)

// deferredSlogHandler is used if you are using a slog handler before the logging config block is processed.
type deferredSlogHandler struct {
mut sync.RWMutex
group string
attrs []slog.Attr
children []*deferredSlogHandler
handle slog.Handler
l *Logger
}

func newDeferredHandler(l *Logger) *deferredSlogHandler {
return &deferredSlogHandler{
children: make([]*deferredSlogHandler, 0),
l: l,
}
}

func (d *deferredSlogHandler) Handle(ctx context.Context, r slog.Record) error {
d.mut.RLock()
defer d.mut.RUnlock()

if d.handle != nil {
return d.handle.Handle(ctx, r)
}
d.l.addRecord(r, d)
return nil
}

// Enabled reports whether the handler handles records at the given level.
// The handler ignores records whose level is lower.
func (d *deferredSlogHandler) Enabled(ctx context.Context, level slog.Level) bool {
d.mut.RLock()
defer d.mut.RUnlock()

if d.handle != nil {
return d.handle.Enabled(ctx, level)
}
return true
}

// WithAttrs returns a new [TextHandler] whose attributes consists
// of h's attributes followed by attrs.
func (d *deferredSlogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
d.mut.RLock()
defer d.mut.RUnlock()

if d.handle != nil {
return d.handle.WithAttrs(attrs)
}

child := &deferredSlogHandler{
attrs: attrs,
children: make([]*deferredSlogHandler, 0),
l: d.l,
}
d.children = append(d.children, child)
return child
}

func (d *deferredSlogHandler) WithGroup(name string) slog.Handler {
d.mut.RLock()
defer d.mut.RUnlock()

if d.handle != nil {
return d.handle.WithGroup(name)
}

child := &deferredSlogHandler{
children: make([]*deferredSlogHandler, 0),
group: name,
l: d.l,
}
d.children = append(d.children, child)
return child
}

// buildHandlers will recursively build actual handlers, this should only be called before replaying once the logging config
// block is set.
func (d *deferredSlogHandler) buildHandlers(parent slog.Handler) {
d.mut.Lock()
defer d.mut.Unlock()

// Root node will not have attrs or groups.
if parent == nil {
d.handle = d.l.handler
} else {
if d.group != "" {
d.handle = parent.WithGroup(d.group)
} else {
d.handle = parent.WithAttrs(d.attrs)
}
}
for _, child := range d.children {
child.buildHandlers(d.handle)
}
d.children = nil
}
267 changes: 267 additions & 0 deletions internal/alloy/logging/deferred_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package logging

import (
"bytes"
"context"
"encoding/json"
"github.com/go-kit/log/level"
"github.com/stretchr/testify/require"
"log/slog"
"strings"
"testing"
"testing/slogtest"
)

func TestDefferredSlogTester(t *testing.T) {
buf := &bytes.Buffer{}
var l *Logger
results := func(t *testing.T) map[string]any {
// Nothing has been written to the byte stream, it only exists in the internal logger buffer
// We need to call l.Update to flush it to the byte stream.
// This does something a bit ugly where it DEPENDS on the var in slogtest.Run, if the behavior of slogtest.Run
// changes this may break the tests.
updateErr := l.Update(Options{
Level: "debug",
Format: "json",
WriteTo: nil,
})
require.NoError(t, updateErr)
line := buf.Bytes()
if len(line) == 0 {
return nil
}
var m map[string]any
unmarshalErr := json.Unmarshal(line, &m)
require.NoError(t, unmarshalErr)
// The tests expect time field and not ts.
if _, found := m["ts"]; found {
m[slog.TimeKey] = m["ts"]
delete(m, "ts")
}
// Need to reset the buffer and logger between each test.
l = nil
buf.Reset()
return m
}

// Had to add some custom logic to handle updated for the deferred tests.
// Also ignore anything that modifies the log line, which are two tests.
slogtest.Run(t, func(t *testing.T) slog.Handler {
var err error
l, err = NewDeferred(buf)
require.NoError(t, err)
return l.Handler()
}, results)
}

func TestDeferredHandlerWritingToBothLoggers(t *testing.T) {
bb := &bytes.Buffer{}
l, err := NewDeferred(bb)
slogger := slog.New(l.deferredSlog)
require.NoError(t, err)
l.Log("msg", "this should happen before")
slogger.Log(context.Background(), slog.LevelInfo, "this should happen after)")

err = l.Update(Options{
Level: "info",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
firstIndex := strings.Index(bb.String(), "this should happen before")
secondIndex := strings.Index(bb.String(), "this should happen after")
require.True(t, firstIndex < secondIndex)
}

func TestSlogHandle(t *testing.T) {
bb := &bytes.Buffer{}
bbSl := &bytes.Buffer{}
sl, alloy, l := newLoggers(t, bb, bbSl)
logInfo(sl, alloy, "test")
err := l.Update(Options{
Level: "debug",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
require.True(t, equal(bb, bbSl))
}

func TestSlogHandleWithDifferingLevelDeny(t *testing.T) {
bb := &bytes.Buffer{}
bbSl := &bytes.Buffer{}
sl, alloy, l := newLoggers(t, bb, bbSl)
logInfo(sl, alloy, "test")
err := l.Update(Options{
Level: "warn",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
require.True(t, bb.Len() == 0)
}

func TestSlogHandleWithDifferingLevelAllow(t *testing.T) {
bb := &bytes.Buffer{}
bbSl := &bytes.Buffer{}
sl, alloy, l := newLoggers(t, bb, bbSl)
logError(sl, alloy, "test")
err := l.Update(Options{
Level: "warn",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
require.True(t, bb.Len() > 0)
}

func TestNormalWithDifferingLevelDeny(t *testing.T) {
bb := &bytes.Buffer{}
l, err := newDeferredTest(bb)
require.NoError(t, err)
level.Debug(l).Log("msg", "this should not log")
err = l.Update(Options{
Level: "error",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
require.True(t, bb.Len() == 0)
}

func TestNormalWithDifferingLevelAllow(t *testing.T) {
bb := &bytes.Buffer{}
l, err := newDeferredTest(bb)
require.NoError(t, err)
level.Error(l).Log("msg", "this should not log")
err = l.Update(Options{
Level: "error",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
// Since we write logs at info, but change to error then our logInfo should be clean.
require.True(t, bb.Len() > 0)
}

func TestDeferredHandler(t *testing.T) {
type testCase struct {
name string
log func(bb *bytes.Buffer, slBB *bytes.Buffer)
}

var testCases = []testCase{
{
name: "Single Attr",
log: func(bb *bytes.Buffer, bbSl *bytes.Buffer) {
sl, alloy, l := newLoggers(t, bb, bbSl)

sl, alloy = withAttrs(sl, alloy, "attr1", "value1")
logInfo(sl, alloy, "test")
err := l.Update(Options{
Level: "debug",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
},
},
{
name: "Attrs Nested",
log: func(bb *bytes.Buffer, bbSl *bytes.Buffer) {
sl, alloy, l := newLoggers(t, bb, bbSl)
sl, alloy = withAttrs(sl, alloy, "attr1", "value1")
sl, alloy = withAttrs(sl, alloy, "nestedattr1", "nestedvalue1")
logInfo(sl, alloy, "test")
err := l.Update(Options{
Level: "debug",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
},
},
{
name: "Group",
log: func(bb *bytes.Buffer, bbSl *bytes.Buffer) {
sl, alloy, l := newLoggers(t, bb, bbSl)
sl, alloy = withGroup(sl, alloy, "gr1")
sl, alloy = withAttrs(sl, alloy, "nestedattr1", "nestedvalue1")
logInfo(sl, alloy, "test")
err := l.Update(Options{
Level: "debug",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
},
},
{
name: "Nested Group",
log: func(bb *bytes.Buffer, bbSl *bytes.Buffer) {
sl, alloy, l := newLoggers(t, bb, bbSl)
sl, alloy = withGroup(sl, alloy, "gr1")
sl, alloy = withGroup(sl, alloy, "gr2")
sl, alloy = withAttrs(sl, alloy, "nestedattr1", "nestedvalue1")
logInfo(sl, alloy, "test")
err := l.Update(Options{
Level: "debug",
Format: "json",
WriteTo: nil,
})
require.NoError(t, err)
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
bb := &bytes.Buffer{}
bbSl := &bytes.Buffer{}
tc.log(bb, bbSl)
require.True(t, equal(bb, bbSl))
})
}
}

func newLoggers(t *testing.T, bb, bbSl *bytes.Buffer) (*slog.Logger, *slog.Logger, *Logger) {
l, err := newDeferredTest(bb)
require.NoError(t, err)

jsonH := slog.NewJSONHandler(bbSl, &slog.HandlerOptions{
AddSource: true,
Level: nil,
ReplaceAttr: testReplace,
})
sl := slog.New(jsonH)
alloy := slog.New(l.deferredSlog)
return sl, alloy, l
}

func withAttrs(sl *slog.Logger, alloyL *slog.Logger, attrs ...string) (*slog.Logger, *slog.Logger) {
var attrAny []any
for _, a := range attrs {
attrAny = append(attrAny, a)
}
return sl.With(attrAny...), alloyL.With(attrAny...)
}

func withGroup(sl *slog.Logger, alloyL *slog.Logger, group string) (*slog.Logger, *slog.Logger) {
return sl.WithGroup(group), alloyL.WithGroup(group)
}

func logInfo(sl *slog.Logger, alloyL *slog.Logger, msg string) {
ctx := context.Background()
sl.Log(ctx, slog.LevelInfo, msg)
alloyL.Log(ctx, slog.LevelInfo, msg)
}

func logError(sl *slog.Logger, alloyL *slog.Logger, msg string) {
ctx := context.Background()
sl.Log(ctx, slog.LevelError, msg)
alloyL.Log(ctx, slog.LevelError, msg)
}

func equal(sl *bytes.Buffer, alloy *bytes.Buffer) bool {
return sl.String() == alloy.String()
}
Loading
Loading