From 37cc19d9a86d9d3e9502a0429628273dcafefc9c Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Wed, 15 Jan 2025 17:29:24 +0100 Subject: [PATCH] database_observability: log instance key across collectors Add instance label to SchemaTable and QuerySample collectors. --- CHANGELOG.md | 2 + .../mysql/collector/query_sample.go | 11 +++-- .../mysql/collector/query_sample_test.go | 46 ++++++++++--------- .../mysql/collector/schema_table.go | 9 ++-- .../mysql/collector/schema_table_test.go | 7 +-- .../database_observability/mysql/component.go | 20 ++++++-- 6 files changed, 59 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1701360a5..5e3bf1c4ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ Main (unreleased) - Improved performance by reducing allocation in Prometheus write pipelines by ~30% (@thampiotr) +- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco) + v1.6.0-rc.1 ----------------- diff --git a/internal/component/database_observability/mysql/collector/query_sample.go b/internal/component/database_observability/mysql/collector/query_sample.go index d00f08f0e6..28422196c2 100644 --- a/internal/component/database_observability/mysql/collector/query_sample.go +++ b/internal/component/database_observability/mysql/collector/query_sample.go @@ -35,6 +35,7 @@ const selectQuerySamples = ` type QuerySampleArguments struct { DB *sql.DB + InstanceKey string CollectInterval time.Duration EntryHandler loki.EntryHandler @@ -43,6 +44,7 @@ type QuerySampleArguments struct { type QuerySample struct { dbConnection *sql.DB + instanceKey string collectInterval time.Duration entryHandler loki.EntryHandler @@ -55,6 +57,7 @@ type QuerySample struct { func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) { return &QuerySample{ dbConnection: args.DB, + instanceKey: args.InstanceKey, collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, logger: log.With(args.Logger, "collector", "QuerySample"), @@ -153,8 +156,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf( - `level=info msg="query samples fetched" op="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`, - OP_QUERY_SAMPLE, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText, + `level=info msg="query samples fetched" op="%s" instance="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`, + OP_QUERY_SAMPLE, c.instanceKey, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText, ), }, } @@ -166,8 +169,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf( - `level=info msg="table name parsed" op="%s" digest="%s" table="%s"`, - OP_QUERY_PARSED_TABLE_NAME, digest, table, + `level=info msg="table name parsed" op="%s" instance="%s" digest="%s" table="%s"`, + OP_QUERY_PARSED_TABLE_NAME, c.instanceKey, digest, table, ), }, } diff --git a/internal/component/database_observability/mysql/collector/query_sample_test.go b/internal/component/database_observability/mysql/collector/query_sample_test.go index 76e323a630..3bd50675d0 100644 --- a/internal/component/database_observability/mysql/collector/query_sample_test.go +++ b/internal/component/database_observability/mysql/collector/query_sample_test.go @@ -34,8 +34,8 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, }, }, { @@ -47,8 +47,8 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, }, }, { @@ -60,8 +60,8 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, }, }, { @@ -73,8 +73,8 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, }, }, { @@ -86,9 +86,9 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="other_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="other_table"`, }, }, { @@ -103,12 +103,12 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` + + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` + `query_sample_redacted="select ifnull(schema_name, :redacted1) as schema_name, digest, count_star from (select * from ` + `performance_schema.events_statements_summary_by_digest where schema_name not in ::redacted2 ` + `and last_seen > date_sub(now(), interval :redacted3 second) order by last_seen desc) as q ` + `group by q.schema_name, q.digest, q.count_star limit :redacted4"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`, }, }, { @@ -125,8 +125,8 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, }, }, { @@ -138,7 +138,7 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`, }, }, { @@ -150,7 +150,7 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`, }, }, { @@ -162,7 +162,7 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`, }, }, { @@ -179,8 +179,8 @@ func TestQuerySample(t *testing.T) { "1000", }}, logs: []string{ - `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, + `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, }, }, } @@ -195,6 +195,7 @@ func TestQuerySample(t *testing.T) { collector, err := NewQuerySample(QuerySampleArguments{ DB: db, + InstanceKey: "mysql-db", CollectInterval: time.Minute, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), @@ -282,6 +283,7 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) { collector, err := NewQuerySample(QuerySampleArguments{ DB: db, + InstanceKey: "mysql-db", CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), @@ -328,8 +330,8 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) { require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) } - require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) + require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" digest="abc123" table="some_table"`, lokiEntries[1].Line) err = mock.ExpectationsWereMet() require.NoError(t, err) diff --git a/internal/component/database_observability/mysql/collector/schema_table.go b/internal/component/database_observability/mysql/collector/schema_table.go index 67dbd07f0f..5ff54a126d 100644 --- a/internal/component/database_observability/mysql/collector/schema_table.go +++ b/internal/component/database_observability/mysql/collector/schema_table.go @@ -49,6 +49,7 @@ const ( type SchemaTableArguments struct { DB *sql.DB + InstanceKey string CollectInterval time.Duration EntryHandler loki.EntryHandler CacheTTL time.Duration @@ -58,6 +59,7 @@ type SchemaTableArguments struct { type SchemaTable struct { dbConnection *sql.DB + instanceKey string collectInterval time.Duration entryHandler loki.EntryHandler // Cache of table definitions. Entries are removed after a configurable TTL. @@ -84,6 +86,7 @@ type tableInfo struct { func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) { return &SchemaTable{ dbConnection: args.DB, + instanceKey: args.InstanceKey, collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL), @@ -166,7 +169,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), - Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" schema="%s"`, OP_SCHEMA_DETECTION, schema), + Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" instance="%s" schema="%s"`, OP_SCHEMA_DETECTION, c.instanceKey, schema), }, } } @@ -204,7 +207,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), - Line: fmt.Sprintf(`level=info msg="table detected" op="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, schema, table), + Line: fmt.Sprintf(`level=info msg="table detected" op="%s" instance="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, c.instanceKey, schema, table), }, } } @@ -240,7 +243,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), - Line: fmt.Sprintf(`level=info msg="create table" op="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, table.schema, table.tableName, createStmt), + Line: fmt.Sprintf(`level=info msg="create table" op="%s" instance="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, c.instanceKey, table.schema, table.tableName, createStmt), }, } } diff --git a/internal/component/database_observability/mysql/collector/schema_table_test.go b/internal/component/database_observability/mysql/collector/schema_table_test.go index 7bf29a2977..36295e7aa2 100644 --- a/internal/component/database_observability/mysql/collector/schema_table_test.go +++ b/internal/component/database_observability/mysql/collector/schema_table_test.go @@ -28,6 +28,7 @@ func TestSchemaTable(t *testing.T) { collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, + InstanceKey: "mysql-db", CollectInterval: time.Second, EntryHandler: lokiClient, CacheTTL: time.Minute, @@ -79,9 +80,9 @@ func TestSchemaTable(t *testing.T) { for _, entry := range lokiEntries { require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" schema="some_schema"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" schema="some_schema" table="some_table"`, lokiEntries[1].Line) - require.Equal(t, `level=info msg="create table" op="create_statement" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line) + require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, `level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line) err = mock.ExpectationsWereMet() require.NoError(t, err) diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index e57d890f23..72fcd47560 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -101,6 +101,7 @@ type Component struct { registry *prometheus.Registry baseTarget discovery.Target collectors []Collector + instanceKey string dbConnection *sql.DB healthErr *atomic.String } @@ -115,6 +116,12 @@ func New(opts component.Options, args Arguments) (*Component, error) { healthErr: atomic.NewString(""), } + instance, err := instanceKey(string(args.DataSourceName)) + if err != nil { + return nil, err + } + c.instanceKey = instance + baseTarget, err := c.getBaseTarget() if err != nil { return nil, err @@ -166,7 +173,7 @@ func (c *Component) getBaseTarget() (discovery.Target, error) { model.AddressLabel: httpData.MemoryListenAddr, model.SchemeLabel: "http", model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"), - "instance": c.instanceKey(), + "instance": c.instanceKey, "job": database_observability.JobName, }, nil } @@ -218,6 +225,7 @@ func (c *Component) startCollectors() error { if c.args.QuerySamplesEnabled { qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{ DB: dbConnection, + InstanceKey: c.instanceKey, CollectInterval: c.args.CollectInterval, EntryHandler: entryHandler, Logger: c.opts.Logger, @@ -235,6 +243,7 @@ func (c *Component) startCollectors() error { stCollector, err := collector.NewSchemaTable(collector.SchemaTableArguments{ DB: dbConnection, + InstanceKey: c.instanceKey, CollectInterval: c.args.CollectInterval, EntryHandler: entryHandler, Logger: c.opts.Logger, @@ -306,8 +315,11 @@ func (c *Component) CurrentHealth() component.Health { // instanceKey returns network(hostname:port)/dbname of the MySQL server. // This is the same key as used by the mysqld_exporter integration. -func (c *Component) instanceKey() string { - m, _ := mysql.ParseDSN(string(c.args.DataSourceName)) +func instanceKey(dsn string) (string, error) { + m, err := mysql.ParseDSN(dsn) + if err != nil { + return "", err + } if m.Addr == "" { m.Addr = "localhost:3306" @@ -316,7 +328,7 @@ func (c *Component) instanceKey() string { m.Net = "tcp" } - return fmt.Sprintf("%s(%s)/%s", m.Net, m.Addr, m.DBName) + return fmt.Sprintf("%s(%s)/%s", m.Net, m.Addr, m.DBName), nil } // formatDSN appends the given parameters to the DSN.