Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker pool for log pushers #1499

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ PKG_WITH_DATA_RACE += internal/tls
PKG_WITH_DATA_RACE += plugins/inputs/logfile
PKG_WITH_DATA_RACE += plugins/inputs/logfile/tail
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatchlogs
jefchien marked this conversation as resolved.
Show resolved Hide resolved
PKG_WITH_DATA_RACE += plugins/processors/awsapplicationsignals
PKG_WITH_DATA_RACE += plugins/processors/ec2tagger
PKG_WITH_DATA_RACE_PATTERN := $(shell echo '$(PKG_WITH_DATA_RACE)' | tr ' ' '|')
Expand Down
6 changes: 5 additions & 1 deletion logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ type LogEvent interface {
Done()
}

type LogEntityProvider interface {
Entity() *cloudwatchlogs.Entity
}

// A LogSrc is a single source where log events are generated
// e.g. a single log file
type LogSrc interface {
LogEntityProvider
SetOutput(func(LogEvent))
Group() string
Stream() string
Destination() string
Description() string
Retention() int
Class() string
Entity() *cloudwatchlogs.Entity
Stop()
}

Expand Down
34 changes: 34 additions & 0 deletions plugins/outputs/cloudwatchlogs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Amazon CloudWatch Logs Output Plugin

For each configured target (log group/stream), the output plugin maintains a queue for log events that it batches.
Once each batch is full or the flush interval is reached, the current batch is sent using the PutLogEvents API to Amazon CloudWatch.

When concurrency is enabled, the pusher uses a shared worker pool to allow multiple concurrent sends.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this default behavior? Default of like min(4, # of cores)

Are we considering it experimental until we get some real world feedback?

Copy link
Contributor Author

@jefchien jefchien Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's the current plan.

```
Target #1 (Log Group/Stream) ┌──Shared Worker Pool──┐
┌──────────────────────────────────────────────────────────────────┐ │ │
│ │ │ ┌──Worker 1──┐ │
│ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ ┌────────┐ │ │
│ │ │ │ ┌───────────────────┐ │ │ ┌──────┼───►│ │ Sender │ │ │
│ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ │ └────────┘ │ │
AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ └────────────┘ │
│ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ │ │ │
│ │ │ │ └───────────────────┘ │ │ │ │ ┌──Worker 2──┐ │
│ └───────────────────────────┘ └───────────────────────┘ │ │ │ │ ┌────────┐ │ │
│ │ ┼──────┼───►│ │ Sender │ │ │
└──────────────────────────────────────────────────────────────────┘ │ │ │ └────────┘ │ │
│ │ └────────────┘ │
│ │ │
Target #2 (Log Group/Stream) │ │ . │
┌──────────────────────────────────────────────────────────────────┐ │ │ . │
│ │ │ │ . │
│ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ │
│ │ │ │ ┌───────────────────┐ │ │ │ │ │
│ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ ┌──Worker n──┐ │
AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ │ ┌────────┐ │ │
│ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ └──────┼───►│ │ Sender │ │ │
│ │ │ │ └───────────────────┘ │ │ │ │ └────────┘ │ │
│ └───────────────────────────┘ └───────────────────────┘ │ │ └────────────┘ │
│ │ │ │
└──────────────────────────────────────────────────────────────────┘ └──────────────────────┘
```
80 changes: 41 additions & 39 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"go.uber.org/zap"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/internal"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)
Expand All @@ -39,9 +39,6 @@ const (
LogEntryField = "value"

defaultFlushTimeout = 5 * time.Second
eventHeaderSize = 200
truncatedSuffix = "[Truncated...]"
msgSizeLimit = 256*1024 - eventHeaderSize

maxRetryTimeout = 14*24*time.Hour + 10*time.Minute
metricRetryTimeout = 2 * time.Minute
Expand Down Expand Up @@ -71,14 +68,18 @@ type CloudWatchLogs struct {

// Retention for log group
RetentionInDays int `toml:"retention_in_days"`
Concurrency int `toml:"concurrency"`

ForceFlushInterval internal.Duration `toml:"force_flush_interval"` // unit is second

Log telegraf.Logger `toml:"-"`

pusherStopChan chan struct{}
pusherWaitGroup sync.WaitGroup
cwDests map[Target]*cwDest
cwDests map[pusher.Target]*cwDest
workerPool pusher.WorkerPool
targetManager pusher.TargetManager
once sync.Once
middleware awsmiddleware.Middleware
}

Expand All @@ -93,6 +94,9 @@ func (c *CloudWatchLogs) Close() error {
for _, d := range c.cwDests {
d.Stop()
}
if c.workerPool != nil {
c.workerPool.Stop()
}

return nil
}
Expand All @@ -115,7 +119,7 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou
retention = -1
}

t := Target{
t := pusher.Target{
Group: group,
Stream: stream,
Retention: retention,
Expand All @@ -124,11 +128,31 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou
return c.getDest(t, logSrc)
}

func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest {
func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
if cwd, ok := c.cwDests[t]; ok {
return cwd
}

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
client := c.createClient(logThrottleRetryer)
agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType)
agent.UsageFlags().SetValue(agent.FlagMode, c.Mode)
if containerInsightsRegexp.MatchString(t.Group) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought container insights doesn't send through cloudwatch output plugin? The agent only uses emf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see you just moved it from createClient()...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically, we had functionality where if the log group matched the regex (^/aws/.*containerinsights/.*/(performance|prometheus)$), then we'd count it as container insights. I don't think this path is used anymore, but we can verify that and clean it up in a separate PR.

useragent.Get().SetContainerInsightsFlag()
}
c.once.Do(func() {
if c.Concurrency > 0 {
c.workerPool = pusher.NewWorkerPool(c.Concurrency)
}
c.targetManager = pusher.NewTargetManager(c.Log, client)
})
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup)
cwd := &cwDest{pusher: p, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
}

