diff --git a/exporter/awsemfexporter/internal/appsignals/useragent.go b/exporter/awsemfexporter/internal/appsignals/useragent.go index d05d5929b6d1..12d5817e0cb7 100644 --- a/exporter/awsemfexporter/internal/appsignals/useragent.go +++ b/exporter/awsemfexporter/internal/appsignals/useragent.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package appsignals +package appsignals // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/appsignals" import ( "context" diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index b3c1ecf42c54..f432a07dcaa3 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -132,7 +132,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri if serviceName, ok := rm.Resource().Attributes().Get("service.name"); ok { if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") || - strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") { + strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") || + strings.HasPrefix(serviceName.Str(), "containerInsightsNeuronMonitorScraper") { // the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights metricReceiver = containerInsightsReceiver } diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 9986e87bd641..852ccdcc54d5 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -282,6 +282,8 @@ func TestTranslateOtToGroupedMetric(t *testing.T) { containerInsightMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsKubeAPIServerScraper") gpuMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1) gpuMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsDCGMExporterScraper") + neuronMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1) + neuronMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNeuronMonitorScraper") counterSumMetrics := map[string]*metricInfo{ "spanCounter": { @@ -390,6 +392,20 @@ func TestTranslateOtToGroupedMetric(t *testing.T) { "myServiceNS/containerInsightsDCGMExporterScraper", containerInsightsReceiver, }, + { + "neuron monitor receiver", + neuronMetric, + map[string]string{ + "isItAnError": "false", + "spanName": "testSpan", + }, + map[string]string{ + oTellibDimensionKey: "cloudwatch-lib", + "spanName": "testSpan", + }, + "myServiceNS/containerInsightsNeuronMonitorScraper", + containerInsightsReceiver, + }, } for _, tc := range testCases { diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 57b8ddbb7904..5d794b459096 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -157,11 +157,12 @@ const ( TypeContainerDiskIO = "ContainerDiskIO" // Special type for pause container // because containerd does not set container name pause container name to POD like docker does. - TypeInfraContainer = "InfraContainer" - TypeGpuContainer = "ContainerGPU" - TypeGpuPod = "PodGPU" - TypeGpuNode = "NodeGPU" - TypeGpuCluster = "ClusterGPU" + TypeInfraContainer = "InfraContainer" + TypeGpuContainer = "ContainerGPU" + TypeGpuPod = "PodGPU" + TypeGpuNode = "NodeGPU" + TypeGpuCluster = "ClusterGPU" + TypeNeuronContainer = "ContainerNeuron" // unit UnitBytes = "Bytes" diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractorhelpers.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractorhelpers.go index 6bc91a48dfe5..402a598b63ed 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractorhelpers.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractorhelpers.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package extractors +package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" import ( "fmt" diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go b/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper_config.go similarity index 57% rename from receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go rename to receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper_config.go index 475c5bc996e2..4d11945d0db3 100644 --- a/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper.go +++ b/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper_config.go @@ -1,12 +1,9 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package gpu +package gpu // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu" import ( - "context" - "errors" - "fmt" "time" configutil "github.com/prometheus/common/config" @@ -15,14 +12,8 @@ import ( "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/model/relabel" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" ) const ( @@ -33,77 +24,13 @@ const ( scraperK8sServiceSelector = "k8s-app=dcgm-exporter-service" ) -type DcgmScraper struct { - ctx context.Context - settings component.TelemetrySettings - host component.Host - hostInfoProvider hostInfoProvider - prometheusReceiver receiver.Metrics - k8sDecorator Decorator - running bool -} - -type DcgmScraperOpts struct { - Ctx context.Context - TelemetrySettings component.TelemetrySettings - Consumer consumer.Metrics - Host component.Host - HostInfoProvider hostInfoProvider - K8sDecorator Decorator - Logger *zap.Logger -} - type hostInfoProvider interface { GetClusterName() string GetInstanceID() string GetInstanceType() string } -func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) { - if opts.Consumer == nil { - return nil, errors.New("consumer cannot be nil") - } - if opts.Host == nil { - return nil, errors.New("host cannot be nil") - } - if opts.HostInfoProvider == nil { - return nil, errors.New("cluster name provider cannot be nil") - } - - promConfig := prometheusreceiver.Config{ - PrometheusConfig: &config.Config{ - ScrapeConfigs: []*config.ScrapeConfig{getScraperConfig(opts.HostInfoProvider)}, - }, - } - - params := receiver.CreateSettings{ - TelemetrySettings: opts.TelemetrySettings, - } - - decoConsumer := decorateConsumer{ - containerOrchestrator: ci.EKS, - nextConsumer: opts.Consumer, - k8sDecorator: opts.K8sDecorator, - logger: opts.Logger, - } - - promFactory := prometheusreceiver.NewFactory() - promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, &decoConsumer) - if err != nil { - return nil, fmt.Errorf("failed to create prometheus receiver: %w", err) - } - - return &DcgmScraper{ - ctx: opts.Ctx, - settings: opts.TelemetrySettings, - host: opts.Host, - hostInfoProvider: opts.HostInfoProvider, - prometheusReceiver: promReceiver, - k8sDecorator: opts.K8sDecorator, - }, nil -} - -func getScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig { +func GetScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig { return &config.ScrapeConfig{ HTTPClientConfig: configutil.HTTPClientConfig{ TLSConfig: configutil.TLSConfig{ @@ -209,35 +136,3 @@ func getMetricRelabelConfig(hostInfoProvider hostInfoProvider) []*relabel.Config }, } } - -func (ds *DcgmScraper) GetMetrics() []pmetric.Metrics { - // This method will never return metrics because the metrics are collected by the scraper. - // This method will ensure the scraper is running - if !ds.running { - ds.settings.Logger.Info("The scraper is not running, starting up the scraper") - err := ds.prometheusReceiver.Start(ds.ctx, ds.host) - if err != nil { - ds.settings.Logger.Error("Unable to start PrometheusReceiver", zap.Error(err)) - } - ds.running = err == nil - } - - return nil -} - -func (ds *DcgmScraper) Shutdown() { - if ds.running { - err := ds.prometheusReceiver.Shutdown(ds.ctx) - if err != nil { - ds.settings.Logger.Error("Unable to shutdown PrometheusReceiver", zap.Error(err)) - } - ds.running = false - } - - if ds.k8sDecorator != nil { - err := ds.k8sDecorator.Shutdown() - if err != nil { - ds.settings.Logger.Error("Unable to shutdown K8sDecorator", zap.Error(err)) - } - } -} diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper_test.go b/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper_test.go index 060fb94b12df..b711da990b92 100644 --- a/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper_test.go +++ b/receiver/awscontainerinsightreceiver/internal/gpu/dcgmscraper_test.go @@ -21,6 +21,7 @@ import ( ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/mocks" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" ) @@ -108,42 +109,6 @@ func (m mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) erro return nil } -func TestNewDcgmScraperBadInputs(t *testing.T) { - settings := componenttest.NewNopTelemetrySettings() - settings.Logger, _ = zap.NewDevelopment() - - tests := []DcgmScraperOpts{ - { - Ctx: context.TODO(), - TelemetrySettings: settings, - Consumer: nil, - Host: componenttest.NewNopHost(), - HostInfoProvider: mockHostInfoProvider{}, - }, - { - Ctx: context.TODO(), - TelemetrySettings: settings, - Consumer: mockConsumer{}, - Host: nil, - HostInfoProvider: mockHostInfoProvider{}, - }, - { - Ctx: context.TODO(), - TelemetrySettings: settings, - Consumer: mockConsumer{}, - Host: componenttest.NewNopHost(), - HostInfoProvider: nil, - }, - } - - for _, tt := range tests { - scraper, err := NewDcgmScraper(tt) - - assert.Error(t, err) - assert.Nil(t, scraper) - } -} - func TestNewDcgmScraperEndToEnd(t *testing.T) { expected := map[string]struct { value float64 @@ -189,16 +154,17 @@ func TestNewDcgmScraperEndToEnd(t *testing.T) { settings := componenttest.NewNopTelemetrySettings() settings.Logger, _ = zap.NewDevelopment() - scraper, err := NewDcgmScraper(DcgmScraperOpts{ + scraper, err := prometheusscraper.NewSimplePrometheusScraper(prometheusscraper.SimplePrometheusScraperOpts{ Ctx: context.TODO(), TelemetrySettings: settings, Consumer: consumer, Host: componenttest.NewNopHost(), HostInfoProvider: mockHostInfoProvider{}, - K8sDecorator: mockDecorator{}, + ScraperConfigs: GetScraperConfig(mockHostInfoProvider{}), + Logger: settings.Logger, }) assert.NoError(t, err) - assert.Equal(t, mockHostInfoProvider{}, scraper.hostInfoProvider) + assert.Equal(t, mockHostInfoProvider{}, scraper.HostInfoProvider) // build up a new PR promFactory := prometheusreceiver.NewFactory() @@ -214,7 +180,7 @@ func TestNewDcgmScraperEndToEnd(t *testing.T) { mp, cfg, err := mocks.SetupMockPrometheus(targets...) assert.NoError(t, err) - scrapeConfig := getScraperConfig(scraper.hostInfoProvider) + scrapeConfig := scraper.ScraperConfigs scrapeConfig.ScrapeInterval = cfg.ScrapeConfigs[0].ScrapeInterval scrapeConfig.ScrapeTimeout = cfg.ScrapeConfigs[0].ScrapeInterval scrapeConfig.Scheme = "http" @@ -245,9 +211,10 @@ func TestNewDcgmScraperEndToEnd(t *testing.T) { // replace the prom receiver params := receiver.CreateSettings{ - TelemetrySettings: scraper.settings, + TelemetrySettings: scraper.Settings, } - scraper.prometheusReceiver, err = promFactory.CreateMetricsReceiver(scraper.ctx, params, &promConfig, consumer) + scraper.PrometheusReceiver, err = promFactory.CreateMetricsReceiver(scraper.Ctx, params, &promConfig, consumer) + assert.NoError(t, err) assert.NotNil(t, mp) defer mp.Close() diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/decorator_test.go b/receiver/awscontainerinsightreceiver/internal/gpu/decorator_test.go deleted file mode 100644 index a81e39268b65..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/gpu/decorator_test.go +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package gpu - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" -) - -var _ Decorator = (*MockK8sDecorator)(nil) - -type MockK8sDecorator struct { -} - -func (m *MockK8sDecorator) Decorate(metric stores.CIMetric) stores.CIMetric { - return metric -} - -func (m *MockK8sDecorator) Shutdown() error { - return nil -} - -func TestConsumeMetrics(t *testing.T) { - logger, _ := zap.NewDevelopment() - dc := &decorateConsumer{ - containerOrchestrator: "EKS", - nextConsumer: consumertest.NewNop(), - k8sDecorator: &MockK8sDecorator{}, - logger: logger, - } - ctx := context.Background() - - testcases := map[string]struct { - metrics pmetric.Metrics - want pmetric.Metrics - shouldError bool - }{ - "empty": { - metrics: pmetric.NewMetrics(), - want: pmetric.NewMetrics(), - shouldError: false, - }, - "unit": { - metrics: generateMetrics(map[string]map[string]string{ - gpuUtil: { - "device": "test0", - }, - gpuMemUtil: { - "device": "test0", - }, - gpuMemTotal: { - "device": "test0", - }, - gpuMemUsed: { - "device": "test0", - }, - gpuPowerDraw: { - "device": "test0", - }, - gpuTemperature: { - "device": "test0", - }, - }), - want: generateMetrics(map[string]map[string]string{ - gpuUtil: { - "device": "test0", - "Unit": "Percent", - }, - gpuMemUtil: { - "device": "test0", - "Unit": "Percent", - }, - gpuMemTotal: { - "device": "test0", - "Unit": "Bytes", - }, - gpuMemUsed: { - "device": "test0", - "Unit": "Bytes", - }, - gpuPowerDraw: { - "device": "test0", - "Unit": "None", - }, - gpuTemperature: { - "device": "test0", - "Unit": "None", - }, - }), - shouldError: false, - }, - "noUnit": { - metrics: generateMetrics(map[string]map[string]string{ - "test": { - "device": "test0", - }, - }), - want: generateMetrics(map[string]map[string]string{ - "test": { - "device": "test0", - }, - }), - shouldError: false, - }, - "typeUnchanged": { - metrics: generateMetrics(map[string]map[string]string{ - gpuUtil: { - "device": "test0", - "Type": "TestType", - }, - }), - want: generateMetrics(map[string]map[string]string{ - gpuUtil: { - "device": "test0", - "Type": "TestType", - "Unit": "Percent", - }, - }), - shouldError: false, - }, - } - - for _, tc := range testcases { - err := dc.ConsumeMetrics(ctx, tc.metrics) - if tc.shouldError { - assert.Error(t, err) - return - } - require.NoError(t, err) - assert.Equal(t, tc.want.MetricCount(), tc.metrics.MetricCount()) - if tc.want.MetricCount() == 0 { - continue - } - actuals := tc.metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() - actuals.Sort(func(a, b pmetric.Metric) bool { - return a.Name() < b.Name() - }) - wants := tc.want.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() - wants.Sort(func(a, b pmetric.Metric) bool { - return a.Name() < b.Name() - }) - for i := 0; i < wants.Len(); i++ { - actual := actuals.At(i) - want := wants.At(i) - assert.Equal(t, want.Name(), actual.Name()) - assert.Equal(t, want.Unit(), actual.Unit()) - actualAttrs := actual.Gauge().DataPoints().At(0).Attributes() - wantAttrs := want.Gauge().DataPoints().At(0).Attributes() - assert.Equal(t, wantAttrs.Len(), actualAttrs.Len()) - wantAttrs.Range(func(k string, v pcommon.Value) bool { - av, ok := actualAttrs.Get(k) - assert.True(t, ok) - assert.Equal(t, v, av) - return true - }) - } - } -} - -func generateMetrics(nameToDims map[string]map[string]string) pmetric.Metrics { - md := pmetric.NewMetrics() - ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() - for name, dims := range nameToDims { - m := ms.AppendEmpty() - m.SetName(name) - gauge := m.SetEmptyGauge().DataPoints().AppendEmpty() - gauge.SetIntValue(10) - for k, v := range dims { - if k == "Unit" { - m.SetUnit(v) - continue - } - gauge.Attributes().PutStr(k, v) - } - } - return md -} diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/metric_unit.go b/receiver/awscontainerinsightreceiver/internal/gpu/metric_unit.go new file mode 100644 index 000000000000..c46a507d8c5d --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/gpu/metric_unit.go @@ -0,0 +1,22 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package gpu // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu" + +const ( + gpuUtil = "DCGM_FI_DEV_GPU_UTIL" + gpuMemUtil = "DCGM_FI_DEV_FB_USED_PERCENT" + gpuMemUsed = "DCGM_FI_DEV_FB_USED" + gpuMemTotal = "DCGM_FI_DEV_FB_TOTAL" + gpuTemperature = "DCGM_FI_DEV_GPU_TEMP" + gpuPowerDraw = "DCGM_FI_DEV_POWER_USAGE" +) + +var MetricToUnit = map[string]string{ + gpuUtil: "Percent", + gpuMemUtil: "Percent", + gpuMemUsed: "Bytes", + gpuMemTotal: "Bytes", + gpuTemperature: "None", + gpuPowerDraw: "None", +} diff --git a/receiver/awscontainerinsightreceiver/internal/mocks/prometheus.go b/receiver/awscontainerinsightreceiver/internal/mocks/prometheus.go index d9b90220df72..3efce32e9f07 100644 --- a/receiver/awscontainerinsightreceiver/internal/mocks/prometheus.go +++ b/receiver/awscontainerinsightreceiver/internal/mocks/prometheus.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package mocks +package mocks // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/mocks" import ( "fmt" diff --git a/receiver/awscontainerinsightreceiver/internal/neuron/neuron_monitor_scraper_config.go b/receiver/awscontainerinsightreceiver/internal/neuron/neuron_monitor_scraper_config.go new file mode 100644 index 000000000000..242583e22703 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/neuron/neuron_monitor_scraper_config.go @@ -0,0 +1,114 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package neuron // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/neuron" + +import ( + "os" + "time" + + configutil "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/kubernetes" + "github.com/prometheus/prometheus/model/relabel" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper" +) + +const ( + caFile = "/etc/amazon-cloudwatch-observability-agent-cert/tls-ca.crt" + collectionInterval = 60 * time.Second + jobName = "containerInsightsNeuronMonitorScraper" + scraperMetricsPath = "/metrics" + scraperK8sServiceSelector = "k8s-app=neuron-monitor-service" +) + +func GetNeuronScrapeConfig(hostinfo prometheusscraper.HostInfoProvider) *config.ScrapeConfig { + + return &config.ScrapeConfig{ + HTTPClientConfig: configutil.HTTPClientConfig{ + TLSConfig: configutil.TLSConfig{ + CAFile: caFile, + InsecureSkipVerify: false, + }, + }, + ScrapeInterval: model.Duration(collectionInterval), + ScrapeTimeout: model.Duration(collectionInterval), + JobName: jobName, + Scheme: "https", + MetricsPath: scraperMetricsPath, + ServiceDiscoveryConfigs: discovery.Configs{ + &kubernetes.SDConfig{ + Role: kubernetes.RoleService, + NamespaceDiscovery: kubernetes.NamespaceDiscovery{ + IncludeOwnNamespace: true, + }, + Selectors: []kubernetes.SelectorConfig{ + { + Role: kubernetes.RoleService, + Label: scraperK8sServiceSelector, + }, + }, + }, + }, + MetricRelabelConfigs: GetNeuronMetricRelabelConfigs(hostinfo), + } +} + +func GetNeuronMetricRelabelConfigs(hostinfo prometheusscraper.HostInfoProvider) []*relabel.Config { + + return []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("neuron.*|system_.*|execution_.*"), + Action: relabel.Keep, + }, + { + SourceLabels: model.LabelNames{"instance_id"}, + TargetLabel: ci.InstanceID, + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: "${1}", + Action: relabel.Replace, + }, + { + SourceLabels: model.LabelNames{"instance_type"}, + TargetLabel: ci.InstanceType, + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: "${1}", + Action: relabel.Replace, + }, + { + SourceLabels: model.LabelNames{"neuroncore"}, + TargetLabel: "NeuronCore", + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: "${1}", + Action: relabel.Replace, + }, + { + SourceLabels: model.LabelNames{"neuron_device_index"}, + TargetLabel: "NeuronDevice", + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: "${1}", + Action: relabel.Replace, + }, + // hacky way to inject static values (clusterName) to label set without additional processor + // relabel looks up an existing label then creates another label with given key (TargetLabel) and value (static) + { + SourceLabels: model.LabelNames{"instance_id"}, + TargetLabel: ci.ClusterNameKey, + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: hostinfo.GetClusterName(), + Action: relabel.Replace, + }, + { + SourceLabels: model.LabelNames{"instance_id"}, + TargetLabel: ci.NodeNameKey, + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: os.Getenv("HOST_NAME"), + Action: relabel.Replace, + }, + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/neuron/neuron_monitor_scraper_test.go b/receiver/awscontainerinsightreceiver/internal/neuron/neuron_monitor_scraper_test.go new file mode 100644 index 000000000000..8628a57498f0 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/neuron/neuron_monitor_scraper_test.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package neuron + +import ( + "context" + "strings" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" +) + +const renameMetric = ` +# HELP python_gc_objects_collected_total Objects collected during gc +# TYPE python_gc_objects_collected_total counter +python_gc_objects_collected_total{generation="0"} 75.0 +# HELP execution_errors_created Execution errors total +# TYPE execution_errors_created gauge +execution_errors_created{availability_zone="us-east-1c",error_type="generic",instance_id="i-09db9b55e0095612f",instance_name="",instance_type="trn1n.32xlarge",region="us-east-1",runtime_tag="367",subnet_id="subnet-06a7754948e8a000f"} 1.7083389404380567e+09 +# HELP neuron_runtime_memory_used_bytes Runtime memory used bytes +# TYPE neuron_runtime_memory_used_bytes gauge +neuron_runtime_memory_used_bytes{availability_zone="us-east-1c",instance_id="i-09db9b55e0095612f",instance_name="",instance_type="trn1n.32xlarge",memory_location="host",region="us-east-1",runtime_tag="367",subnet_id="subnet-06a7754948e8a000f"} 9.043968e+06 +# HELP neuroncore_utilization_ratio NeuronCore utilization ratio +# TYPE neuroncore_utilization_ratio gauge +neuroncore_utilization_ratio{availability_zone="us-east-1c",instance_id="i-09db9b55e0095612f",instance_name="",instance_type="trn1n.32xlarge",neuroncore="0",region="us-east-1",runtime_tag="367",subnet_id="subnet-06a7754948e8a000f"} 0.1 +# HELP system_memory_total_bytes System memory total_bytes bytes +# TYPE system_memory_total_bytes gauge +system_memory_total_bytes{availability_zone="us-east-1c",instance_id="i-09db9b55e0095612f",instance_name="",instance_type="trn1n.32xlarge",region="us-east-1",subnet_id="subnet-06a7754948e8a000f"} 5.32523487232e+011 +# HELP neurondevice_hw_ecc_events_total_mem_ecc_corrected Neuron hardware errors +# TYPE neurondevice_hw_ecc_events_total_mem_ecc_corrected gauge +neurondevice_hw_ecc_events_total_mem_ecc_corrected{availability_zone="us-east-1c",instance_id="i-09db9b55e0095612f",instance_name="",instance_type="trn1n.32xlarge",neuron_device_index="5",region="us-east-1",runtime_tag="367",subnet_id="subnet-06a7754948e8a000f"} 3 +` + +const dummyClusterName = "cluster-name" +const dummyHostName = "i-000000000" +const dummyNodeName = "dummy-nodeName" + +type mockHostInfoProvider struct { +} + +func (m mockHostInfoProvider) GetClusterName() string { + return dummyClusterName +} + +func (m mockHostInfoProvider) GetInstanceID() string { + return dummyHostName +} + +func TestNewNeuronScraperEndToEnd(t *testing.T) { + t.Setenv("HOST_NAME", dummyNodeName) + expectedMetrics := make(map[string]prometheusscraper.ExpectedMetricStruct) + expectedMetrics["neuroncore_utilization_ratio"] = prometheusscraper.ExpectedMetricStruct{ + MetricValue: 0.1, + MetricLabels: []prometheusscraper.MetricLabel{ + {LabelName: "InstanceId", LabelValue: "i-09db9b55e0095612f"}, + {LabelName: "ClusterName", LabelValue: dummyClusterName}, + {LabelName: "NeuronCore", LabelValue: "0"}, + {LabelName: "NodeName", LabelValue: dummyNodeName}, + }, + } + expectedMetrics["neurondevice_hw_ecc_events_total_mem_ecc_corrected"] = prometheusscraper.ExpectedMetricStruct{ + MetricValue: 3, + MetricLabels: []prometheusscraper.MetricLabel{ + {LabelName: "InstanceId", LabelValue: "i-09db9b55e0095612f"}, + {LabelName: "ClusterName", LabelValue: dummyClusterName}, + {LabelName: "NeuronDevice", LabelValue: "5"}, + {LabelName: "NodeName", LabelValue: dummyNodeName}, + }, + } + expectedMetrics["neuron_runtime_memory_used_bytes"] = prometheusscraper.ExpectedMetricStruct{ + MetricValue: 9.043968e+06, + MetricLabels: []prometheusscraper.MetricLabel{ + {LabelName: "InstanceId", LabelValue: "i-09db9b55e0095612f"}, + {LabelName: "ClusterName", LabelValue: dummyClusterName}, + {LabelName: "NodeName", LabelValue: dummyNodeName}, + }, + } + + expectedMetrics["execution_errors_created"] = prometheusscraper.ExpectedMetricStruct{ + MetricValue: 1.7083389404380567e+09, + MetricLabels: []prometheusscraper.MetricLabel{ + {LabelName: "InstanceId", LabelValue: "i-09db9b55e0095612f"}, + {LabelName: "ClusterName", LabelValue: dummyClusterName}, + {LabelName: "NodeName", LabelValue: dummyNodeName}, + }, + } + + expectedMetrics["system_memory_total_bytes"] = prometheusscraper.ExpectedMetricStruct{ + MetricValue: 5.32523487232e+011, + MetricLabels: []prometheusscraper.MetricLabel{ + {LabelName: "InstanceId", LabelValue: "i-09db9b55e0095612f"}, + {LabelName: "ClusterName", LabelValue: dummyClusterName}, + {LabelName: "NodeName", LabelValue: dummyNodeName}, + }, + } + + expectedMetrics["up"] = prometheusscraper.ExpectedMetricStruct{ + MetricValue: 1.0, + MetricLabels: []prometheusscraper.MetricLabel{}, + } + + consumer := prometheusscraper.MockConsumer{ + T: t, + ExpectedMetrics: expectedMetrics, + } + + mockedScraperOpts := prometheusscraper.SimplePrometheusScraperOpts{ + Ctx: context.TODO(), + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + Consumer: consumer, + Host: componenttest.NewNopHost(), + ScraperConfigs: GetNeuronScrapeConfig(mockHostInfoProvider{}), + HostInfoProvider: mockHostInfoProvider{}, + } + + prometheusscraper.TestSimplePrometheusEndToEnd(prometheusscraper.TestSimplePrometheusEndToEndOpts{ + T: t, + Consumer: consumer, + DataReturned: renameMetric, + ScraperOpts: mockedScraperOpts, + MetricRelabelConfig: GetNeuronMetricRelabelConfigs(mockHostInfoProvider{}), + }) +} + +func TestNeuronMonitorScraperJobName(t *testing.T) { + // needs to start with containerInsights + assert.True(t, strings.HasPrefix(jobName, "containerInsightsNeuronMonitorScraper")) +} diff --git a/receiver/awscontainerinsightreceiver/internal/neuron/pod_attribute_decorator.go b/receiver/awscontainerinsightreceiver/internal/neuron/pod_attribute_decorator.go new file mode 100644 index 000000000000..4879e3f3305a --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/neuron/pod_attribute_decorator.go @@ -0,0 +1,157 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package neuron // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/neuron" + +import ( + "context" + "strconv" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +const ( + neuronHardwareInfoKey = "neuron_hardware" + neuronCorePerDeviceKey = "neuroncore_per_device_count" + neuronCoreAttributeKey = "neuroncore" + neuronDeviceAttributeKey = "NeuronDevice" + neuronCoreResourceName = "aws.amazon.com/neuroncore" + neuronDeviceResourceName = "aws.amazon.com/neurondevice" + neuronDeviceResourceNameAlt = "aws.amazon.com/neuron" +) + +type PodResourcesStoreInterface interface { + GetContainerInfo(string, string) *stores.ContainerInfo +} + +type PodAttributesDecoratorConsumer struct { + NextConsumer consumer.Metrics + PodResourcesStore PodResourcesStoreInterface + Logger *zap.Logger +} + +func (pdc *PodAttributesDecoratorConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (pdc *PodAttributesDecoratorConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + pdc.neuronMetricsProcess(md) + return pdc.NextConsumer.ConsumeMetrics(ctx, md) +} + +func (pdc *PodAttributesDecoratorConsumer) neuronMetricsProcess(md pmetric.Metrics) { + rms := md.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + rs := rms.At(i) + ilms := rs.ScopeMetrics() + for j := 0; j < ilms.Len(); j++ { + ils := ilms.At(j) + metrics := ils.Metrics() + + neuronHardwareInfo, neuronHardwareInfoFound := findNeuronHardwareInfo(metrics) + if neuronHardwareInfoFound { + neuronCoresPerDevice, extracted := getNeuronCoresPerDevice(neuronHardwareInfo) + if extracted { + for k := 0; k < metrics.Len(); k++ { + m := metrics.At(k) + pdc.addPodCorrelationAttributes(getMetricDatapoints(m), neuronCoresPerDevice) + } + } + } + } + } +} + +func findNeuronHardwareInfo(metrics pmetric.MetricSlice) (pmetric.Metric, bool) { + var neuronHardwareInfo pmetric.Metric + neuronHardwareInfoFound := false + for k := 0; k < metrics.Len(); k++ { + m := metrics.At(k) + if m.Name() == neuronHardwareInfoKey { + neuronHardwareInfo = m + neuronHardwareInfoFound = true + break + } + } + return neuronHardwareInfo, neuronHardwareInfoFound +} + +func (pdc *PodAttributesDecoratorConsumer) addPodCorrelationAttributes(metricDatapoints pmetric.NumberDataPointSlice, neuronCoresPerDevice int) { + for i := 0; i < metricDatapoints.Len(); i++ { + attributes := metricDatapoints.At(i).Attributes() + var containerInfo *stores.ContainerInfo + + if neuronDeviceIndex, neuronDeviceIndexPresent := attributes.Get(neuronDeviceAttributeKey); neuronDeviceIndexPresent { + // get container info from neuronDeviceIndex + neuronDeviceIndex := neuronDeviceIndex.AsString() + containerInfo = pdc.getContainerInfoForNeuronDeviceIndex(neuronDeviceIndex) + + } else if neuronCoreIndex, neuronCoreIndexPresent := attributes.Get(neuronCoreAttributeKey); neuronCoreIndexPresent { + // get container info from neuronCore + containerInfo = pdc.PodResourcesStore.GetContainerInfo(neuronCoreIndex.AsString(), neuronCoreResourceName) + neuronDeviceIndex := getNeuronDeviceIndexFromCoreAttribute(neuronCoreIndex, neuronCoresPerDevice) + if containerInfo == nil { + // else get container info from calculated neuronDeviceIndex + containerInfo = pdc.getContainerInfoForNeuronDeviceIndex(neuronDeviceIndex) + } + attributes.PutStr(neuronDeviceAttributeKey, neuronDeviceIndex) + } + populateAttributes(&attributes, containerInfo) + } +} + +func (pdc *PodAttributesDecoratorConsumer) getContainerInfoForNeuronDeviceIndex(neuronDeviceIndex string) *stores.ContainerInfo { + containerInfo := pdc.PodResourcesStore.GetContainerInfo(neuronDeviceIndex, neuronDeviceResourceName) + if containerInfo == nil { + // Alt resource name is to support backward compatibility in neuron monitor : https://awsdocs-neuron.readthedocs-hosted.com/en/latest/containers/tutorials/k8s-setup.html + containerInfo = pdc.PodResourcesStore.GetContainerInfo(neuronDeviceIndex, neuronDeviceResourceNameAlt) + } + return containerInfo +} + +func populateAttributes(attributes *pcommon.Map, containerInfo *stores.ContainerInfo) { + if containerInfo != nil { + attributes.PutStr(ci.AttributeContainerName, containerInfo.ContainerName) + attributes.PutStr(ci.AttributeK8sPodName, containerInfo.PodName) + attributes.PutStr(ci.AttributeK8sNamespace, containerInfo.Namespace) + } +} + +func getMetricDatapoints(m pmetric.Metric) pmetric.NumberDataPointSlice { + switch m.Type() { + case pmetric.MetricTypeGauge: + return m.Gauge().DataPoints() + case pmetric.MetricTypeSum: + return m.Sum().DataPoints() + default: + return pmetric.NewNumberDataPointSlice() + } +} + +// We extract the attribute named `neuroncore_per_device_count` from the metric to get the value +// https://awsdocs-neuron.readthedocs-hosted.com/en/latest/tools/neuron-sys-tools/neuron-monitor-user-guide.html +func getNeuronCoresPerDevice(neuronHardwareInfo pmetric.Metric) (int, bool) { + neuronCoreHardwareInfoDatapoints := neuronHardwareInfo.Sum().DataPoints() + if neuronCoreHardwareInfoDatapoints.Len() > 0 { + neuronCoresPerDeviceValue, found := neuronCoreHardwareInfoDatapoints.At(0).Attributes().Get(neuronCorePerDeviceKey) + if found { + neuronCoresPerDevice, _ := strconv.Atoi(neuronCoresPerDeviceValue.AsString()) + return neuronCoresPerDevice, true + } + } + return -1, false +} + +// To get the device index from core index we divide the index by cores in a single device +// https://awsdocs-neuron.readthedocs-hosted.com/en/latest/tools/neuron-sys-tools/neuron-monitor-user-guide.html +func getNeuronDeviceIndexFromCoreAttribute(neuronCoreIndex pcommon.Value, neuronCoresPerDevice int) string { + neuronCoreIndexIntVal, _ := strconv.Atoi(neuronCoreIndex.AsString()) + return strconv.Itoa(neuronCoreIndexIntVal / neuronCoresPerDevice) +} diff --git a/receiver/awscontainerinsightreceiver/internal/neuron/pod_attribute_decorator_test.go b/receiver/awscontainerinsightreceiver/internal/neuron/pod_attribute_decorator_test.go new file mode 100644 index 000000000000..01fb143a0305 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/neuron/pod_attribute_decorator_test.go @@ -0,0 +1,190 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package neuron + +import ( + "context" + "testing" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +var dummyPodName = "pod-name" +var dummyPodNameForAltResource = "pod-name-alt" +var dummyContainerName = "container-name" +var dummyNamespace = "namespace" + +type mockPodResourcesStore struct { +} + +func (m mockPodResourcesStore) GetContainerInfo(_ string, _ string) *stores.ContainerInfo { + return &stores.ContainerInfo{ + PodName: dummyPodName, + ContainerName: dummyContainerName, + Namespace: dummyNamespace, + } +} + +type mockPodResourcesStoreWithAltResourceName struct { +} + +func (m mockPodResourcesStoreWithAltResourceName) GetContainerInfo(_ string, resourceName string) *stores.ContainerInfo { + if resourceName == neuronDeviceResourceNameAlt { + return &stores.ContainerInfo{ + PodName: dummyPodNameForAltResource, + ContainerName: dummyContainerName, + Namespace: dummyNamespace, + } + } + return nil +} + +func TestConsumeMetricsForPodAttributeDecorator(t *testing.T) { + logger, _ := zap.NewDevelopment() + dc := &PodAttributesDecoratorConsumer{ + NextConsumer: consumertest.NewNop(), + PodResourcesStore: mockPodResourcesStore{}, + Logger: logger, + } + ctx := context.Background() + + testcases1 := map[string]decoratorconsumer.TestCase{ + "empty": { + Metrics: pmetric.NewMetrics(), + Want: pmetric.NewMetrics(), + ShouldError: false, + }, + "neuron_hardware_info_not_found": { + Metrics: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + }, + }), + + Want: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + }, + }), + ShouldError: false, + }, + "correlation_via_neuron_device_index": { + Metrics: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronDeviceAttributeKey: "1", + }, + }), + Want: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronDeviceAttributeKey: "1", + ci.AttributeContainerName: dummyContainerName, + ci.AttributeK8sPodName: dummyPodName, + ci.AttributeK8sNamespace: dummyNamespace, + }, + }), + ShouldError: false, + }, + "correlation_via_neuron_core": { + Metrics: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronCoreAttributeKey: "10", + }, + }), + Want: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronCoreAttributeKey: "10", + neuronDeviceAttributeKey: "5", + ci.AttributeContainerName: dummyContainerName, + ci.AttributeK8sPodName: dummyPodName, + ci.AttributeK8sNamespace: dummyNamespace, + }, + }), + ShouldError: false, + }, + "correlation_when_both_present": { + Metrics: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronDeviceAttributeKey: "5", + neuronCoreAttributeKey: "10", + }, + }), + Want: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronCoreAttributeKey: "10", + neuronDeviceAttributeKey: "5", + ci.AttributeContainerName: dummyContainerName, + ci.AttributeK8sPodName: dummyPodName, + ci.AttributeK8sNamespace: dummyNamespace, + }, + }), + ShouldError: false, + }, + } + + decoratorconsumer.RunDecoratorTestScenarios(ctx, t, dc, testcases1) + + dc = &PodAttributesDecoratorConsumer{ + NextConsumer: consumertest.NewNop(), + PodResourcesStore: mockPodResourcesStoreWithAltResourceName{}, + Logger: logger, + } + + testcases2 := map[string]decoratorconsumer.TestCase{ + "correlation_via_neuron_device_index_alt_name": { + Metrics: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronDeviceAttributeKey: "1", + }, + }), + Want: decoratorconsumer.GenerateMetrics(map[decoratorconsumer.MetricIdentifier]map[string]string{ + {Name: neuronHardwareInfoKey, MetricType: pmetric.MetricTypeSum}: { + neuronCorePerDeviceKey: "2", + }, + {Name: "test", MetricType: pmetric.MetricTypeGauge}: { + "device": "test0", + neuronDeviceAttributeKey: "1", + ci.AttributeContainerName: dummyContainerName, + ci.AttributeK8sPodName: dummyPodNameForAltResource, + ci.AttributeK8sNamespace: dummyNamespace, + }, + }), + ShouldError: false, + }, + } + + decoratorconsumer.RunDecoratorTestScenarios(ctx, t, dc, testcases2) +} diff --git a/receiver/awscontainerinsightreceiver/internal/gpu/decorator.go b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator.go similarity index 61% rename from receiver/awscontainerinsightreceiver/internal/gpu/decorator.go rename to receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator.go index ee3f9a8c36c1..d999ca7999b7 100644 --- a/receiver/awscontainerinsightreceiver/internal/gpu/decorator.go +++ b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator.go @@ -1,53 +1,36 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package gpu +package decoratorconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer" import ( "context" + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" - - ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" ) -const ( - gpuUtil = "DCGM_FI_DEV_GPU_UTIL" - gpuMemUtil = "DCGM_FI_DEV_FB_USED_PERCENT" - gpuMemUsed = "DCGM_FI_DEV_FB_USED" - gpuMemTotal = "DCGM_FI_DEV_FB_TOTAL" - gpuTemperature = "DCGM_FI_DEV_GPU_TEMP" - gpuPowerDraw = "DCGM_FI_DEV_POWER_USAGE" -) - -var metricToUnit = map[string]string{ - gpuUtil: "Percent", - gpuMemUtil: "Percent", - gpuMemUsed: "Bytes", - gpuMemTotal: "Bytes", - gpuTemperature: "None", - gpuPowerDraw: "None", -} - -// GPU decorator acts as an interceptor of metrics before the scraper sends them to the next designated consumer -type decorateConsumer struct { - containerOrchestrator string - nextConsumer consumer.Metrics - k8sDecorator Decorator - logger *zap.Logger +// Decorator acts as an interceptor of metrics before the scraper sends them to the next designated consumer +type DecorateConsumer struct { + ContainerOrchestrator string + NextConsumer consumer.Metrics + K8sDecorator Decorator + MetricType string + MetricToUnitMap map[string]string + Logger *zap.Logger } -func (dc *decorateConsumer) Capabilities() consumer.Capabilities { +func (dc *DecorateConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{ MutatesData: true, } } -func (dc *decorateConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (dc *DecorateConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { resourceTags := make(map[string]string) rms := md.ResourceMetrics() for i := 0; i < rms.Len(); i++ { @@ -62,21 +45,21 @@ func (dc *decorateConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metri ms := ilms.At(j).Metrics() for k := 0; k < ms.Len(); k++ { m := ms.At(k) - converted := ci.ConvertToFieldsAndTags(m, dc.logger) + converted := ci.ConvertToFieldsAndTags(m, dc.Logger) var rcis []*stores.RawContainerInsightsMetric for _, pair := range converted { - rcis = append(rcis, stores.NewRawContainerInsightsMetricWithData(ci.TypeGpuContainer, pair.Fields, pair.Tags, dc.logger)) + rcis = append(rcis, stores.NewRawContainerInsightsMetricWithData(dc.MetricType, pair.Fields, pair.Tags, dc.Logger)) } decorated := dc.decorateMetrics(rcis) dc.updateAttributes(m, decorated) - if unit, ok := metricToUnit[m.Name()]; ok { + if unit, ok := dc.MetricToUnitMap[m.Name()]; ok { m.SetUnit(unit) } } } } - return dc.nextConsumer.ConsumeMetrics(ctx, md) + return dc.NextConsumer.ConsumeMetrics(ctx, md) } type Decorator interface { @@ -84,14 +67,14 @@ type Decorator interface { Shutdown() error } -func (dc *decorateConsumer) decorateMetrics(rcis []*stores.RawContainerInsightsMetric) []*stores.RawContainerInsightsMetric { +func (dc *DecorateConsumer) decorateMetrics(rcis []*stores.RawContainerInsightsMetric) []*stores.RawContainerInsightsMetric { var result []*stores.RawContainerInsightsMetric - if dc.containerOrchestrator != ci.EKS { + if dc.ContainerOrchestrator != ci.EKS { return result } for _, rci := range rcis { // add tags for EKS - out := dc.k8sDecorator.Decorate(rci) + out := dc.K8sDecorator.Decorate(rci) if out != nil { result = append(result, out.(*stores.RawContainerInsightsMetric)) } @@ -99,7 +82,7 @@ func (dc *decorateConsumer) decorateMetrics(rcis []*stores.RawContainerInsightsM return result } -func (dc *decorateConsumer) updateAttributes(m pmetric.Metric, rcis []*stores.RawContainerInsightsMetric) { +func (dc *DecorateConsumer) updateAttributes(m pmetric.Metric, rcis []*stores.RawContainerInsightsMetric) { if len(rcis) == 0 { return } @@ -110,7 +93,7 @@ func (dc *decorateConsumer) updateAttributes(m pmetric.Metric, rcis []*stores.Ra case pmetric.MetricTypeSum: dps = m.Sum().DataPoints() default: - dc.logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String())) + dc.Logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String())) } if dps.Len() == 0 { return @@ -132,9 +115,9 @@ func (dc *decorateConsumer) updateAttributes(m pmetric.Metric, rcis []*stores.Ra } } -func (dc *decorateConsumer) Shutdown() error { - if dc.k8sDecorator != nil { - return dc.k8sDecorator.Shutdown() +func (dc *DecorateConsumer) Shutdown() error { + if dc.K8sDecorator != nil { + return dc.K8sDecorator.Shutdown() } return nil } diff --git a/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator_test.go b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator_test.go new file mode 100644 index 000000000000..be7663b93eba --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator_test.go @@ -0,0 +1,145 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package decoratorconsumer + +import ( + "context" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +var _ Decorator = (*MockK8sDecorator)(nil) + +type MockK8sDecorator struct { +} + +func (m *MockK8sDecorator) Decorate(metric stores.CIMetric) stores.CIMetric { + return metric +} + +func (m *MockK8sDecorator) Shutdown() error { + return nil +} + +const ( + util = "UTIL" + memUtil = "USED_PERCENT" + memUsed = "FB_USED" + memTotal = "FB_TOTAL" + temp = "TEMP" + powerDraw = "POWER_USAGE" +) + +var metricToUnit = map[string]string{ + util: "Percent", + memUtil: "Percent", + memUsed: "Bytes", + memTotal: "Bytes", + temp: "None", + powerDraw: "None", +} + +func TestConsumeMetrics(t *testing.T) { + logger, _ := zap.NewDevelopment() + dc := &DecorateConsumer{ + ContainerOrchestrator: "EKS", + NextConsumer: consumertest.NewNop(), + K8sDecorator: &MockK8sDecorator{}, + MetricToUnitMap: metricToUnit, + Logger: logger, + } + ctx := context.Background() + + testcases := map[string]TestCase{ + "empty": { + Metrics: pmetric.NewMetrics(), + Want: pmetric.NewMetrics(), + ShouldError: false, + }, + "unit": { + Metrics: GenerateMetrics(map[MetricIdentifier]map[string]string{ + {util, pmetric.MetricTypeGauge}: { + "device": "test0", + }, + {memUtil, pmetric.MetricTypeGauge}: { + "device": "test0", + }, + {memTotal, pmetric.MetricTypeGauge}: { + "device": "test0", + }, + {memUsed, pmetric.MetricTypeGauge}: { + "device": "test0", + }, + {powerDraw, pmetric.MetricTypeGauge}: { + "device": "test0", + }, + {temp, pmetric.MetricTypeGauge}: { + "device": "test0", + }, + }), + Want: GenerateMetrics(map[MetricIdentifier]map[string]string{ + {util, pmetric.MetricTypeGauge}: { + "device": "test0", + "Unit": "Percent", + }, + {memUtil, pmetric.MetricTypeGauge}: { + "device": "test0", + "Unit": "Percent", + }, + {memTotal, pmetric.MetricTypeGauge}: { + "device": "test0", + "Unit": "Bytes", + }, + {memUsed, pmetric.MetricTypeGauge}: { + "device": "test0", + "Unit": "Bytes", + }, + {powerDraw, pmetric.MetricTypeGauge}: { + "device": "test0", + "Unit": "None", + }, + {temp, pmetric.MetricTypeGauge}: { + "device": "test0", + "Unit": "None", + }, + }), + ShouldError: false, + }, + "noUnit": { + Metrics: GenerateMetrics(map[MetricIdentifier]map[string]string{ + {"test", pmetric.MetricTypeGauge}: { + "device": "test0", + }, + }), + Want: GenerateMetrics(map[MetricIdentifier]map[string]string{ + {"test", pmetric.MetricTypeGauge}: { + "device": "test0", + }, + }), + ShouldError: false, + }, + "typeUnchanged": { + Metrics: GenerateMetrics(map[MetricIdentifier]map[string]string{ + {util, pmetric.MetricTypeGauge}: { + "device": "test0", + "Type": "TestType", + }, + }), + Want: GenerateMetrics(map[MetricIdentifier]map[string]string{ + {util, pmetric.MetricTypeGauge}: { + "device": "test0", + "Type": "TestType", + "Unit": "Percent", + }, + }), + ShouldError: false, + }, + } + + RunDecoratorTestScenarios(ctx, t, dc, testcases) +} diff --git a/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator_testutils.go b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator_testutils.go new file mode 100644 index 000000000000..171a0110db88 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer/decorator_testutils.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package decoratorconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type MetricIdentifier struct { + Name string + MetricType pmetric.MetricType +} + +type TestCase struct { + Metrics pmetric.Metrics + Want pmetric.Metrics + ShouldError bool +} + +func RunDecoratorTestScenarios(ctx context.Context, t *testing.T, dc consumer.Metrics, testcases map[string]TestCase) { + for _, tc := range testcases { + err := dc.ConsumeMetrics(ctx, tc.Metrics) + if tc.ShouldError { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tc.Want.MetricCount(), tc.Metrics.MetricCount()) + if tc.Want.MetricCount() == 0 { + continue + } + actuals := tc.Metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + actuals.Sort(func(a, b pmetric.Metric) bool { + return a.Name() < b.Name() + }) + wants := tc.Want.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + wants.Sort(func(a, b pmetric.Metric) bool { + return a.Name() < b.Name() + }) + for i := 0; i < wants.Len(); i++ { + actual := actuals.At(i) + want := wants.At(i) + assert.Equal(t, want.Name(), actual.Name()) + assert.Equal(t, want.Unit(), actual.Unit()) + actualAttrs := getAttributesFromMetric(&actual) + wantAttrs := getAttributesFromMetric(&want) + assert.Equal(t, wantAttrs.Len(), actualAttrs.Len()) + wantAttrs.Range(func(k string, v pcommon.Value) bool { + av, ok := actualAttrs.Get(k) + assert.True(t, ok) + assert.Equal(t, v, av) + return true + }) + } + } +} + +func GenerateMetrics(nameToDimsGauge map[MetricIdentifier]map[string]string) pmetric.Metrics { + md := pmetric.NewMetrics() + ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + for metric, dims := range nameToDimsGauge { + m := ms.AppendEmpty() + m.SetName(metric.Name) + metricBody := m.SetEmptyGauge().DataPoints().AppendEmpty() + if metric.MetricType == pmetric.MetricTypeSum { + metricBody = m.SetEmptySum().DataPoints().AppendEmpty() + } + metricBody.SetIntValue(10) + for k, v := range dims { + if k == "Unit" { + m.SetUnit(v) + continue + } + metricBody.Attributes().PutStr(k, v) + } + } + return md +} + +func getAttributesFromMetric(m *pmetric.Metric) pcommon.Map { + if m.Type() == pmetric.MetricTypeGauge { + return m.Gauge().DataPoints().At(0).Attributes() + } + return m.Sum().DataPoints().At(0).Attributes() +} diff --git a/receiver/awscontainerinsightreceiver/internal/prometheusscraper/prometheus_scraper_testutils.go b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/prometheus_scraper_testutils.go new file mode 100644 index 000000000000..bce5bc78e715 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/prometheus_scraper_testutils.go @@ -0,0 +1,154 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper" + +import ( + "context" + "fmt" + "strings" + "testing" + + configutil "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/mocks" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" +) + +type MetricLabel struct { + LabelName string + LabelValue string +} + +type ExpectedMetricStruct struct { + MetricValue float64 + MetricLabels []MetricLabel +} + +type TestSimplePrometheusEndToEndOpts struct { + T *testing.T + Consumer consumer.Metrics + DataReturned string + ScraperOpts SimplePrometheusScraperOpts + MetricRelabelConfig []*relabel.Config +} + +type MockConsumer struct { + T *testing.T + ExpectedMetrics map[string]ExpectedMetricStruct +} + +func (m MockConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: false, + } +} + +func (m MockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + expectedMetricsCount := len(m.ExpectedMetrics) + metricFoundCount := 0 + + assert.Equal(m.T, 1, md.ResourceMetrics().Len()) + + scopeMetrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + for i := 0; i < scopeMetrics.Len(); i++ { + metric := scopeMetrics.At(i) + metricsStruct, ok := m.ExpectedMetrics[metric.Name()] + if ok { + assert.Equal(m.T, metricsStruct.MetricValue, metric.Gauge().DataPoints().At(0).DoubleValue()) + for _, expectedLabel := range metricsStruct.MetricLabels { + labelValue, isFound := metric.Gauge().DataPoints().At(0).Attributes().Get(expectedLabel.LabelName) + assert.True(m.T, isFound) + assert.Equal(m.T, expectedLabel.LabelValue, labelValue.Str()) + } + metricFoundCount++ + } + } + + assert.Equal(m.T, expectedMetricsCount, metricFoundCount) + + return nil +} + +func TestSimplePrometheusEndToEnd(opts TestSimplePrometheusEndToEndOpts) { + scraper, err := NewSimplePrometheusScraper(opts.ScraperOpts) + assert.NoError(opts.T, err) + + // build up a new PR + promFactory := prometheusreceiver.NewFactory() + + targets := []*mocks.TestData{ + { + Name: "neuron", + Pages: []mocks.MockPrometheusResponse{ + {Code: 200, Data: opts.DataReturned}, + }, + }, + } + mp, cfg, err := mocks.SetupMockPrometheus(targets...) + assert.NoError(opts.T, err) + + split := strings.Split(mp.Srv.URL, "http://") + + mockedScrapeConfig := &config.ScrapeConfig{ + HTTPClientConfig: configutil.HTTPClientConfig{ + TLSConfig: configutil.TLSConfig{ + InsecureSkipVerify: true, + }, + }, + ScrapeInterval: cfg.ScrapeConfigs[0].ScrapeInterval, + ScrapeTimeout: cfg.ScrapeConfigs[0].ScrapeInterval, + JobName: fmt.Sprintf("%s/%s", "jobName", cfg.ScrapeConfigs[0].MetricsPath), + HonorTimestamps: true, + Scheme: "http", + MetricsPath: cfg.ScrapeConfigs[0].MetricsPath, + ServiceDiscoveryConfigs: discovery.Configs{ + // using dummy static config to avoid service discovery initialization + &discovery.StaticConfig{ + { + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue(split[1]), + }, + }, + }, + }, + }, + RelabelConfigs: []*relabel.Config{}, + MetricRelabelConfigs: opts.MetricRelabelConfig, + } + + promConfig := prometheusreceiver.Config{ + PrometheusConfig: &config.Config{ + ScrapeConfigs: []*config.ScrapeConfig{mockedScrapeConfig}, + }, + } + + // replace the prom receiver + params := receiver.CreateSettings{ + TelemetrySettings: scraper.Settings, + } + scraper.PrometheusReceiver, err = promFactory.CreateMetricsReceiver(scraper.Ctx, params, &promConfig, opts.Consumer) + assert.NoError(opts.T, err) + assert.NotNil(opts.T, mp) + defer mp.Close() + + // perform a single scrape, this will kick off the scraper process for additional scrapes + scraper.GetMetrics() + + opts.T.Cleanup(func() { + scraper.Shutdown() + }) + + // wait for 2 scrapes, one initiated by us, another by the new scraper process + mp.Wg.Wait() + mp.Wg.Wait() +} diff --git a/receiver/awscontainerinsightreceiver/internal/prometheusscraper/simple_prometheus_scraper.go b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/simple_prometheus_scraper.go new file mode 100644 index 000000000000..50815d3d9591 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/simple_prometheus_scraper.go @@ -0,0 +1,105 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper" + +import ( + "context" + "errors" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" + "github.com/prometheus/prometheus/config" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" +) + +type SimplePrometheusScraper struct { + Ctx context.Context + Settings component.TelemetrySettings + host component.Host + HostInfoProvider HostInfoProvider + PrometheusReceiver receiver.Metrics + ScraperConfigs *config.ScrapeConfig + running bool +} + +type SimplePrometheusScraperOpts struct { + Ctx context.Context + TelemetrySettings component.TelemetrySettings + Consumer consumer.Metrics + Host component.Host + HostInfoProvider HostInfoProvider + ScraperConfigs *config.ScrapeConfig + Logger *zap.Logger +} + +type HostInfoProvider interface { + GetClusterName() string + GetInstanceID() string +} + +func NewSimplePrometheusScraper(opts SimplePrometheusScraperOpts) (*SimplePrometheusScraper, error) { + if opts.Consumer == nil { + return nil, errors.New("consumer cannot be nil") + } + if opts.Host == nil { + return nil, errors.New("host cannot be nil") + } + if opts.HostInfoProvider == nil { + return nil, errors.New("cluster name provider cannot be nil") + } + + promConfig := prometheusreceiver.Config{ + PrometheusConfig: &config.Config{ + ScrapeConfigs: []*config.ScrapeConfig{opts.ScraperConfigs}, + }, + } + + params := receiver.CreateSettings{ + TelemetrySettings: opts.TelemetrySettings, + } + + promFactory := prometheusreceiver.NewFactory() + promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, opts.Consumer) + if err != nil { + return nil, fmt.Errorf("failed to create prometheus receiver: %w", err) + } + + return &SimplePrometheusScraper{ + Ctx: opts.Ctx, + Settings: opts.TelemetrySettings, + host: opts.Host, + HostInfoProvider: opts.HostInfoProvider, + ScraperConfigs: opts.ScraperConfigs, + PrometheusReceiver: promReceiver, + }, nil +} + +func (ds *SimplePrometheusScraper) GetMetrics() []pmetric.Metrics { + // This method will never return metrics because the metrics are collected by the scraper. + // This method will ensure the scraper is running + + if !ds.running { + ds.Settings.Logger.Info("The scraper is not running, starting up the scraper") + err := ds.PrometheusReceiver.Start(ds.Ctx, ds.host) + if err != nil { + ds.Settings.Logger.Error("Unable to start PrometheusReceiver", zap.Error(err)) + } + ds.running = err == nil + } + return nil +} + +func (ds *SimplePrometheusScraper) Shutdown() { + if ds.running { + err := ds.PrometheusReceiver.Shutdown(ds.Ctx) + if err != nil { + ds.Settings.Logger.Error("Unable to shutdown PrometheusReceiver", zap.Error(err)) + } + ds.running = false + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/prometheusscraper/simple_prometheus_scraper_test.go b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/simple_prometheus_scraper_test.go new file mode 100644 index 000000000000..c97dafc76429 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/prometheusscraper/simple_prometheus_scraper_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusscraper + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.uber.org/zap" +) + +type mockHostInfoProvider struct { +} + +func (m mockHostInfoProvider) GetClusterName() string { + return "cluster-name" +} + +func (m mockHostInfoProvider) GetInstanceID() string { + return "i-000000000" +} + +func TestSimplePrometheusScraperBadInputs(t *testing.T) { + settings := componenttest.NewNopTelemetrySettings() + settings.Logger, _ = zap.NewDevelopment() + + tests := []SimplePrometheusScraperOpts{ + { + Ctx: context.TODO(), + TelemetrySettings: settings, + Consumer: nil, + Host: componenttest.NewNopHost(), + HostInfoProvider: mockHostInfoProvider{}, + }, + { + Ctx: context.TODO(), + TelemetrySettings: settings, + Consumer: MockConsumer{}, + Host: nil, + HostInfoProvider: mockHostInfoProvider{}, + }, + { + Ctx: context.TODO(), + TelemetrySettings: settings, + Consumer: MockConsumer{}, + Host: componenttest.NewNopHost(), + HostInfoProvider: nil, + }, + } + + for _, tt := range tests { + scraper, err := NewSimplePrometheusScraper(tt) + + assert.Error(t, err) + assert.Nil(t, scraper) + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils.go b/receiver/awscontainerinsightreceiver/internal/stores/utils.go index f82751e45398..db3e65f370ff 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils.go @@ -133,6 +133,8 @@ func tagMetricSourceLinux(metric CIMetric) { sources = append(sources, []string{"cadvisor"}...) case ci.TypeGpuContainer: sources = append(sources, []string{"dcgm", "pod", "calculated"}...) + case ci.TypeNeuronContainer: + sources = append(sources, []string{"neuron", "pod", "calculated"}...) } if len(sources) > 0 { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index 9bdb74a42445..f75f9d68f8f5 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -176,6 +176,7 @@ func TestUtils_TagMetricSource(t *testing.T) { ci.TypeContainerFS, ci.TypeContainerDiskIO, ci.TypeGpuContainer, + ci.TypeNeuronContainer, } expectedSources := []string{ @@ -190,6 +191,7 @@ func TestUtils_TagMetricSource(t *testing.T) { "[\"cadvisor\",\"calculated\"]", "[\"cadvisor\"]", "[\"dcgm\",\"pod\",\"calculated\"]", + "[\"neuron\",\"pod\",\"calculated\"]", } for i, mtype := range types { tags := map[string]string{ diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index b1a9e48cd9f1..65e772a96ac1 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -24,6 +24,9 @@ import ( hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/neuron" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" ) @@ -43,7 +46,10 @@ type awsContainerInsightReceiver struct { containerMetricsProvider metricsProvider k8sapiserver metricsProvider prometheusScraper *k8sapiserver.PrometheusScraper - dcgmScraper *gpu.DcgmScraper + k8sDecorator *stores.K8sDecorator + podResourcesStore *stores.PodResourcesStore + dcgmScraper *prometheusscraper.SimplePrometheusScraper + neuronMonitorScraper *prometheusscraper.SimplePrometheusScraper } // newAWSContainerInsightReceiver creates the aws container insight receiver with the given parameters. @@ -73,18 +79,18 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone } if acir.config.ContainerOrchestrator == ci.EKS { - k8sDecorator, err := stores.NewK8sDecorator(ctx, acir.config.TagService, acir.config.PrefFullPodName, acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.settings.Logger) + acir.k8sDecorator, err = stores.NewK8sDecorator(ctx, acir.config.TagService, acir.config.PrefFullPodName, acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.settings.Logger) if err != nil { return err } if runtime.GOOS == ci.OperatingSystemWindows { - acir.containerMetricsProvider, err = k8swindows.New(acir.settings.Logger, k8sDecorator, *hostinfo) + acir.containerMetricsProvider, err = k8swindows.New(acir.settings.Logger, acir.k8sDecorator, *hostinfo) if err != nil { return err } } else { - decoratorOption := cadvisor.WithDecorator(k8sDecorator) + decoratorOption := cadvisor.WithDecorator(acir.k8sDecorator) acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, decoratorOption) if err != nil { return err @@ -105,10 +111,14 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone if err != nil { acir.settings.Logger.Debug("Unable to start kube apiserver prometheus scraper", zap.Error(err)) } - err = acir.initDcgmScraper(ctx, host, hostinfo, k8sDecorator) + err = acir.initDcgmScraper(ctx, host, hostinfo, acir.k8sDecorator) if err != nil { acir.settings.Logger.Debug("Unable to start dcgm scraper", zap.Error(err)) } + err = acir.initNeuronScraper(ctx, host, hostinfo, acir.k8sDecorator) + if err != nil { + acir.settings.Logger.Debug("Unable to start neuron scraper", zap.Error(err)) + } } } if acir.config.ContainerOrchestrator == ci.ECS { @@ -189,16 +199,66 @@ func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, ho return nil } + decoConsumer := decoratorconsumer.DecorateConsumer{ + ContainerOrchestrator: ci.EKS, + NextConsumer: acir.nextConsumer, + MetricType: ci.TypeGpuContainer, + MetricToUnitMap: gpu.MetricToUnit, + K8sDecorator: decorator, + Logger: acir.settings.Logger, + } + + scraperOpts := prometheusscraper.SimplePrometheusScraperOpts{ + Ctx: ctx, + TelemetrySettings: acir.settings, + Consumer: &decoConsumer, + Host: host, + ScraperConfigs: gpu.GetScraperConfig(hostinfo), + HostInfoProvider: hostinfo, + Logger: acir.settings.Logger, + } + var err error - acir.dcgmScraper, err = gpu.NewDcgmScraper(gpu.DcgmScraperOpts{ + acir.dcgmScraper, err = prometheusscraper.NewSimplePrometheusScraper(scraperOpts) + return err +} + +func (acir *awsContainerInsightReceiver) initNeuronScraper(ctx context.Context, host component.Host, hostinfo *hostInfo.Info, decorator *stores.K8sDecorator) error { + if !acir.config.EnableAcceleratedComputeMetrics { + return nil + } + + decoConsumer := decoratorconsumer.DecorateConsumer{ + ContainerOrchestrator: ci.EKS, + NextConsumer: acir.nextConsumer, + MetricType: ci.TypeNeuronContainer, + K8sDecorator: decorator, + Logger: acir.settings.Logger, + } + + acir.podResourcesStore = stores.NewPodResourcesStore(acir.settings.Logger) + acir.podResourcesStore.AddResourceName("aws.amazon.com/neuroncore") + acir.podResourcesStore.AddResourceName("aws.amazon.com/neuron") + acir.podResourcesStore.AddResourceName("aws.amazon.com/neurondevice") + + podAttributesDecoratorConsumer := neuron.PodAttributesDecoratorConsumer{ + NextConsumer: &decoConsumer, + PodResourcesStore: acir.podResourcesStore, + Logger: acir.settings.Logger, + } + + scraperOpts := prometheusscraper.SimplePrometheusScraperOpts{ Ctx: ctx, TelemetrySettings: acir.settings, - Consumer: acir.nextConsumer, + Consumer: &podAttributesDecoratorConsumer, Host: host, + ScraperConfigs: neuron.GetNeuronScrapeConfig(hostinfo), HostInfoProvider: hostinfo, - K8sDecorator: decorator, Logger: acir.settings.Logger, - }) + } + + var err error + acir.neuronMonitorScraper, err = prometheusscraper.NewSimplePrometheusScraper(scraperOpts) return err } @@ -226,6 +286,18 @@ func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error { acir.dcgmScraper.Shutdown() } + if acir.neuronMonitorScraper != nil { + acir.neuronMonitorScraper.Shutdown() + } + + if acir.k8sDecorator != nil { + errs = errors.Join(errs, acir.k8sDecorator.Shutdown()) + } + + if acir.podResourcesStore != nil { + acir.podResourcesStore.Shutdown() + } + return errs } @@ -257,6 +329,10 @@ func (acir *awsContainerInsightReceiver) collectData(ctx context.Context) error acir.dcgmScraper.GetMetrics() //nolint:errcheck } + if acir.neuronMonitorScraper != nil { + acir.neuronMonitorScraper.GetMetrics() //nolint:errcheck + } + for _, md := range mds { err := acir.nextConsumer.ConsumeMetrics(ctx, md) if err != nil {