Skip to content

Commit

Permalink
database_observability: report health of component and collectors
Browse files Browse the repository at this point in the history
Report unhealthy in case of errors when starting up the collectors or
of any collector is stopped during operations.
  • Loading branch information
cristiangreco committed Jan 13, 2025
1 parent 3a4be44 commit 6e00e77
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"regexp"
"strings"
"sync/atomic"

"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -21,6 +22,8 @@ type ConnectionInfo struct {
DSN string
Registry *prometheus.Registry
InfoMetric *prometheus.GaugeVec

running *atomic.Bool
}

func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
Expand All @@ -35,15 +38,22 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
DSN: args.DSN,
Registry: args.Registry,
InfoMetric: infoMetric,
running: &atomic.Bool{},
}, nil
}

func (c *ConnectionInfo) Name() string {
return "ConnectionInfo"
}

func (c *ConnectionInfo) Start(ctx context.Context) error {
cfg, err := mysql.ParseDSN(c.DSN)
if err != nil {
return err
}

c.running.Store(true)

var (
providerName = "unknown"
providerRegion = "unknown"
Expand All @@ -66,6 +76,11 @@ func (c *ConnectionInfo) Start(ctx context.Context) error {
return nil
}

func (c *ConnectionInfo) Stopped() bool {
return !c.running.Load()
}

func (c *ConnectionInfo) Stop() {
c.running.Store(false)
c.Registry.Unregister(c.InfoMetric)
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
dbConnection: args.DB,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: args.Logger,
logger: log.With(args.Logger, "collector", "QuerySample"),
running: &atomic.Bool{},
}, nil
}

func (c *QuerySample) Name() string {
return "QuerySample"
}

func (c *QuerySample) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "QuerySample collector started")

Expand All @@ -81,6 +85,7 @@ func (c *QuerySample) Start(ctx context.Context) error {
for {
if err := c.fetchQuerySamples(c.ctx); err != nil {
level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err)
c.Stop()
break
}

Expand Down Expand Up @@ -127,7 +132,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
}

if strings.HasSuffix(sampleText, "...") {
level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
level.Debug(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"sync/atomic"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -66,10 +67,10 @@ type SchemaTable struct {
// TODO(cristian): allow configuring cache size (currently unlimited).
cache *expirable.LRU[string, tableInfo]

logger log.Logger

ctx context.Context
cancel context.CancelFunc
logger log.Logger
running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

type tableInfo struct {
Expand All @@ -86,18 +87,29 @@ func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL),
logger: args.Logger,
logger: log.With(args.Logger, "collector", "SchemaTable"),
running: &atomic.Bool{},
}, nil
}

func (c *SchemaTable) Name() string {
return "SchemaTable"
}

func (c *SchemaTable) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "SchemaTable collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
c.ctx = ctx
c.cancel = cancel

go func() {
defer func() {
c.Stop()
c.running.Store(false)
}()

ticker := time.NewTicker(c.collectInterval)

for {
Expand All @@ -119,6 +131,10 @@ func (c *SchemaTable) Start(ctx context.Context) error {
return nil
}

func (c *SchemaTable) Stopped() bool {
return !c.running.Load()
}

// Stop should be kept idempotent
func (c *SchemaTable) Stop() {
c.cancel()
Expand Down Expand Up @@ -155,6 +171,11 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
}
}

if len(schemas) == 0 {
level.Info(c.logger).Log("msg", "no schema detected from information_schema.schemata")
return nil
}

tables := []tableInfo{}

for _, schema := range schemas {
Expand Down
52 changes: 50 additions & 2 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -79,12 +80,15 @@ type Exports struct {
}

var (
_ component.Component = (*Component)(nil)
_ http_service.Component = (*Component)(nil)
_ component.Component = (*Component)(nil)
_ http_service.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)

type Collector interface {
Name() string
Start(context.Context) error
Stopped() bool
Stop()
}

Expand All @@ -98,6 +102,7 @@ type Component struct {
baseTarget discovery.Target
collectors []Collector
dbConnection *sql.DB
healthErr *atomic.String
}

func New(opts component.Options, args Arguments) (*Component, error) {
Expand All @@ -107,6 +112,7 @@ func New(opts component.Options, args Arguments) (*Component, error) {
receivers: args.ForwardTo,
handler: loki.NewLogsReceiver(),
registry: prometheus.NewRegistry(),
healthErr: atomic.NewString(""),
}

baseTarget, err := c.getBaseTarget()
Expand Down Expand Up @@ -184,6 +190,16 @@ func (c *Component) Update(args component.Arguments) error {

c.args = args.(Arguments)

if err := c.startCollectors(); err != nil {
c.healthErr.Store(err.Error())
return err
}

c.healthErr.Store("")
return nil
}

func (c *Component) startCollectors() error {
dbConnection, err := sql.Open("mysql", formatDSN(string(c.args.DataSourceName), "parseTime=true"))
if err != nil {
return err
Expand Down Expand Up @@ -254,6 +270,38 @@ func (c *Component) Handler() http.Handler {
return promhttp.HandlerFor(c.registry, promhttp.HandlerOpts{})
}

func (c *Component) CurrentHealth() component.Health {
if err := c.healthErr.Load(); err != "" {
return component.Health{
Health: component.HealthTypeUnhealthy,
Message: err,
UpdateTime: time.Now(),
}
}

var unhealthyCollectors []string

for _, collector := range c.collectors {
if collector.Stopped() {
unhealthyCollectors = append(unhealthyCollectors, collector.Name())
}
}

if len(unhealthyCollectors) > 0 {
return component.Health{
Health: component.HealthTypeUnhealthy,
Message: "One or more collectors are unhealthy: [" + strings.Join(unhealthyCollectors, ", ") + "]",
UpdateTime: time.Now(),
}
}

return component.Health{
Health: component.HealthTypeHealthy,
Message: "All collectors are healthy",
UpdateTime: time.Now(),
}
}

// instanceKey returns network(hostname:port)/dbname of the MySQL server.
// This is the same key as used by the mysqld_exporter integration.
func (c *Component) instanceKey() string {
Expand Down

0 comments on commit 6e00e77

Please sign in to comment.