func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlogs.CloudWatchLogs {
credentialConfig := &configaws.CredentialConfig{
Region: c.Region,
AccessKey: c.AccessKey,
Expand All @@ -138,34 +162,24 @@ func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest {
Filename: c.Filename,
Token: c.Token,
}

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
client := cloudwatchlogs.New(
credentialConfig.Credentials(),
&aws.Config{
Endpoint: aws.String(c.EndpointOverride),
Retryer: logThrottleRetryer,
Retryer: retryer,
LogLevel: configaws.SDKLogLevel(),
Logger: configaws.SDKLogger{},
},
)
agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType)
agent.UsageFlags().SetValue(agent.FlagMode, c.Mode)
if containerInsightsRegexp.MatchString(t.Group) {
useragent.Get().SetContainerInsightsFlag()
}
client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"}))
if c.middleware != nil {
if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil {
c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err)
} else {
c.Log.Info("Configured middleware on AWS client")
c.Log.Debug("Configured middleware on AWS client")
jefchien marked this conversation as resolved.
Show resolved Hide resolved
}
}
pusher := NewPusher(c.Region, t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup, logSrc)
cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
return client
}

func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
Expand All @@ -179,7 +193,7 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
return
}
cwd.switchToEMF()
cwd.pusher.RetryDuration = metricRetryTimeout
cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout)

e := c.getLogEventFromMetric(m)
if e == nil {
Expand All @@ -189,11 +203,11 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
cwd.AddEvent(e)
}

func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error) {
func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) {
tags := m.Tags()
logGroup, ok := tags[LogGroupNameTag]
if !ok {
return Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name())
return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name())
} else {
m.RemoveTag(LogGroupNameTag)
}
Expand All @@ -205,7 +219,7 @@ func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error)
logStream = c.LogStreamName
}

return Target{logGroup, logStream, util.StandardLogGroupClass, -1}, nil
return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil
}

func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent {
Expand Down Expand Up @@ -299,7 +313,7 @@ func (e *structuredLogEvent) Time() time.Time {
func (e *structuredLogEvent) Done() {}

type cwDest struct {
*pusher
pusher *pusher.Pusher
sync.Mutex
isEMF bool
stopped bool
Expand Down Expand Up @@ -341,25 +355,13 @@ func (cd *cwDest) switchToEMF() {
defer cd.Unlock()
if !cd.isEMF {
cd.isEMF = true
cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs)
cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs)
if ok {
cwl.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("x-amzn-logs-format", "json/emf"))
}
}
}

func (cd *cwDest) setRetryer(r request.Retryer) {
cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs)
if ok {
cwl.Retryer = r
}
}

type Target struct {
Group, Stream, Class string
Retention int
}

// Description returns a one-sentence description on the Output
func (c *CloudWatchLogs) Description() string {
return "Configuration for AWS CloudWatchLogs output."
Expand Down Expand Up @@ -398,7 +400,7 @@ func init() {
return &CloudWatchLogs{
ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout},
pusherStopChan: make(chan struct{}),
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
middleware: agenthealth.NewAgentHealth(
zap.NewNop(),
&agenthealth.Config{
Expand Down
11 changes: 7 additions & 4 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ package cloudwatchlogs
import (
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)

// TestCreateDestination would create different destination for cloudwatchlogs endpoint based on the log group, log stream,
// and log group's retention
func TestCreateDestination(t *testing.T) {

testCases := map[string]struct {
cfgLogGroup string
cfgLogStream string
Expand Down Expand Up @@ -68,28 +69,30 @@ func TestCreateDestination(t *testing.T) {
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
LogGroupName: "G1",
LogStreamName: "S1",
AccessKey: "access_key",
SecretKey: "secret_key",
pusherStopChan: make(chan struct{}),
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
}
dest := c.CreateDest(testCase.cfgLogGroup, testCase.cfgLogStream, testCase.cfgLogRetention, testCase.cfgLogClass, testCase.cfgTailerSrc).(*cwDest)
require.Equal(t, testCase.expectedLogGroup, dest.pusher.Group)
require.Equal(t, testCase.expectedLogStream, dest.pusher.Stream)
require.Equal(t, testCase.expectedLogGroupRetention, dest.pusher.Retention)
require.Equal(t, testCase.expectedLogClass, dest.pusher.Class)
require.Equal(t, testCase.expectedTailerSrc, dest.pusher.logSrc)
require.Equal(t, testCase.expectedTailerSrc, dest.pusher.EntityProvider)
})
}
}

func TestDuplicateDestination(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
AccessKey: "access_key",
SecretKey: "secret_key",
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
pusherStopChan: make(chan struct{}),
}
// Given the same log group, log stream, same retention, and logClass
Expand Down
Loading
Loading