Skip to content

Commit

Permalink
Add metrics cardinality control for Application Signals (#1014)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxiamxia authored Jan 29, 2024
1 parent 5077593 commit fe3013d
Show file tree
Hide file tree
Showing 23 changed files with 1,137 additions and 30 deletions.
27 changes: 27 additions & 0 deletions plugins/processors/awsappsignals/common/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package common

const (
AttributeRemoteService = "aws.remote.service"
AttributeHostedInEnvironment = "aws.hostedin.environment"
)

const (
MetricAttributeRemoteNamespace = "K8s.RemoteNamespace"
MetricAttributeLocalService = "Service"
MetricAttributeLocalOperation = "Operation"
MetricAttributeRemoteService = "RemoteService"
MetricAttributeRemoteOperation = "RemoteOperation"
MetricAttributeRemoteTarget = "RemoteTarget"
)
const (
HostedInAttributeClusterName = "HostedIn.EKS.Cluster"
HostedInAttributeK8SNamespace = "HostedIn.K8s.Namespace"
HostedInAttributeEnvironment = "HostedIn.Environment"
)

const (
AttributeTmpReserved = "aws.tmp.reserved"
)
41 changes: 39 additions & 2 deletions plugins/processors/awsappsignals/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,47 @@
package config

import (
"context"
"errors"
"time"

"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsappsignals/rules"
)

type Config struct {
Resolvers []Resolver `mapstructure:"resolvers"`
Rules []rules.Rule `mapstructure:"rules"`
Resolvers []Resolver `mapstructure:"resolvers"`
Rules []rules.Rule `mapstructure:"rules"`
Limiter *LimiterConfig `mapstructure:"limiter"`
}

type LimiterConfig struct {
Threshold int `mapstructure:"drop_threshold"`
Disabled bool `mapstructure:"disabled"`
LogDroppedMetrics bool `mapstructure:"log_dropped_metrics"`
RotationInterval time.Duration `mapstructure:"rotation_interval"`
GarbageCollectionInterval time.Duration `mapstructure:"-"`
ParentContext context.Context `mapstructure:"-"`
}

const (
DefaultThreshold = 500
DefaultRotationInterval = 1 * time.Hour
DefaultGCInterval = 10 * time.Minute
)

func NewDefaultLimiterConfig() *LimiterConfig {
return &LimiterConfig{
Threshold: DefaultThreshold,
Disabled: true,
LogDroppedMetrics: false,
RotationInterval: DefaultRotationInterval,
}
}

func (lc *LimiterConfig) Validate() {
if lc.GarbageCollectionInterval == 0 {
lc.GarbageCollectionInterval = DefaultGCInterval
}
}

func (cfg *Config) Validate() error {
Expand All @@ -33,5 +66,9 @@ func (cfg *Config) Validate() error {
return errors.New("unknown resolver")
}
}

if cfg.Limiter != nil {
cfg.Limiter.Validate()
}
return nil
}
4 changes: 2 additions & 2 deletions plugins/processors/awsappsignals/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func createTracesProcessor(
next,
ap.processTraces,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(ap.Start),
processorhelper.WithStart(ap.StartTraces),
processorhelper.WithShutdown(ap.Shutdown))
}

Expand All @@ -80,7 +80,7 @@ func createMetricsProcessor(
nextMetricsConsumer,
ap.processMetrics,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(ap.Start),
processorhelper.WithStart(ap.StartMetrics),
processorhelper.WithShutdown(ap.Shutdown))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package cardinalitycontrol

import (
"hash/adler32"
"hash/crc32"
"hash/fnv"
)

type CountMinSketchHashFunc func(hashKey string) int64

type CountMinSketchEntry interface {
HashKey() string
Frequency() int
}

type CountMinSketch struct {
depth int
maxDepth int
width int
matrix [][]int
hashFuncs []CountMinSketchHashFunc
}

func (cms *CountMinSketch) Insert(obj CountMinSketchEntry) {
for i := 0; i < cms.depth; i++ {
hashFunc := cms.hashFuncs[i]
hashValue := hashFunc(obj.HashKey())
pos := int(hashValue % int64(cms.width))

cms.matrix[i][pos] += obj.Frequency()
}
}

