From 43ec0f134c9c6e2ee73c4c830a303b2f376057d7 Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Mon, 13 Jan 2025 10:04:00 -0500 Subject: [PATCH 1/3] Add worker pool for log pushers. Split up queue/batch from sender. --- Makefile | 1 - logs/logs.go | 6 +- plugins/outputs/cloudwatchlogs/README.md | 34 ++ .../outputs/cloudwatchlogs/cloudwatchlogs.go | 80 +-- .../cloudwatchlogs/cloudwatchlogs_test.go | 11 +- .../cloudwatchlogs/internal/pusher/batch.go | 146 +++++ .../internal/pusher/batch_test.go | 155 ++++++ .../cloudwatchlogs/internal/pusher/convert.go | 73 +++ .../internal/pusher/convert_test.go | 105 ++++ .../cloudwatchlogs/internal/pusher/pool.go | 110 ++++ .../internal/pusher/pool_test.go | 135 +++++ .../cloudwatchlogs/internal/pusher/pusher.go | 66 +++ .../internal/pusher/pusher_test.go | 100 ++++ .../cloudwatchlogs/internal/pusher/queue.go | 229 ++++++++ .../pusher/queue_test.go} | 517 ++++++------------ .../{ => internal/pusher}/retry.go | 31 +- .../{ => internal/pusher}/retry_test.go | 2 +- .../cloudwatchlogs/internal/pusher/sender.go | 140 +++++ .../internal/pusher/sender_test.go | 196 +++++++ .../cloudwatchlogs/internal/pusher/target.go | 140 +++++ .../internal/pusher/target_test.go | 148 +++++ plugins/outputs/cloudwatchlogs/pusher.go | 474 ---------------- translator/config/schema.json | 5 + .../tocwconfig/sampleConfig/log_filter.conf | 1 + .../tocwconfig/sampleConfig/log_filter.json | 1 + translator/translate/logs/ruleConcurrency.go | 24 + 26 files changed, 2043 insertions(+), 887 deletions(-) create mode 100644 plugins/outputs/cloudwatchlogs/README.md create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/batch.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/convert.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/pool.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/queue.go rename plugins/outputs/cloudwatchlogs/{pusher_test.go => internal/pusher/queue_test.go} (53%) rename plugins/outputs/cloudwatchlogs/{ => internal/pusher}/retry.go (85%) rename plugins/outputs/cloudwatchlogs/{ => internal/pusher}/retry_test.go (99%) create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/sender.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/target.go create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go delete mode 100644 plugins/outputs/cloudwatchlogs/pusher.go create mode 100644 translator/translate/logs/ruleConcurrency.go diff --git a/Makefile b/Makefile index 03e22d2f20..9e5dd23d3f 100644 --- a/Makefile +++ b/Makefile @@ -211,7 +211,6 @@ PKG_WITH_DATA_RACE += internal/tls PKG_WITH_DATA_RACE += plugins/inputs/logfile PKG_WITH_DATA_RACE += plugins/inputs/logfile/tail PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch -PKG_WITH_DATA_RACE += plugins/outputs/cloudwatchlogs PKG_WITH_DATA_RACE += plugins/processors/awsapplicationsignals PKG_WITH_DATA_RACE += plugins/processors/ec2tagger PKG_WITH_DATA_RACE_PATTERN := $(shell echo '$(PKG_WITH_DATA_RACE)' | tr ' ' '|') diff --git a/logs/logs.go b/logs/logs.go index a497883009..33998edffe 100644 --- a/logs/logs.go +++ b/logs/logs.go @@ -30,9 +30,14 @@ type LogEvent interface { Done() } +type LogEntityProvider interface { + Entity() *cloudwatchlogs.Entity +} + // A LogSrc is a single source where log events are generated // e.g. a single log file type LogSrc interface { + LogEntityProvider SetOutput(func(LogEvent)) Group() string Stream() string @@ -40,7 +45,6 @@ type LogSrc interface { Description() string Retention() int Class() string - Entity() *cloudwatchlogs.Entity Stop() } diff --git a/plugins/outputs/cloudwatchlogs/README.md b/plugins/outputs/cloudwatchlogs/README.md new file mode 100644 index 0000000000..61f53c86b7 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/README.md @@ -0,0 +1,34 @@ +# Amazon CloudWatch Logs Output Plugin + +For each configured target (log group/stream), the output plugin maintains a queue for log events that it batches. +Once each batch is full or the flush interval is reached, the current batch is sent using the PutLogEvents API to Amazon CloudWatch. + +When concurrency is enabled, the pusher uses a shared worker pool to allow multiple concurrent sends. +``` + Target #1 (Log Group/Stream) ┌──Shared Worker Pool──┐ + ┌──────────────────────────────────────────────────────────────────┐ │ │ + │ │ │ ┌──Worker 1──┐ │ + │ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ ┌────────┐ │ │ + │ │ │ │ ┌───────────────────┐ │ │ ┌──────┼───►│ │ Sender │ │ │ + │ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ │ └────────┘ │ │ +AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ └────────────┘ │ + │ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ │ │ │ + │ │ │ │ └───────────────────┘ │ │ │ │ ┌──Worker 2──┐ │ + │ └───────────────────────────┘ └───────────────────────┘ │ │ │ │ ┌────────┐ │ │ + │ │ ┼──────┼───►│ │ Sender │ │ │ + └──────────────────────────────────────────────────────────────────┘ │ │ │ └────────┘ │ │ + │ │ └────────────┘ │ + │ │ │ + Target #2 (Log Group/Stream) │ │ . │ + ┌──────────────────────────────────────────────────────────────────┐ │ │ . │ + │ │ │ │ . │ + │ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ │ + │ │ │ │ ┌───────────────────┐ │ │ │ │ │ + │ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ ┌──Worker n──┐ │ +AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ │ ┌────────┐ │ │ + │ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ └──────┼───►│ │ Sender │ │ │ + │ │ │ │ └───────────────────┘ │ │ │ │ └────────┘ │ │ + │ └───────────────────────────┘ └───────────────────────┘ │ │ └────────────┘ │ + │ │ │ │ + └──────────────────────────────────────────────────────────────────┘ └──────────────────────┘ +``` diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 84a124abab..59c03d3c7a 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -13,7 +13,6 @@ import ( "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/request" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" "go.uber.org/zap" @@ -27,6 +26,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/internal" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/logs" + "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" "github.com/aws/amazon-cloudwatch-agent/tool/util" ) @@ -39,9 +39,6 @@ const ( LogEntryField = "value" defaultFlushTimeout = 5 * time.Second - eventHeaderSize = 200 - truncatedSuffix = "[Truncated...]" - msgSizeLimit = 256*1024 - eventHeaderSize maxRetryTimeout = 14*24*time.Hour + 10*time.Minute metricRetryTimeout = 2 * time.Minute @@ -71,6 +68,7 @@ type CloudWatchLogs struct { // Retention for log group RetentionInDays int `toml:"retention_in_days"` + Concurrency int `toml:"concurrency"` ForceFlushInterval internal.Duration `toml:"force_flush_interval"` // unit is second @@ -78,7 +76,10 @@ type CloudWatchLogs struct { pusherStopChan chan struct{} pusherWaitGroup sync.WaitGroup - cwDests map[Target]*cwDest + cwDests map[pusher.Target]*cwDest + workerPool pusher.WorkerPool + targetManager pusher.TargetManager + once sync.Once middleware awsmiddleware.Middleware } @@ -93,6 +94,9 @@ func (c *CloudWatchLogs) Close() error { for _, d := range c.cwDests { d.Stop() } + if c.workerPool != nil { + c.workerPool.Stop() + } return nil } @@ -115,7 +119,7 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou retention = -1 } - t := Target{ + t := pusher.Target{ Group: group, Stream: stream, Retention: retention, @@ -124,11 +128,31 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou return c.getDest(t, logSrc) } -func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest { +func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { if cwd, ok := c.cwDests[t]; ok { return cwd } + logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log) + client := c.createClient(logThrottleRetryer) + agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType) + agent.UsageFlags().SetValue(agent.FlagMode, c.Mode) + if containerInsightsRegexp.MatchString(t.Group) { + useragent.Get().SetContainerInsightsFlag() + } + c.once.Do(func() { + if c.Concurrency > 0 { + c.workerPool = pusher.NewWorkerPool(c.Concurrency) + } + c.targetManager = pusher.NewTargetManager(c.Log, client) + }) + p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup) + cwd := &cwDest{pusher: p, retryer: logThrottleRetryer} + c.cwDests[t] = cwd + return cwd +} + +func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlogs.CloudWatchLogs { credentialConfig := &configaws.CredentialConfig{ Region: c.Region, AccessKey: c.AccessKey, @@ -138,34 +162,24 @@ func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest { Filename: c.Filename, Token: c.Token, } - - logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log) client := cloudwatchlogs.New( credentialConfig.Credentials(), &aws.Config{ Endpoint: aws.String(c.EndpointOverride), - Retryer: logThrottleRetryer, + Retryer: retryer, LogLevel: configaws.SDKLogLevel(), Logger: configaws.SDKLogger{}, }, ) - agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType) - agent.UsageFlags().SetValue(agent.FlagMode, c.Mode) - if containerInsightsRegexp.MatchString(t.Group) { - useragent.Get().SetContainerInsightsFlag() - } client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"})) if c.middleware != nil { if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err) } else { - c.Log.Info("Configured middleware on AWS client") + c.Log.Debug("Configured middleware on AWS client") } } - pusher := NewPusher(c.Region, t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup, logSrc) - cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer} - c.cwDests[t] = cwd - return cwd + return client } func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) { @@ -179,7 +193,7 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) { return } cwd.switchToEMF() - cwd.pusher.RetryDuration = metricRetryTimeout + cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout) e := c.getLogEventFromMetric(m) if e == nil { @@ -189,11 +203,11 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) { cwd.AddEvent(e) } -func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error) { +func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) { tags := m.Tags() logGroup, ok := tags[LogGroupNameTag] if !ok { - return Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name()) + return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name()) } else { m.RemoveTag(LogGroupNameTag) } @@ -205,7 +219,7 @@ func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error) logStream = c.LogStreamName } - return Target{logGroup, logStream, util.StandardLogGroupClass, -1}, nil + return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil } func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent { @@ -299,7 +313,7 @@ func (e *structuredLogEvent) Time() time.Time { func (e *structuredLogEvent) Done() {} type cwDest struct { - *pusher + pusher *pusher.Pusher sync.Mutex isEMF bool stopped bool @@ -341,25 +355,13 @@ func (cd *cwDest) switchToEMF() { defer cd.Unlock() if !cd.isEMF { cd.isEMF = true - cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs) + cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs) if ok { cwl.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("x-amzn-logs-format", "json/emf")) } } } -func (cd *cwDest) setRetryer(r request.Retryer) { - cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs) - if ok { - cwl.Retryer = r - } -} - -type Target struct { - Group, Stream, Class string - Retention int -} - // Description returns a one-sentence description on the Output func (c *CloudWatchLogs) Description() string { return "Configuration for AWS CloudWatchLogs output." @@ -398,7 +400,7 @@ func init() { return &CloudWatchLogs{ ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout}, pusherStopChan: make(chan struct{}), - cwDests: make(map[Target]*cwDest), + cwDests: make(map[pusher.Target]*cwDest), middleware: agenthealth.NewAgentHealth( zap.NewNop(), &agenthealth.Config{ diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go index 2b4cdf5290..f7d47257e0 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go @@ -6,16 +6,17 @@ package cloudwatchlogs import ( "testing" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "github.com/aws/amazon-cloudwatch-agent/logs" + "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher" "github.com/aws/amazon-cloudwatch-agent/tool/util" ) // TestCreateDestination would create different destination for cloudwatchlogs endpoint based on the log group, log stream, // and log group's retention func TestCreateDestination(t *testing.T) { - testCases := map[string]struct { cfgLogGroup string cfgLogStream string @@ -68,28 +69,30 @@ func TestCreateDestination(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { c := &CloudWatchLogs{ + Log: testutil.Logger{Name: "test"}, LogGroupName: "G1", LogStreamName: "S1", AccessKey: "access_key", SecretKey: "secret_key", pusherStopChan: make(chan struct{}), - cwDests: make(map[Target]*cwDest), + cwDests: make(map[pusher.Target]*cwDest), } dest := c.CreateDest(testCase.cfgLogGroup, testCase.cfgLogStream, testCase.cfgLogRetention, testCase.cfgLogClass, testCase.cfgTailerSrc).(*cwDest) require.Equal(t, testCase.expectedLogGroup, dest.pusher.Group) require.Equal(t, testCase.expectedLogStream, dest.pusher.Stream) require.Equal(t, testCase.expectedLogGroupRetention, dest.pusher.Retention) require.Equal(t, testCase.expectedLogClass, dest.pusher.Class) - require.Equal(t, testCase.expectedTailerSrc, dest.pusher.logSrc) + require.Equal(t, testCase.expectedTailerSrc, dest.pusher.EntityProvider) }) } } func TestDuplicateDestination(t *testing.T) { c := &CloudWatchLogs{ + Log: testutil.Logger{Name: "test"}, AccessKey: "access_key", SecretKey: "secret_key", - cwDests: make(map[Target]*cwDest), + cwDests: make(map[pusher.Target]*cwDest), pusherStopChan: make(chan struct{}), } // Given the same log group, log stream, same retention, and logClass diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go new file mode 100644 index 0000000000..cc46a1ec24 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go @@ -0,0 +1,146 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sort" + "time" + + "github.com/aws/aws-sdk-go/aws" + + "github.com/aws/amazon-cloudwatch-agent/logs" + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +// CloudWatch Logs API limits +// Taken from https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html +const ( + // The maximum batch size in bytes. This size is calculated as the sum of all event messages in UTF-8, + // plus 26 bytes for each log event. + reqSizeLimit = 1024 * 1024 + // The maximum number of log events in a batch. + reqEventsLimit = 10000 + // The bytes required for metadata for each log event. + perEventHeaderBytes = 26 + // A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails. + batchTimeRangeLimit = 24 * time.Hour +) + +// logEvent represents a single cloudwatchlogs.InputLogEvent with some metadata for processing +type logEvent struct { + event *cloudwatchlogs.InputLogEvent + eventBytes int + timestamp time.Time + doneCallback func() +} + +func newLogEvent(timestamp time.Time, message string, doneCallback func()) *logEvent { + return &logEvent{ + event: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(timestamp.UnixMilli()), + Message: aws.String(message), + }, + eventBytes: len(message) + perEventHeaderBytes, + timestamp: timestamp, + doneCallback: doneCallback, + } +} + +type logEventBatch struct { + Target + events []*cloudwatchlogs.InputLogEvent + entityProvider logs.LogEntityProvider + // Total size of all events in the batch. + bufferedSize int + // Whether the events need to be sorted before being sent. + needSort bool + // Minimum and maximum timestamps in the batch. + minT, maxT time.Time + // Callbacks to execute when batch is successfully sent. + doneCallbacks []func() +} + +func newLogEventBatch(target Target, entityProvider logs.LogEntityProvider) *logEventBatch { + return &logEventBatch{ + Target: target, + events: make([]*cloudwatchlogs.InputLogEvent, 0), + entityProvider: entityProvider, + } +} + +// inTimeRange checks if adding an event with the timestamp would keep the batch within the 24-hour limit. +func (b *logEventBatch) inTimeRange(timestamp time.Time) bool { + if b.minT.IsZero() || b.maxT.IsZero() { + return true + } + return timestamp.Sub(b.minT) <= batchTimeRangeLimit && + b.maxT.Sub(timestamp) <= batchTimeRangeLimit +} + +// hasSpace checks if adding an event of the given size will exceed the space limits. +func (b *logEventBatch) hasSpace(size int) bool { + return len(b.events) < reqEventsLimit && b.bufferedSize+size <= reqSizeLimit +} + +// append adds a log event to the batch. +func (b *logEventBatch) append(e *logEvent) { + if len(b.events) > 0 && *e.event.Timestamp < *b.events[len(b.events)-1].Timestamp { + b.needSort = true + } + b.events = append(b.events, e.event) + b.addDoneCallback(e.doneCallback) + b.bufferedSize += e.eventBytes + if b.minT.IsZero() || b.minT.After(e.timestamp) { + b.minT = e.timestamp + } + if b.maxT.IsZero() || b.maxT.Before(e.timestamp) { + b.maxT = e.timestamp + } +} + +// addDoneCallback adds the callback to the end of the registered callbacks. +func (b *logEventBatch) addDoneCallback(callback func()) { + if callback != nil { + b.doneCallbacks = append(b.doneCallbacks, callback) + } +} + +// done runs all registered callbacks. +func (b *logEventBatch) done() { + for i := len(b.doneCallbacks) - 1; i >= 0; i-- { + done := b.doneCallbacks[i] + done() + } +} + +// build creates a cloudwatchlogs.PutLogEventsInput from the batch. The log events in the batch must be in +// chronological order by their timestamp. +func (b *logEventBatch) build() *cloudwatchlogs.PutLogEventsInput { + if b.needSort { + sort.Stable(byTimestamp(b.events)) + } + input := &cloudwatchlogs.PutLogEventsInput{ + LogGroupName: aws.String(b.Group), + LogStreamName: aws.String(b.Stream), + LogEvents: b.events, + } + if b.entityProvider != nil { + input.Entity = b.entityProvider.Entity() + } + return input +} + +type byTimestamp []*cloudwatchlogs.InputLogEvent + +func (t byTimestamp) Len() int { + return len(t) +} + +func (t byTimestamp) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} + +func (t byTimestamp) Less(i, j int) bool { + return *t[i].Timestamp < *t[j].Timestamp +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go new file mode 100644 index 0000000000..f305f4d502 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go @@ -0,0 +1,155 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/logs" + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +type mockEntityProvider struct { + mock.Mock +} + +var _ logs.LogEntityProvider = (*mockEntityProvider)(nil) + +func (m *mockEntityProvider) Entity() *cloudwatchlogs.Entity { + args := m.Called() + return args.Get(0).(*cloudwatchlogs.Entity) +} + +func newMockEntityProvider(entity *cloudwatchlogs.Entity) *mockEntityProvider { + ep := new(mockEntityProvider) + ep.On("Entity").Return(entity) + return ep +} + +func TestLogEventBatch(t *testing.T) { + t.Run("Append", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + + event1 := newLogEvent(time.Now(), "Test message 1", nil) + event2 := newLogEvent(time.Now(), "Test message 2", nil) + + batch.append(event1) + assert.Equal(t, 1, len(batch.events), "Batch should have 1 event") + + batch.append(event2) + assert.Equal(t, 2, len(batch.events), "Batch should have 2 events") + }) + + t.Run("InTimeRange", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + + now := time.Now() + assert.True(t, batch.inTimeRange(now)) + event1 := newLogEvent(now, "Test message 1", nil) + batch.append(event1) + + assert.True(t, batch.inTimeRange(now.Add(23*time.Hour)), "Time within 24 hours should be in range") + assert.False(t, batch.inTimeRange(now.Add(25*time.Hour)), "Time beyond 24 hours should not be in range") + assert.False(t, batch.inTimeRange(now.Add(-25*time.Hour)), "Time more than 24 hours in past should not be in range") + }) + + t.Run("HasSpace", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + + // Add events until close to the limit + for i := 0; i < reqEventsLimit-1; i++ { + event := newLogEvent(time.Now(), "Test message", nil) + batch.append(event) + } + + assert.True(t, batch.hasSpace(100), "Batch should have space for one more small event") + assert.False(t, batch.hasSpace(reqSizeLimit), "Batch should not have space for an event that exceeds the size limit") + + // Add one more event to reach the limit + event := newLogEvent(time.Now(), "Last message", nil) + batch.append(event) + + assert.False(t, batch.hasSpace(1), "Batch should not have space after reaching event limit") + }) + + t.Run("Build", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + + event1 := newLogEvent(time.Now(), "Test message 1", nil) + event2 := newLogEvent(time.Now(), "Test message 2", nil) + batch.append(event1) + batch.append(event2) + + input := batch.build() + + assert.Equal(t, "G", *input.LogGroupName, "Log group name should match") + assert.Equal(t, "S", *input.LogStreamName, "Log stream name should match") + assert.Equal(t, 2, len(input.LogEvents), "Input should have 2 log events") + }) + + t.Run("EventSort", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + + now := time.Now() + event1 := newLogEvent(now.Add(1*time.Second), "Test message 1", nil) + event2 := newLogEvent(now, "Test message 2", nil) + event3 := newLogEvent(now.Add(2*time.Second), "Test message 3", nil) + + // Add events in non-chronological order + batch.append(event1) + batch.append(event2) + batch.append(event3) + + input := batch.build() + + assert.Equal(t, 3, len(input.LogEvents), "Input should have 3 log events") + assert.True(t, *input.LogEvents[0].Timestamp < *input.LogEvents[1].Timestamp, "Events should be sorted by timestamp") + assert.True(t, *input.LogEvents[1].Timestamp < *input.LogEvents[2].Timestamp, "Events should be sorted by timestamp") + }) + + t.Run("DoneCallback", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + + callbackCalled := false + callback := func() { + callbackCalled = true + } + + event := newLogEvent(time.Now(), "Test message", callback) + batch.append(event) + + batch.done() + + assert.True(t, callbackCalled, "Done callback should have been called") + }) + + t.Run("WithEntityProvider", func(t *testing.T) { + testEntity := &cloudwatchlogs.Entity{ + Attributes: map[string]*string{ + "PlatformType": aws.String("AWS::EC2"), + "EC2.InstanceId": aws.String("i-123456789"), + "EC2.AutoScalingGroup": aws.String("test-group"), + }, + KeyAttributes: map[string]*string{ + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + "AwsAccountId": aws.String("123456789"), + }, + } + mockProvider := newMockEntityProvider(testEntity) + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, mockProvider) + + event := newLogEvent(time.Now(), "Test message", nil) + batch.append(event) + + input := batch.build() + + assert.Equal(t, testEntity, input.Entity, "Entity should be set from the EntityProvider") + }) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go b/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go new file mode 100644 index 0000000000..53eb95ec48 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go @@ -0,0 +1,73 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "time" + + "github.com/influxdata/telegraf" + + "github.com/aws/amazon-cloudwatch-agent/logs" +) + +const ( + // Each log event can be no larger than 256 KB. When truncating the message, assume this is the limit for + // message length. + msgSizeLimit = 256*1024 - perEventHeaderBytes + // The suffix to add to truncated log lines. + truncatedSuffix = "[Truncated...]" + + // The duration until a timestamp is considered old. + warnOldTimeStamp = 24 * time.Hour + // The minimum interval between logs warning about the old timestamps. + warnOldTimeStampLogInterval = 5 * time.Minute +) + +type converter struct { + Target + logger telegraf.Logger + lastValidTime time.Time + lastUpdateTime time.Time + lastWarnMessage time.Time +} + +func newConverter(logger telegraf.Logger, target Target) *converter { + return &converter{ + logger: logger, + Target: target, + } +} + +// Handles message truncation and timestamp +func (c *converter) convert(e logs.LogEvent) *logEvent { + message := e.Message() + + if len(message) > msgSizeLimit { + message = message[:msgSizeLimit-len(truncatedSuffix)] + truncatedSuffix + } + var t time.Time + if e.Time().IsZero() { + if !c.lastValidTime.IsZero() { + // Where there has been a valid time before, assume most log events would have + // a valid timestamp and use the last valid timestamp for new entries that does + // not have a timestamp. + t = c.lastValidTime + if !c.lastUpdateTime.IsZero() { + // Check when timestamp has an interval of 1 day. + if time.Since(c.lastUpdateTime) > warnOldTimeStamp && time.Since(c.lastWarnMessage) > warnOldTimeStampLogInterval { + c.logger.Warnf("Unable to parse timestamp, using last valid timestamp found in the logs %v: which is at least older than 1 day for log group %v: ", c.lastValidTime, c.Group) + c.lastWarnMessage = time.Now() + } + } + } else { + t = time.Now() + } + } else { + t = e.Time() + c.lastValidTime = t + c.lastUpdateTime = time.Now() + c.lastWarnMessage = time.Time{} + } + return newLogEvent(t, message, e.Done) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go new file mode 100644 index 0000000000..0579314857 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go @@ -0,0 +1,105 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "bytes" + "io" + "log" + "os" + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +type stubLogEvent struct { + message string + timestamp time.Time + done func() +} + +func (m *stubLogEvent) Message() string { + return m.message +} + +func (m *stubLogEvent) Time() time.Time { + return m.timestamp +} + +func (m *stubLogEvent) Done() { + if m.done != nil { + m.done() + } +} + +func newStubLogEvent(message string, timestamp time.Time) *stubLogEvent { + return &stubLogEvent{ + message: message, + timestamp: timestamp, + } +} + +func TestConverter(t *testing.T) { + logger := testutil.Logger{Name: "converter"} + target := Target{Group: "testGroup", Stream: "testStream"} + + t.Run("WithValidTimestamp", func(t *testing.T) { + t.Parallel() + now := time.Now() + + conv := newConverter(logger, target) + le := conv.convert(newStubLogEvent("Test message", now)) + + assert.Equal(t, now.UnixMilli(), *le.event.Timestamp) + assert.Equal(t, "Test message", *le.event.Message) + assert.Equal(t, now, conv.lastValidTime) + }) + + t.Run("WithNoTimestamp", func(t *testing.T) { + t.Parallel() + testTimestampMs := int64(12345678) + + conv := newConverter(logger, target) + conv.lastValidTime = time.UnixMilli(testTimestampMs) + + le := conv.convert(newStubLogEvent("Test message", time.Time{})) + + assert.Equal(t, testTimestampMs, *le.event.Timestamp) + assert.Equal(t, "Test message", *le.event.Message) + }) + + t.Run("TruncateMessage", func(t *testing.T) { + t.Parallel() + largeMessage := string(make([]byte, msgSizeLimit+100)) + event := newStubLogEvent(largeMessage, time.Now()) + + conv := newConverter(logger, target) + le := conv.convert(event) + + assert.Equal(t, msgSizeLimit, len(*le.event.Message)) + assert.Equal(t, truncatedSuffix, (*le.event.Message)[len(*le.event.Message)-len(truncatedSuffix):]) + }) + + t.Run("WithOldTimestampWarning", func(t *testing.T) { + oldTime := time.Now().Add(-25 * time.Hour) + conv := newConverter(logger, target) + conv.lastValidTime = oldTime + conv.lastUpdateTime = oldTime + + var logbuf bytes.Buffer + log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) + le := conv.convert(newStubLogEvent("Test message", time.Time{})) + + assert.Equal(t, oldTime.UnixMilli(), *le.event.Timestamp) + assert.Equal(t, "Test message", *le.event.Message) + loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") + assert.Len(t, loglines, 1) + logline := loglines[0] + assert.True(t, strings.Contains(logline, "W!")) + assert.True(t, strings.Contains(logline, "Unable to parse timestamp")) + }) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go new file mode 100644 index 0000000000..fc85aee4fc --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go @@ -0,0 +1,110 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "sync/atomic" + "time" +) + +type WorkerPool interface { + Submit(task func()) + Stop() +} + +type workerPool struct { + tasks chan func() + workerCount atomic.Int32 + wg sync.WaitGroup + stopCh chan struct{} + stopped atomic.Bool +} + +// NewWorkerPool creates a pool of workers of the specified size. +func NewWorkerPool(size int) WorkerPool { + p := &workerPool{ + tasks: make(chan func(), size), + stopCh: make(chan struct{}), + } + for i := 0; i < size; i++ { + p.addWorker() + } + return p +} + +// addWorker creates and starts a new worker goroutine. +func (p *workerPool) addWorker() { + p.wg.Add(1) + p.workerCount.Add(1) + go p.worker() +} + +func (p *workerPool) worker() { + defer func() { + p.workerCount.Add(-1) + p.wg.Done() + }() + for task := range p.tasks { + task() + } +} + +// Submit adds a task to the pool. Blocks until a worker is available to receive the task or the pool is stopped. +func (p *workerPool) Submit(task func()) { + if !p.stopped.Load() { + select { + case p.tasks <- task: + case <-p.stopCh: + return + } + } +} + +// WorkerCount keeps track of the available workers in the pool. +func (p *workerPool) WorkerCount() int32 { + return p.workerCount.Load() +} + +// Stop gracefully shuts down the worker pool. +func (p *workerPool) Stop() { + if !p.stopped.Load() { + p.stopped.Store(true) + close(p.stopCh) + close(p.tasks) + p.wg.Wait() + } +} + +// senderPool wraps a Sender with a WorkerPool for concurrent sending. +type senderPool struct { + workerPool WorkerPool + sender Sender +} + +var _ Sender = (*senderPool)(nil) + +func newSenderPool(workerPool WorkerPool, sender Sender) Sender { + return &senderPool{ + workerPool: workerPool, + sender: sender, + } +} + +// Send submits a send task to the worker pool. +func (s *senderPool) Send(batch *logEventBatch) { + s.workerPool.Submit(func() { + s.sender.Send(batch) + }) +} + +// SetRetryDuration sets the retry duration on the wrapped Sender. +func (s *senderPool) SetRetryDuration(duration time.Duration) { + s.sender.SetRetryDuration(duration) +} + +// RetryDuration returns the retry duration of the wrapped Sender. +func (s *senderPool) RetryDuration() time.Duration { + return s.sender.RetryDuration() +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go new file mode 100644 index 0000000000..1b69645543 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go @@ -0,0 +1,135 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +func TestWorkerPool(t *testing.T) { + t.Run("BasicSubmit", func(t *testing.T) { + pool := NewWorkerPool(3).(*workerPool) + assert.EqualValues(t, 3, pool.WorkerCount()) + var wg sync.WaitGroup + var completed atomic.Int32 + + for i := 0; i < 10; i++ { + wg.Add(1) + pool.Submit(func() { + defer wg.Done() + completed.Add(1) + }) + } + + wg.Wait() + assert.EqualValues(t, 10, completed.Load()) + assert.EqualValues(t, 3, pool.WorkerCount()) + pool.Stop() + assert.EqualValues(t, 0, pool.WorkerCount()) + }) + + t.Run("GracefulStop", func(t *testing.T) { + pool := NewWorkerPool(20) + + var completed atomic.Int32 + taskCount := 500 + + for i := 0; i < taskCount; i++ { + pool.Submit(func() { + time.Sleep(time.Millisecond) + completed.Add(1) + }) + } + + pool.Stop() + assert.EqualValues(t, taskCount, completed.Load()) + }) + + t.Run("SubmitAfterStop", func(t *testing.T) { + pool := NewWorkerPool(3).(*workerPool) + pool.Stop() + assert.EqualValues(t, 0, pool.WorkerCount()) + assert.NotPanics(t, func() { + pool.Submit(func() { + assert.Fail(t, "should not reach") + }) + }) + time.Sleep(time.Millisecond) + }) + + t.Run("MultipleStops", func(t *testing.T) { + pool := NewWorkerPool(3) + pool.Stop() + assert.NotPanics(t, func() { + pool.Stop() + }) + }) + + t.Run("ConcurrentSubmitAndStop", func(t *testing.T) { + pool := NewWorkerPool(20) + var wg sync.WaitGroup + taskCount := 1000 + var completed atomic.Int32 + + // Start submitting tasks + for i := 0; i < taskCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + pool.Submit(func() { + time.Sleep(time.Millisecond) + completed.Add(1) + }) + }() + } + + // Stop the pool while tasks are being submitted + time.Sleep(5 * time.Millisecond) + pool.Stop() + + assert.LessOrEqual(t, completed.Load(), int32(taskCount)) + assert.Greater(t, completed.Load(), int32(0)) + }) +} + +func TestSenderPool(t *testing.T) { + logger := testutil.Logger{Name: "test"} + stop := make(chan struct{}) + mockService := new(mockLogsService) + mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) + s := newSender(logger, mockService, nil, time.Second, stop) + p := NewWorkerPool(12) + sp := newSenderPool(p, s) + + assert.Equal(t, time.Second, sp.RetryDuration()) + sp.SetRetryDuration(time.Minute) + assert.Equal(t, time.Minute, sp.RetryDuration()) + + var completed atomic.Int32 + var evts []*logEvent + for i := 0; i < 200; i++ { + evts = append(evts, newLogEvent(time.Now(), "test", func() { + time.Sleep(time.Millisecond) + completed.Add(1) + })) + } + + for _, evt := range evts { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(evt) + sp.Send(batch) + } + + p.Stop() + assert.Equal(t, int32(200), completed.Load()) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go new file mode 100644 index 0000000000..33656c3bb2 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go @@ -0,0 +1,66 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "time" + + "github.com/influxdata/telegraf" + + "github.com/aws/amazon-cloudwatch-agent/logs" +) + +// Pusher connects the Queue to the Sender. +type Pusher struct { + Target + Queue + Service cloudWatchLogsService + TargetManager TargetManager + EntityProvider logs.LogEntityProvider + Sender Sender +} + +// NewPusher creates a new Pusher instance with a new Queue and Sender. Calls PutRetentionPolicy using the +// TargetManager. +func NewPusher( + logger telegraf.Logger, + target Target, + service cloudWatchLogsService, + targetManager TargetManager, + entityProvider logs.LogEntityProvider, + workerPool WorkerPool, + flushTimeout time.Duration, + retryDuration time.Duration, + stop <-chan struct{}, + wg *sync.WaitGroup, +) *Pusher { + s := createSender(logger, service, targetManager, workerPool, retryDuration, stop) + q := newQueue(logger, target, flushTimeout, entityProvider, s, stop, wg) + targetManager.PutRetentionPolicy(target) + return &Pusher{ + Target: target, + Queue: q, + Service: service, + TargetManager: targetManager, + EntityProvider: entityProvider, + Sender: s, + } +} + +// createSender initializes a Sender. Wraps it in a senderPool if a WorkerPool is provided. +func createSender( + logger telegraf.Logger, + service cloudWatchLogsService, + targetManager TargetManager, + workerPool WorkerPool, + retryDuration time.Duration, + stop <-chan struct{}, +) Sender { + s := newSender(logger, service, targetManager, retryDuration, stop) + if workerPool == nil { + return s + } + return newSenderPool(workerPool, s) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go new file mode 100644 index 0000000000..2ab3970f78 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go @@ -0,0 +1,100 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +const eventCount = 100000 + +func TestPusher(t *testing.T) { + t.Run("WithSender", func(t *testing.T) { + t.Parallel() + stop := make(chan struct{}) + var wg sync.WaitGroup + pusher := setupPusher(t, "single", nil, stop, &wg) + + var completed atomic.Int32 + generateEvents(t, pusher, &completed) + + close(stop) + wg.Wait() + }) + + t.Run("WithSenderPool", func(t *testing.T) { + t.Parallel() + stop := make(chan struct{}) + var wg sync.WaitGroup + wp := NewWorkerPool(5) + pusher := setupPusher(t, "pool", wp, stop, &wg) + + _, isSenderPool := pusher.Sender.(*senderPool) + assert.True(t, isSenderPool) + + var completed atomic.Int32 + generateEvents(t, pusher, &completed) + + close(stop) + wg.Wait() + wp.Stop() + }) +} + +func generateEvents(t *testing.T, pusher *Pusher, completed *atomic.Int32) { + t.Helper() + for i := 0; i < eventCount; i++ { + pusher.AddEvent(&stubLogEvent{ + message: "test message", + timestamp: time.Now(), + done: func() { + completed.Add(1) + }, + }) + } +} + +func setupPusher(t *testing.T, name string, workerPool WorkerPool, stop chan struct{}, wg *sync.WaitGroup) *Pusher { + t.Helper() + logger := testutil.Logger{Name: name} + target := Target{Group: "G", Stream: "S", Retention: 7} + service := new(stubLogsService) + service.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + // add latency + time.Sleep(50 * time.Millisecond) + return &cloudwatchlogs.PutLogEventsOutput{}, nil + } + mockManager := new(mockTargetManager) + mockManager.On("PutRetentionPolicy", target).Return() + + pusher := NewPusher( + logger, + target, + service, + mockManager, + nil, + workerPool, + time.Second, + time.Minute, + stop, + wg, + ) + + assert.NotNil(t, pusher) + assert.Equal(t, target, pusher.Target) + assert.NotNil(t, pusher.Queue) + assert.NotNil(t, pusher.Sender) + + // Verify that PutRetentionPolicy was called + mockManager.AssertCalled(t, "PutRetentionPolicy", target) + return pusher +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go new file mode 100644 index 0000000000..89f26faed4 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go @@ -0,0 +1,229 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/influxdata/telegraf" + + "github.com/aws/amazon-cloudwatch-agent/logs" + "github.com/aws/amazon-cloudwatch-agent/profiler" +) + +type Queue interface { + AddEvent(e logs.LogEvent) + AddEventNonBlocking(e logs.LogEvent) +} + +type queue struct { + target Target + logger telegraf.Logger + + entityProvider logs.LogEntityProvider + sender Sender + converter *converter + batch *logEventBatch + eventsCh chan logs.LogEvent + nonBlockingEventsCh chan logs.LogEvent + + flushCh chan struct{} + resetTimerCh chan struct{} + flushTimer *time.Timer + flushTimeout time.Duration + stop <-chan struct{} + lastSentTime atomic.Value + + initNonBlockingChOnce sync.Once + startNonBlockCh chan struct{} + wg *sync.WaitGroup +} + +func newQueue( + logger telegraf.Logger, + target Target, + flushTimeout time.Duration, + entityProvider logs.LogEntityProvider, + sender Sender, + stop <-chan struct{}, + wg *sync.WaitGroup, +) Queue { + q := &queue{ + target: target, + logger: logger, + converter: newConverter(logger, target), + batch: newLogEventBatch(target, entityProvider), + sender: sender, + eventsCh: make(chan logs.LogEvent, 100), + flushCh: make(chan struct{}), + resetTimerCh: make(chan struct{}), + flushTimer: time.NewTimer(flushTimeout), + flushTimeout: flushTimeout, + stop: stop, + startNonBlockCh: make(chan struct{}), + wg: wg, + } + q.wg.Add(1) + go q.start() + return q +} + +// AddEvent adds an event to the queue blocking if full. +func (q *queue) AddEvent(e logs.LogEvent) { + if !hasValidTime(e) { + q.logger.Errorf("The log entry in (%v/%v) with timestamp (%v) comparing to the current time (%v) is out of accepted time range. Discard the log entry.", q.target.Group, q.target.Stream, e.Time(), time.Now()) + return + } + q.eventsCh <- e +} + +// AddEventNonBlocking adds an event to the queue without blocking. If the queue is full, drops the oldest event in +// the queue. +func (q *queue) AddEventNonBlocking(e logs.LogEvent) { + if !hasValidTime(e) { + q.logger.Errorf("The log entry in (%v/%v) with timestamp (%v) comparing to the current time (%v) is out of accepted time range. Discard the log entry.", q.target.Group, q.target.Stream, e.Time(), time.Now()) + return + } + + q.initNonBlockingChOnce.Do(func() { + q.nonBlockingEventsCh = make(chan logs.LogEvent, reqEventsLimit*2) + q.startNonBlockCh <- struct{}{} // Unblock the select loop to recognize the channel merge + }) + + // Drain the channel until new event can be added + for { + select { + case q.nonBlockingEventsCh <- e: + return + default: + <-q.nonBlockingEventsCh + q.addStats("emfMetricDrop", 1) + } + } +} + +// start is the main loop for processing events and managing the queue. +func (q *queue) start() { + defer q.wg.Done() + mergeChan := make(chan logs.LogEvent) + + // Merge events from both blocking and non-blocking channel + go func() { + for { + select { + case e := <-q.eventsCh: + mergeChan <- e + case e := <-q.nonBlockingEventsCh: + mergeChan <- e + case <-q.startNonBlockCh: + case <-q.stop: + return + } + } + }() + + go q.manageFlushTimer() + + for { + select { + case e := <-mergeChan: + // Start timer when first event of the batch is added (happens after a flush timer timeout) + if len(q.batch.events) == 0 { + q.resetFlushTimer() + } + event := q.converter.convert(e) + if !q.batch.inTimeRange(event.timestamp) || !q.batch.hasSpace(event.eventBytes) { + q.send() + } + q.batch.append(event) + case <-q.flushCh: + lastSentTime, _ := q.lastSentTime.Load().(time.Time) + if time.Since(lastSentTime) >= q.flushTimeout && len(q.batch.events) > 0 { + q.send() + } else { + q.resetFlushTimer() + } + case <-q.stop: + if len(q.batch.events) > 0 { + q.send() + } + return + } + } +} + +// send the current batch of events. +func (q *queue) send() { + if len(q.batch.events) > 0 { + q.batch.addDoneCallback(q.onSuccessCallback(q.batch.bufferedSize)) + q.sender.Send(q.batch) + q.batch = newLogEventBatch(q.target, q.entityProvider) + } +} + +// onSuccessCallback returns a callback function to be executed after a successful send. +func (q *queue) onSuccessCallback(bufferedSize int) func() { + return func() { + q.lastSentTime.Store(time.Now()) + go q.addStats("rawSize", float64(bufferedSize)) + q.resetFlushTimer() + } +} + +// addStats adds statistics to the profiler. +func (q *queue) addStats(statsName string, value float64) { + statsKey := []string{"cloudwatchlogs", q.target.Group, statsName} + profiler.Profiler.AddStats(statsKey, value) +} + +// manageFlushTimer manages the flush timer for the queue. Needed since the timer Stop/Reset functions cannot +// be called concurrently. +func (q *queue) manageFlushTimer() { + for { + select { + case <-q.flushTimer.C: + q.flushCh <- struct{}{} + case <-q.resetTimerCh: + q.stopFlushTimer() + q.flushTimer.Reset(q.flushTimeout) + case <-q.stop: + q.stopFlushTimer() + return + } + } +} + +// stopFlushTimer stops the timer and attempts to drain it. +func (q *queue) stopFlushTimer() { + if !q.flushTimer.Stop() { + select { + case <-q.flushTimer.C: + default: + } + } +} + +// resetFlushTimer sends a reset timer request if there isn't already one pending. +func (q *queue) resetFlushTimer() { + select { + case q.resetTimerCh <- struct{}{}: + default: + } +} + +func hasValidTime(e logs.LogEvent) bool { + //http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents + //* None of the log events in the logEventBatch can be more than 2 hours in the future. + //* None of the log events in the logEventBatch can be older than 14 days or the retention period of the log group. + if !e.Time().IsZero() { + now := time.Now() + dt := now.Sub(e.Time()).Hours() + if dt > 24*14 || dt < -2 { + return false + } + } + return true +} diff --git a/plugins/outputs/cloudwatchlogs/pusher_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go similarity index 53% rename from plugins/outputs/cloudwatchlogs/pusher_test.go rename to plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go index fc3957e5cc..c270145ab1 100644 --- a/plugins/outputs/cloudwatchlogs/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT -package cloudwatchlogs +package pusher import ( "bytes" @@ -17,7 +17,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "github.com/aws/amazon-cloudwatch-agent/logs" @@ -25,94 +25,46 @@ import ( "github.com/aws/amazon-cloudwatch-agent/tool/util" ) -type mockLogSrc struct { - logs.LogSrc - returnEmpty bool -} - -func (m *mockLogSrc) Entity() *cloudwatchlogs.Entity { - entity := &cloudwatchlogs.Entity{ - Attributes: map[string]*string{ - "PlatformType": aws.String("AWS::EC2"), - "EC2.InstanceId": aws.String("i-123456789"), - "EC2.AutoScalingGroup": aws.String("test-group"), - }, - KeyAttributes: map[string]*string{ - "Name": aws.String("myService"), - "Environment": aws.String("myEnvironment"), - "AwsAccountId": aws.String("123456789"), - }, - } - if m.returnEmpty { - return nil - } - return entity -} - -var wg sync.WaitGroup - -type svcMock struct { +type stubLogsService struct { ple func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) clg func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) cls func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) prp func(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) } -func (s *svcMock) PutLogEvents(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { +func (s *stubLogsService) PutLogEvents(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { if s.ple != nil { return s.ple(in) } return nil, nil } -func (s *svcMock) CreateLogGroup(in *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + +func (s *stubLogsService) CreateLogGroup(in *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { if s.clg != nil { return s.clg(in) } return nil, nil } -func (s *svcMock) CreateLogStream(in *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + +func (s *stubLogsService) CreateLogStream(in *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { if s.cls != nil { return s.cls(in) } return nil, nil } -func (s *svcMock) PutRetentionPolicy(in *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + +func (s *stubLogsService) PutRetentionPolicy(in *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { if s.prp != nil { return s.prp(in) } return nil, nil } -func TestNewPusher(t *testing.T) { - var s svcMock - stop, p := testPreparation(-1, &s, time.Second, maxRetryTimeout) - - require.Equal(t, &s, p.Service, "Pusher service does not match the service passed in") - require.Equal(t, p.Group, "G", fmt.Sprintf("Pusher initialized with the wrong target: %v", p.Target)) - require.Equal(t, p.Stream, "S", fmt.Sprintf("Pusher initialized with the wrong target: %v", p.Target)) - - close(stop) - wg.Wait() -} - -type evtMock struct { - m string - t time.Time - d func() -} - -func (e evtMock) Message() string { return e.m } -func (e evtMock) Time() time.Time { return e.t } -func (e evtMock) Done() { - if e.d != nil { - e.d() - } -} - func TestAddSingleEvent_WithAccountId(t *testing.T) { - var s svcMock + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService called := false - nst := "NEXT_SEQ_TOKEN" expectedEntity := &cloudwatchlogs.Entity{ Attributes: map[string]*string{ "PlatformType": aws.String("AWS::EC2"), @@ -129,10 +81,6 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { called = true - if in.SequenceToken != nil { - t.Errorf("PutLogEvents called with wrong sequenceToken, first call should not provide any token") - } - if *in.LogGroupName != "G" || *in.LogStreamName != "S" { t.Errorf("PutLogEvents called with wrong group and stream: %v/%v", *in.LogGroupName, *in.LogStreamName) } @@ -141,39 +89,33 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { t.Errorf("PutLogEvents called with incorrect message, got: '%v'", *in.LogEvents[0].Message) } require.Equal(t, expectedEntity, in.Entity) - return &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: &nst, - }, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - - p.AddEvent(evtMock{"MSG", time.Now(), nil}) + ep := newMockEntityProvider(expectedEntity) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.") - p.FlushTimeout = 10 * time.Millisecond - p.resetFlushTimer() + q.flushTimeout = time.Second + q.resetFlushTimer() - time.Sleep(3 * time.Second) + time.Sleep(2 * time.Second) require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.") - require.NotNil(t, nst, *p.sequenceToken, "Pusher did not capture the NextSequenceToken") close(stop) wg.Wait() } func TestAddSingleEvent_WithoutAccountId(t *testing.T) { - var s svcMock + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService called := false - nst := "NEXT_SEQ_TOKEN" s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { called = true - if in.SequenceToken != nil { - t.Errorf("PutLogEvents called with wrong sequenceToken, first call should not provide any token") - } - if *in.LogGroupName != "G" || *in.LogStreamName != "S" { t.Errorf("PutLogEvents called with wrong group and stream: %v/%v", *in.LogGroupName, *in.LogStreamName) } @@ -182,44 +124,41 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { t.Errorf("PutLogEvents called with incorrect message, got: '%v'", *in.LogEvents[0].Message) } require.Nil(t, in.Entity) - return &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: &nst, - }, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.logSrc = &mockLogSrc{returnEmpty: true} - - p.AddEvent(evtMock{"MSG", time.Now(), nil}) + ep := newMockEntityProvider(nil) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.") - p.FlushTimeout = 10 * time.Millisecond - p.resetFlushTimer() + q.flushTimeout = time.Second + q.resetFlushTimer() - time.Sleep(3 * time.Second) + time.Sleep(2 * time.Second) require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.") - require.NotNil(t, nst, *p.sequenceToken, "Pusher did not capture the NextSequenceToken") close(stop) wg.Wait() } -func TestStopPusherWouldDoFinalSend(t *testing.T) { - var s svcMock +func TestStopQueueWouldDoFinalSend(t *testing.T) { + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService called := false - nst := "NEXT_SEQ_TOKEN" s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { called = true if len(in.LogEvents) != 1 { t.Errorf("PutLogEvents called with incorrect number of message, expecting 1, but %v received", len(in.LogEvents)) } - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent("MSG", time.Now())) - p.AddEvent(evtMock{"MSG", time.Now(), nil}) time.Sleep(10 * time.Millisecond) require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.") @@ -231,20 +170,22 @@ func TestStopPusherWouldDoFinalSend(t *testing.T) { } func TestStopPusherWouldStopRetries(t *testing.T) { - var s svcMock + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { return nil, &cloudwatchlogs.ServiceUnavailableException{} } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.AddEvent(evtMock{"MSG", time.Now(), nil}) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent("MSG", time.Now())) sendComplete := make(chan struct{}) go func() { defer close(sendComplete) - p.send() + q.send() }() close(stop) @@ -257,8 +198,9 @@ func TestStopPusherWouldStopRetries(t *testing.T) { } func TestLongMessageGetsTruncated(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService longMsg := strings.Repeat("x", msgSizeLimit+1) s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { @@ -278,117 +220,117 @@ func TestLongMessageGetsTruncated(t *testing.T) { t.Errorf("Truncated long message had the wrong suffix: %v", msg[len(msg)-30:]) } - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.AddEvent(evtMock{longMsg, time.Now(), nil}) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent(longMsg, time.Now())) - for len(p.events) < 1 { + for len(q.batch.events) < 1 { time.Sleep(10 * time.Millisecond) } - p.send() + q.send() close(stop) wg.Wait() } func TestRequestIsLessThan1MB(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService longMsg := strings.Repeat("x", msgSizeLimit) s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - length := 0 for _, le := range in.LogEvents { - length += len(*le.Message) + eventHeaderSize + length += len(*le.Message) + perEventHeaderBytes } - if length > 1024*1024 { + if length > reqSizeLimit { t.Fatalf("PutLogEvents called with payload larger than request limit of 1MB, %v received", length) } - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 8; i++ { - p.AddEvent(evtMock{longMsg, time.Now(), nil}) + q.AddEvent(newStubLogEvent(longMsg, time.Now())) } time.Sleep(10 * time.Millisecond) - p.send() - p.send() + q.send() + q.send() close(stop) wg.Wait() } func TestRequestIsLessThan10kEvents(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService msg := "m" s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - if len(in.LogEvents) > 10000 { t.Fatalf("PutLogEvents called with more than 10k events, %v received", len(in.LogEvents)) } - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 30000; i++ { - p.AddEvent(evtMock{msg, time.Now(), nil}) + q.AddEvent(newStubLogEvent(msg, time.Now())) } time.Sleep(10 * time.Millisecond) for i := 0; i < 5; i++ { - p.send() + q.send() } close(stop) wg.Wait() } func TestTimestampPopulation(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - if len(in.LogEvents) > 10000 { t.Fatalf("PutLogEvents called with more than 10k events, %v received", len(in.LogEvents)) } - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 3; i++ { - p.AddEvent(evtMock{"msg", time.Time{}, nil}) // time.Time{} creates zero time + q.AddEvent(newStubLogEvent("msg", time.Time{})) } time.Sleep(10 * time.Millisecond) for i := 0; i < 5; i++ { - p.send() + q.send() } close(stop) wg.Wait() } func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + var wg sync.WaitGroup + var s stubLogsService s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { t.Errorf("PutLogEvents should not be called for out of range events") - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - stop, p := testPreparation(-1, &s, 10*time.Millisecond, maxRetryTimeout) - p.AddEvent(evtMock{"MSG", time.Now().Add(-15 * 24 * time.Hour), nil}) - p.AddEvent(evtMock{"MSG", time.Now().Add(2*time.Hour + 1*time.Minute), nil}) + stop, q := testPreparation(-1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent("MSG", time.Now().Add(-15*24*time.Hour))) + q.AddEventNonBlocking(newStubLogEvent("MSG", time.Now().Add(2*time.Hour+1*time.Minute))) loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") require.Equal(t, 2, len(loglines), fmt.Sprintf("Expecting 2 error logs, but %d received", len(loglines))) @@ -406,8 +348,9 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { } func TestAddMultipleEvents(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { if *in.LogGroupName != "G" || *in.LogStreamName != "S" { @@ -427,41 +370,36 @@ func TestAddMultipleEvents(t *testing.T) { } } - return &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: &nst, - }, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - var evts []evtMock + var evts []logs.LogEvent start := time.Now().Add(-100 * time.Millisecond) for i := 0; i < 100; i++ { - e := evtMock{ + evts = append(evts, newStubLogEvent( fmt.Sprintf("MSG - %v", i), - start.Add(time.Duration(i) * time.Millisecond), - nil, - } - evts = append(evts, e) + start.Add(time.Duration(i)*time.Millisecond), + )) } evts[10], evts[90] = evts[90], evts[10] // make events out of order - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for _, e := range evts { - p.AddEvent(e) + q.AddEvent(e) } - p.FlushTimeout = 10 * time.Millisecond - p.resetFlushTimer() + q.flushTimeout = 10 * time.Millisecond + q.resetFlushTimer() - time.Sleep(3 * time.Second) - require.NotNil(t, p.sequenceToken, "Pusher did not capture the NextSequenceToken") - require.Equal(t, nst, *p.sequenceToken, "Pusher did not capture the NextSequenceToken") + time.Sleep(time.Second) close(stop) wg.Wait() } func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService ci := 0 s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { @@ -477,12 +415,8 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { } ci++ - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } else if ci == 1 { - if *in.SequenceToken != nst { - t.Errorf("PutLogEvents called without correct sequenceToken") - } - if len(in.LogEvents) != 1 { t.Errorf("PutLogEvents called with incorrect number of message, expecting 1, but %v received", len(in.LogEvents)) } @@ -491,28 +425,28 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { if *le.Message != "MSG now" { t.Errorf("PutLogEvents received wrong message: '%v'", *le.Message) } - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } t.Errorf("PutLogEvents should not be call more the 2 times") return nil, nil } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.AddEvent(evtMock{"MSG 25hrs ago", time.Now().Add(-25 * time.Hour), nil}) - p.AddEvent(evtMock{"MSG 24hrs ago", time.Now().Add(-24 * time.Hour), nil}) - p.AddEvent(evtMock{"MSG 23hrs ago", time.Now().Add(-23 * time.Hour), nil}) - p.AddEvent(evtMock{"MSG now", time.Now(), nil}) - p.FlushTimeout = 10 * time.Millisecond - p.resetFlushTimer() + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent("MSG 25hrs ago", time.Now().Add(-25*time.Hour))) + q.AddEvent(newStubLogEvent("MSG 24hrs ago", time.Now().Add(-24*time.Hour))) + q.AddEvent(newStubLogEvent("MSG 23hrs ago", time.Now().Add(-23*time.Hour))) + q.AddEvent(newStubLogEvent("MSG now", time.Now())) + q.flushTimeout = 10 * time.Millisecond + q.resetFlushTimer() time.Sleep(20 * time.Millisecond) close(stop) wg.Wait() } func TestUnhandledErrorWouldNotResend(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + var wg sync.WaitGroup + var s stubLogsService cnt := 0 s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { @@ -521,15 +455,14 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { return nil, errors.New("unhandled error") } t.Errorf("Pusher should not attempt a resend when an unhandled error has been returned") - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.AddEvent(evtMock{"msg", time.Now(), nil}) - p.FlushTimeout = 10 * time.Millisecond + stop, q := testPreparation(-1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) logline := logbuf.String() @@ -544,8 +477,8 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { } func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + var wg sync.WaitGroup + var s stubLogsService var plec, clgc, clsc int s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { @@ -554,14 +487,9 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { case 0: e = &cloudwatchlogs.ResourceNotFoundException{} case 1: - e = &cloudwatchlogs.InvalidSequenceTokenException{ - Message_: aws.String("Invalid SequenceToken"), - ExpectedSequenceToken: &nst, - } - case 2: e = awserr.New("Unknown Error", "", nil) - case 3: - return &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &nst}, nil + case 2: + return &cloudwatchlogs.PutLogEventsOutput{}, nil default: t.Errorf("Unexpected PutLogEvents call (%d time)", plec) } @@ -581,23 +509,19 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.AddEvent(evtMock{"msg", time.Now(), nil}) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(10 * time.Millisecond) - p.send() + q.send() - foundInvalidSeqToken, foundUnknownErr := false, false + foundUnknownErr := false loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") for _, logline := range loglines { - if (strings.Contains(logline, "W!") || strings.Contains(logline, "I!")) && strings.Contains(logline, "Invalid SequenceToken") { - foundInvalidSeqToken = true - } if strings.Contains(logline, "E!") && strings.Contains(logline, "Unknown Error") { foundUnknownErr = true } } - require.True(t, foundInvalidSeqToken, fmt.Sprintf("Expecting error log with Invalid SequenceToken, but received '%s' in the log", logbuf.String())) require.True(t, foundUnknownErr, fmt.Sprintf("Expecting error log with unknown error, but received '%s' in the log", logbuf.String())) log.SetOutput(os.Stderr) @@ -606,85 +530,12 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { wg.Wait() } -func TestCreateLogGroupWithError(t *testing.T) { - var s svcMock - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - - // test normal case. 1. creating stream fails, 2, creating group succeeds, 3, creating stream succeeds. - var cnt_clg int - var cnt_cls int - s.clg = func(in *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { - cnt_clg++ - return nil, nil - } - s.cls = func(in *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { - cnt_cls++ - if cnt_cls == 1 { - return nil, awserr.New(cloudwatchlogs.ErrCodeResourceNotFoundException, "", nil) - } - - if cnt_cls == 2 { - return nil, nil - } - - t.Errorf("CreateLogStream should not be called when CreateLogGroup failed.") - return nil, nil - } - - p.createLogGroupAndStream() - - require.Equal(t, 1, cnt_clg, "CreateLogGroup was not called.") - require.Equal(t, 2, cnt_cls, "CreateLogStream was not called.") - - // test creating stream succeeds - cnt_clg = 0 - cnt_cls = 0 - s.clg = func(in *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { - cnt_clg++ - return nil, awserr.New(cloudwatchlogs.ErrCodeResourceAlreadyExistsException, "", nil) - } - s.cls = func(in *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { - cnt_cls++ - return nil, nil - } - - p.createLogGroupAndStream() - - require.Equal(t, 1, cnt_cls, "CreateLogSteam was not called after CreateLogGroup returned ResourceAlreadyExistsException.") - require.Equal(t, 0, cnt_clg, "CreateLogGroup should not be called when logstream is created successfully at first time.") - - // test creating group fails - cnt_clg = 0 - cnt_cls = 0 - s.clg = func(in *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { - cnt_clg++ - return nil, awserr.New(cloudwatchlogs.ErrCodeOperationAbortedException, "", nil) - } - s.cls = func(in *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { - cnt_cls++ - return nil, awserr.New(cloudwatchlogs.ErrCodeResourceNotFoundException, "", nil) - } - - err := p.createLogGroupAndStream() - require.Error(t, err, "createLogGroupAndStream should return err.") - - awsErr, ok := err.(awserr.Error) - require.False(t, ok && awsErr.Code() != cloudwatchlogs.ErrCodeOperationAbortedException, "createLogGroupAndStream should return ErrCodeOperationAbortedException.") - - require.Equal(t, 1, cnt_cls, "CreateLogSteam should be called for one time.") - require.Equal(t, 1, cnt_clg, "CreateLogGroup should be called for one time.") - - close(stop) - wg.Wait() -} - func TestLogRejectedLogEntryInfo(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + var wg sync.WaitGroup + var s stubLogsService s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { return &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: &nst, RejectedLogEventsInfo: &cloudwatchlogs.RejectedLogEventsInfo{ TooOldLogEventEndIndex: aws.Int64(100), TooNewLogEventStartIndex: aws.Int64(200), @@ -696,10 +547,10 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.AddEvent(evtMock{"msg", time.Now(), nil}) + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(10 * time.Millisecond) - p.send() + q.send() loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") require.Len(t, loglines, 4, fmt.Sprintf("Expecting 3 error logs, but %d received", len(loglines))) @@ -723,8 +574,9 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { } func TestAddEventNonBlocking(t *testing.T) { - var s svcMock - nst := "NEXT_SEQ_TOKEN" + t.Parallel() + var wg sync.WaitGroup + var s stubLogsService const N = 100 s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { @@ -732,95 +584,35 @@ func TestAddEventNonBlocking(t *testing.T) { t.Errorf("PutLogEvents called with incorrect number of message, only %v received", len(in.LogEvents)) } - return &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: &nst, - }, nil + return &cloudwatchlogs.PutLogEventsOutput{}, nil } - var evts []evtMock + var evts []logs.LogEvent start := time.Now().Add(-N * time.Millisecond) for i := 0; i < N; i++ { - e := evtMock{ + evts = append(evts, newStubLogEvent( fmt.Sprintf("MSG - %v", i), - start.Add(time.Duration(i) * time.Millisecond), - nil, - } - evts = append(evts, e) + start.Add(time.Duration(i)*time.Millisecond), + )) } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.FlushTimeout = 50 * time.Millisecond - p.resetFlushTimer() + stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q.flushTimeout = 50 * time.Millisecond + q.resetFlushTimer() time.Sleep(200 * time.Millisecond) // Wait until pusher started, merge channel is blocked for _, e := range evts { - p.AddEventNonBlocking(e) + q.AddEventNonBlocking(e) } - time.Sleep(3 * time.Second) - require.NotNil(t, p.sequenceToken, "Pusher did not capture the NextSequenceToken") - require.NotNil(t, nst, *p.sequenceToken, "Pusher did not capture the NextSequenceToken") + time.Sleep(time.Second) close(stop) wg.Wait() } -func TestPutRetentionNegativeInput(t *testing.T) { - var s svcMock - var prpc int - s.prp = func(in *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { - prpc++ - return nil, nil - } - stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) - p.putRetentionPolicy() - - require.NotEqual(t, 1, prpc, "Put Retention Policy api shouldn't have been called") - - close(stop) - wg.Wait() -} - -func TestPutRetentionValidMaxInput(t *testing.T) { - var s svcMock - var prpc = 0 - s.prp = func(in *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { - prpc++ - return nil, nil - } - stop, p := testPreparation(1000000000000000000, &s, 1*time.Hour, maxRetryTimeout) - p.putRetentionPolicy() - - require.Equal(t, 2, prpc, fmt.Sprintf("Put Retention Policy api should have been called twice. Number of times called: %v", prpc)) - - close(stop) - wg.Wait() -} - -func TestPutRetentionWhenError(t *testing.T) { - var s svcMock - var prpc int - s.prp = func(in *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { - prpc++ - return nil, awserr.New(cloudwatchlogs.ErrCodeResourceNotFoundException, "", nil) - - } - var logbuf bytes.Buffer - log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - - stop, p := testPreparation(1, &s, 1*time.Hour, maxRetryTimeout) - time.Sleep(10 * time.Millisecond) - - loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") - logline := loglines[0] - - require.NotEqual(t, 0, prpc, fmt.Sprintf("Put Retention Policy should have been called on creation with retention of %v", p.Retention)) - require.True(t, strings.Contains(logline, "ResourceNotFound"), fmt.Sprintf("Expecting ResourceNotFoundException but got '%s' in the log", logbuf.String())) - - close(stop) - wg.Wait() -} func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { - var s svcMock + var wg sync.WaitGroup + var s stubLogsService cnt := 0 s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { @@ -831,9 +623,9 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { var logbuf bytes.Buffer log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - stop, p := testPreparation(-1, &s, 10*time.Millisecond, time.Second) - p.AddEvent(evtMock{"msg", time.Now(), nil}) - time.Sleep(4 * time.Second) + stop, q := testPreparation(-1, &s, 10*time.Millisecond, time.Second, nil, &wg) + q.AddEvent(newStubLogEvent("msg", time.Now())) + time.Sleep(2 * time.Second) loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") lastline := loglines[len(loglines)-1] @@ -846,9 +638,26 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { wg.Wait() } -func testPreparation(retention int, s *svcMock, flushTimeout time.Duration, retryDuration time.Duration) (chan struct{}, *pusher) { +func testPreparation( + retention int, + service *stubLogsService, + flushTimeout time.Duration, + retryDuration time.Duration, + entityProvider logs.LogEntityProvider, + wg *sync.WaitGroup, +) (chan struct{}, *queue) { stop := make(chan struct{}) - mockLogSrcObj := &mockLogSrc{} - p := NewPusher("us-east-1", Target{"G", "S", util.StandardLogGroupClass, retention}, s, flushTimeout, retryDuration, models.NewLogger("cloudwatchlogs", "test", ""), stop, &wg, mockLogSrcObj) - return stop, p + logger := testutil.Logger{Name: "test"} + tm := NewTargetManager(logger, service) + s := newSender(logger, service, tm, retryDuration, stop) + q := newQueue( + logger, + Target{"G", "S", util.StandardLogGroupClass, retention}, + flushTimeout, + entityProvider, + s, + stop, + wg, + ) + return stop, q.(*queue) } diff --git a/plugins/outputs/cloudwatchlogs/retry.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retry.go similarity index 85% rename from plugins/outputs/cloudwatchlogs/retry.go rename to plugins/outputs/cloudwatchlogs/internal/pusher/retry.go index 149ad77509..edb12eb49d 100644 --- a/plugins/outputs/cloudwatchlogs/retry.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retry.go @@ -1,9 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT -package cloudwatchlogs +package pusher import ( + "errors" + "math/rand" "net" "strings" "time" @@ -32,6 +34,10 @@ const ( maxRetryDelay = 1 * time.Minute ) +var ( + seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) // nolint:gosec +) + type retryWaitStrategy int const ( @@ -75,7 +81,8 @@ func chooseRetryWaitStrategy(err error) retryWaitStrategy { } // Check AWS Error codes if available - if awsErr, ok := err.(awserr.Error); ok { + var awsErr awserr.Error + if errors.As(err, &awsErr) { switch awsErr.Code() { case cloudwatchlogs.ErrCodeServiceUnavailableException, @@ -86,7 +93,8 @@ func chooseRetryWaitStrategy(err error) retryWaitStrategy { } // Check HTTP status codes if available - if requestFailure, ok := err.(awserr.RequestFailure); ok { + var requestFailure awserr.RequestFailure + if errors.As(err, &requestFailure) { switch requestFailure.StatusCode() { case 500, // internal failure @@ -101,22 +109,19 @@ func chooseRetryWaitStrategy(err error) retryWaitStrategy { } func isErrConnectionTimeout(err error) bool { - netErr, ok := err.(net.Error) - return ok && netErr.Timeout() + var netErr net.Error + return errors.As(err, &netErr) && netErr.Timeout() } func isErrConnectionReset(err error) bool { - if strings.Contains(err.Error(), "read: connection reset") { + errStr := err.Error() + if strings.Contains(errStr, "read: connection reset") { return false } - if strings.Contains(err.Error(), "use of closed network connection") || - strings.Contains(err.Error(), "connection reset") || - strings.Contains(err.Error(), "broken pipe") { - return true - } - - return false + return strings.Contains(errStr, "use of closed network connection") || + strings.Contains(errStr, "connection reset") || + strings.Contains(errStr, "broken pipe") } func isErrConnectionRefused(err error) bool { diff --git a/plugins/outputs/cloudwatchlogs/retry_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retry_test.go similarity index 99% rename from plugins/outputs/cloudwatchlogs/retry_test.go rename to plugins/outputs/cloudwatchlogs/internal/pusher/retry_test.go index 095c2d66fa..116ce25239 100644 --- a/plugins/outputs/cloudwatchlogs/retry_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retry_test.go @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT -package cloudwatchlogs +package pusher import ( "errors" diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go new file mode 100644 index 0000000000..097f2d6acc --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go @@ -0,0 +1,140 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "errors" + "sync/atomic" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/influxdata/telegraf" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +type cloudWatchLogsService interface { + PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) + CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) + CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) + PutRetentionPolicy(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) +} + +type Sender interface { + Send(*logEventBatch) + SetRetryDuration(time.Duration) + RetryDuration() time.Duration +} + +type sender struct { + service cloudWatchLogsService + retryDuration atomic.Value + targetManager TargetManager + logger telegraf.Logger + stop <-chan struct{} +} + +func newSender( + logger telegraf.Logger, + service cloudWatchLogsService, + targetManager TargetManager, + retryDuration time.Duration, + stop <-chan struct{}, +) Sender { + s := &sender{ + logger: logger, + service: service, + targetManager: targetManager, + stop: stop, + } + s.retryDuration.Store(retryDuration) + return s +} + +// Send attempts to send a batch of log events to CloudWatch Logs. Will retry failed attempts until it reaches the +// RetryDuration or an unretryable error. +func (s *sender) Send(batch *logEventBatch) { + if len(batch.events) == 0 { + return + } + input := batch.build() + startTime := time.Now() + + retryCountShort := 0 + retryCountLong := 0 + for { + output, err := s.service.PutLogEvents(input) + if err == nil { + if output.RejectedLogEventsInfo != nil { + info := output.RejectedLogEventsInfo + if info.TooOldLogEventEndIndex != nil { + s.logger.Warnf("%d log events for log '%s/%s' are too old", *info.TooOldLogEventEndIndex, batch.Group, batch.Stream) + } + if info.TooNewLogEventStartIndex != nil { + s.logger.Warnf("%d log events for log '%s/%s' are too new", *info.TooNewLogEventStartIndex, batch.Group, batch.Stream) + } + if info.ExpiredLogEventEndIndex != nil { + s.logger.Warnf("%d log events for log '%s/%s' are expired", *info.ExpiredLogEventEndIndex, batch.Group, batch.Stream) + } + } + batch.done() + s.logger.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(batch.events), batch.Group, batch.Stream, batch.bufferedSize/1024, time.Since(startTime)) + return + } + + var awsErr awserr.Error + if !errors.As(err, &awsErr) { + s.logger.Errorf("Non aws error received when sending logs to %v/%v: %v. CloudWatch agent will not retry and logs will be missing!", batch.Group, batch.Stream, err) + return + } + + switch e := awsErr.(type) { + case *cloudwatchlogs.ResourceNotFoundException: + if targetErr := s.targetManager.InitTarget(batch.Target); targetErr != nil { + s.logger.Errorf("Unable to create log stream %v/%v: %v", batch.Group, batch.Stream, targetErr) + break + } + case *cloudwatchlogs.InvalidParameterException, + *cloudwatchlogs.DataAlreadyAcceptedException: + s.logger.Errorf("%v, will not retry the request", e) + return + default: + s.logger.Errorf("Aws error received when sending logs to %v/%v: %v", batch.Group, batch.Stream, awsErr) + } + + // retry wait strategy depends on the type of error returned + var wait time.Duration + if chooseRetryWaitStrategy(err) == retryLong { + wait = retryWaitLong(retryCountLong) + retryCountLong++ + } else { + wait = retryWaitShort(retryCountShort) + retryCountShort++ + } + + if time.Since(startTime)+wait > s.RetryDuration() { + s.logger.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream) + return + } + + s.logger.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait) + + select { + case <-s.stop: + s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream) + return + case <-time.After(wait): + } + } +} + +// SetRetryDuration sets the maximum duration for retrying failed log sends. +func (s *sender) SetRetryDuration(retryDuration time.Duration) { + s.retryDuration.Store(retryDuration) +} + +// RetryDuration returns the current maximum retry duration. +func (s *sender) RetryDuration() time.Duration { + return s.retryDuration.Load().(time.Duration) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go new file mode 100644 index 0000000000..b8d0c52d38 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go @@ -0,0 +1,196 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "errors" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +type mockLogsService struct { + mock.Mock +} + +func (m *mockLogsService) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + args := m.Called(input) + return args.Get(0).(*cloudwatchlogs.PutLogEventsOutput), args.Error(1) +} + +func (m *mockLogsService) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + args := m.Called(input) + return args.Get(0).(*cloudwatchlogs.CreateLogStreamOutput), args.Error(1) +} + +func (m *mockLogsService) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + args := m.Called(input) + return args.Get(0).(*cloudwatchlogs.CreateLogGroupOutput), args.Error(1) +} + +func (m *mockLogsService) PutRetentionPolicy(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + args := m.Called(input) + return args.Get(0).(*cloudwatchlogs.PutRetentionPolicyOutput), args.Error(1) +} + +type mockTargetManager struct { + mock.Mock +} + +func (m *mockTargetManager) InitTarget(target Target) error { + args := m.Called(target) + return args.Error(0) +} + +func (m *mockTargetManager) PutRetentionPolicy(target Target) { + m.Called(target) +} + +func TestSender(t *testing.T) { + logger := testutil.Logger{Name: "test"} + + t.Run("Send/RejectedLogEvents", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + rejectedInfo := &cloudwatchlogs.RejectedLogEventsInfo{ + TooOldLogEventEndIndex: aws.Int64(1), + TooNewLogEventStartIndex: aws.Int64(2), + ExpiredLogEventEndIndex: aws.Int64(3), + } + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{RejectedLogEventsInfo: rejectedInfo}, nil).Once() + + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s.Send(batch) + + mockService.AssertExpectations(t) + }) + + t.Run("Send/ResourceNotFound", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.ResourceNotFoundException{}).Twice() + mockManager.On("InitTarget", mock.Anything).Return(errors.New("test")).Once() + mockManager.On("InitTarget", mock.Anything).Return(nil).Once() + mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() + + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s.Send(batch) + + mockService.AssertExpectations(t) + mockManager.AssertExpectations(t) + }) + + t.Run("Error/InvalidParameter", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.InvalidParameterException{}).Once() + + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s.Send(batch) + + mockService.AssertExpectations(t) + }) + + t.Run("Error/DataAlreadyAccepted", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.DataAlreadyAcceptedException{}).Once() + + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s.Send(batch) + + mockService.AssertExpectations(t) + }) + + t.Run("Error/DropOnGeneric", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, errors.New("test")).Once() + + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s.Send(batch) + + mockService.AssertExpectations(t) + }) + + t.Run("Error/RetryOnGenericAWS", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() + + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s.Send(batch) + + mockService.AssertExpectations(t) + }) + + t.Run("DropOnRetryExhaustion", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() + + s := newSender(logger, mockService, mockManager, 100*time.Millisecond, make(chan struct{})) + s.Send(batch) + + mockService.AssertExpectations(t) + }) + + t.Run("StopChannelClosed", func(t *testing.T) { + batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything). + Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() + + stopCh := make(chan struct{}) + s := newSender(logger, mockService, mockManager, time.Second, stopCh) + + go func() { + time.Sleep(50 * time.Millisecond) + close(stopCh) + }() + + s.Send(batch) + + mockService.AssertExpectations(t) + }) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go new file mode 100644 index 0000000000..a15e1fb8dd --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go @@ -0,0 +1,140 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/influxdata/telegraf" + "golang.org/x/sync/singleflight" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +type Target struct { + Group, Stream, Class string + Retention int +} + +func (t Target) String() string { + return fmt.Sprintf("%s:%s:%s:%v", t.Group, t.Stream, t.Class, t.Retention) +} + +type TargetManager interface { + InitTarget(target Target) error + PutRetentionPolicy(target Target) +} + +type targetManager struct { + logger telegraf.Logger + service cloudWatchLogsService + // cache of initialized targets + cache map[Target]struct{} + group singleflight.Group +} + +func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager { + return &targetManager{ + logger: logger, + service: service, + cache: make(map[Target]struct{}), + } +} + +// InitTarget initializes a Target if it hasn't been initialized before. +func (m *targetManager) InitTarget(target Target) error { + _, err, _ := m.group.Do(target.String(), func() (any, error) { + if _, ok := m.cache[target]; !ok { + err := m.createLogGroupAndStream(target) + if err != nil { + return nil, err + } + m.PutRetentionPolicy(target) + m.cache[target] = struct{}{} + } + return nil, nil + }) + return err +} + +func (m *targetManager) createLogGroupAndStream(t Target) error { + err := m.createLogStream(t) + if err == nil { + return nil + } + + m.logger.Debugf("creating stream fail due to : %v", err) + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { + err = m.createLogGroup(t) + + // attempt to create stream again if group created successfully. + if err == nil { + m.logger.Debugf("successfully created log group %v. Retrying log stream %v", t.Group, t.Stream) + err = m.createLogStream(t) + } else { + m.logger.Debugf("creating group fail due to : %v", err) + } + } + + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException { + m.logger.Debugf("Resource was already created. %v\n", err) + return nil // if the log group or log stream already exist, this is not worth returning an error for + } + + return err +} + +func (m *targetManager) createLogGroup(t Target) error { + var err error + if t.Class != "" { + _, err = m.service.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: &t.Group, + LogGroupClass: &t.Class, + }) + } else { + _, err = m.service.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: &t.Group, + }) + } + return err +} + +func (m *targetManager) createLogStream(t Target) error { + _, err := m.service.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + LogGroupName: &t.Group, + LogStreamName: &t.Stream, + }) + + if err == nil { + m.logger.Debugf("successfully created log stream %v", t.Stream) + return nil + } + return err +} + +// PutRetentionPolicy tries to set the retention policy for a log group. Does not retry on failure. +func (m *targetManager) PutRetentionPolicy(t Target) { + if t.Retention > 0 { + i := aws.Int64(int64(t.Retention)) + putRetentionInput := &cloudwatchlogs.PutRetentionPolicyInput{ + LogGroupName: &t.Group, + RetentionInDays: i, + } + _, err := m.service.PutRetentionPolicy(putRetentionInput) + if err != nil { + // since this gets called both before we start pushing logs, and after we first attempt + // to push a log to a non-existent log group, we don't want to dirty the log with an error + // if the error is that the log group doesn't exist (yet). + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { + m.logger.Debugf("Log group %v not created yet: %v", t.Group, err) + } else { + m.logger.Errorf("Unable to put retention policy for log group %v: %v ", t.Group, err) + } + } else { + m.logger.Debugf("successfully updated log retention policy for log group %v", t.Group) + } + } +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go new file mode 100644 index 0000000000..c9f4ca5232 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -0,0 +1,148 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +func TestTargetManager(t *testing.T) { + logger := testutil.Logger{Name: "test"} + + t.Run("CreateLogStream", func(t *testing.T) { + target := Target{Group: "G", Stream: "S"} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() + + manager := NewTargetManager(logger, mockService) + err := manager.InitTarget(target) + + assert.NoError(t, err) + mockService.AssertExpectations(t) + }) + + t.Run("CreateLogGroupAndStream", func(t *testing.T) { + target := Target{Group: "G", Stream: "S", Class: "newClass"} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything). + Return(&cloudwatchlogs.CreateLogStreamOutput{}, awserr.New(cloudwatchlogs.ErrCodeResourceNotFoundException, "Log group not found", nil)).Once() + mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, nil).Once() + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once() + + manager := NewTargetManager(logger, mockService) + err := manager.InitTarget(target) + + assert.NoError(t, err) + mockService.AssertExpectations(t) + }) + + t.Run("CreateLogGroup/Error", func(t *testing.T) { + target := Target{Group: "G", Stream: "S"} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything). + Return(&cloudwatchlogs.CreateLogStreamOutput{}, awserr.New(cloudwatchlogs.ErrCodeResourceNotFoundException, "Log group not found", nil)).Once() + mockService.On("CreateLogGroup", mock.Anything). + Return(&cloudwatchlogs.CreateLogGroupOutput{}, awserr.New("SomeAWSError", "Failed to create log group", nil)).Once() + + manager := NewTargetManager(logger, mockService) + err := manager.InitTarget(target) + + assert.Error(t, err) + mockService.AssertExpectations(t) + }) + + t.Run("SetRetentionPolicy", func(t *testing.T) { + target := Target{Group: "G", Stream: "S", Retention: 7} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() + mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once() + + manager := NewTargetManager(logger, mockService) + err := manager.InitTarget(target) + + assert.NoError(t, err) + mockService.AssertExpectations(t) + }) + + t.Run("SetRetentionPolicy/LogGroupNotFound", func(t *testing.T) { + target := Target{Group: "G", Stream: "S", Retention: 7} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() + mockService.On("PutRetentionPolicy", mock.Anything). + Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, &cloudwatchlogs.ResourceNotFoundException{}).Once() + + manager := NewTargetManager(logger, mockService) + err := manager.InitTarget(target) + + assert.NoError(t, err) // The overall operation should still succeed even if setting retention policy fails + mockService.AssertExpectations(t) + }) + + t.Run("SetRetentionPolicy/Error", func(t *testing.T) { + target := Target{Group: "G", Stream: "S", Retention: 7} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() + mockService.On("PutRetentionPolicy", mock.Anything). + Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, awserr.New("SomeAWSError", "Failed to set retention policy", nil)).Once() + + manager := NewTargetManager(logger, mockService) + err := manager.InitTarget(target) + + assert.NoError(t, err) // The overall operation should still succeed even if setting retention policy fails + mockService.AssertExpectations(t) + }) + + t.Run("SetRetentionPolicy/Negative", func(t *testing.T) { + target := Target{Group: "G", Stream: "S", Retention: -1} + + mockService := new(mockLogsService) + + manager := NewTargetManager(logger, mockService) + manager.PutRetentionPolicy(target) + + mockService.AssertNotCalled(t, "PutRetentionPolicy", mock.Anything) + }) + + t.Run("ConcurrentInit", func(t *testing.T) { + target := Target{Group: "G", Stream: "S"} + + var count atomic.Int32 + service := new(stubLogsService) + service.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + time.Sleep(10 * time.Millisecond) + count.Add(1) + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } + + manager := NewTargetManager(logger, service) + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := manager.InitTarget(target) + assert.NoError(t, err) + }() + } + + wg.Wait() + assert.Equal(t, int32(1), count.Load()) + }) +} diff --git a/plugins/outputs/cloudwatchlogs/pusher.go b/plugins/outputs/cloudwatchlogs/pusher.go deleted file mode 100644 index ff0bb1dd0e..0000000000 --- a/plugins/outputs/cloudwatchlogs/pusher.go +++ /dev/null @@ -1,474 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: MIT - -package cloudwatchlogs - -import ( - "math/rand" - "sort" - "sync" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/influxdata/telegraf" - - "github.com/aws/amazon-cloudwatch-agent/logs" - "github.com/aws/amazon-cloudwatch-agent/profiler" - "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" -) - -const ( - reqSizeLimit = 1024 * 1024 - reqEventsLimit = 10000 - warnOldTimeStamp = 1 * 24 * time.Hour - warnOldTimeStampLogInterval = 1 * 5 * time.Minute -) - -var ( - seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) -) - -type CloudWatchLogsService interface { - PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) - CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) - CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) - PutRetentionPolicy(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) -} - -type pusher struct { - Target - Service CloudWatchLogsService - FlushTimeout time.Duration - RetryDuration time.Duration - Log telegraf.Logger - - region string - logSrc logs.LogSrc - events []*cloudwatchlogs.InputLogEvent - minT, maxT *time.Time - doneCallbacks []func() - eventsCh chan logs.LogEvent - nonBlockingEventsCh chan logs.LogEvent - bufferredSize int - flushTimer *time.Timer - sequenceToken *string - lastValidTime int64 - lastUpdateTime time.Time - lastWarnMessage time.Time - needSort bool - stop <-chan struct{} - lastSentTime time.Time - - initNonBlockingChOnce sync.Once - startNonBlockCh chan struct{} - wg *sync.WaitGroup -} - -func NewPusher(region string, target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger, stop <-chan struct{}, wg *sync.WaitGroup, logSrc logs.LogSrc) *pusher { - p := &pusher{ - Target: target, - Service: service, - FlushTimeout: flushTimeout, - RetryDuration: retryDuration, - Log: logger, - region: region, - logSrc: logSrc, - events: make([]*cloudwatchlogs.InputLogEvent, 0, 10), - eventsCh: make(chan logs.LogEvent, 100), - flushTimer: time.NewTimer(flushTimeout), - stop: stop, - startNonBlockCh: make(chan struct{}), - wg: wg, - } - p.putRetentionPolicy() - p.wg.Add(1) - go p.start() - return p -} - -func (p *pusher) AddEvent(e logs.LogEvent) { - if !hasValidTime(e) { - p.Log.Errorf("The log entry in (%v/%v) with timestamp (%v) comparing to the current time (%v) is out of accepted time range. Discard the log entry.", p.Group, p.Stream, e.Time(), time.Now()) - return - } - p.eventsCh <- e -} - -func (p *pusher) AddEventNonBlocking(e logs.LogEvent) { - if !hasValidTime(e) { - p.Log.Errorf("The log entry in (%v/%v) with timestamp (%v) comparing to the current time (%v) is out of accepted time range. Discard the log entry.", p.Group, p.Stream, e.Time(), time.Now()) - return - } - - p.initNonBlockingChOnce.Do(func() { - p.nonBlockingEventsCh = make(chan logs.LogEvent, reqEventsLimit*2) - p.startNonBlockCh <- struct{}{} // Unblock the select loop to recogonize the channel merge - }) - - // Drain the channel until new event can be added - for { - select { - case p.nonBlockingEventsCh <- e: - return - default: - <-p.nonBlockingEventsCh - p.addStats("emfMetricDrop", 1) - } - } -} - -func hasValidTime(e logs.LogEvent) bool { - //http://docs.aws.amazon.com/goto/SdkForGoV1/logs-2014-03-28/PutLogEvents - //* None of the log events in the batch can be more than 2 hours in the future. - //* None of the log events in the batch can be older than 14 days or the retention period of the log group. - if !e.Time().IsZero() { - now := time.Now() - dt := now.Sub(e.Time()).Hours() - if dt > 24*14 || dt < -2 { - return false - } - } - return true -} - -func (p *pusher) start() { - defer p.wg.Done() - - ec := make(chan logs.LogEvent) - - // Merge events from both blocking and non-blocking channel - go func() { - for { - select { - case e := <-p.eventsCh: - ec <- e - case e := <-p.nonBlockingEventsCh: - ec <- e - case <-p.startNonBlockCh: - case <-p.stop: - return - } - } - }() - - for { - select { - case e := <-ec: - // Start timer when first event of the batch is added (happens after a flush timer timeout) - if len(p.events) == 0 { - p.resetFlushTimer() - } - - ce := p.convertEvent(e) - et := time.Unix(*ce.Timestamp/1000, *ce.Timestamp%1000) // Cloudwatch Log Timestamp is in Millisecond - - // A batch of log events in a single request cannot span more than 24 hours. - if (p.minT != nil && et.Sub(*p.minT) > 24*time.Hour) || (p.maxT != nil && p.maxT.Sub(et) > 24*time.Hour) { - p.send() - } - - size := len(*ce.Message) + eventHeaderSize - if p.bufferredSize+size > reqSizeLimit || len(p.events) == reqEventsLimit { - p.send() - } - - if len(p.events) > 0 && *ce.Timestamp < *p.events[len(p.events)-1].Timestamp { - p.needSort = true - } - - p.events = append(p.events, ce) - p.doneCallbacks = append(p.doneCallbacks, e.Done) - p.bufferredSize += size - if p.minT == nil || p.minT.After(et) { - p.minT = &et - } - if p.maxT == nil || p.maxT.Before(et) { - p.maxT = &et - } - - case <-p.flushTimer.C: - if time.Since(p.lastSentTime) >= p.FlushTimeout && len(p.events) > 0 { - p.send() - } else { - p.resetFlushTimer() - } - case <-p.stop: - if len(p.events) > 0 { - p.send() - } - return - } - } -} - -func (p *pusher) reset() { - for i := 0; i < len(p.events); i++ { - p.events[i] = nil - } - p.events = p.events[:0] - for i := 0; i < len(p.doneCallbacks); i++ { - p.doneCallbacks[i] = nil - } - p.doneCallbacks = p.doneCallbacks[:0] - p.bufferredSize = 0 - p.needSort = false - p.minT = nil - p.maxT = nil -} - -func (p *pusher) send() { - defer p.resetFlushTimer() // Reset the flush timer after sending the request - if p.needSort { - sort.Stable(ByTimestamp(p.events)) - } - input := &cloudwatchlogs.PutLogEventsInput{ - LogEvents: p.events, - LogGroupName: &p.Group, - LogStreamName: &p.Stream, - SequenceToken: p.sequenceToken, - } - if p.logSrc != nil { - input.Entity = p.logSrc.Entity() - } - - startTime := time.Now() - - retryCountShort := 0 - retryCountLong := 0 - for { - input.SequenceToken = p.sequenceToken - output, err := p.Service.PutLogEvents(input) - if err == nil { - if output.NextSequenceToken != nil { - p.sequenceToken = output.NextSequenceToken - } - if output.RejectedLogEventsInfo != nil { - info := output.RejectedLogEventsInfo - if info.TooOldLogEventEndIndex != nil { - p.Log.Warnf("%d log events for log '%s/%s' are too old", *info.TooOldLogEventEndIndex, p.Group, p.Stream) - } - if info.TooNewLogEventStartIndex != nil { - p.Log.Warnf("%d log events for log '%s/%s' are too new", *info.TooNewLogEventStartIndex, p.Group, p.Stream) - } - if info.ExpiredLogEventEndIndex != nil { - p.Log.Warnf("%d log events for log '%s/%s' are expired", *info.ExpiredLogEventEndIndex, p.Group, p.Stream) - } - } - for i := len(p.doneCallbacks) - 1; i >= 0; i-- { - done := p.doneCallbacks[i] - done() - } - - p.Log.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(p.events), p.Group, p.Stream, p.bufferredSize/1024, time.Since(startTime)) - p.addStats("rawSize", float64(p.bufferredSize)) - - p.reset() - p.lastSentTime = time.Now() - - return - } - - awsErr, ok := err.(awserr.Error) - if !ok { - p.Log.Errorf("Non aws error received when sending logs to %v/%v: %v. CloudWatch agent will not retry and logs will be missing!", p.Group, p.Stream, err) - // Messages will be discarded but done callbacks not called - p.reset() - return - } - - switch e := awsErr.(type) { - case *cloudwatchlogs.ResourceNotFoundException: - err := p.createLogGroupAndStream() - if err != nil { - p.Log.Errorf("Unable to create log stream %v/%v: %v", p.Group, p.Stream, e.Message()) - break - } - p.putRetentionPolicy() - case *cloudwatchlogs.InvalidSequenceTokenException: - if p.sequenceToken == nil { - p.Log.Infof("First time sending logs to %v/%v since startup so sequenceToken is nil, learned new token:(%v): %v", p.Group, p.Stream, e.ExpectedSequenceToken, e.Message()) - } else { - p.Log.Warnf("Invalid SequenceToken used (%v) while sending logs to %v/%v, will use new token and retry: %v", p.sequenceToken, p.Group, p.Stream, e.Message()) - } - if e.ExpectedSequenceToken == nil { - p.Log.Errorf("Failed to find sequence token from aws response while sending logs to %v/%v: %v", p.Group, p.Stream, e.Message()) - } - p.sequenceToken = e.ExpectedSequenceToken - case *cloudwatchlogs.InvalidParameterException, - *cloudwatchlogs.DataAlreadyAcceptedException: - p.Log.Errorf("%v, will not retry the request", e) - p.reset() - return - default: - p.Log.Errorf("Aws error received when sending logs to %v/%v: %v", p.Group, p.Stream, awsErr) - } - - // retry wait strategy depends on the type of error returned - var wait time.Duration - if chooseRetryWaitStrategy(err) == retryLong { - wait = retryWaitLong(retryCountLong) - retryCountLong++ - } else { - wait = retryWaitShort(retryCountShort) - retryCountShort++ - } - - if time.Since(startTime)+wait > p.RetryDuration { - p.Log.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, p.Group, p.Stream) - p.reset() - return - } - - p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait) - - select { - case <-p.stop: - p.Log.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, p.Group, p.Stream) - p.reset() - return - case <-time.After(wait): - } - - } - -} - -func (p *pusher) createLogGroupAndStream() error { - _, err := p.Service.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ - LogGroupName: &p.Group, - LogStreamName: &p.Stream, - }) - - if err == nil { - p.Log.Debugf("successfully created log stream %v", p.Stream) - return nil - } - - p.Log.Debugf("creating stream fail due to : %v", err) - if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { - err = p.createLogGroup() - - // attempt to create stream again if group created successfully. - if err == nil { - p.Log.Debugf("successfully created log group %v. Retrying log stream %v", p.Group, p.Stream) - _, err = p.Service.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ - LogGroupName: &p.Group, - LogStreamName: &p.Stream, - }) - - if err == nil { - p.Log.Debugf("successfully created log stream %v", p.Stream) - } - } else { - p.Log.Debugf("creating group fail due to : %v", err) - } - - } - - if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException { - p.Log.Debugf("Resource was already created. %v\n", err) - return nil // if the log group or log stream already exist, this is not worth returning an error for - } - - return err -} - -func (p *pusher) createLogGroup() error { - var err error - if p.Class != "" { - _, err = p.Service.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ - LogGroupName: &p.Group, - LogGroupClass: &p.Class, - }) - } else { - _, err = p.Service.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ - LogGroupName: &p.Group, - }) - } - return err -} - -func (p *pusher) putRetentionPolicy() { - if p.Retention > 0 { - i := aws.Int64(int64(p.Retention)) - putRetentionInput := &cloudwatchlogs.PutRetentionPolicyInput{ - LogGroupName: &p.Group, - RetentionInDays: i, - } - _, err := p.Service.PutRetentionPolicy(putRetentionInput) - if err != nil { - // since this gets called both before we start pushing logs, and after we first attempt - // to push a log to a non-existent log group, we don't want to dirty the log with an error - // if the error is that the log group doesn't exist (yet). - if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { - p.Log.Debugf("Log group %v not created yet: %v", p.Group, err) - } else { - p.Log.Errorf("Unable to put retention policy for log group %v: %v ", p.Group, err) - } - } else { - p.Log.Debugf("successfully updated log retention policy for log group %v", p.Group) - } - } -} - -func (p *pusher) resetFlushTimer() { - p.flushTimer.Stop() - p.flushTimer.Reset(p.FlushTimeout) -} - -func (p *pusher) convertEvent(e logs.LogEvent) *cloudwatchlogs.InputLogEvent { - message := e.Message() - - if len(message) > msgSizeLimit { - message = message[:msgSizeLimit-len(truncatedSuffix)] + truncatedSuffix - } - var t int64 - if e.Time().IsZero() { - if p.lastValidTime != 0 { - // Where there has been a valid time before, assume most log events would have - // a valid timestamp and use the last valid timestamp for new entries that does - // not have a timestamp. - t = p.lastValidTime - if !p.lastUpdateTime.IsZero() { - // Check when timestamp has an interval of 1 days. - if (time.Since(p.lastUpdateTime) > warnOldTimeStamp) && (time.Since(p.lastWarnMessage) > warnOldTimeStampLogInterval) { - { - p.Log.Warnf("Unable to parse timestamp, using last valid timestamp found in the logs %v: which is at least older than 1 day for log group %v: ", p.lastValidTime, p.Group) - p.lastWarnMessage = time.Now() - } - } - } - } else { - t = time.Now().UnixNano() / 1000000 - } - } else { - t = e.Time().UnixNano() / 1000000 - p.lastValidTime = t - p.lastUpdateTime = time.Now() - p.lastWarnMessage = time.Time{} - } - return &cloudwatchlogs.InputLogEvent{ - Message: &message, - Timestamp: &t, - } -} - -func (p *pusher) addStats(statsName string, value float64) { - statsKey := []string{"cloudwatchlogs", p.Group, statsName} - profiler.Profiler.AddStats(statsKey, value) -} - -type ByTimestamp []*cloudwatchlogs.InputLogEvent - -func (inputLogEvents ByTimestamp) Len() int { - return len(inputLogEvents) -} - -func (inputLogEvents ByTimestamp) Swap(i, j int) { - inputLogEvents[i], inputLogEvents[j] = inputLogEvents[j], inputLogEvents[i] -} - -func (inputLogEvents ByTimestamp) Less(i, j int) bool { - return *inputLogEvents[i].Timestamp < *inputLogEvents[j].Timestamp -} diff --git a/translator/config/schema.json b/translator/config/schema.json index 74438f5a71..4938baf571 100644 --- a/translator/config/schema.json +++ b/translator/config/schema.json @@ -898,6 +898,11 @@ "type": "string", "minLength": 1, "maxLength": 259 + }, + "concurrency": { + "description": "The number of concurrent workers available for cloudwatch logs export", + "type": "integer", + "minimum": 1 } }, "additionalProperties": false, diff --git a/translator/tocwconfig/sampleConfig/log_filter.conf b/translator/tocwconfig/sampleConfig/log_filter.conf index b60a6c98a4..86078eac35 100644 --- a/translator/tocwconfig/sampleConfig/log_filter.conf +++ b/translator/tocwconfig/sampleConfig/log_filter.conf @@ -49,6 +49,7 @@ [outputs] [[outputs.cloudwatchlogs]] + concurrency = 10 force_flush_interval = "5s" log_stream_name = "LOG_STREAM_NAME" region = "us-east-1" diff --git a/translator/tocwconfig/sampleConfig/log_filter.json b/translator/tocwconfig/sampleConfig/log_filter.json index 626a9ea7cb..8ce5494f3b 100644 --- a/translator/tocwconfig/sampleConfig/log_filter.json +++ b/translator/tocwconfig/sampleConfig/log_filter.json @@ -31,6 +31,7 @@ ] } }, + "concurrency": 10, "log_stream_name": "LOG_STREAM_NAME" } } diff --git a/translator/translate/logs/ruleConcurrency.go b/translator/translate/logs/ruleConcurrency.go new file mode 100644 index 0000000000..77ce901170 --- /dev/null +++ b/translator/translate/logs/ruleConcurrency.go @@ -0,0 +1,24 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package logs + +import "github.com/aws/amazon-cloudwatch-agent/translator" + +const ConcurrencySectionKey = "concurrency" + +type Concurrency struct { +} + +func (c *Concurrency) ApplyRule(input any) (string, any) { + result := map[string]interface{}{} + _, val := translator.DefaultCase(ConcurrencySectionKey, float64(0), input) + if v, ok := val.(float64); ok && v > 0 { + result[ConcurrencySectionKey] = int(v) + } + return Output_Cloudwatch_Logs, result +} + +func init() { + RegisterRule(ConcurrencySectionKey, new(Concurrency)) +} From b71972db81720f9b519932492d144f4de93a2f3d Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Mon, 13 Jan 2025 12:46:46 -0500 Subject: [PATCH 2/3] Remove atomic.Bool from workerPool. --- .../cloudwatchlogs/internal/pusher/batch.go | 2 +- .../cloudwatchlogs/internal/pusher/convert.go | 7 +++---- .../outputs/cloudwatchlogs/internal/pusher/pool.go | 14 +++++++++----- .../cloudwatchlogs/internal/pusher/pool_test.go | 5 +++-- translator/tocwconfig/sampleConfig/log_filter.conf | 2 +- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go index cc46a1ec24..c2cc50ba56 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go @@ -13,7 +13,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" ) -// CloudWatch Logs API limits +// CloudWatch Logs PutLogEvents API limits // Taken from https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html const ( // The maximum batch size in bytes. This size is calculated as the sum of all event messages in UTF-8, diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go b/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go index 53eb95ec48..da4bd67ac4 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go @@ -12,12 +12,10 @@ import ( ) const ( - // Each log event can be no larger than 256 KB. When truncating the message, assume this is the limit for - // message length. + // Each log event can be no larger than 256 KB. When truncating the message, this is the limit for message length. msgSizeLimit = 256*1024 - perEventHeaderBytes // The suffix to add to truncated log lines. truncatedSuffix = "[Truncated...]" - // The duration until a timestamp is considered old. warnOldTimeStamp = 24 * time.Hour // The minimum interval between logs warning about the old timestamps. @@ -39,7 +37,8 @@ func newConverter(logger telegraf.Logger, target Target) *converter { } } -// Handles message truncation and timestamp +// convert handles message truncation to remain within PutLogEvents limits and sets a timestamp if not set in the +// logs.LogEvent. func (c *converter) convert(e logs.LogEvent) *logEvent { message := e.Message() diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go index fc85aee4fc..cb9fe113a3 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go @@ -19,7 +19,6 @@ type workerPool struct { workerCount atomic.Int32 wg sync.WaitGroup stopCh chan struct{} - stopped atomic.Bool } // NewWorkerPool creates a pool of workers of the specified size. @@ -53,7 +52,10 @@ func (p *workerPool) worker() { // Submit adds a task to the pool. Blocks until a worker is available to receive the task or the pool is stopped. func (p *workerPool) Submit(task func()) { - if !p.stopped.Load() { + select { + case <-p.stopCh: + return + default: select { case p.tasks <- task: case <-p.stopCh: @@ -67,10 +69,12 @@ func (p *workerPool) WorkerCount() int32 { return p.workerCount.Load() } -// Stop gracefully shuts down the worker pool. +// Stop closes the channels and waits for the workers to stop. func (p *workerPool) Stop() { - if !p.stopped.Load() { - p.stopped.Store(true) + select { + case <-p.stopCh: + return + default: close(p.stopCh) close(p.tasks) p.wg.Wait() diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go index 1b69645543..d2962e85ff 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go @@ -69,9 +69,10 @@ func TestWorkerPool(t *testing.T) { t.Run("MultipleStops", func(t *testing.T) { pool := NewWorkerPool(3) - pool.Stop() assert.NotPanics(t, func() { - pool.Stop() + for i := 0; i < 10; i++ { + pool.Stop() + } }) }) diff --git a/translator/tocwconfig/sampleConfig/log_filter.conf b/translator/tocwconfig/sampleConfig/log_filter.conf index 86078eac35..9457546dcf 100644 --- a/translator/tocwconfig/sampleConfig/log_filter.conf +++ b/translator/tocwconfig/sampleConfig/log_filter.conf @@ -49,7 +49,7 @@ [outputs] [[outputs.cloudwatchlogs]] - concurrency = 10 + concurrency = 10 force_flush_interval = "5s" log_stream_name = "LOG_STREAM_NAME" region = "us-east-1" From 603f11bf67c3e0ac316db77e70e8f87426e29eb2 Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Wed, 15 Jan 2025 19:50:47 -0500 Subject: [PATCH 3/3] Added mutex to target manager to prevent concurrent read/write. --- .../cloudwatchlogs/internal/pusher/batch.go | 25 ++++++++++------ .../internal/pusher/batch_test.go | 8 +++++ .../cloudwatchlogs/internal/pusher/convert.go | 9 +++--- .../internal/pusher/convert_test.go | 20 ++++++------- .../cloudwatchlogs/internal/pusher/pool.go | 3 +- .../cloudwatchlogs/internal/pusher/target.go | 30 ++++++++----------- .../internal/pusher/target_test.go | 11 ++++--- 7 files changed, 60 insertions(+), 46 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go index c2cc50ba56..e8a7cb8f2a 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go @@ -29,24 +29,30 @@ const ( // logEvent represents a single cloudwatchlogs.InputLogEvent with some metadata for processing type logEvent struct { - event *cloudwatchlogs.InputLogEvent - eventBytes int timestamp time.Time + message string + eventBytes int doneCallback func() } func newLogEvent(timestamp time.Time, message string, doneCallback func()) *logEvent { return &logEvent{ - event: &cloudwatchlogs.InputLogEvent{ - Timestamp: aws.Int64(timestamp.UnixMilli()), - Message: aws.String(message), - }, - eventBytes: len(message) + perEventHeaderBytes, + message: message, timestamp: timestamp, + eventBytes: len(message) + perEventHeaderBytes, doneCallback: doneCallback, } } +// batch builds a cloudwatchlogs.InputLogEvent from the timestamp and message stored. Converts the timestamp to +// milliseconds to match the PutLogEvents specifications. +func (e *logEvent) build() *cloudwatchlogs.InputLogEvent { + return &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(e.timestamp.UnixMilli()), + Message: aws.String(e.message), + } +} + type logEventBatch struct { Target events []*cloudwatchlogs.InputLogEvent @@ -85,10 +91,11 @@ func (b *logEventBatch) hasSpace(size int) bool { // append adds a log event to the batch. func (b *logEventBatch) append(e *logEvent) { - if len(b.events) > 0 && *e.event.Timestamp < *b.events[len(b.events)-1].Timestamp { + event := e.build() + if len(b.events) > 0 && *event.Timestamp < *b.events[len(b.events)-1].Timestamp { b.needSort = true } - b.events = append(b.events, e.event) + b.events = append(b.events, event) b.addDoneCallback(e.doneCallback) b.bufferedSize += e.eventBytes if b.minT.IsZero() || b.minT.After(e.timestamp) { diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go index f305f4d502..cc7051bdbb 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go @@ -32,6 +32,14 @@ func newMockEntityProvider(entity *cloudwatchlogs.Entity) *mockEntityProvider { return ep } +func TestLogEvent(t *testing.T) { + now := time.Now() + e := newLogEvent(now, "test message", nil) + inputLogEvent := e.build() + assert.EqualValues(t, now.UnixMilli(), *inputLogEvent.Timestamp) + assert.EqualValues(t, "test message", *inputLogEvent.Message) +} + func TestLogEventBatch(t *testing.T) { t.Run("Append", func(t *testing.T) { batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go b/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go index da4bd67ac4..52c42d39fc 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/convert.go @@ -45,6 +45,7 @@ func (c *converter) convert(e logs.LogEvent) *logEvent { if len(message) > msgSizeLimit { message = message[:msgSizeLimit-len(truncatedSuffix)] + truncatedSuffix } + now := time.Now() var t time.Time if e.Time().IsZero() { if !c.lastValidTime.IsZero() { @@ -54,18 +55,18 @@ func (c *converter) convert(e logs.LogEvent) *logEvent { t = c.lastValidTime if !c.lastUpdateTime.IsZero() { // Check when timestamp has an interval of 1 day. - if time.Since(c.lastUpdateTime) > warnOldTimeStamp && time.Since(c.lastWarnMessage) > warnOldTimeStampLogInterval { + if now.Sub(c.lastUpdateTime) > warnOldTimeStamp && now.Sub(c.lastWarnMessage) > warnOldTimeStampLogInterval { c.logger.Warnf("Unable to parse timestamp, using last valid timestamp found in the logs %v: which is at least older than 1 day for log group %v: ", c.lastValidTime, c.Group) - c.lastWarnMessage = time.Now() + c.lastWarnMessage = now } } } else { - t = time.Now() + t = now } } else { t = e.Time() c.lastValidTime = t - c.lastUpdateTime = time.Now() + c.lastUpdateTime = now c.lastWarnMessage = time.Time{} } return newLogEvent(t, message, e.Done) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go index 0579314857..52b40ef71c 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go @@ -54,22 +54,22 @@ func TestConverter(t *testing.T) { conv := newConverter(logger, target) le := conv.convert(newStubLogEvent("Test message", now)) - assert.Equal(t, now.UnixMilli(), *le.event.Timestamp) - assert.Equal(t, "Test message", *le.event.Message) + assert.Equal(t, now, le.timestamp) + assert.Equal(t, "Test message", le.message) assert.Equal(t, now, conv.lastValidTime) }) t.Run("WithNoTimestamp", func(t *testing.T) { t.Parallel() - testTimestampMs := int64(12345678) + testTimestampMs := time.UnixMilli(12345678) conv := newConverter(logger, target) - conv.lastValidTime = time.UnixMilli(testTimestampMs) + conv.lastValidTime = testTimestampMs le := conv.convert(newStubLogEvent("Test message", time.Time{})) - assert.Equal(t, testTimestampMs, *le.event.Timestamp) - assert.Equal(t, "Test message", *le.event.Message) + assert.Equal(t, testTimestampMs, le.timestamp) + assert.Equal(t, "Test message", le.message) }) t.Run("TruncateMessage", func(t *testing.T) { @@ -80,8 +80,8 @@ func TestConverter(t *testing.T) { conv := newConverter(logger, target) le := conv.convert(event) - assert.Equal(t, msgSizeLimit, len(*le.event.Message)) - assert.Equal(t, truncatedSuffix, (*le.event.Message)[len(*le.event.Message)-len(truncatedSuffix):]) + assert.Equal(t, msgSizeLimit, len(le.message)) + assert.Equal(t, truncatedSuffix, (le.message)[len(le.message)-len(truncatedSuffix):]) }) t.Run("WithOldTimestampWarning", func(t *testing.T) { @@ -94,8 +94,8 @@ func TestConverter(t *testing.T) { log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) le := conv.convert(newStubLogEvent("Test message", time.Time{})) - assert.Equal(t, oldTime.UnixMilli(), *le.event.Timestamp) - assert.Equal(t, "Test message", *le.event.Message) + assert.Equal(t, oldTime, le.timestamp) + assert.Equal(t, "Test message", le.message) loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") assert.Len(t, loglines, 1) logline := loglines[0] diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go index cb9fe113a3..3bf9eb7f34 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go @@ -24,7 +24,7 @@ type workerPool struct { // NewWorkerPool creates a pool of workers of the specified size. func NewWorkerPool(size int) WorkerPool { p := &workerPool{ - tasks: make(chan func(), size), + tasks: make(chan func(), size*2), stopCh: make(chan struct{}), } for i := 0; i < size; i++ { @@ -40,6 +40,7 @@ func (p *workerPool) addWorker() { go p.worker() } +// worker receives tasks from the channel and executes them. func (p *workerPool) worker() { defer func() { p.workerCount.Add(-1) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go index a15e1fb8dd..b395e23040 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go @@ -4,12 +4,11 @@ package pusher import ( - "fmt" + "sync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/influxdata/telegraf" - "golang.org/x/sync/singleflight" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" ) @@ -19,10 +18,6 @@ type Target struct { Retention int } -func (t Target) String() string { - return fmt.Sprintf("%s:%s:%s:%v", t.Group, t.Stream, t.Class, t.Retention) -} - type TargetManager interface { InitTarget(target Target) error PutRetentionPolicy(target Target) @@ -33,7 +28,7 @@ type targetManager struct { service cloudWatchLogsService // cache of initialized targets cache map[Target]struct{} - group singleflight.Group + mu sync.Mutex } func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager { @@ -46,18 +41,17 @@ func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) Tar // InitTarget initializes a Target if it hasn't been initialized before. func (m *targetManager) InitTarget(target Target) error { - _, err, _ := m.group.Do(target.String(), func() (any, error) { - if _, ok := m.cache[target]; !ok { - err := m.createLogGroupAndStream(target) - if err != nil { - return nil, err - } - m.PutRetentionPolicy(target) - m.cache[target] = struct{}{} + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.cache[target]; !ok { + err := m.createLogGroupAndStream(target) + if err != nil { + return err } - return nil, nil - }) - return err + m.PutRetentionPolicy(target) + m.cache[target] = struct{}{} + } + return nil } func (m *targetManager) createLogGroupAndStream(t Target) error { diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index c9f4ca5232..88a417f9e3 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -121,7 +121,10 @@ func TestTargetManager(t *testing.T) { }) t.Run("ConcurrentInit", func(t *testing.T) { - target := Target{Group: "G", Stream: "S"} + targets := []Target{ + {Group: "G1", Stream: "S1"}, + {Group: "G2", Stream: "S2"}, + } var count atomic.Int32 service := new(stubLogsService) @@ -133,16 +136,16 @@ func TestTargetManager(t *testing.T) { manager := NewTargetManager(logger, service) var wg sync.WaitGroup - for i := 0; i < 10; i++ { + for i := 0; i < 50; i++ { wg.Add(1) go func() { defer wg.Done() - err := manager.InitTarget(target) + err := manager.InitTarget(targets[i%len(targets)]) assert.NoError(t, err) }() } wg.Wait() - assert.Equal(t, int32(1), count.Load()) + assert.EqualValues(t, len(targets), count.Load()) }) }