Skip to content

Commit

Permalink
Remove no longer used batching/queuing functionality from cloudwatch …
Browse files Browse the repository at this point in the history
…service
  • Loading branch information
rowanseymour committed Dec 18, 2024
1 parent 8dc29fe commit 64da854
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 74 deletions.
48 changes: 2 additions & 46 deletions aws/cwatch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,21 @@ package cwatch

import (
"context"
"log/slog"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
awsx "github.com/nyaruka/gocommon/aws"
"github.com/nyaruka/gocommon/syncx"
)

type Service struct {
Client Client
namespace string
deployment string

batcher *syncx.Batcher[types.MetricDatum]
batcherWG *sync.WaitGroup
}

// NewService creates a new Cloudwatch service with the given credentials and configuration. Some behaviours depend on
// the given deployment value:
// - "test": metrics just logged, Queue(..) sends synchronously
// - "dev": metrics just logged, Queue(..) adds to batcher
// - "*": metrics sent to Cloudwatch, Queue(..) adds to batcher
// NewService creates a new Cloudwatch service with the given credentials and configuration. If deployment is given as
// "dev" or "test", then metrics are logged and not sent to Cloudwatch.
func NewService(accessKey, secretKey, region, namespace, deployment string) (*Service, error) {
var client Client

Expand All @@ -43,34 +33,6 @@ func NewService(accessKey, secretKey, region, namespace, deployment string) (*Se
return &Service{Client: client, namespace: namespace, deployment: deployment}, nil
}

func (s *Service) StartQueue(maxAge time.Duration) {
if s.batcher != nil {
panic("queue already started")
}

s.batcherWG = &sync.WaitGroup{}
s.batcher = syncx.NewBatcher(s.processBatch, 100, maxAge, 1000, s.batcherWG)
s.batcher.Start()
}

func (s *Service) StopQueue() {
if s.batcher == nil {
panic("queue wasn't started")
}
s.batcher.Stop()
s.batcherWG.Wait()
}

func (s *Service) Queue(data ...types.MetricDatum) {
if s.deployment == "test" {
s.Send(context.TODO(), data...)
} else {
for _, d := range data {
s.batcher.Queue(d)
}
}
}

func (s *Service) Send(ctx context.Context, data ...types.MetricDatum) error {
_, err := s.Client.PutMetricData(ctx, s.prepare(data))
return err
Expand All @@ -87,9 +49,3 @@ func (s *Service) prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInp
MetricData: data,
}
}

func (s *Service) processBatch(batch []types.MetricDatum) {
if err := s.Send(context.TODO(), batch...); err != nil {
slog.Error("error sending metric data batch", "error", err, "count", len(batch))
}
}
30 changes: 2 additions & 28 deletions aws/cwatch/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cwatch_test
import (
"context"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
Expand All @@ -12,39 +11,14 @@ import (
)

func TestService(t *testing.T) {
// create service for test environment
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "test")
// create service for dev environment
svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
assert.NoError(t, err)

err = svc.Send(context.Background(), types.MetricDatum{MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)})
assert.NoError(t, err)
assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount())

// check Queue sends synchronously
svc.Queue(types.MetricDatum{MetricName: aws.String("NumGoats"), Value: aws.Float64(10), Unit: types.StandardUnitCount})
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())

// create service for dev environment
svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "dev")
assert.NoError(t, err)

svc.StartQueue(time.Millisecond * 100)

svc.Queue(cwatch.Datum("NumGoats", 10, types.StandardUnitCount, cwatch.Dimension("Host", "foo1")))
svc.Queue(cwatch.Datum("NumSheep", 20, types.StandardUnitCount))
assert.Equal(t, 0, svc.Client.(*cwatch.DevClient).CallCount()) // not sent yet

time.Sleep(time.Millisecond * 200)

assert.Equal(t, 1, svc.Client.(*cwatch.DevClient).CallCount()) // sent as one call

svc.Queue(cwatch.Datum("SleepTime", 30, types.StandardUnitSeconds))

svc.StopQueue()

// check the queued metric was sent
assert.Equal(t, 2, svc.Client.(*cwatch.DevClient).CallCount())

// create service for prod environment
svc, err = cwatch.NewService("root", "key", "us-east-1", "Foo", "prod")
assert.NoError(t, err)
Expand Down

0 comments on commit 64da854

Please sign in to comment.