Skip to content

Commit

Permalink
Support Application Signals .NET runtime metrics exporting (#1471)
Browse files Browse the repository at this point in the history
Co-authored-by: Kaushik Surya <108111936+sky333999@users.noreply.github.com>
  • Loading branch information
bjrara and sky333999 authored Jan 8, 2025
1 parent 20f6bb4 commit da53e1c
Show file tree
Hide file tree
Showing 13 changed files with 2,101 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package metrichandlers

import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type aggregationType int

const (
defaultAggregation aggregationType = iota
lastValueAggregation
)

// AggregationMutator is used to convert predefined ObservableUpDownCounter metrics to use LastValue metrichandlers. This
// is necessary for cases where metrics are instrumented as cumulative, yet reported with snapshot values.
//
// For example, metrics like DotNetGCGen0HeapSize may report values such as 1000, 2000, 1000, with cumulative temporality
// When exporters, such as the EMF exporter, detect these as cumulative, they convert the values to deltas,
// resulting in outputs like -, 1000, -1000, which misrepresent the data.
//
// Normally, this issue could be resolved by configuring a view with LastValue metrichandlers within the SDK.
// However, since the view feature is not fully supported in .NET, this workaround implements the required
// conversion to LastValue metrichandlers to ensure accurate metric reporting.
// See https://github.com/open-telemetry/opentelemetry-dotnet/issues/2618.
type AggregationMutator struct {
includes map[string]aggregationType
}

func NewAggregationMutator() AggregationMutator {
return newAggregationMutatorWithConfig(map[string]aggregationType{
"DotNetGCGen0HeapSize": lastValueAggregation,
"DotNetGCGen1HeapSize": lastValueAggregation,
"DotNetGCGen2HeapSize": lastValueAggregation,
"DotNetGCLOHHeapSize": lastValueAggregation,
"DotNetGCPOHHeapSize": lastValueAggregation,
"DotNetThreadCount": lastValueAggregation,
"DotNetThreadQueueLength": lastValueAggregation,
})
}

func newAggregationMutatorWithConfig(includes map[string]aggregationType) AggregationMutator {
return AggregationMutator{
includes,
}
}

func (t *AggregationMutator) ProcessMetrics(_ context.Context, m pmetric.Metric, _ pcommon.Map) {
aggType, exists := t.includes[m.Name()]
if !exists || aggType == defaultAggregation {
return
}
switch m.Type() {
case pmetric.MetricTypeSum:
switch aggType {
case lastValueAggregation:
m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
default:
}
default:
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package metrichandlers

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func TestAggregationMutator_ProcessMetrics(t *testing.T) {
tests := []struct {
name string
config map[string]aggregationType
metrics []pmetric.Metric
expectedTemporality map[string]pmetric.AggregationTemporality
}{
{
"testCumulativeToDelta",
map[string]aggregationType{
"test0": lastValueAggregation,
},

[]pmetric.Metric{
generateMetricWithSumAggregation("test0", pmetric.AggregationTemporalityCumulative),
},
map[string]pmetric.AggregationTemporality{
"test0": pmetric.AggregationTemporalityDelta,
},
},
{
"testNoChange",
map[string]aggregationType{
"test0": lastValueAggregation,
"test1": defaultAggregation,
},
[]pmetric.Metric{
generateMetricWithSumAggregation("test0", pmetric.AggregationTemporalityDelta),
generateMetricWithSumAggregation("test1", pmetric.AggregationTemporalityCumulative),
generateMetricWithSumAggregation("test2", pmetric.AggregationTemporalityCumulative),
},
map[string]pmetric.AggregationTemporality{
"test0": pmetric.AggregationTemporalityDelta,
"test1": pmetric.AggregationTemporalityCumulative,
"test2": pmetric.AggregationTemporalityCumulative,
},
},
}

ctx := context.Background()
for _, tt := range tests {
t.Run(tt.name, func(t1 *testing.T) {
mutator := newAggregationMutatorWithConfig(tt.config)

for _, m := range tt.metrics {
mutator.ProcessMetrics(ctx, m, pcommon.NewMap())
assert.Equal(t1, tt.expectedTemporality[m.Name()], m.Sum().AggregationTemporality())
}
})
}

mutator := NewAggregationMutator()

m := generateMetricWithSumAggregation("DotNetGCGen0HeapSize", pmetric.AggregationTemporalityCumulative)
mutator.ProcessMetrics(ctx, m, pcommon.NewMap())
assert.Equal(t, pmetric.MetricTypeSum, m.Type())
assert.Equal(t, pmetric.AggregationTemporalityDelta, m.Sum().AggregationTemporality())

m.SetEmptyHistogram()
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
mutator.ProcessMetrics(ctx, m, pcommon.NewMap())
assert.Equal(t, pmetric.MetricTypeHistogram, m.Type())
assert.Equal(t, pmetric.AggregationTemporalityCumulative, m.Histogram().AggregationTemporality())

}

func generateMetricWithSumAggregation(metricName string, temporality pmetric.AggregationTemporality) pmetric.Metric {
m := pmetric.NewMetrics().ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName(metricName)
m.SetEmptySum()
m.Sum().SetAggregationTemporality(temporality)
return m
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package prune
package metrichandlers

import (
"errors"
Expand All @@ -12,10 +12,10 @@ import (
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
)

type MetricPruner struct {
type Pruner struct {
}

func (p *MetricPruner) ShouldBeDropped(attributes pcommon.Map) (bool, error) {
func (p *Pruner) ShouldBeDropped(attributes pcommon.Map) (bool, error) {
for _, attributeKey := range common.CWMetricAttributes {
if val, ok := attributes.Get(attributeKey); ok {
if !isAsciiPrintable(val.Str()) {
Expand All @@ -29,8 +29,8 @@ func (p *MetricPruner) ShouldBeDropped(attributes pcommon.Map) (bool, error) {
return false, nil
}

func NewPruner() *MetricPruner {
return &MetricPruner{}
func NewPruner() *Pruner {
return &Pruner{}
}

func isAsciiPrintable(val string) bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package prune
package metrichandlers

import (
"testing"
Expand Down Expand Up @@ -41,7 +41,7 @@ func TestMetricPrunerWithIndexableAttribute(t *testing.T) {
},
}

p := &MetricPruner{}
p := &Pruner{}
for _, tt := range tests {
attributes := pcommon.NewMap()
attributes.PutStr(common.MetricAttributeTelemetrySource, "UnitTest")
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestMetricPrunerWithNonIndexableAttribute(t *testing.T) {
},
}

p := &MetricPruner{}
p := &Pruner{}
for _, tt := range tests {
attributes := pcommon.NewMap()
attributes.PutStr(common.MetricAttributeTelemetrySource, "UnitTest")
Expand All @@ -99,7 +99,7 @@ func TestMetricPrunerWithNoTelemetrySourceAttribute(t *testing.T) {
},
}

p := &MetricPruner{}
p := &Pruner{}
for _, tt := range tests {
attributes := pcommon.NewMap()
attributes.PutStr(common.AttributeEC2InstanceId, tt.val)
Expand Down
24 changes: 14 additions & 10 deletions plugins/processors/awsapplicationsignals/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (

appsignalsconfig "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/cardinalitycontrol"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/metrichandlers"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/normalizer"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/prune"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/resolver"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/rules"
)
Expand All @@ -44,14 +44,15 @@ type stopper interface {
}

type awsapplicationsignalsprocessor struct {
logger *zap.Logger
config *appsignalsconfig.Config
replaceActions *rules.ReplaceActions
allowlistMutators []allowListMutator
metricMutators []attributesMutator
traceMutators []attributesMutator
limiter cardinalitycontrol.Limiter
stoppers []stopper
logger *zap.Logger
config *appsignalsconfig.Config
replaceActions *rules.ReplaceActions
allowlistMutators []allowListMutator
metricMutators []attributesMutator
traceMutators []attributesMutator
limiter cardinalitycontrol.Limiter
aggregationMutator metrichandlers.AggregationMutator
stoppers []stopper
}

func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ component.Host) error {
Expand All @@ -76,11 +77,13 @@ func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ co

ap.replaceActions = rules.NewReplacer(ap.config.Rules, !limiterConfig.Disabled)

pruner := prune.NewPruner()
pruner := metrichandlers.NewPruner()
keeper := rules.NewKeeper(ap.config.Rules, !limiterConfig.Disabled)
dropper := rules.NewDropper(ap.config.Rules)
ap.allowlistMutators = []allowListMutator{pruner, keeper, dropper}

ap.aggregationMutator = metrichandlers.NewAggregationMutator()

return nil
}

Expand Down Expand Up @@ -143,6 +146,7 @@ func (ap *awsapplicationsignalsprocessor) processMetrics(ctx context.Context, md
m.SetName(metricCaser.String(m.Name())) // Ensure metric name is in sentence case
}
ap.processMetricAttributes(ctx, m, resourceAttributes)
ap.aggregationMutator.ProcessMetrics(ctx, m, resourceAttributes)
}
}
}
Expand Down
Loading

0 comments on commit da53e1c

Please sign in to comment.