Skip to content

Commit

Permalink
feat(loki/src/k8s): not restart tailers in loki.source.kubernetes c…
Browse files Browse the repository at this point in the history
…omponent by above-average time deltas if K8s version is > 1.29.0 (#6263)

Signed-off-by: hainenber <dotronghai96@gmail.com>
  • Loading branch information
hainenber authored and tpaschalis committed Apr 18, 2024
1 parent 41c2df1 commit b88edd5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ Main (unreleased)
- In `prometheus.exporter.kafka`, the interpolation table used to compute estimated lag metrics is now pruned
on `metadata_refresh_interval` instead of `prune_interval_seconds`. (@wildum)

- Don't restart tailers in `loki.source.kubernetes` component by above-average
time deltas if K8s version is >= 1.29.1 (@hainenber)

### Bugfixes

- Fixed issue with defaults for Beyla component not being applied correctly. (marctc)
Expand Down
81 changes: 49 additions & 32 deletions internal/component/loki/source/kubernetes/kubetail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/alloy/logging/level"
"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -170,6 +171,15 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
return err
}

k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion()
if err != nil {
return err
}
k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion)
if err != nil {
return err
}

// Create a new rolling average calculator to determine the average delta
// time between log entries.
//
Expand All @@ -180,41 +190,48 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
// The computed average will never be less than the minimum of 2s.
calc := newRollingAverageCalculator(10000, 100, 2*time.Second, maxTailerLifetime)

go func() {
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
case <-ctx.Done():
return
case <-rolledFileTicker.C:
// Versions of Kubernetes which do not contain
// kubernetes/kubernetes#115702 will fail to detect rolled log files
// and stop sending logs to us.
//
// To work around this, we use a rolling average to determine how
// frequent we usually expect to see entries. If 3x the normal delta has
// elapsed, we'll restart the tailer.
//
// False positives here are acceptable, but false negatives mean that
// we'll have a larger spike of missing logs until we detect a rolled
// file.
avg := calc.GetAverage()
last := calc.GetLast()
if last.IsZero() {
continue
}
s := time.Since(last)
if s > avg*3 {
level.Info(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)
// Versions of Kubernetes which do not contain
// kubernetes/kubernetes#115702 (<= v1.29.1) will fail to detect rotated log files
// and stop sending logs to us.
//
// To work around this, we use a rolling average to determine how
// frequent we usually expect to see entries. If 3x the normal delta has
// elapsed, we'll restart the tailer.
//
// False positives here are acceptable, but false negatives mean that
// we'll have a larger spike of missing logs until we detect a rolled
// file.
if k8sComparableServerVersion.LT(semver.Version{Major: 1, Minor: 29, Patch: 0}) {
go func() {
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
case <-ctx.Done():
return
case <-rolledFileTicker.C:
avg := calc.GetAverage()
last := calc.GetLast()
if last.IsZero() {
continue
}
s := time.Since(last)
if s > avg*3 {
level.Debug(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)
return
}
}
}
}
}()
}()
} else {
go func() {
<-ctx.Done()
_ = stream.Close()
}()
}

level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)

Expand Down

0 comments on commit b88edd5

Please sign in to comment.