Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

database_observability: report health of component and collectors #2392

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Main (unreleased)
### Enhancements

- Update `prometheus.write.queue` to support v2 for cpu performance. (@mattdurham)
- (_Experimental_) Add health reporting to `database_observability.mysql` component (@cristiangreco)

v1.6.0-rc.0
-----------------
Expand Down Expand Up @@ -132,26 +133,26 @@ v1.6.0-rc.0
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36071
- `otelcol.exporter.datadog`: Stop prefixing `http_server_duration`, `http_server_request_size` and `http_server_response_size` with `otelcol`.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36265
These metrics can be from SDKs rather than collector. Stop prefixing them to be consistent with
These metrics can be from SDKs rather than collector. Stop prefixing them to be consistent with
https://opentelemetry.io/docs/collector/internal-telemetry/#lists-of-internal-metrics
- `otelcol.receiver.datadog`: Add json handling for the `api/v2/series` endpoint in the datadogreceiver.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36218
- `otelcol.processor.span`: Add a new `keep_original_name` configuration argument
- `otelcol.processor.span`: Add a new `keep_original_name` configuration argument
to keep the original span name when extracting attributes from the span name.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36397
- `pkg/ottl`: Respect the `depth` option when flattening slices using `flatten`.
The `depth` option is also now required to be at least `1`.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36198
- `otelcol.exporter.loadbalancing`: Shutdown exporters during collector shutdown. This fixes a memory leak.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36024
- `otelcol.processor.k8sattributes`: New `wait_for_metadata` and `wait_for_metadata_timeout` configuration arguments,
- `otelcol.processor.k8sattributes`: New `wait_for_metadata` and `wait_for_metadata_timeout` configuration arguments,
which block the processor startup until metadata is received from Kubernetes.
https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32556
- `otelcol.processor.k8sattributes`: Enable the `k8sattr.fieldExtractConfigRegex.disallow` for all Alloy instances,
- `otelcol.processor.k8sattributes`: Enable the `k8sattr.fieldExtractConfigRegex.disallow` for all Alloy instances,
to retain the behavior of `regex` argument in the `annotation` and `label` blocks.
When the feature gate is "deprecated" in the upstream Collector, Alloy users will need to use the transform processor instead.
https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/25128
- `otelcol.receiver.vcenter`: The existing code did not honor TLS settings beyond 'insecure'.
- `otelcol.receiver.vcenter`: The existing code did not honor TLS settings beyond 'insecure'.
All TLS client config should now be honored.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36482
- `otelcol.receiver.opencensus`: Do not report error message when OpenCensus receiver is shutdown cleanly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

var rdsRegex = regexp.MustCompile(`(?P<identifier>[^\.]+)\.([^\.]+)\.(?P<region>[^\.]+)\.rds\.amazonaws\.com`)
Expand All @@ -21,6 +22,8 @@ type ConnectionInfo struct {
DSN string
Registry *prometheus.Registry
InfoMetric *prometheus.GaugeVec

running *atomic.Bool
}

func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
Expand All @@ -35,15 +38,22 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
DSN: args.DSN,
Registry: args.Registry,
InfoMetric: infoMetric,
running: &atomic.Bool{},
}, nil
}

func (c *ConnectionInfo) Name() string {
return "ConnectionInfo"
}

