From 2d5c5de1ef3de18a2dd11d61e4f8ffebb0822098 Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Mon, 13 Jan 2025 12:27:41 +0100 Subject: [PATCH] database_observability: report health of component and collectors Report unhealthy in case of errors when starting up the collectors or of any collector is stopped during operations. --- CHANGELOG.md | 12 +++-- .../mysql/collector/connection_info.go | 15 ++++++ .../mysql/collector/query_sample.go | 11 ++-- .../mysql/collector/schema_table.go | 31 +++++++++-- .../database_observability/mysql/component.go | 54 ++++++++++++++++++- 5 files changed, 108 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4f1a41b49..383b6e603f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ Main (unreleased) - Add support for TLS to `prometheus.write.queue`. (@mattdurham) +- (_Experimental_) Add health reporting to `database_observability.mysql` component (@cristiangreco) + v1.6.0-rc.0 ----------------- @@ -124,11 +126,11 @@ v1.6.0-rc.0 https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36071 - `otelcol.exporter.datadog`: Stop prefixing `http_server_duration`, `http_server_request_size` and `http_server_response_size` with `otelcol`. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36265 - These metrics can be from SDKs rather than collector. Stop prefixing them to be consistent with + These metrics can be from SDKs rather than collector. Stop prefixing them to be consistent with https://opentelemetry.io/docs/collector/internal-telemetry/#lists-of-internal-metrics - `otelcol.receiver.datadog`: Add json handling for the `api/v2/series` endpoint in the datadogreceiver. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36218 - - `otelcol.processor.span`: Add a new `keep_original_name` configuration argument + - `otelcol.processor.span`: Add a new `keep_original_name` configuration argument to keep the original span name when extracting attributes from the span name. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36397 - `pkg/ottl`: Respect the `depth` option when flattening slices using `flatten`. @@ -136,14 +138,14 @@ v1.6.0-rc.0 https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36198 - `otelcol.exporter.loadbalancing`: Shutdown exporters during collector shutdown. This fixes a memory leak. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36024 - - `otelcol.processor.k8sattributes`: New `wait_for_metadata` and `wait_for_metadata_timeout` configuration arguments, + - `otelcol.processor.k8sattributes`: New `wait_for_metadata` and `wait_for_metadata_timeout` configuration arguments, which block the processor startup until metadata is received from Kubernetes. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32556 - - `otelcol.processor.k8sattributes`: Enable the `k8sattr.fieldExtractConfigRegex.disallow` for all Alloy instances, + - `otelcol.processor.k8sattributes`: Enable the `k8sattr.fieldExtractConfigRegex.disallow` for all Alloy instances, to retain the behavior of `regex` argument in the `annotation` and `label` blocks. When the feature gate is "deprecated" in the upstream Collector, Alloy users will need to use the transform processor instead. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/25128 - - `otelcol.receiver.vcenter`: The existing code did not honor TLS settings beyond 'insecure'. + - `otelcol.receiver.vcenter`: The existing code did not honor TLS settings beyond 'insecure'. All TLS client config should now be honored. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36482 - `otelcol.receiver.opencensus`: Do not report error message when OpenCensus receiver is shutdown cleanly. diff --git a/internal/component/database_observability/mysql/collector/connection_info.go b/internal/component/database_observability/mysql/collector/connection_info.go index bd890788a0..5b9aec35b2 100644 --- a/internal/component/database_observability/mysql/collector/connection_info.go +++ b/internal/component/database_observability/mysql/collector/connection_info.go @@ -8,6 +8,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" ) var rdsRegex = regexp.MustCompile(`(?P[^\.]+)\.([^\.]+)\.(?P[^\.]+)\.rds\.amazonaws\.com`) @@ -21,6 +22,8 @@ type ConnectionInfo struct { DSN string Registry *prometheus.Registry InfoMetric *prometheus.GaugeVec + + running *atomic.Bool } func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { @@ -35,15 +38,22 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { DSN: args.DSN, Registry: args.Registry, InfoMetric: infoMetric, + running: &atomic.Bool{}, }, nil } +func (c *ConnectionInfo) Name() string { + return "ConnectionInfo" +} + func (c *ConnectionInfo) Start(ctx context.Context) error { cfg, err := mysql.ParseDSN(c.DSN) if err != nil { return err } + c.running.Store(true) + var ( providerName = "unknown" providerRegion = "unknown" @@ -66,6 +76,11 @@ func (c *ConnectionInfo) Start(ctx context.Context) error { return nil } +func (c *ConnectionInfo) Stopped() bool { + return !c.running.Load() +} + func (c *ConnectionInfo) Stop() { c.Registry.Unregister(c.InfoMetric) + c.running.Store(false) } diff --git a/internal/component/database_observability/mysql/collector/query_sample.go b/internal/component/database_observability/mysql/collector/query_sample.go index f06ef5907c..d00f08f0e6 100644 --- a/internal/component/database_observability/mysql/collector/query_sample.go +++ b/internal/component/database_observability/mysql/collector/query_sample.go @@ -5,12 +5,12 @@ import ( "database/sql" "fmt" "strings" - "sync/atomic" "time" "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/xwb1989/sqlparser" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/database_observability" @@ -57,11 +57,15 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) { dbConnection: args.DB, collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, - logger: args.Logger, + logger: log.With(args.Logger, "collector", "QuerySample"), running: &atomic.Bool{}, }, nil } +func (c *QuerySample) Name() string { + return "QuerySample" +} + func (c *QuerySample) Start(ctx context.Context) error { level.Debug(c.logger).Log("msg", "QuerySample collector started") @@ -81,6 +85,7 @@ func (c *QuerySample) Start(ctx context.Context) error { for { if err := c.fetchQuerySamples(c.ctx); err != nil { level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err) + c.Stop() break } @@ -127,7 +132,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { } if strings.HasSuffix(sampleText, "...") { - level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest) + level.Debug(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest) continue } diff --git a/internal/component/database_observability/mysql/collector/schema_table.go b/internal/component/database_observability/mysql/collector/schema_table.go index 89d50f1ba1..67dbd07f0f 100644 --- a/internal/component/database_observability/mysql/collector/schema_table.go +++ b/internal/component/database_observability/mysql/collector/schema_table.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/prometheus/common/model" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/database_observability" @@ -66,10 +67,10 @@ type SchemaTable struct { // TODO(cristian): allow configuring cache size (currently unlimited). cache *expirable.LRU[string, tableInfo] - logger log.Logger - - ctx context.Context - cancel context.CancelFunc + logger log.Logger + running *atomic.Bool + ctx context.Context + cancel context.CancelFunc } type tableInfo struct { @@ -86,18 +87,29 @@ func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) { collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL), - logger: args.Logger, + logger: log.With(args.Logger, "collector", "SchemaTable"), + running: &atomic.Bool{}, }, nil } +func (c *SchemaTable) Name() string { + return "SchemaTable" +} + func (c *SchemaTable) Start(ctx context.Context) error { level.Debug(c.logger).Log("msg", "SchemaTable collector started") + c.running.Store(true) ctx, cancel := context.WithCancel(ctx) c.ctx = ctx c.cancel = cancel go func() { + defer func() { + c.Stop() + c.running.Store(false) + }() + ticker := time.NewTicker(c.collectInterval) for { @@ -119,6 +131,10 @@ func (c *SchemaTable) Start(ctx context.Context) error { return nil } +func (c *SchemaTable) Stopped() bool { + return !c.running.Load() +} + // Stop should be kept idempotent func (c *SchemaTable) Stop() { c.cancel() @@ -155,6 +171,11 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { } } + if len(schemas) == 0 { + level.Info(c.logger).Log("msg", "no schema detected from information_schema.schemata") + return nil + } + tables := []tableInfo{} for _, schema := range schemas { diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index a7171b81a6..e57d890f23 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/model" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" @@ -79,12 +80,15 @@ type Exports struct { } var ( - _ component.Component = (*Component)(nil) - _ http_service.Component = (*Component)(nil) + _ component.Component = (*Component)(nil) + _ http_service.Component = (*Component)(nil) + _ component.HealthComponent = (*Component)(nil) ) type Collector interface { + Name() string Start(context.Context) error + Stopped() bool Stop() } @@ -98,6 +102,7 @@ type Component struct { baseTarget discovery.Target collectors []Collector dbConnection *sql.DB + healthErr *atomic.String } func New(opts component.Options, args Arguments) (*Component, error) { @@ -107,6 +112,7 @@ func New(opts component.Options, args Arguments) (*Component, error) { receivers: args.ForwardTo, handler: loki.NewLogsReceiver(), registry: prometheus.NewRegistry(), + healthErr: atomic.NewString(""), } baseTarget, err := c.getBaseTarget() @@ -184,6 +190,16 @@ func (c *Component) Update(args component.Arguments) error { c.args = args.(Arguments) + if err := c.startCollectors(); err != nil { + c.healthErr.Store(err.Error()) + return err + } + + c.healthErr.Store("") + return nil +} + +func (c *Component) startCollectors() error { dbConnection, err := sql.Open("mysql", formatDSN(string(c.args.DataSourceName), "parseTime=true")) if err != nil { return err @@ -254,6 +270,40 @@ func (c *Component) Handler() http.Handler { return promhttp.HandlerFor(c.registry, promhttp.HandlerOpts{}) } +func (c *Component) CurrentHealth() component.Health { + if err := c.healthErr.Load(); err != "" { + return component.Health{ + Health: component.HealthTypeUnhealthy, + Message: err, + UpdateTime: time.Now(), + } + } + + var unhealthyCollectors []string + + c.mut.RLock() + for _, collector := range c.collectors { + if collector.Stopped() { + unhealthyCollectors = append(unhealthyCollectors, collector.Name()) + } + } + c.mut.RUnlock() + + if len(unhealthyCollectors) > 0 { + return component.Health{ + Health: component.HealthTypeUnhealthy, + Message: "One or more collectors are unhealthy: [" + strings.Join(unhealthyCollectors, ", ") + "]", + UpdateTime: time.Now(), + } + } + + return component.Health{ + Health: component.HealthTypeHealthy, + Message: "All collectors are healthy", + UpdateTime: time.Now(), + } +} + // instanceKey returns network(hostname:port)/dbname of the MySQL server. // This is the same key as used by the mysqld_exporter integration. func (c *Component) instanceKey() string {