func NewCountMinSketch(depth, width int, hashFuncs ...CountMinSketchHashFunc) *CountMinSketch {
matrix := make([][]int, depth)
for i := range matrix {
matrix[i] = make([]int, width)
}
cms := &CountMinSketch{
depth: 0,
maxDepth: depth,
width: width,
matrix: matrix,
}
if hashFuncs != nil {
cms.RegisterHashFunc(hashFuncs...)
} else {
RegisterDefaultHashFuncs(cms)
}
return cms
}

func RegisterDefaultHashFuncs(cms *CountMinSketch) {
hashFunc1 := func(hashKey string) int64 {
h := fnv.New32a()
h.Write([]byte(hashKey))
return int64(h.Sum32())
}
hashFunc2 := func(hashKey string) int64 {
hash := crc32.ChecksumIEEE([]byte(hashKey))
return int64(hash)
}
hashFunc3 := func(hashKey string) int64 {
hash := adler32.Checksum([]byte(hashKey))
return int64(hash)
}
cms.RegisterHashFunc(hashFunc1, hashFunc2, hashFunc3)
}

func (cms *CountMinSketch) RegisterHashFunc(hashFuncs ...CountMinSketchHashFunc) {
if cms.hashFuncs == nil {
cms.hashFuncs = hashFuncs
} else {
cms.hashFuncs = append(cms.hashFuncs, hashFuncs...)
}
if cms.maxDepth < len(cms.hashFuncs) {
cms.depth = cms.maxDepth
} else {
cms.depth = len(cms.hashFuncs)
}
}

func (cms *CountMinSketch) Get(obj CountMinSketchEntry) int {
minCount := int(^uint(0) >> 1) // Initialize with the maximum possible integer value
for i := 0; i < cms.depth; i++ {
hashFunc := cms.hashFuncs[i]
hashValue := hashFunc(obj.HashKey())
pos := int(hashValue % int64(cms.width))

if cms.matrix[i][pos] < minCount {
minCount = cms.matrix[i][pos]
}
}
return minCount
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package cardinalitycontrol

import (
"math/rand"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

var metricNames = []string{"latency", "error", "fault"}

func TestUpdateFrequency(t *testing.T) {
cms := NewCountMinSketch(3, 10)
for i := 0; i < 10; i++ {
md := MetricData{
hashKey: "xxx",
name: "latency",
service: "app1",
frequency: 1,
}
cms.Insert(md)
val := cms.Get(md)
assert.Equal(t, 1+i, val)
}
}

var testCases = []int{50, 100, 200, 500, 1000, 2000}

func TestWriteMultipleEntries(t *testing.T) {
cms := NewCountMinSketch(3, 5000)

maxCollisionRate := 0
for _, dataCount := range testCases {
metricDataArray := make([]*MetricData, dataCount)
for i := 0; i < dataCount; i++ {
labels := map[string]string{
"operation": "/api/customers/" + strconv.Itoa(rand.Int()),
}
for _, metricName := range metricNames {
freq := rand.Intn(5000)
md := MetricData{
hashKey: sortAndConcatLabels(labels),
name: metricName,
service: "app",
frequency: freq,
}
cms.Insert(md)
if metricDataArray[i] == nil {
metricDataArray[i] = &md
} else {
metricDataArray[i].frequency = metricDataArray[i].frequency + freq
}

}
}

err := 0
for _, data := range metricDataArray {
val := cms.Get(data)
if data.frequency != val {
err += 1
}
}
collisionRate := err * 100 / len(metricDataArray)
if maxCollisionRate < collisionRate {
maxCollisionRate = collisionRate
}
t.Logf("When the item count is %d with even distribution, the collision rate is %d.\n", dataCount, collisionRate)
}

// revisit the count min sketch setting if the assertion fails.
assert.True(t, maxCollisionRate < 30)
}

func TestAdjustUnsupportedDepth(t *testing.T) {
cms := NewCountMinSketch(5, 10)
assert.Equal(t, 3, cms.depth)
for i := 0; i < 2; i++ {
cms.RegisterHashFunc(func(hashKey string) int64 {
return int64(0)
})
}
assert.Equal(t, 5, cms.depth)
for i := 0; i < 2; i++ {
cms.RegisterHashFunc(func(hashKey string) int64 {
return int64(0)
})
}
assert.Equal(t, 5, cms.depth)
}
Loading

0 comments on commit fe3013d

Please sign in to comment.