From ed1bfea3597032da000b0aacb991b3121ffb5960 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 30 Apr 2024 12:13:06 -0400 Subject: [PATCH] prometheus.remote_write: notify remote_write of data upon WAL commit This commit fixes an issue where the remote_write queue only checked for new WAL data every 15 seconds. remote_write can be notified of new data immediately by calling `Notify` on the remote storage, but we weren't doing this, which caused us to fall back to the 15 second interval. --- CHANGELOG.md | 8 ++++++-- .../prometheus/remotewrite/remote_write.go | 2 ++ internal/static/metrics/wal/wal.go | 12 ++++++++++++ internal/static/metrics/wal/wal_test.go | 16 ++++++++++++++++ 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13dbe33bcd..3129801205 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,8 +47,12 @@ Main (unreleased) - In `mimir.rules.kubernetes`, fix an issue where unrecoverable errors from the Mimir API were retried. (@56quarters) -- Flow: Fix an issue where `faro.receiver`'s `extra_log_labels` with empty value don't - map existing value in log line. (@hainenber) +- Fix an issue where `faro.receiver`'s `extra_log_labels` with empty value + don't map existing value in log line. (@hainenber) + +- Fix an issue where `prometheus.remote_write` only queued data for sending + every 15 seconds instead of as soon as data was written to the WAL. + (@rfratto) ### Other changes diff --git a/internal/component/prometheus/remotewrite/remote_write.go b/internal/component/prometheus/remotewrite/remote_write.go index 3e64328ae5..8bfc905b90 100644 --- a/internal/component/prometheus/remotewrite/remote_write.go +++ b/internal/component/prometheus/remotewrite/remote_write.go @@ -84,6 +84,8 @@ func New(o component.Options, c Arguments) (*Component, error) { remoteLogger := log.With(o.Logger, "subcomponent", "rw") remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil) + walStorage.SetNotifier(remoteStore) + service, err := o.GetServiceData(labelstore.ServiceName) if err != nil { return nil, err diff --git a/internal/static/metrics/wal/wal.go b/internal/static/metrics/wal/wal.go index bab0fd4033..eb0441d8e6 100644 --- a/internal/static/metrics/wal/wal.go +++ b/internal/static/metrics/wal/wal.go @@ -141,6 +141,8 @@ type Storage struct { deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until. metrics *storageMetrics + + notifier wlog.WriteNotified } // NewStorage makes a new Storage. @@ -457,6 +459,12 @@ func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chun } } +// SetNotifier sets the notifier for the WAL storage. SetNotifier must only be +// called before data is written to the WAL. +func (w *Storage) SetNotifier(n wlog.WriteNotified) { + w.notifier = n +} + // Directory returns the path where the WAL storage is held. func (w *Storage) Directory() string { return w.path @@ -859,6 +867,10 @@ func (a *appender) Commit() error { return err } + if a.w.notifier != nil { + a.w.notifier.Notify() + } + a.clearData() a.w.appenderPool.Put(a) return nil diff --git a/internal/static/metrics/wal/wal_test.go b/internal/static/metrics/wal/wal_test.go index 8563b0b400..350461415d 100644 --- a/internal/static/metrics/wal/wal_test.go +++ b/internal/static/metrics/wal/wal_test.go @@ -64,11 +64,15 @@ func TestStorage_InvalidSeries(t *testing.T) { func TestStorage(t *testing.T) { walDir := t.TempDir() + onNotify := util.NewWaitTrigger() + notifier := &fakeNotifier{NotitfyFunc: onNotify.Trigger} + s, err := NewStorage(log.NewNopLogger(), nil, walDir) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() + s.SetNotifier(notifier) app := s.Appender(context.Background()) @@ -99,6 +103,8 @@ func TestStorage(t *testing.T) { actualExemplars := collector.exemplars sort.Sort(byRefExemplar(actualExemplars)) require.Equal(t, expectedExemplars, actualExemplars) + + require.NoError(t, onNotify.Wait(time.Minute), "Expected Notify to be called") } func TestStorage_Rollback(t *testing.T) { @@ -664,3 +670,13 @@ func (b byRefExemplar) Less(i, j int) bool { } return b[i].Ref < b[j].Ref } + +type fakeNotifier struct { + NotitfyFunc func() +} + +func (fn *fakeNotifier) Notify() { + if fn.NotitfyFunc != nil { + fn.NotitfyFunc() + } +}