func (c *ConnectionInfo) Start(ctx context.Context) error {
cfg, err := mysql.ParseDSN(c.DSN)
if err != nil {
return err
}

c.running.Store(true)

var (
providerName = "unknown"
providerRegion = "unknown"
Expand All @@ -66,6 +76,11 @@ func (c *ConnectionInfo) Start(ctx context.Context) error {
return nil
}

func (c *ConnectionInfo) Stopped() bool {
return !c.running.Load()
}

func (c *ConnectionInfo) Stop() {
c.Registry.Unregister(c.InfoMetric)
c.running.Store(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/xwb1989/sqlparser"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
Expand Down Expand Up @@ -57,11 +57,15 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
dbConnection: args.DB,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: args.Logger,
logger: log.With(args.Logger, "collector", "QuerySample"),
running: &atomic.Bool{},
}, nil
}

func (c *QuerySample) Name() string {
return "QuerySample"
}

func (c *QuerySample) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "QuerySample collector started")

Expand All @@ -81,6 +85,7 @@ func (c *QuerySample) Start(ctx context.Context) error {
for {
if err := c.fetchQuerySamples(c.ctx); err != nil {
level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err)
c.Stop()
break
}

Expand Down Expand Up @@ -127,7 +132,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
}

if strings.HasSuffix(sampleText, "...") {
level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
level.Debug(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by remove noisy info log

continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
Expand Down Expand Up @@ -66,10 +67,10 @@ type SchemaTable struct {
// TODO(cristian): allow configuring cache size (currently unlimited).
cache *expirable.LRU[string, tableInfo]

logger log.Logger

ctx context.Context
cancel context.CancelFunc
logger log.Logger
running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

type tableInfo struct {
Expand All @@ -86,18 +87,29 @@ func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL),
logger: args.Logger,
logger: log.With(args.Logger, "collector", "SchemaTable"),
running: &atomic.Bool{},
}, nil
}

func (c *SchemaTable) Name() string {
return "SchemaTable"
}

func (c *SchemaTable) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "SchemaTable collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
c.ctx = ctx
c.cancel = cancel

go func() {
defer func() {
c.Stop()
c.running.Store(false)
}()

ticker := time.NewTicker(c.collectInterval)

for {
Expand All @@ -119,6 +131,10 @@ func (c *SchemaTable) Start(ctx context.Context) error {
return nil
}

func (c *SchemaTable) Stopped() bool {
return !c.running.Load()
}

// Stop should be kept idempotent
func (c *SchemaTable) Stop() {
c.cancel()
Expand Down Expand Up @@ -155,6 +171,11 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
}
}

if len(schemas) == 0 {
level.Info(c.logger).Log("msg", "no schema detected from information_schema.schemata")
return nil
}

Comment on lines +174 to +178
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by: log if no schema is detected

tables := []tableInfo{}

for _, schema := range schemas {
Expand Down
54 changes: 52 additions & 2 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -79,12 +80,15 @@ type Exports struct {
}

var (
_ component.Component = (*Component)(nil)
_ http_service.Component = (*Component)(nil)
_ component.Component = (*Component)(nil)
_ http_service.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)

type Collector interface {
Name() string
Start(context.Context) error
Stopped() bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, you might have collectors that can be considered unhealthy but are still running. A different approach to support this would be to have a CurrentHealth function in the collector interface that returns the health object. Then you would not need the healthErr attribute anymore, you would just call CurrentHealth on all the collectors in the CurrentHealth function of the component.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a great point. I wanted to start simple for now, as collectors are anyway not resilient at all (they'll stop as soon as any error is hit). Agree that in the future we might want to delegate the logic to the collectors themselves.

Stop()
}

Expand All @@ -98,6 +102,7 @@ type Component struct {
baseTarget discovery.Target
collectors []Collector
dbConnection *sql.DB
healthErr *atomic.String
}

func New(opts component.Options, args Arguments) (*Component, error) {
Expand All @@ -107,6 +112,7 @@ func New(opts component.Options, args Arguments) (*Component, error) {
receivers: args.ForwardTo,
handler: loki.NewLogsReceiver(),
registry: prometheus.NewRegistry(),
healthErr: atomic.NewString(""),
}

baseTarget, err := c.getBaseTarget()
Expand Down Expand Up @@ -184,6 +190,16 @@ func (c *Component) Update(args component.Arguments) error {

c.args = args.(Arguments)

if err := c.startCollectors(); err != nil {
c.healthErr.Store(err.Error())
return err
}

c.healthErr.Store("")
return nil
}

func (c *Component) startCollectors() error {
dbConnection, err := sql.Open("mysql", formatDSN(string(c.args.DataSourceName), "parseTime=true"))
if err != nil {
return err
Expand Down Expand Up @@ -254,6 +270,40 @@ func (c *Component) Handler() http.Handler {
return promhttp.HandlerFor(c.registry, promhttp.HandlerOpts{})
}

func (c *Component) CurrentHealth() component.Health {
if err := c.healthErr.Load(); err != "" {
return component.Health{
Health: component.HealthTypeUnhealthy,
Message: err,
UpdateTime: time.Now(),
}
}

var unhealthyCollectors []string

c.mut.RLock()
for _, collector := range c.collectors {
if collector.Stopped() {
unhealthyCollectors = append(unhealthyCollectors, collector.Name())
}
}
c.mut.RUnlock()

if len(unhealthyCollectors) > 0 {
return component.Health{
Health: component.HealthTypeUnhealthy,
Message: "One or more collectors are unhealthy: [" + strings.Join(unhealthyCollectors, ", ") + "]",
UpdateTime: time.Now(),
}
}

return component.Health{
Health: component.HealthTypeHealthy,
Message: "All collectors are healthy",
UpdateTime: time.Now(),
}
}

// instanceKey returns network(hostname:port)/dbname of the MySQL server.
// This is the same key as used by the mysqld_exporter integration.
func (c *Component) instanceKey() string {
Expand Down
Loading