diff --git a/CHANGELOG.md b/CHANGELOG.md index 79631948dd..c63b720ca7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,8 @@ v1.6.0 - Reduced memory allocation in discovery components by up to 30% (@thampiotr) +- Add `tenant` label to remaining `loki_write_.+` metrics (@towolf) + ### Bugfixes - Fix issue where `alloy_prometheus_relabel_metrics_processed` was not being incremented. (@mattdurham) diff --git a/internal/component/common/loki/client/client.go b/internal/component/common/loki/client/client.go index 254806a61a..0ff02b82ab 100644 --- a/internal/component/common/loki/client/client.go +++ b/internal/component/common/loki/client/client.go @@ -60,7 +60,6 @@ type Metrics struct { mutatedBytes *prometheus.CounterVec requestDuration *prometheus.HistogramVec batchRetries *prometheus.CounterVec - countersWithHost []*prometheus.CounterVec countersWithHostTenant []*prometheus.CounterVec countersWithHostTenantReason []*prometheus.CounterVec } @@ -71,11 +70,11 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", - }, []string{HostLabel}) + }, []string{HostLabel, TenantLabel}) m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_sent_bytes_total", Help: "Number of bytes sent.", - }, []string{HostLabel}) + }, []string{HostLabel, TenantLabel}) m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_dropped_bytes_total", Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", @@ -83,7 +82,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_sent_entries_total", Help: "Number of log entries sent to the ingester.", - }, []string{HostLabel}) + }, []string{HostLabel, TenantLabel}) m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_dropped_entries_total", Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", @@ -99,18 +98,14 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "loki_write_request_duration_seconds", Help: "Duration of send requests.", - }, []string{"status_code", HostLabel}) + }, []string{"status_code", HostLabel, TenantLabel}) m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "loki_write_batch_retries_total", Help: "Number of times batches has had to be retried.", }, []string{HostLabel, TenantLabel}) - m.countersWithHost = []*prometheus.CounterVec{ - m.encodedBytes, m.sentBytes, m.sentEntries, - } - m.countersWithHostTenant = []*prometheus.CounterVec{ - m.batchRetries, + m.batchRetries, m.encodedBytes, m.sentBytes, m.sentEntries, } m.countersWithHostTenantReason = []*prometheus.CounterVec{ @@ -210,12 +205,6 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLin c.client.Timeout = cfg.Timeout - // Initialize counters to 0 so the metrics are exported before the first - // occurrence of incrementing to avoid missing metrics. - for _, counter := range c.metrics.countersWithHost { - counter.WithLabelValues(c.cfg.URL.Host).Add(0) - } - c.wg.Add(1) go c.run() return c, nil @@ -357,7 +346,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) { return } bufBytes := float64(len(buf)) - c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) backoff := backoff.New(c.ctx, c.cfg.BackoffConfig) var status int @@ -366,7 +355,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) { // send uses `timeout` internally, so `context.Background` is good enough. status, err = c.send(context.Background(), tenantID, buf) - c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) + c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host, tenantID).Observe(time.Since(start).Seconds()) // Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) { @@ -377,8 +366,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) { } if err == nil { - c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) + c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount)) return } diff --git a/internal/component/common/loki/client/client_test.go b/internal/component/common/loki/client/client_test.go index a461f1e4d4..4a183ccf86 100644 --- a/internal/component/common/loki/client/client_test.go +++ b/internal/component/common/loki/client/client_test.go @@ -83,7 +83,7 @@ func TestClient_Handle(t *testing.T) { expectedMetrics: ` # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 3.0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0 # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. # TYPE loki_write_dropped_entries_total counter loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 @@ -121,7 +121,7 @@ func TestClient_Handle(t *testing.T) { expectedMetrics: ` # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 2.0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 2.0 # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. # TYPE loki_write_dropped_entries_total counter loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 @@ -166,7 +166,7 @@ func TestClient_Handle(t *testing.T) { expectedMetrics: ` # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 3.0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0 # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. # TYPE loki_write_dropped_entries_total counter loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 @@ -208,7 +208,7 @@ func TestClient_Handle(t *testing.T) { expectedMetrics: ` # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 2.0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 2.0 # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. # TYPE loki_write_dropped_entries_total counter loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 @@ -270,7 +270,7 @@ func TestClient_Handle(t *testing.T) { loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 `, }, "do not retry send a batch in case the server responds with a 4xx": { @@ -306,7 +306,7 @@ func TestClient_Handle(t *testing.T) { loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 `, }, "do retry sending a batch in case the server responds with a 429": { @@ -350,7 +350,7 @@ func TestClient_Handle(t *testing.T) { loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 `, }, "do not retry in case of 429 when client is configured to drop rate limited batches": { @@ -387,7 +387,7 @@ func TestClient_Handle(t *testing.T) { loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0 # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 0 `, }, "batch log entries together honoring the client tenant ID": { @@ -406,7 +406,7 @@ func TestClient_Handle(t *testing.T) { expectedMetrics: ` # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 2.0 + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-default"} 2.0 # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. # TYPE loki_write_dropped_entries_total counter loki_write_dropped_entries_total{host="__HOST__", reason="ingester_error", tenant="tenant-default"} 0 @@ -451,7 +451,9 @@ func TestClient_Handle(t *testing.T) { expectedMetrics: ` # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 4.0 + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-1"} 2.0 + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-2"} 1.0 + loki_write_sent_entries_total{host="__HOST__",tenant="tenant-default"} 1.0 # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. # TYPE loki_write_dropped_entries_total counter loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0 @@ -604,7 +606,7 @@ func TestClient_StopNow(t *testing.T) { expectedMetrics: ` # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 3.0 + loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0 # HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. # TYPE loki_write_dropped_entries_total counter loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0 @@ -635,7 +637,7 @@ func TestClient_StopNow(t *testing.T) { loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0 # HELP loki_write_sent_entries_total Number of log entries sent to the ingester. # TYPE loki_write_sent_entries_total counter - loki_write_sent_entries_total{host="__HOST__"} 0 + loki_write_sent_entries_total{host="__HOST__", tenant=""} 0 `, }, } diff --git a/internal/component/common/loki/client/queue_client.go b/internal/component/common/loki/client/queue_client.go index a23f0bfce1..f676b734b6 100644 --- a/internal/component/common/loki/client/queue_client.go +++ b/internal/component/common/loki/client/queue_client.go @@ -233,12 +233,6 @@ func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config, c.client.Timeout = cfg.Timeout - // Initialize counters to 0 so the metrics are exported before the first - // occurrence of incrementing to avoid missing metrics. - for _, counter := range c.metrics.countersWithHost { - counter.WithLabelValues(c.cfg.URL.Host).Add(0) - } - c.wg.Add(1) go c.runSendOldBatches() return c, nil @@ -456,7 +450,7 @@ func (c *queueClient) sendBatch(ctx context.Context, tenantID string, batch *bat return } bufBytes := float64(len(buf)) - c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) backoff := backoff.New(c.ctx, c.cfg.BackoffConfig) var status int @@ -465,7 +459,7 @@ func (c *queueClient) sendBatch(ctx context.Context, tenantID string, batch *bat // send uses `timeout` internally, so `context.Background` is good enough. status, err = c.send(ctx, tenantID, buf) - c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) + c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host, tenantID).Observe(time.Since(start).Seconds()) // Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) { @@ -476,8 +470,8 @@ func (c *queueClient) sendBatch(ctx context.Context, tenantID string, batch *bat } if err == nil { - c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes) + c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount)) return }