Skip to content

Commit

Permalink
prometheus.remote_write: notify remote_write of data upon WAL commit
Browse files Browse the repository at this point in the history
This commit fixes an issue where the remote_write queue only checked for
new WAL data every 15 seconds. remote_write can be notified of new data
immediately by calling `Notify` on the remote storage, but we weren't
doing this, which caused us to fall back to the 15 second interval.
  • Loading branch information
rfratto committed Apr 30, 2024
1 parent 07ccc05 commit ed1bfea
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 2 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ Main (unreleased)

- In `mimir.rules.kubernetes`, fix an issue where unrecoverable errors from the Mimir API were retried. (@56quarters)

- Flow: Fix an issue where `faro.receiver`'s `extra_log_labels` with empty value don't
map existing value in log line. (@hainenber)
- Fix an issue where `faro.receiver`'s `extra_log_labels` with empty value
don't map existing value in log line. (@hainenber)

- Fix an issue where `prometheus.remote_write` only queued data for sending
every 15 seconds instead of as soon as data was written to the WAL.
(@rfratto)

### Other changes

Expand Down
2 changes: 2 additions & 0 deletions internal/component/prometheus/remotewrite/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func New(o component.Options, c Arguments) (*Component, error) {
remoteLogger := log.With(o.Logger, "subcomponent", "rw")
remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil)

walStorage.SetNotifier(remoteStore)

service, err := o.GetServiceData(labelstore.ServiceName)
if err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions internal/static/metrics/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type Storage struct {
deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until.

metrics *storageMetrics

notifier wlog.WriteNotified
}

// NewStorage makes a new Storage.
Expand Down Expand Up @@ -457,6 +459,12 @@ func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chun
}
}

// SetNotifier sets the notifier for the WAL storage. SetNotifier must only be
// called before data is written to the WAL.
func (w *Storage) SetNotifier(n wlog.WriteNotified) {
w.notifier = n
}

// Directory returns the path where the WAL storage is held.
func (w *Storage) Directory() string {
return w.path
Expand Down Expand Up @@ -859,6 +867,10 @@ func (a *appender) Commit() error {
return err
}

if a.w.notifier != nil {
a.w.notifier.Notify()
}

a.clearData()
a.w.appenderPool.Put(a)
return nil
Expand Down
16 changes: 16 additions & 0 deletions internal/static/metrics/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,15 @@ func TestStorage_InvalidSeries(t *testing.T) {
func TestStorage(t *testing.T) {
walDir := t.TempDir()

onNotify := util.NewWaitTrigger()
notifier := &fakeNotifier{NotitfyFunc: onNotify.Trigger}

s, err := NewStorage(log.NewNopLogger(), nil, walDir)
require.NoError(t, err)
defer func() {
require.NoError(t, s.Close())
}()
s.SetNotifier(notifier)

app := s.Appender(context.Background())

Expand Down Expand Up @@ -99,6 +103,8 @@ func TestStorage(t *testing.T) {
actualExemplars := collector.exemplars
sort.Sort(byRefExemplar(actualExemplars))
require.Equal(t, expectedExemplars, actualExemplars)

require.NoError(t, onNotify.Wait(time.Minute), "Expected Notify to be called")
}

func TestStorage_Rollback(t *testing.T) {
Expand Down Expand Up @@ -664,3 +670,13 @@ func (b byRefExemplar) Less(i, j int) bool {
}
return b[i].Ref < b[j].Ref
}

type fakeNotifier struct {
NotitfyFunc func()
}

func (fn *fakeNotifier) Notify() {
if fn.NotitfyFunc != nil {
fn.NotitfyFunc()
}
}

0 comments on commit ed1bfea

Please sign in to comment.