Skip to content

Commit

Permalink
Add Neuron Scraper for scraping neuron monitor metrics (open-telemetr…
Browse files Browse the repository at this point in the history
…y#184)

* Add Neuron monitor scraper to scrape metrics from NeuronMonitor

---------

Co-authored-by: Hyunsoo Kim <hsookim@amazon.com>
Co-authored-by: Aditya Purang <puranga@amazon.com>
  • Loading branch information
3 people authored Mar 21, 2024
1 parent 34bd73a commit 7441665
Show file tree
Hide file tree
Showing 23 changed files with 1,323 additions and 396 deletions.
2 changes: 1 addition & 1 deletion exporter/awsemfexporter/internal/appsignals/useragent.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package appsignals
package appsignals // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/appsignals"

import (
"context"
Expand Down
3 changes: 2 additions & 1 deletion exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri

if serviceName, ok := rm.Resource().Attributes().Get("service.name"); ok {
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") {
strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsNeuronMonitorScraper") {
// the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights
metricReceiver = containerInsightsReceiver
}
Expand Down
16 changes: 16 additions & 0 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
containerInsightMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsKubeAPIServerScraper")
gpuMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
gpuMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsDCGMExporterScraper")
neuronMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
neuronMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNeuronMonitorScraper")

counterSumMetrics := map[string]*metricInfo{
"spanCounter": {
Expand Down Expand Up @@ -390,6 +392,20 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
"myServiceNS/containerInsightsDCGMExporterScraper",
containerInsightsReceiver,
},
{
"neuron monitor receiver",
neuronMetric,
map[string]string{
"isItAnError": "false",
"spanName": "testSpan",
},
map[string]string{
oTellibDimensionKey: "cloudwatch-lib",
"spanName": "testSpan",
},
"myServiceNS/containerInsightsNeuronMonitorScraper",
containerInsightsReceiver,
},
}

for _, tc := range testCases {
Expand Down
11 changes: 6 additions & 5 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ const (
TypeContainerDiskIO = "ContainerDiskIO"
// Special type for pause container
// because containerd does not set container name pause container name to POD like docker does.
TypeInfraContainer = "InfraContainer"
TypeGpuContainer = "ContainerGPU"
TypeGpuPod = "PodGPU"
TypeGpuNode = "NodeGPU"
TypeGpuCluster = "ClusterGPU"
TypeInfraContainer = "InfraContainer"
TypeGpuContainer = "ContainerGPU"
TypeGpuPod = "PodGPU"
TypeGpuNode = "NodeGPU"
TypeGpuCluster = "ClusterGPU"
TypeNeuronContainer = "ContainerNeuron"

// unit
UnitBytes = "Bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package extractors
package extractors // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package gpu
package gpu // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu"

import (
"context"
"errors"
"fmt"
"time"

configutil "github.com/prometheus/common/config"
Expand All @@ -15,14 +12,8 @@ import (
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/model/relabel"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
)

const (
Expand All @@ -33,77 +24,13 @@ const (
scraperK8sServiceSelector = "k8s-app=dcgm-exporter-service"
)

type DcgmScraper struct {
ctx context.Context
settings component.TelemetrySettings
host component.Host
hostInfoProvider hostInfoProvider
prometheusReceiver receiver.Metrics
k8sDecorator Decorator
running bool
}

type DcgmScraperOpts struct {
Ctx context.Context
TelemetrySettings component.TelemetrySettings
Consumer consumer.Metrics
Host component.Host
HostInfoProvider hostInfoProvider
K8sDecorator Decorator
Logger *zap.Logger
}

type hostInfoProvider interface {
GetClusterName() string
GetInstanceID() string
GetInstanceType() string
}

func NewDcgmScraper(opts DcgmScraperOpts) (*DcgmScraper, error) {
if opts.Consumer == nil {
return nil, errors.New("consumer cannot be nil")
}
if opts.Host == nil {
return nil, errors.New("host cannot be nil")
}
if opts.HostInfoProvider == nil {
return nil, errors.New("cluster name provider cannot be nil")
}

promConfig := prometheusreceiver.Config{
PrometheusConfig: &config.Config{
ScrapeConfigs: []*config.ScrapeConfig{getScraperConfig(opts.HostInfoProvider)},
},
}

params := receiver.CreateSettings{
TelemetrySettings: opts.TelemetrySettings,
}

decoConsumer := decorateConsumer{
containerOrchestrator: ci.EKS,
nextConsumer: opts.Consumer,
k8sDecorator: opts.K8sDecorator,
logger: opts.Logger,
}

promFactory := prometheusreceiver.NewFactory()
promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, &decoConsumer)
if err != nil {
return nil, fmt.Errorf("failed to create prometheus receiver: %w", err)
}

return &DcgmScraper{
ctx: opts.Ctx,
settings: opts.TelemetrySettings,
host: opts.Host,
hostInfoProvider: opts.HostInfoProvider,
prometheusReceiver: promReceiver,
k8sDecorator: opts.K8sDecorator,
}, nil
}

func getScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig {
func GetScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig {
return &config.ScrapeConfig{
HTTPClientConfig: configutil.HTTPClientConfig{
TLSConfig: configutil.TLSConfig{
Expand Down Expand Up @@ -209,35 +136,3 @@ func getMetricRelabelConfig(hostInfoProvider hostInfoProvider) []*relabel.Config
},
}
}

func (ds *DcgmScraper) GetMetrics() []pmetric.Metrics {
// This method will never return metrics because the metrics are collected by the scraper.
// This method will ensure the scraper is running
if !ds.running {
ds.settings.Logger.Info("The scraper is not running, starting up the scraper")
err := ds.prometheusReceiver.Start(ds.ctx, ds.host)
if err != nil {
ds.settings.Logger.Error("Unable to start PrometheusReceiver", zap.Error(err))
}
ds.running = err == nil
}

return nil
}

func (ds *DcgmScraper) Shutdown() {
if ds.running {
err := ds.prometheusReceiver.Shutdown(ds.ctx)
if err != nil {
ds.settings.Logger.Error("Unable to shutdown PrometheusReceiver", zap.Error(err))
}
ds.running = false
}

if ds.k8sDecorator != nil {
err := ds.k8sDecorator.Shutdown()
if err != nil {
ds.settings.Logger.Error("Unable to shutdown K8sDecorator", zap.Error(err))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/mocks"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
)
Expand Down Expand Up @@ -108,42 +109,6 @@ func (m mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) erro
return nil
}

func TestNewDcgmScraperBadInputs(t *testing.T) {
settings := componenttest.NewNopTelemetrySettings()
settings.Logger, _ = zap.NewDevelopment()

tests := []DcgmScraperOpts{
{
Ctx: context.TODO(),
TelemetrySettings: settings,
Consumer: nil,
Host: componenttest.NewNopHost(),
HostInfoProvider: mockHostInfoProvider{},
},
{
Ctx: context.TODO(),
TelemetrySettings: settings,
Consumer: mockConsumer{},
Host: nil,
HostInfoProvider: mockHostInfoProvider{},
},
{
Ctx: context.TODO(),
TelemetrySettings: settings,
Consumer: mockConsumer{},
Host: componenttest.NewNopHost(),
HostInfoProvider: nil,
},
}

for _, tt := range tests {
scraper, err := NewDcgmScraper(tt)

assert.Error(t, err)
assert.Nil(t, scraper)
}
}

func TestNewDcgmScraperEndToEnd(t *testing.T) {
expected := map[string]struct {
value float64
Expand Down Expand Up @@ -189,16 +154,17 @@ func TestNewDcgmScraperEndToEnd(t *testing.T) {
settings := componenttest.NewNopTelemetrySettings()
settings.Logger, _ = zap.NewDevelopment()

scraper, err := NewDcgmScraper(DcgmScraperOpts{
scraper, err := prometheusscraper.NewSimplePrometheusScraper(prometheusscraper.SimplePrometheusScraperOpts{
Ctx: context.TODO(),
TelemetrySettings: settings,
Consumer: consumer,
Host: componenttest.NewNopHost(),
HostInfoProvider: mockHostInfoProvider{},
K8sDecorator: mockDecorator{},
ScraperConfigs: GetScraperConfig(mockHostInfoProvider{}),
Logger: settings.Logger,
})
assert.NoError(t, err)
assert.Equal(t, mockHostInfoProvider{}, scraper.hostInfoProvider)
assert.Equal(t, mockHostInfoProvider{}, scraper.HostInfoProvider)

// build up a new PR
promFactory := prometheusreceiver.NewFactory()
Expand All @@ -214,7 +180,7 @@ func TestNewDcgmScraperEndToEnd(t *testing.T) {
mp, cfg, err := mocks.SetupMockPrometheus(targets...)
assert.NoError(t, err)

scrapeConfig := getScraperConfig(scraper.hostInfoProvider)
scrapeConfig := scraper.ScraperConfigs
scrapeConfig.ScrapeInterval = cfg.ScrapeConfigs[0].ScrapeInterval
scrapeConfig.ScrapeTimeout = cfg.ScrapeConfigs[0].ScrapeInterval
scrapeConfig.Scheme = "http"
Expand Down Expand Up @@ -245,9 +211,10 @@ func TestNewDcgmScraperEndToEnd(t *testing.T) {

// replace the prom receiver
params := receiver.CreateSettings{
TelemetrySettings: scraper.settings,
TelemetrySettings: scraper.Settings,
}
scraper.prometheusReceiver, err = promFactory.CreateMetricsReceiver(scraper.ctx, params, &promConfig, consumer)
scraper.PrometheusReceiver, err = promFactory.CreateMetricsReceiver(scraper.Ctx, params, &promConfig, consumer)

assert.NoError(t, err)
assert.NotNil(t, mp)
defer mp.Close()
Expand Down
Loading

0 comments on commit 7441665

Please sign in to comment.