diff --git a/.github/workflows/deploy-pr-preview.yml b/.github/workflows/deploy-pr-preview.yml new file mode 100644 index 0000000000..983b5d57e7 --- /dev/null +++ b/.github/workflows/deploy-pr-preview.yml @@ -0,0 +1,24 @@ +name: Deploy pr preview + +on: + pull_request: + types: + - opened + - synchronize + - closed + paths: + - "docs/sources/**" + +jobs: + deploy-pr-preview: + if: github.repository == 'grafana/alloy' + uses: grafana/writers-toolkit/.github/workflows/deploy-preview.yml@main + with: + sha: ${{ github.event.pull_request.head.sha }} + branch: ${{ github.head_ref }} + event_number: ${{ github.event.number }} + title: ${{ github.event.pull_request.title }} + repo: alloy + website_directory: content/docs/alloy/latest + relative_prefix: /docs/alloy/latest/ + index_file: true diff --git a/.github/workflows/helm-test.yml b/.github/workflows/helm-test.yml index 43fc01757e..aec51de3f0 100644 --- a/.github/workflows/helm-test.yml +++ b/.github/workflows/helm-test.yml @@ -71,7 +71,7 @@ jobs: run: ct lint --config ./operations/helm/ct.yaml - name: Create kind cluster - uses: helm/kind-action@v1.10.0 + uses: helm/kind-action@v1.11.0 if: steps.list-changed.outputs.changed == 'true' - name: Add dependency chart repos diff --git a/CHANGELOG.md b/CHANGELOG.md index b8d10ea49b..a54941c628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,11 +50,15 @@ Main (unreleased) - Change processlist query to support ONLY_FULL_GROUP_BY sql_mode - Add perf_schema quantile columns to collector +- Live Debugging button should appear in UI only for supported components (@ravishankar15) - Add three new stdlib functions to_base64, from_URLbase64 and to_URLbase64 (@ravishankar15) +- Add `ignore_older_than` option for local.file_match (@ravishankar15) +- Add livedebugging support for `discover.relabel` (@ravishankar15) +- Performance optimization for live debugging feature (@ravishankar15) -- Use a forked `github.com/goccy/go-json` module which reduces the memory consumption of an Alloy instance by 20MB. +- Upgrade `github.com/goccy/go-json` to v0.10.4, which reduces the memory consumption of an Alloy instance by 20MB. If Alloy is running certain otelcol components, this reduction will not apply. (@ptodev) - + - Update `prometheus.write.queue` library for performance increases in cpu. (@mattdurham) ### Bugfixes @@ -105,7 +109,7 @@ v1.5.1 - Fixed a crash when updating the configuration of `remote.http`. (@kinolaev) -- Fixed an issue in the `otelcol.processor.attribute` component where the actions `delete` and `hash` could not be used with the `pattern` argument. (@wildum) +- Fixed an issue in the `otelcol.processor.attribute` component where the actions `delete` and `hash` could not be used with the `pattern` argument. (@wildum) - Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa) @@ -305,7 +309,7 @@ v1.4.0 - Add the label `alloy_cluster` in the metric `alloy_config_hash` when the flag `cluster.name` is set to help differentiate between configs from the same alloy cluster or different alloy clusters. (@wildum) - + - Add support for discovering the cgroup path(s) of a process in `process.discovery`. (@mahendrapaipuri) ### Bugfixes diff --git a/CODEOWNERS b/CODEOWNERS index 750a939cd5..3d3b770f0c 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -9,6 +9,7 @@ * @grafana/grafana-alloy-maintainers #`make docs` procedure and related workflows are owned by @jdbaldry. +/.github/workflows/deploy-pr-preview.yml @jdbaldry /.github/workflows/publish-technical-documentation-next.yml @jdbaldry /.github/workflows/publish-technical-documentation-release.yml @jdbaldry /.github/workflows/update-make-docs.yml @jdbaldry diff --git a/docs/sources/reference/components/local/local.file_match.md b/docs/sources/reference/components/local/local.file_match.md index 70f036f1ba..fd7d4048b9 100644 --- a/docs/sources/reference/components/local/local.file_match.md +++ b/docs/sources/reference/components/local/local.file_match.md @@ -24,16 +24,19 @@ local.file_match "LABEL" { The following arguments are supported: -Name | Type | Description | Default | Required ---------------- | ------------------- | ------------------------------------------------------------------------------------------ |---------| -------- -`path_targets` | `list(map(string))` | Targets to expand; looks for glob patterns on the `__path__` and `__path_exclude__` keys. | | yes -`sync_period` | `duration` | How often to sync filesystem and targets. | `"10s"` | no +Name | Type | Description | Default | Required +--------------- | ------------------- | ------------------------------------------------------------------------------------------ |---------| -------- +`path_targets` | `list(map(string))` | Targets to expand; looks for glob patterns on the `__path__` and `__path_exclude__` keys. | | yes +`sync_period` | `duration` | How often to sync filesystem and targets. | `"10s"` | no +`ignore_older_than` | `duration` | Ignores files which are modified before this duration. | `"0s"` | no `path_targets` uses [doublestar][] style paths. * `/tmp/**/*.log` will match all subfolders of `tmp` and include any files that end in `*.log`. * `/tmp/apache/*.log` will match only files in `/tmp/apache/` that end in `*.log`. * `/tmp/**` will match all subfolders of `tmp`, `tmp` itself, and all files. +`local.file_match` doesn't ignore files when `ignore_older_than` is set to the default, `0s`. + ## Exported fields diff --git a/docs/sources/reference/components/prometheus/prometheus.remote_write.md b/docs/sources/reference/components/prometheus/prometheus.remote_write.md index d2614a2e52..a919d750f0 100644 --- a/docs/sources/reference/components/prometheus/prometheus.remote_write.md +++ b/docs/sources/reference/components/prometheus/prometheus.remote_write.md @@ -427,7 +427,7 @@ To troubleshoot, take the following steps in order: You can use [Promtool][promtool] to inspect it and find out which metric series were sent by this {{< param "PRODUCT_NAME" >}} instance since the last WAL truncation event. For example: ``` - ./promtool tsdb dump --match='{__name__=\"otelcol_connector_spanmetrics_duration_seconds_bucket\", http_method=\"GET\", job=\"ExampleJobName\"' /path/to/wal/ + ./promtool tsdb dump --match='{__name__="otelcol_connector_spanmetrics_duration_seconds_bucket", http_method="GET", job="ExampleJobName"}' /path/to/wal/ ``` [clustering]: ../../configure/clustering diff --git a/docs/sources/reference/config-blocks/livedebugging.md b/docs/sources/reference/config-blocks/livedebugging.md index 4587d95117..af30cb4f2e 100644 --- a/docs/sources/reference/config-blocks/livedebugging.md +++ b/docs/sources/reference/config-blocks/livedebugging.md @@ -30,8 +30,8 @@ livedebugging { The following arguments are supported: -| Name | Type | Description | Default | Required | -| --------- | ------ | ----------------------------------- | ------- | -------- | -| `enabled` | `bool` | Enables the live debugging feature. | `false` | no | +| Name | Type | Description | Default | Required | +| -------------------- | ----- | --------------------------------------------------------------- | ------- | -------- | +| `enabled` | `bool`| Enables the live debugging feature. | `false` | no | [debug]: ../../../troubleshoot/debug/ diff --git a/docs/sources/troubleshoot/debug.md b/docs/sources/troubleshoot/debug.md index 0e674aa1a1..5b8518515d 100644 --- a/docs/sources/troubleshoot/debug.md +++ b/docs/sources/troubleshoot/debug.md @@ -111,6 +111,7 @@ Supported components: * `otelcol.receiver.*` * `prometheus.relabel` {{< /admonition >}} +* `discovery.relabel` ## Debug using the UI diff --git a/go.mod b/go.mod index 0c662da968..4dbf6351b1 100644 --- a/go.mod +++ b/go.mod @@ -518,7 +518,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/go-zookeeper/zk v1.0.3 // indirect github.com/gobwas/glob v0.2.3 // indirect - github.com/goccy/go-json v0.10.3 // indirect + github.com/goccy/go-json v0.10.4 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/googleapis v1.4.1 // indirect @@ -945,6 +945,3 @@ exclude ( ) replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.12.0 - -// TODO(ptodev): Remove when this PR has been merged: https://github.com/goccy/go-json/pull/490 -replace github.com/goccy/go-json => github.com/grafana/go-json v0.0.0-20241106155216-71a03f133f5c diff --git a/go.sum b/go.sum index 77ad0b47c0..6076b1a759 100644 --- a/go.sum +++ b/go.sum @@ -1619,6 +1619,9 @@ github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBT github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM= +github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -1853,8 +1856,6 @@ github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb h1:AWE6+kvtE18HP+lRW github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb/go.mod h1:kkWM4WUV230bNG3urVRWPBnSJHs64y/0RmWjftnnn0c= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= -github.com/grafana/go-json v0.0.0-20241106155216-71a03f133f5c h1:yKBKEC347YZpgii1KazRCfxHsTaxMqWZzoivM1OTT50= -github.com/grafana/go-json v0.0.0-20241106155216-71a03f133f5c/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/grafana/go-offsets-tracker v0.1.7 h1:2zBQ7iiGzvyXY7LA8kaaSiEqH/Yx82UcfRabbY5aOG4= github.com/grafana/go-offsets-tracker v0.1.7/go.mod h1:qcQdu7zlUKIFNUdBJlLyNHuJGW0SKWKjkrN6jtt+jds= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= diff --git a/internal/cmd/integration-tests/docker-compose.yaml b/internal/cmd/integration-tests/docker-compose.yaml index 1632286bab..451fd1b779 100644 --- a/internal/cmd/integration-tests/docker-compose.yaml +++ b/internal/cmd/integration-tests/docker-compose.yaml @@ -12,7 +12,7 @@ services: - "9009:9009" tempo: - image: grafana/tempo:latest + image: grafana/tempo:2.6.1 command: [ "-config.file=/etc/tempo.yaml" ] volumes: - ./configs/tempo/tempo.yaml:/etc/tempo.yaml diff --git a/internal/component/component_provider.go b/internal/component/component_provider.go index 1f2c981447..bc685ae597 100644 --- a/internal/component/component_provider.go +++ b/internal/component/component_provider.go @@ -123,9 +123,10 @@ type Info struct { ComponentName string // Name of the component. Health Health // Current component health. - Arguments Arguments // Current arguments value of the component. - Exports Exports // Current exports value of the component. - DebugInfo interface{} // Current debug info of the component. + Arguments Arguments // Current arguments value of the component. + Exports Exports // Current exports value of the component. + DebugInfo interface{} // Current debug info of the component. + LiveDebuggingEnabled bool } // MarshalJSON returns a JSON representation of cd. The format of the @@ -139,19 +140,20 @@ func (info *Info) MarshalJSON() ([]byte, error) { } componentDetailJSON struct { - Name string `json:"name"` - Type string `json:"type,omitempty"` - LocalID string `json:"localID"` - ModuleID string `json:"moduleID"` - Label string `json:"label,omitempty"` - References []string `json:"referencesTo"` - ReferencedBy []string `json:"referencedBy"` - Health *componentHealthJSON `json:"health"` - Original string `json:"original"` - Arguments json.RawMessage `json:"arguments,omitempty"` - Exports json.RawMessage `json:"exports,omitempty"` - DebugInfo json.RawMessage `json:"debugInfo,omitempty"` - CreatedModuleIDs []string `json:"createdModuleIDs,omitempty"` + Name string `json:"name"` + Type string `json:"type,omitempty"` + LocalID string `json:"localID"` + ModuleID string `json:"moduleID"` + Label string `json:"label,omitempty"` + References []string `json:"referencesTo"` + ReferencedBy []string `json:"referencedBy"` + Health *componentHealthJSON `json:"health"` + Original string `json:"original"` + Arguments json.RawMessage `json:"arguments,omitempty"` + Exports json.RawMessage `json:"exports,omitempty"` + DebugInfo json.RawMessage `json:"debugInfo,omitempty"` + CreatedModuleIDs []string `json:"createdModuleIDs,omitempty"` + LiveDebuggingEnabled bool `json:"liveDebuggingEnabled"` } ) @@ -196,10 +198,11 @@ func (info *Info) MarshalJSON() ([]byte, error) { Message: info.Health.Message, UpdatedTime: info.Health.UpdateTime, }, - Arguments: arguments, - Exports: exports, - DebugInfo: debugInfo, - CreatedModuleIDs: info.ModuleIDs, + Arguments: arguments, + Exports: exports, + DebugInfo: debugInfo, + CreatedModuleIDs: info.ModuleIDs, + LiveDebuggingEnabled: info.LiveDebuggingEnabled, }) } diff --git a/internal/component/discovery/relabel/relabel.go b/internal/component/discovery/relabel/relabel.go index 551f361d76..6613c6c801 100644 --- a/internal/component/discovery/relabel/relabel.go +++ b/internal/component/discovery/relabel/relabel.go @@ -2,12 +2,14 @@ package relabel import ( "context" + "fmt" "sync" "github.com/grafana/alloy/internal/component" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/service/livedebugging" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" ) @@ -46,13 +48,23 @@ type Component struct { mut sync.RWMutex rcs []*relabel.Config + + debugDataPublisher livedebugging.DebugDataPublisher } var _ component.Component = (*Component)(nil) +var _ component.LiveDebugging = (*Component)(nil) // New creates a new discovery.relabel component. func New(o component.Options, args Arguments) (*Component, error) { - c := &Component{opts: o} + debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + c := &Component{ + opts: o, + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), + } // Call to Update() to set the output once at the start if err := c.Update(args); err != nil { @@ -81,9 +93,13 @@ func (c *Component) Update(args component.Arguments) error { for _, t := range newArgs.Targets { lset := componentMapToPromLabels(t) - lset, keep := relabel.Process(lset, relabelConfigs...) + relabelled, keep := relabel.Process(lset, relabelConfigs...) if keep { - targets = append(targets, promLabelsToComponent(lset)) + targets = append(targets, promLabelsToComponent(relabelled)) + } + componentID := livedebugging.ComponentID(c.opts.ID) + if c.debugDataPublisher.IsActive(componentID) { + c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lset.String(), relabelled.String())) } } @@ -95,6 +111,8 @@ func (c *Component) Update(args component.Arguments) error { return nil } +func (c *Component) LiveDebugging(_ int) {} + func componentMapToPromLabels(ls discovery.Target) labels.Labels { res := make([]labels.Label, 0, len(ls)) for k, v := range ls { diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index e5b9766e25..102f81beee 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -26,8 +26,9 @@ func init() { // Arguments holds values which are used to configure the local.file_match // component. type Arguments struct { - PathTargets []discovery.Target `alloy:"path_targets,attr"` - SyncPeriod time.Duration `alloy:"sync_period,attr,optional"` + PathTargets []discovery.Target `alloy:"path_targets,attr"` + SyncPeriod time.Duration `alloy:"sync_period,attr,optional"` + IgnoreOlderThan time.Duration `alloy:"ignore_older_than,attr,optional"` } var _ component.Component = (*Component)(nil) @@ -80,8 +81,9 @@ func (c *Component) Update(args component.Arguments) error { c.watches = c.watches[:0] for _, v := range c.args.PathTargets { c.watches = append(c.watches, watch{ - target: v, - log: c.opts.Logger, + target: v, + log: c.opts.Logger, + ignoreOlderThan: c.args.IgnoreOlderThan, }) } diff --git a/internal/component/local/file_match/file_test.go b/internal/component/local/file_match/file_test.go index bec538ce2f..63645315bd 100644 --- a/internal/component/local/file_match/file_test.go +++ b/internal/component/local/file_match/file_test.go @@ -63,6 +63,35 @@ func TestDirectoryFile(t *testing.T) { require.True(t, contains(foundFiles, "t1.txt")) } +func TestFileIgnoreOlder(t *testing.T) { + dir := path.Join(os.TempDir(), "alloy_testing", "t1") + err := os.MkdirAll(dir, 0755) + require.NoError(t, err) + writeFile(t, dir, "t1.txt") + t.Cleanup(func() { + os.RemoveAll(dir) + }) + c := createComponent(t, dir, []string{path.Join(dir, "*.txt")}, nil) + ct := context.Background() + ct, ccl := context.WithTimeout(ct, 5*time.Second) + defer ccl() + c.args.SyncPeriod = 10 * time.Millisecond + c.args.IgnoreOlderThan = 100 * time.Millisecond + c.Update(c.args) + go c.Run(ct) + + foundFiles := c.getWatchedFiles() + require.Len(t, foundFiles, 1) + require.True(t, contains(foundFiles, "t1.txt")) + time.Sleep(150 * time.Millisecond) + + writeFile(t, dir, "t2.txt") + ct.Done() + foundFiles = c.getWatchedFiles() + require.Len(t, foundFiles, 1) + require.True(t, contains(foundFiles, "t2.txt")) +} + func TestAddingFile(t *testing.T) { dir := path.Join(os.TempDir(), "alloy_testing", "t2") err := os.MkdirAll(dir, 0755) diff --git a/internal/component/local/file_match/watch.go b/internal/component/local/file_match/watch.go index 709d821151..04d8e456c3 100644 --- a/internal/component/local/file_match/watch.go +++ b/internal/component/local/file_match/watch.go @@ -3,6 +3,7 @@ package file_match import ( "os" "path/filepath" + "time" "github.com/bmatcuk/doublestar" "github.com/go-kit/log" @@ -14,8 +15,9 @@ import ( // watch handles a single discovery.target for file watching. type watch struct { - target discovery.Target - log log.Logger + target discovery.Target + log log.Logger + ignoreOlderThan time.Duration } func (w *watch) getPaths() ([]discovery.Target, error) { @@ -48,9 +50,15 @@ func (w *watch) getPaths() ([]discovery.Target, error) { } continue } + if fi.IsDir() { continue } + + if w.ignoreOlderThan != 0 && fi.ModTime().Before(time.Now().Add(-w.ignoreOlderThan)) { + continue + } + dt := discovery.Target{} for dk, v := range w.target { dt[dk] = v diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 30a035124d..7558b0fc6c 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -53,7 +53,7 @@ func TestJSONLabelsStage(t *testing.T) { // The third stage will set some labels from the extracted values above. // Again, if the value is empty, it is inferred that we want to use the // populate the label with extracted value of the same name. - stg := `stage.json { + stg := `stage.json { expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } drop_malformed = true } @@ -62,7 +62,7 @@ func TestJSONLabelsStage(t *testing.T) { source = "extra" } stage.labels { - values = { + values = { stream = "", user = "", ts = "timestamp", @@ -679,7 +679,7 @@ func TestLeakyUpdate(t *testing.T) { numLogsToSend := 1 cfg1 := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test1" action = "inc" @@ -688,7 +688,7 @@ func TestLeakyUpdate(t *testing.T) { }` + forwardArgs cfg2 := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test2" action = "inc" @@ -731,7 +731,7 @@ func TestMetricsStageRefresh(t *testing.T) { numLogsToSend := 3 cfgWithMetric := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test" action = "inc" @@ -776,7 +776,7 @@ func TestMetricsStageRefresh(t *testing.T) { // We try having a metric with the same name as before so that we can see if there // is some sort of double registration error for that metric. cfgWithTwoMetrics := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test_3" action = "inc" diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index eb20434dec..9e9cc5fdf2 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(host, components...) + a.sched.Schedule(a.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index bb9b3a151e..643730000f 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -214,9 +214,12 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 63e0f1cdd0..4f19b5a625 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -253,9 +253,12 @@ func (e *Exporter) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + } + // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) - e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 1e494e71c1..7eca6e7349 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.sched.Schedule(e.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 2c731616a5..fbf70e22c0 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,9 +37,7 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host - - // newComponentsCh is written to when schedComponents gets updated. - newComponentsCh chan struct{} + running bool // onPause is called when scheduler is making changes to running components. onPause func() @@ -51,89 +49,102 @@ type Scheduler struct { // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: func() {}, - onResume: func() {}, + log: l, + onPause: func() {}, + onResume: func() {}, } } -// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to -// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running -// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it -// will call onResume as a last step. +// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks. +// The callbacks are a useful way of pausing and resuming the ingestion of data by the components: +// * onPause() is called before the scheduler stops the components. +// * onResume() is called after the scheduler starts the components. +// The callbacks are used by the Schedule() and Run() functions. +// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran. func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: onPause, - onResume: onResume, + log: l, + onPause: onPause, + onResume: onResume, } } -// Schedule schedules a new set of OpenTelemetry Components to run. Components -// will only be scheduled when the Scheduler is running. +// Schedule a new set of OpenTelemetry Components to run. +// Components will only be started when the Scheduler's Run() function has been called. +// +// Schedule() completely overrides the set of previously running components. +// Components which have been removed since the last call to Schedule will be stopped. // -// Schedule completely overrides the set of previously running components; -// components which have been removed since the last call to Schedule will be -// stopped. -func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) { +// updateConsumers is called after the components are paused and before starting the new components. +// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component. +func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - cs.schedComponents = cc + // If the scheduler isn't running yet, just update the state. + // That way the Run function is ready to go. + if !cs.running { + cs.schedComponents = cc + cs.host = h + updateConsumers() + return + } + + // The new components must be setup in this order: + // + // 1. Pause consumers + // 2. Stop the old components + // 3. Change the consumers + // 4. Start the new components + // 5. Start the consumer + // + // There could be race conditions if the order above is not followed. + + // 1. Pause consumers + // This prevents them from accepting new data while we're shutting them down. + cs.onPause() + + // 2. Stop the old components + cs.stopComponents(ctx, cs.schedComponents...) + + // 3. Change the consumers + // This can only be done after stopping the pervious components and before starting the new ones. + updateConsumers() + + // 4. Start the new components + level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents)) + cs.schedComponents = cs.startComponents(ctx, h, cc...) cs.host = h + //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? - select { - case cs.newComponentsCh <- struct{}{}: - // Queued new message. - default: - // A message is already queued for refreshing running components so we - // don't have to do anything here. - } + // 5. Start the consumer + // The new components will now start accepting telemetry data. + cs.onResume() } -// Run starts the Scheduler. Run will watch for schedule components to appear -// and run them, terminating previously running components if they exist. +// Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { - firstRun := true - var components []otelcomponent.Component + cs.schedMut.Lock() + cs.running = true + + cs.onPause() + cs.startComponents(ctx, cs.host, cs.schedComponents...) + cs.onResume() + + cs.schedMut.Unlock() // Make sure we terminate all of our running components on shutdown. defer func() { - if !firstRun { // always handle the callbacks correctly - cs.onPause() - } - cs.stopComponents(context.Background(), components...) + cs.schedMut.Lock() + defer cs.schedMut.Unlock() + cs.stopComponents(context.Background(), cs.schedComponents...) + // this Resume call should not be needed but is added for robustness to ensure that + // it does not ever exit in "paused" state. cs.onResume() }() - // Wait for a write to cs.newComponentsCh. The initial list of components is - // always empty so there's nothing to do until cs.newComponentsCh is written - // to. - for { - select { - case <-ctx.Done(): - return nil - case <-cs.newComponentsCh: - if !firstRun { - cs.onPause() // do not pause on first run - } else { - firstRun = false - } - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, components...) - - cs.schedMut.Lock() - components = cs.schedComponents - host := cs.host - cs.schedMut.Unlock() - - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) - components = cs.startComponents(ctx, host, components...) - cs.onResume() - } - } + <-ctx.Done() + return nil } func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 469d679b7f..c034e262f8 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,10 +5,11 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/runtime/componenttest" @@ -32,7 +33,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -52,12 +53,12 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(context.Background(), func() {}, h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) @@ -81,26 +82,32 @@ func TestScheduler(t *testing.T) { require.NoError(t, err) }() + toInt := func(a *atomic.Int32) int { return int(a.Load()) } + + // The Run function starts the components. They should be paused and then resumed. + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run") + assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) - - toInt := func(a *atomic.Int32) int { return int(a.Load()) } + cs.Schedule(ctx, func() {}, h, component) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run") - assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run") + assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule") + assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(ctx, func() {}, h) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run") - assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") @@ -109,8 +116,8 @@ func TestScheduler(t *testing.T) { cancel() require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") - assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown") + assert.Equal(t, 4, toInt(resumeCalls), "resume callback should be called on shutdown") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") }) @@ -133,7 +140,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(ctx, func() {}, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5072d65233..08a7a6181c 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -237,9 +237,13 @@ func (p *Processor) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) + return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 80b82efb06..ff4c3c21d1 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(host, components...) + r.sched.Schedule(r.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 35a1240273..c17868b427 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" "github.com/grafana/alloy/internal/component" @@ -18,6 +19,7 @@ import ( "github.com/grafana/alloy/internal/component/pyroscope/write" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" ) const ( @@ -53,16 +55,21 @@ func (a *Arguments) SetToDefault() { } type Component struct { - opts component.Options - server *fnet.TargetServer - appendables []pyroscope.Appendable - mut sync.Mutex + opts component.Options + server *fnet.TargetServer + uncheckedCollector *util.UncheckedCollector + appendables []pyroscope.Appendable + mut sync.Mutex } func New(opts component.Options, args Arguments) (*Component, error) { + uncheckedCollector := util.NewUncheckedCollector(nil) + opts.Registerer.MustRegister(uncheckedCollector) + c := &Component{ - opts: opts, - appendables: args.ForwardTo, + opts: opts, + uncheckedCollector: uncheckedCollector, + appendables: args.ForwardTo, } if err := c.Update(args); err != nil { @@ -116,7 +123,14 @@ func (c *Component) Update(args component.Arguments) error { c.shutdownServer() - srv, err := fnet.NewTargetServer(c.opts.Logger, "pyroscope_receive_http", c.opts.Registerer, newArgs.Server) + // [server.Server] registers new metrics every time it is created. To + // avoid issues with re-registering metrics with the same name, we create a + // new registry for the server every time we create one, and pass it to an + // unchecked collector to bypass uniqueness checking. + serverRegistry := prometheus.NewRegistry() + c.uncheckedCollector.SetCollector(serverRegistry) + + srv, err := fnet.NewTargetServer(c.opts.Logger, "pyroscope_receive_http", serverRegistry, newArgs.Server) if err != nil { return fmt.Errorf("failed to create server: %w", err) } diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 71929abce6..5c507016cb 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -287,3 +287,45 @@ func testOptions(t *testing.T) component.Options { Registerer: prometheus.NewRegistry(), } } + +// TestUpdateArgs verifies that the component can be updated with new arguments. This explictly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. +func TestUpdateArgs(t *testing.T) { + ports, err := freeport.GetFreePorts(2) + require.NoError(t, err) + + forwardTo := []pyroscope.Appendable{testAppendable(nil)} + + args := Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: ports[0], + }, + }, + ForwardTo: forwardTo, + } + + comp, err := New(testOptions(t), args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, comp.Run(ctx)) + }() + + waitForServerReady(t, ports[0]) + + comp.Update(Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: ports[1], + }, + }, + ForwardTo: forwardTo, + }) + + waitForServerReady(t, ports[1]) +} diff --git a/internal/runtime/alloy_components.go b/internal/runtime/alloy_components.go index fef0061a83..439d96949d 100644 --- a/internal/runtime/alloy_components.go +++ b/internal/runtime/alloy_components.go @@ -128,6 +128,10 @@ func (f *Runtime) getComponentDetail(cn controller.ComponentNode, graph *dag.Gra componentInfo.DebugInfo = builtinComponent.DebugInfo() } } + + _, liveDebuggingEnabled := componentInfo.Component.(component.LiveDebugging) + componentInfo.LiveDebuggingEnabled = liveDebuggingEnabled + return componentInfo } diff --git a/internal/web/api/api.go b/internal/web/api/api.go index 53592b744a..5164f91854 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -11,6 +11,7 @@ import ( "path" "strconv" "strings" + "time" "github.com/google/uuid" "github.com/gorilla/mux" @@ -165,13 +166,11 @@ func getClusteringPeersHandler(host service.Host) http.HandlerFunc { } } -func liveDebugging(host service.Host, callbackManager livedebugging.CallbackManager) http.HandlerFunc { +func liveDebugging(_ service.Host, callbackManager livedebugging.CallbackManager) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) componentID := livedebugging.ComponentID(vars["id"]) - // Buffer of 1000 entries to handle load spikes and prevent this functionality from eating up too much memory. - // TODO: in the future we may want to make this value configurable to handle heavy load dataCh := make(chan string, 1000) ctx := r.Context() @@ -200,9 +199,12 @@ func liveDebugging(host service.Host, callbackManager livedebugging.CallbackMana return } + flushTicker := time.NewTicker(time.Second) + defer func() { close(dataCh) callbackManager.DeleteCallback(id, componentID) + flushTicker.Stop() }() for { @@ -216,7 +218,7 @@ func liveDebugging(host service.Host, callbackManager livedebugging.CallbackMana if writeErr != nil { return } - // TODO: flushing at a regular interval might be better performance wise + case <-flushTicker.C: w.(http.Flusher).Flush() case <-ctx.Done(): return diff --git a/internal/web/ui/src/features/component/ComponentView.tsx b/internal/web/ui/src/features/component/ComponentView.tsx index 81b81c1d6f..5aa29c1375 100644 --- a/internal/web/ui/src/features/component/ComponentView.tsx +++ b/internal/web/ui/src/features/component/ComponentView.tsx @@ -23,6 +23,7 @@ export const ComponentView: FC = (props) => { const referencedBy = props.component.referencedBy.filter((id) => props.info[id] !== undefined).map((id) => props.info[id]); const referencesTo = props.component.referencesTo.filter((id) => props.info[id] !== undefined).map((id) => props.info[id]); + const liveDebuggingEnabled = props.component.liveDebuggingEnabled; const argsPartition = partitionBody(props.component.arguments, 'Arguments'); const exportsPartition = props.component.exports && partitionBody(props.component.exports, 'Exports'); @@ -47,6 +48,24 @@ export const ComponentView: FC = (props) => { ); } + function liveDebuggingButton(): ReactElement | string { + if (useRemotecfg) { + return 'Live debugging is not yet available for remote components'; + } + + if (!liveDebuggingEnabled) { + return 'Live debugging is not yet available for this component'; + } + + return ( +
+ + Live debugging + +
+ ); + } + return (
- {useRemotecfg ? ( - 'Live debugging is not yet available for remote components' - ) : ( -
- - Live debugging - -
- )} + {liveDebuggingButton()} {props.component.health.message && (
diff --git a/internal/web/ui/src/features/component/types.ts b/internal/web/ui/src/features/component/types.ts index 4678b4ca17..b8e8162ec8 100644 --- a/internal/web/ui/src/features/component/types.ts +++ b/internal/web/ui/src/features/component/types.ts @@ -42,6 +42,11 @@ export interface ComponentInfo { * IDs of components which this component is referencing. */ referencesTo: string[]; + + /** + * Used to indicate if live debugging is available for the component + */ + liveDebuggingEnabled: boolean; } /**