Skip to content

Commit

Permalink
[awsentityprocessor] Use existing attributes from resource detection …
Browse files Browse the repository at this point in the history
…processor in application signals to populate entity (#1486)
  • Loading branch information
duhminick authored Jan 14, 2025
1 parent f7ef67a commit 09f8b09
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 23 deletions.
7 changes: 7 additions & 0 deletions extension/entitystore/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type serviceProviderInterface interface {
logFileServiceAttribute(LogFileGlob, LogGroupName) ServiceAttribute
getServiceNameAndSource() (string, string)
getAutoScalingGroup() string
setAutoScalingGroup(string)
}

type EntityStore struct {
Expand Down Expand Up @@ -185,6 +186,12 @@ func (e *EntityStore) GetAutoScalingGroup() string {
return e.serviceprovider.getAutoScalingGroup()
}

func (e *EntityStore) SetAutoScalingGroup(asg string) {
if e.serviceprovider != nil {
e.serviceprovider.setAutoScalingGroup(asg)
}
}

// AddServiceAttrEntryForLogFile adds an entry to the entity store for the provided file glob -> (serviceName, environmentName) key-value pair
func (e *EntityStore) AddServiceAttrEntryForLogFile(fileGlob LogFileGlob, serviceName string, environmentName string) {
if e.serviceprovider != nil {
Expand Down
15 changes: 15 additions & 0 deletions extension/entitystore/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (s *mockServiceProvider) getAutoScalingGroup() string {
return args.Get(0).(string)
}

func (s *mockServiceProvider) setAutoScalingGroup(asg string) {
s.Called(asg)
}

type mockMetadataProvider struct {
InstanceIdentityDocument *ec2metadata.EC2InstanceIdentityDocument
Tags map[string]string
Expand Down Expand Up @@ -688,6 +692,17 @@ func TestEntityStore_ServiceProviderInDifferentEnv(t *testing.T) {

}

func TestEntityStore_SetAutoScalingGroup(t *testing.T) {
e := &EntityStore{}
sp := new(mockServiceProvider)
sp.On("setAutoScalingGroup", "asg-name").Return()
e.serviceprovider = sp

e.SetAutoScalingGroup("asg-name")

sp.AssertExpectations(t)
}

func assertIfNonEmpty(t *testing.T, message string, pattern string) {
if pattern != "" {
assert.NotContains(t, message, pattern)
Expand Down
33 changes: 23 additions & 10 deletions extension/entitystore/serviceprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ type ServiceAttribute struct {
type LogGroupName string
type LogFileGlob string

type autoscalinggroup struct {
name string
once sync.Once
}

type serviceprovider struct {
mode string
ec2Info *EC2Info
metadataProvider ec2metadataprovider.MetadataProvider
iamRole string
imdsServiceName string
autoScalingGroup string
autoScalingGroup autoscalinggroup
region string
done chan struct{}
logger *zap.Logger
Expand Down Expand Up @@ -101,7 +106,21 @@ func (s *serviceprovider) GetIMDSServiceName() string {
func (s *serviceprovider) getAutoScalingGroup() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.autoScalingGroup
return s.autoScalingGroup.name
}

func (s *serviceprovider) setAutoScalingGroup(asg string) {
s.autoScalingGroup.once.Do(func() {
s.mutex.Lock()
defer s.mutex.Unlock()

if asgLength := len(asg); asgLength > autoScalingGroupSizeMax {
s.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax))
s.autoScalingGroup.name = ""
} else {
s.autoScalingGroup.name = asg
}
})
}

// addEntryForLogFile adds an association between a log file glob and a service attribute, as configured in the
Expand Down Expand Up @@ -275,15 +294,9 @@ func (s *serviceprovider) scrapeImdsServiceNameAndASG() error {
// case sensitive
if originalCaseKey := lowerTagKeys[strings.ToLower(ec2tagger.Ec2InstanceTagKeyASG)]; originalCaseKey == ec2tagger.Ec2InstanceTagKeyASG {
asg, err := s.metadataProvider.InstanceTagValue(context.Background(), ec2tagger.Ec2InstanceTagKeyASG)
if err == nil {
if err == nil && asg != "" {
s.logger.Debug("AutoScalingGroup retrieved through IMDS")
s.mutex.Lock()
s.autoScalingGroup = asg
if asgLength := len(s.autoScalingGroup); asgLength > autoScalingGroupSizeMax {
s.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax))
s.autoScalingGroup = ""
}
s.mutex.Unlock()
s.setAutoScalingGroup(asg)
}
}

Expand Down
39 changes: 36 additions & 3 deletions extension/entitystore/serviceprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ func Test_serviceprovider_serviceAttributeFromAsg(t *testing.T) {
s := &serviceprovider{}
assert.Equal(t, ServiceAttribute{}, s.serviceAttributeFromAsg())

s = &serviceprovider{autoScalingGroup: ""}
s = &serviceprovider{autoScalingGroup: autoscalinggroup{name: ""}}
assert.Equal(t, ServiceAttribute{}, s.serviceAttributeFromAsg())

s = &serviceprovider{autoScalingGroup: "test-asg"}
s = &serviceprovider{autoScalingGroup: autoscalinggroup{name: "test-asg"}}
assert.Equal(t, ServiceAttribute{Environment: "ec2:test-asg"}, s.serviceAttributeFromAsg())
}

Expand All @@ -231,7 +231,7 @@ func Test_serviceprovider_logFileServiceAttribute(t *testing.T) {

assert.Equal(t, ServiceAttribute{ServiceName: ServiceNameUnknown, ServiceNameSource: ServiceNameSourceUnknown, Environment: "ec2:default"}, s.logFileServiceAttribute("glob", "group"))

s.autoScalingGroup = "test-asg"
s.autoScalingGroup = autoscalinggroup{name: "test-asg"}
assert.Equal(t, ServiceAttribute{ServiceName: ServiceNameUnknown, ServiceNameSource: ServiceNameSourceUnknown, Environment: "ec2:test-asg"}, s.logFileServiceAttribute("glob", "group"))

s.iamRole = "test-role"
Expand Down Expand Up @@ -414,3 +414,36 @@ func Test_serviceprovider_scrapeAndgetImdsServiceNameAndASG(t *testing.T) {
})
}
}

func Test_serviceprovider_setAutoScalingGroup(t *testing.T) {
tests := []struct {
name string
asgs []string
want string
}{
{
name: "setAutoScalingGroup called once",
asgs: []string{"test-asg"},
want: "test-asg",
},
{
name: "setAutoScalingGroup called multiple times",
asgs: []string{"test-asg", "test-asg2", "test-asg3", "test-asg4"},
want: "test-asg",
},
{
name: "setAutoScalingGroup not called",
asgs: []string{},
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &serviceprovider{}
for _, asg := range tt.asgs {
s.setAutoScalingGroup(asg)
}
assert.Equal(t, tt.want, s.getAutoScalingGroup())
})
}
}
23 changes: 18 additions & 5 deletions plugins/processors/awsentity/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
)

const (
attributeAwsLogGroupNames = "aws.log.group.names"
attributeDeploymentEnvironment = "deployment.environment"
attributeServiceName = "service.name"
attributeService = "Service"
EMPTY = ""
attributeAwsLogGroupNames = "aws.log.group.names"
attributeDeploymentEnvironment = "deployment.environment"
attributeServiceName = "service.name"
attributeService = "Service"
attributeEC2TagAwsAutoscalingGroupName = "ec2.tag.aws:autoscaling:groupName"
EMPTY = ""
)

type scraper interface {
Expand Down Expand Up @@ -67,6 +68,14 @@ var addPodToServiceEnvironmentMap = func(podName string, serviceName string, env
es.AddPodServiceEnvironmentMapping(podName, serviceName, environmentName, serviceNameSource)
}

var setAutoScalingGroup = func(asg string) {
es := entitystore.GetEntityStore()
if es == nil {
return
}
es.SetAutoScalingGroup(asg)
}

var getEC2InfoFromEntityStore = func() entitystore.EC2Info {
es := entitystore.GetEntityStore()
if es == nil {
Expand Down Expand Up @@ -143,6 +152,10 @@ func (p *awsEntityProcessor) processMetrics(_ context.Context, md pmetric.Metric
if serviceNameSource, sourceExists := resourceAttrs.Get(entityattributes.AttributeEntityServiceNameSource); sourceExists {
entityServiceNameSource = serviceNameSource.Str()
}
// resourcedetection processor may have picked up the ASG name from an ec2:DescribeTags call
if autoScalingGroupNameAttr, ok := resourceAttrs.Get(attributeEC2TagAwsAutoscalingGroupName); ok {
setAutoScalingGroup(autoScalingGroupNameAttr.Str())
}

entityServiceName := getServiceAttributes(resourceAttrs)
entityEnvironmentName := environmentName
Expand Down
59 changes: 59 additions & 0 deletions plugins/processors/awsentity/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type mockEntityStore struct {
entries []entityStoreEntry
podToServiceEnvironmentMap map[string]entitystore.ServiceEnvironment
autoScalingGroup string
}

type entityStoreEntry struct {
Expand Down Expand Up @@ -75,6 +76,12 @@ func newMockGetAutoScalingGroupFromEntityStore(asg string) func() string {
}
}

func newMockSetAutoScalingGroup(es *mockEntityStore) func(string) {
return func(asg string) {
es.autoScalingGroup = asg
}
}

// This helper function creates a test logger
// so that it can send the log messages into a
// temporary buffer for pattern matching
Expand Down Expand Up @@ -551,6 +558,58 @@ func TestAWSEntityProcessorNoSensitiveInfoInLogs(t *testing.T) {
}
}

func TestAWSEntityProcessorSetAutoScalingGroup(t *testing.T) {
ctx := context.Background()

tests := []struct {
name string
resourceAttrs []string
want string
}{
{
name: "ASGPopulatedFromResourceDetection",
resourceAttrs: []string{attributeEC2TagAwsAutoscalingGroupName, "test-asg"},
want: "test-asg",
},
{
name: "MultipleResourceAttributes",
resourceAttrs: []string{attributeEC2TagAwsAutoscalingGroupName, "test-asg", attributeAwsLogGroupNames, "log-group"},
want: "test-asg",
},
{
name: "ASGNotPopulated",
resourceAttrs: []string{attributeAwsLogGroupNames, "log-group"},
want: "",
},
{
name: "ResourceAttributesEmpty",
resourceAttrs: []string{},
want: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resetSetAutoScalingGroup := setAutoScalingGroup
defer func() {
setAutoScalingGroup = resetSetAutoScalingGroup
}()

es := newMockEntityStore()
setAutoScalingGroup = newMockSetAutoScalingGroup(es)
metrics := generateMetrics(tt.resourceAttrs...)

p := newAwsEntityProcessor(&Config{EntityType: attributeService}, zap.NewNop())
_, err := p.processMetrics(ctx, metrics)
assert.NoError(t, err)

if len(tt.resourceAttrs) > 0 {
assert.Equal(t, tt.want, es.autoScalingGroup)
}
})
}
}

func generateTestMetrics() pmetric.Metrics {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ processors:
resolvers:
- name: ""
platform: generic
awsentity/service/application_signals:
entity_type: Service
platform: onPremise
metricstransform/application_signals:
transforms:
- action: update
Expand Down Expand Up @@ -1371,6 +1374,7 @@ service:
- metricstransform/application_signals
- resourcedetection
- awsapplicationsignals
- awsentity/service/application_signals
receivers:
- otlp/application_signals
traces/application_signals:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ processors:
resolvers:
- name: ""
platform: generic
awsentity/service/application_signals:
entity_type: Service
platform: onPremise
metricstransform/application_signals:
transforms:
- action: update
Expand Down Expand Up @@ -1366,6 +1369,7 @@ service:
- metricstransform/application_signals
- resourcedetection
- awsapplicationsignals
- awsentity/service/application_signals
receivers:
- otlp/application_signals
traces/application_signals:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"

"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awsemf"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awsxray"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricstransformprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/resourcedetection"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/otlp"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)

type translator struct {
Expand Down Expand Up @@ -59,11 +59,12 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators
translators.Processors.Set(metricstransformprocessor.NewTranslatorWithName(common.AppSignals))
}

mode := context.CurrentContext().KubernetesMode()
translators.Processors.Set(resourcedetection.NewTranslator(resourcedetection.WithDataType(t.dataType)))
translators.Processors.Set(awsapplicationsignals.NewTranslator(awsapplicationsignals.WithDataType(t.dataType)))

if t.dataType == component.DataTypeMetrics && mode != "" {
// ECS is not in scope for entity association, so we only add the entity processor in non-ECS platforms
isECS := ecsutil.GetECSUtilSingleton().IsECS()
if t.dataType == component.DataTypeMetrics && !isECS {
translators.Processors.Set(awsentity.NewTranslatorWithEntityType(awsentity.Service, common.AppSignals, false))
}

Expand Down
Loading

0 comments on commit 09f8b09

Please sign in to comment.