diff --git a/extension/entitystore/extension.go b/extension/entitystore/extension.go index 7f5c71389f..776330d77d 100644 --- a/extension/entitystore/extension.go +++ b/extension/entitystore/extension.go @@ -44,6 +44,7 @@ type serviceProviderInterface interface { logFileServiceAttribute(LogFileGlob, LogGroupName) ServiceAttribute getServiceNameAndSource() (string, string) getAutoScalingGroup() string + setAutoScalingGroup(string) } type EntityStore struct { @@ -185,6 +186,12 @@ func (e *EntityStore) GetAutoScalingGroup() string { return e.serviceprovider.getAutoScalingGroup() } +func (e *EntityStore) SetAutoScalingGroup(asg string) { + if e.serviceprovider != nil { + e.serviceprovider.setAutoScalingGroup(asg) + } +} + // AddServiceAttrEntryForLogFile adds an entry to the entity store for the provided file glob -> (serviceName, environmentName) key-value pair func (e *EntityStore) AddServiceAttrEntryForLogFile(fileGlob LogFileGlob, serviceName string, environmentName string) { if e.serviceprovider != nil { diff --git a/extension/entitystore/extension_test.go b/extension/entitystore/extension_test.go index 982be1ee47..9bb4da8365 100644 --- a/extension/entitystore/extension_test.go +++ b/extension/entitystore/extension_test.go @@ -69,6 +69,10 @@ func (s *mockServiceProvider) getAutoScalingGroup() string { return args.Get(0).(string) } +func (s *mockServiceProvider) setAutoScalingGroup(asg string) { + s.Called(asg) +} + type mockMetadataProvider struct { InstanceIdentityDocument *ec2metadata.EC2InstanceIdentityDocument Tags map[string]string @@ -688,6 +692,17 @@ func TestEntityStore_ServiceProviderInDifferentEnv(t *testing.T) { } +func TestEntityStore_SetAutoScalingGroup(t *testing.T) { + e := &EntityStore{} + sp := new(mockServiceProvider) + sp.On("setAutoScalingGroup", "asg-name").Return() + e.serviceprovider = sp + + e.SetAutoScalingGroup("asg-name") + + sp.AssertExpectations(t) +} + func assertIfNonEmpty(t *testing.T, message string, pattern string) { if pattern != "" { assert.NotContains(t, message, pattern) diff --git a/extension/entitystore/serviceprovider.go b/extension/entitystore/serviceprovider.go index 5acee79a6a..188d69c5d3 100644 --- a/extension/entitystore/serviceprovider.go +++ b/extension/entitystore/serviceprovider.go @@ -53,13 +53,18 @@ type ServiceAttribute struct { type LogGroupName string type LogFileGlob string +type autoscalinggroup struct { + name string + once sync.Once +} + type serviceprovider struct { mode string ec2Info *EC2Info metadataProvider ec2metadataprovider.MetadataProvider iamRole string imdsServiceName string - autoScalingGroup string + autoScalingGroup autoscalinggroup region string done chan struct{} logger *zap.Logger @@ -101,7 +106,21 @@ func (s *serviceprovider) GetIMDSServiceName() string { func (s *serviceprovider) getAutoScalingGroup() string { s.mutex.RLock() defer s.mutex.RUnlock() - return s.autoScalingGroup + return s.autoScalingGroup.name +} + +func (s *serviceprovider) setAutoScalingGroup(asg string) { + s.autoScalingGroup.once.Do(func() { + s.mutex.Lock() + defer s.mutex.Unlock() + + if asgLength := len(asg); asgLength > autoScalingGroupSizeMax { + s.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax)) + s.autoScalingGroup.name = "" + } else { + s.autoScalingGroup.name = asg + } + }) } // addEntryForLogFile adds an association between a log file glob and a service attribute, as configured in the @@ -275,15 +294,9 @@ func (s *serviceprovider) scrapeImdsServiceNameAndASG() error { // case sensitive if originalCaseKey := lowerTagKeys[strings.ToLower(ec2tagger.Ec2InstanceTagKeyASG)]; originalCaseKey == ec2tagger.Ec2InstanceTagKeyASG { asg, err := s.metadataProvider.InstanceTagValue(context.Background(), ec2tagger.Ec2InstanceTagKeyASG) - if err == nil { + if err == nil && asg != "" { s.logger.Debug("AutoScalingGroup retrieved through IMDS") - s.mutex.Lock() - s.autoScalingGroup = asg - if asgLength := len(s.autoScalingGroup); asgLength > autoScalingGroupSizeMax { - s.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax)) - s.autoScalingGroup = "" - } - s.mutex.Unlock() + s.setAutoScalingGroup(asg) } } diff --git a/extension/entitystore/serviceprovider_test.go b/extension/entitystore/serviceprovider_test.go index e2c5bcad08..21cdb1d985 100644 --- a/extension/entitystore/serviceprovider_test.go +++ b/extension/entitystore/serviceprovider_test.go @@ -203,10 +203,10 @@ func Test_serviceprovider_serviceAttributeFromAsg(t *testing.T) { s := &serviceprovider{} assert.Equal(t, ServiceAttribute{}, s.serviceAttributeFromAsg()) - s = &serviceprovider{autoScalingGroup: ""} + s = &serviceprovider{autoScalingGroup: autoscalinggroup{name: ""}} assert.Equal(t, ServiceAttribute{}, s.serviceAttributeFromAsg()) - s = &serviceprovider{autoScalingGroup: "test-asg"} + s = &serviceprovider{autoScalingGroup: autoscalinggroup{name: "test-asg"}} assert.Equal(t, ServiceAttribute{Environment: "ec2:test-asg"}, s.serviceAttributeFromAsg()) } @@ -231,7 +231,7 @@ func Test_serviceprovider_logFileServiceAttribute(t *testing.T) { assert.Equal(t, ServiceAttribute{ServiceName: ServiceNameUnknown, ServiceNameSource: ServiceNameSourceUnknown, Environment: "ec2:default"}, s.logFileServiceAttribute("glob", "group")) - s.autoScalingGroup = "test-asg" + s.autoScalingGroup = autoscalinggroup{name: "test-asg"} assert.Equal(t, ServiceAttribute{ServiceName: ServiceNameUnknown, ServiceNameSource: ServiceNameSourceUnknown, Environment: "ec2:test-asg"}, s.logFileServiceAttribute("glob", "group")) s.iamRole = "test-role" @@ -414,3 +414,36 @@ func Test_serviceprovider_scrapeAndgetImdsServiceNameAndASG(t *testing.T) { }) } } + +func Test_serviceprovider_setAutoScalingGroup(t *testing.T) { + tests := []struct { + name string + asgs []string + want string + }{ + { + name: "setAutoScalingGroup called once", + asgs: []string{"test-asg"}, + want: "test-asg", + }, + { + name: "setAutoScalingGroup called multiple times", + asgs: []string{"test-asg", "test-asg2", "test-asg3", "test-asg4"}, + want: "test-asg", + }, + { + name: "setAutoScalingGroup not called", + asgs: []string{}, + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &serviceprovider{} + for _, asg := range tt.asgs { + s.setAutoScalingGroup(asg) + } + assert.Equal(t, tt.want, s.getAutoScalingGroup()) + }) + } +} diff --git a/plugins/processors/awsentity/processor.go b/plugins/processors/awsentity/processor.go index bd1a63b079..7334bebd46 100644 --- a/plugins/processors/awsentity/processor.go +++ b/plugins/processors/awsentity/processor.go @@ -20,11 +20,12 @@ import ( ) const ( - attributeAwsLogGroupNames = "aws.log.group.names" - attributeDeploymentEnvironment = "deployment.environment" - attributeServiceName = "service.name" - attributeService = "Service" - EMPTY = "" + attributeAwsLogGroupNames = "aws.log.group.names" + attributeDeploymentEnvironment = "deployment.environment" + attributeServiceName = "service.name" + attributeService = "Service" + attributeEC2TagAwsAutoscalingGroupName = "ec2.tag.aws:autoscaling:groupName" + EMPTY = "" ) type scraper interface { @@ -67,6 +68,14 @@ var addPodToServiceEnvironmentMap = func(podName string, serviceName string, env es.AddPodServiceEnvironmentMapping(podName, serviceName, environmentName, serviceNameSource) } +var setAutoScalingGroup = func(asg string) { + es := entitystore.GetEntityStore() + if es == nil { + return + } + es.SetAutoScalingGroup(asg) +} + var getEC2InfoFromEntityStore = func() entitystore.EC2Info { es := entitystore.GetEntityStore() if es == nil { @@ -143,6 +152,10 @@ func (p *awsEntityProcessor) processMetrics(_ context.Context, md pmetric.Metric if serviceNameSource, sourceExists := resourceAttrs.Get(entityattributes.AttributeEntityServiceNameSource); sourceExists { entityServiceNameSource = serviceNameSource.Str() } + // resourcedetection processor may have picked up the ASG name from an ec2:DescribeTags call + if autoScalingGroupNameAttr, ok := resourceAttrs.Get(attributeEC2TagAwsAutoscalingGroupName); ok { + setAutoScalingGroup(autoScalingGroupNameAttr.Str()) + } entityServiceName := getServiceAttributes(resourceAttrs) entityEnvironmentName := environmentName diff --git a/plugins/processors/awsentity/processor_test.go b/plugins/processors/awsentity/processor_test.go index f9051975bd..5506c986c2 100644 --- a/plugins/processors/awsentity/processor_test.go +++ b/plugins/processors/awsentity/processor_test.go @@ -23,6 +23,7 @@ import ( type mockEntityStore struct { entries []entityStoreEntry podToServiceEnvironmentMap map[string]entitystore.ServiceEnvironment + autoScalingGroup string } type entityStoreEntry struct { @@ -75,6 +76,12 @@ func newMockGetAutoScalingGroupFromEntityStore(asg string) func() string { } } +func newMockSetAutoScalingGroup(es *mockEntityStore) func(string) { + return func(asg string) { + es.autoScalingGroup = asg + } +} + // This helper function creates a test logger // so that it can send the log messages into a // temporary buffer for pattern matching @@ -551,6 +558,58 @@ func TestAWSEntityProcessorNoSensitiveInfoInLogs(t *testing.T) { } } +func TestAWSEntityProcessorSetAutoScalingGroup(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + resourceAttrs []string + want string + }{ + { + name: "ASGPopulatedFromResourceDetection", + resourceAttrs: []string{attributeEC2TagAwsAutoscalingGroupName, "test-asg"}, + want: "test-asg", + }, + { + name: "MultipleResourceAttributes", + resourceAttrs: []string{attributeEC2TagAwsAutoscalingGroupName, "test-asg", attributeAwsLogGroupNames, "log-group"}, + want: "test-asg", + }, + { + name: "ASGNotPopulated", + resourceAttrs: []string{attributeAwsLogGroupNames, "log-group"}, + want: "", + }, + { + name: "ResourceAttributesEmpty", + resourceAttrs: []string{}, + want: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resetSetAutoScalingGroup := setAutoScalingGroup + defer func() { + setAutoScalingGroup = resetSetAutoScalingGroup + }() + + es := newMockEntityStore() + setAutoScalingGroup = newMockSetAutoScalingGroup(es) + metrics := generateMetrics(tt.resourceAttrs...) + + p := newAwsEntityProcessor(&Config{EntityType: attributeService}, zap.NewNop()) + _, err := p.processMetrics(ctx, metrics) + assert.NoError(t, err) + + if len(tt.resourceAttrs) > 0 { + assert.Equal(t, tt.want, es.autoScalingGroup) + } + }) + } +} + func generateTestMetrics() pmetric.Metrics { md := pmetric.NewMetrics() rm := md.ResourceMetrics().AppendEmpty() diff --git a/translator/tocwconfig/sampleConfig/base_appsignals_config.yaml b/translator/tocwconfig/sampleConfig/base_appsignals_config.yaml index 2c5d844ff3..ce4711920e 100644 --- a/translator/tocwconfig/sampleConfig/base_appsignals_config.yaml +++ b/translator/tocwconfig/sampleConfig/base_appsignals_config.yaml @@ -213,6 +213,9 @@ processors: resolvers: - name: "" platform: generic + awsentity/service/application_signals: + entity_type: Service + platform: onPremise metricstransform/application_signals: transforms: - action: update @@ -1371,6 +1374,7 @@ service: - metricstransform/application_signals - resourcedetection - awsapplicationsignals + - awsentity/service/application_signals receivers: - otlp/application_signals traces/application_signals: diff --git a/translator/tocwconfig/sampleConfig/base_appsignals_fallback_config.yaml b/translator/tocwconfig/sampleConfig/base_appsignals_fallback_config.yaml index cf91c88e64..9f050852d0 100644 --- a/translator/tocwconfig/sampleConfig/base_appsignals_fallback_config.yaml +++ b/translator/tocwconfig/sampleConfig/base_appsignals_fallback_config.yaml @@ -209,6 +209,9 @@ processors: resolvers: - name: "" platform: generic + awsentity/service/application_signals: + entity_type: Service + platform: onPremise metricstransform/application_signals: transforms: - action: update @@ -1366,6 +1369,7 @@ service: - metricstransform/application_signals - resourcedetection - awsapplicationsignals + - awsentity/service/application_signals receivers: - otlp/application_signals traces/application_signals: diff --git a/translator/translate/otel/pipeline/applicationsignals/translator.go b/translator/translate/otel/pipeline/applicationsignals/translator.go index 2f8afbd37c..47706a1317 100644 --- a/translator/translate/otel/pipeline/applicationsignals/translator.go +++ b/translator/translate/otel/pipeline/applicationsignals/translator.go @@ -9,7 +9,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" - "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awsemf" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awsxray" @@ -21,6 +20,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricstransformprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/resourcedetection" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/otlp" + "github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil" ) type translator struct { @@ -59,11 +59,12 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators translators.Processors.Set(metricstransformprocessor.NewTranslatorWithName(common.AppSignals)) } - mode := context.CurrentContext().KubernetesMode() translators.Processors.Set(resourcedetection.NewTranslator(resourcedetection.WithDataType(t.dataType))) translators.Processors.Set(awsapplicationsignals.NewTranslator(awsapplicationsignals.WithDataType(t.dataType))) - if t.dataType == component.DataTypeMetrics && mode != "" { + // ECS is not in scope for entity association, so we only add the entity processor in non-ECS platforms + isECS := ecsutil.GetECSUtilSingleton().IsECS() + if t.dataType == component.DataTypeMetrics && !isECS { translators.Processors.Set(awsentity.NewTranslatorWithEntityType(awsentity.Service, common.AppSignals, false)) } diff --git a/translator/translate/otel/pipeline/applicationsignals/translator_test.go b/translator/translate/otel/pipeline/applicationsignals/translator_test.go index ae749c20c5..ae3c94ead3 100644 --- a/translator/translate/otel/pipeline/applicationsignals/translator_test.go +++ b/translator/translate/otel/pipeline/applicationsignals/translator_test.go @@ -16,6 +16,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + "github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil" "github.com/aws/amazon-cloudwatch-agent/translator/util/eksdetector" ) @@ -195,6 +196,7 @@ func TestTranslatorMetricsForKubernetes(t *testing.T) { }) } } + func TestTranslatorMetricsForEC2(t *testing.T) { type want struct { receivers []string @@ -225,7 +227,7 @@ func TestTranslatorMetricsForEC2(t *testing.T) { }, want: &want{ receivers: []string{"otlp/application_signals"}, - processors: []string{"metricstransform/application_signals", "resourcedetection", "awsapplicationsignals"}, + processors: []string{"metricstransform/application_signals", "resourcedetection", "awsapplicationsignals", "awsentity/service/application_signals"}, exporters: []string{"awsemf/application_signals"}, extensions: []string{"agenthealth/logs", "agenthealth/statuscode"}, }, @@ -245,7 +247,7 @@ func TestTranslatorMetricsForEC2(t *testing.T) { }, want: &want{ receivers: []string{"otlp/application_signals"}, - processors: []string{"metricstransform/application_signals", "resourcedetection", "awsapplicationsignals"}, + processors: []string{"metricstransform/application_signals", "resourcedetection", "awsapplicationsignals", "awsentity/service/application_signals"}, exporters: []string{"debug/application_signals", "awsemf/application_signals"}, extensions: []string{"agenthealth/logs", "agenthealth/statuscode"}, }, @@ -273,3 +275,78 @@ func TestTranslatorMetricsForEC2(t *testing.T) { }) } } + +// TestTranslatorMetricsForECS tests that the awsentity processor is not added +func TestTranslatorMetricsForECS(t *testing.T) { + type want struct { + receivers []string + processors []string + exporters []string + extensions []string + } + tt := NewTranslator(component.DataTypeMetrics) + assert.EqualValues(t, "metrics/application_signals", tt.ID().String()) + testCases := map[string]struct { + input map[string]interface{} + want *want + wantErr error + }{ + "WithoutMetricsCollectedKey": { + input: map[string]interface{}{}, + wantErr: &common.MissingKeyError{ID: tt.ID(), JsonKey: fmt.Sprint(common.AppSignalsMetrics)}, + }, + "WithAppSignalsEnabledMetrics": { + input: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "application_signals": map[string]interface{}{}, + }, + }, + }, + want: &want{ + receivers: []string{"otlp/application_signals"}, + processors: []string{"metricstransform/application_signals", "resourcedetection", "awsapplicationsignals"}, + exporters: []string{"awsemf/application_signals"}, + extensions: []string{"agenthealth/logs", "agenthealth/statuscode"}, + }, + }, + "WithAppSignalsAndLoggingEnabled": { + input: map[string]interface{}{ + "agent": map[string]interface{}{ + "debug": true, + }, + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "application_signals": map[string]interface{}{}, + }, + }, + }, + want: &want{ + receivers: []string{"otlp/application_signals"}, + processors: []string{"metricstransform/application_signals", "resourcedetection", "awsapplicationsignals"}, + exporters: []string{"debug/application_signals", "awsemf/application_signals"}, + extensions: []string{"agenthealth/logs", "agenthealth/statuscode"}, + }, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + context.CurrentContext().SetRunInContainer(true) + t.Setenv(config.RUN_IN_CONTAINER, config.RUN_IN_CONTAINER_TRUE) + ecsutil.GetECSUtilSingleton().Region = "test" + + conf := confmap.NewFromStringMap(testCase.input) + got, err := tt.Translate(conf) + assert.Equal(t, testCase.wantErr, err) + if testCase.want == nil { + assert.Nil(t, got) + } else { + require.NotNil(t, got) + assert.Equal(t, testCase.want.receivers, collections.MapSlice(got.Receivers.Keys(), component.ID.String)) + assert.Equal(t, testCase.want.processors, collections.MapSlice(got.Processors.Keys(), component.ID.String)) + assert.Equal(t, testCase.want.exporters, collections.MapSlice(got.Exporters.Keys(), component.ID.String)) + assert.Equal(t, testCase.want.extensions, collections.MapSlice(got.Extensions.Keys(), component.ID.String)) + } + }) + } +}