Skip to content

Commit

Permalink
Send metrics to cloudwatch
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Dec 16, 2024
1 parent d4660c8 commit 9b91ac2
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 3 deletions.
80 changes: 78 additions & 2 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
Expand Down Expand Up @@ -642,7 +644,7 @@ func (b *backend) SaveAttachment(ctx context.Context, ch courier.Channel, conten

path := filepath.Join("attachments", strconv.FormatInt(int64(orgID), 10), filename[:4], filename[4:8], filename)

storageURL, err := b.s3.PutObject(ctx, b.config.S3AttachmentsBucket, path, contentType, data, types.ObjectCannedACLPublicRead)
storageURL, err := b.s3.PutObject(ctx, b.config.S3AttachmentsBucket, path, contentType, data, s3types.ObjectCannedACLPublicRead)
if err != nil {
return "", fmt.Errorf("error saving attachment to storage (bytes=%d): %w", len(data), err)
}
Expand Down Expand Up @@ -782,6 +784,80 @@ func (b *backend) Heartbeat() error {
b.stats.redisWaitDuration = redisStats.WaitDuration
b.stats.redisWaitCount = redisStats.WaitCount

dims := []cwtypes.Dimension{
{Name: aws.String("Host"), Value: aws.String(b.config.InstanceID)},
{Name: aws.String("App"), Value: aws.String("courier")},
}

metrics := []cwtypes.MetricDatum{
{
MetricName: aws.String("DBBusy"),
Dimensions: dims,
Value: aws.Float64(float64(dbStats.InUse)),
Unit: cwtypes.StandardUnitCount,
},
{
MetricName: aws.String("DBIdle"),
Dimensions: dims,
Value: aws.Float64(float64(dbStats.InUse)),
Unit: cwtypes.StandardUnitCount},
{
MetricName: aws.String("DBWaitMS"),
Dimensions: dims,
Value: aws.Float64(float64(dbWaitDurationInPeriod / time.Millisecond)),
Unit: cwtypes.StandardUnitMilliseconds,
},
{
MetricName: aws.String("DBWaitCount"),
Dimensions: dims,
Value: aws.Float64(float64(dbWaitCountInPeriod)),
Unit: cwtypes.StandardUnitCount,
},

{
MetricName: aws.String("RedisActive"),
Dimensions: dims,
Value: aws.Float64(float64(redisStats.ActiveCount)),
Unit: cwtypes.StandardUnitCount,
},
{
MetricName: aws.String("RedisIdle"),
Dimensions: dims,
Value: aws.Float64(float64(redisStats.IdleCount)),
Unit: cwtypes.StandardUnitCount,
},
{MetricName: aws.String("RedisWaitMS"),
Dimensions: dims,
Value: aws.Float64(float64(redisWaitDurationInPeriod / time.Millisecond)),
Unit: cwtypes.StandardUnitMilliseconds,
},
{
MetricName: aws.String("RedisWaitCount"),
Dimensions: dims,
Value: aws.Float64(float64(redisWaitCountInPeriod)),
Unit: cwtypes.StandardUnitCount,
},

{
MetricName: aws.String("BulkQueue"),
Dimensions: dims,
Value: aws.Float64(float64(bulkSize)),
Unit: cwtypes.StandardUnitCount,
},
{
MetricName: aws.String("PriorityQueue"),
Dimensions: dims,
Value: aws.Float64(float64(prioritySize)),
Unit: cwtypes.StandardUnitCount,
},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
if err = b.CloudWatchService().Send(ctx, metrics...); err != nil {
slog.Error("error putting metrics", "error", err)
}

Check warning on line 859 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L858-L859

Added lines #L858 - L859 were not covered by tests

analytics.Gauge("courier.db_busy", float64(dbStats.InUse))
analytics.Gauge("courier.db_idle", float64(dbStats.Idle))
analytics.Gauge("courier.db_wait_ms", float64(dbWaitDurationInPeriod/time.Millisecond))
Expand Down
3 changes: 3 additions & 0 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"
"unicode/utf8"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/analytics"
Expand Down Expand Up @@ -219,6 +221,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,

// log that we created a new contact to librato
analytics.Gauge("courier.new_contact", float64(1))
b.cw.Send(ctx, types.MetricDatum{MetricName: aws.String("NewContact"), Value: aws.Float64(float64(1)), Unit: types.StandardUnitCount})

// and return it
return contact, nil
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"log/slog"
"net"
"os"
"strings"

"github.com/nyaruka/courier/utils"
Expand All @@ -31,6 +32,7 @@ type Config struct {

CloudwatchNamespace string `help:"the namespace to use for cloudwatch metrics"`
DeploymentID string `help:"the deployment identifier to use for metrics"`
InstanceID string `help:"the instance identifier to use for metrics"`

DynamoEndpoint string `help:"DynamoDB service endpoint, e.g. https://dynamodb.us-east-1.amazonaws.com"`
DynamoTablePrefix string `help:"prefix to use for DynamoDB tables"`
Expand Down Expand Up @@ -63,6 +65,7 @@ type Config struct {

// NewDefaultConfig returns a new default configuration object
func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()
return &Config{
Backend: "rapidpro",
Domain: "localhost",
Expand All @@ -78,6 +81,7 @@ func NewDefaultConfig() *Config {

CloudwatchNamespace: "Temba",
DeploymentID: "dev",
InstanceID: hostname,

DynamoEndpoint: "", // let library generate it
DynamoTablePrefix: "Temba",
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
require (
cloud.google.com/go/compute/metadata v0.5.2 // indirect
github.com/antchfx/xpath v1.3.2 // indirect
github.com/aws/aws-sdk-go v1.55.5
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.5 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.46 // indirect
Expand All @@ -41,7 +42,7 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24 // indirect
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 // indirect
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/antchfx/xmlquery v1.4.2 h1:MZKd9+wblwxfQ1zd1AdrTsqVaMjMCwow3IqkCSe00K
github.com/antchfx/xmlquery v1.4.2/go.mod h1:QXhvf5ldTuGqhd1SHNvvtlhhdQLks4dD0awIVhXIDTA=
github.com/antchfx/xpath v1.3.2 h1:LNjzlsSjinu3bQpw9hWMY9ocB80oLOWuQqFvO6xt51U=
github.com/antchfx/xpath v1.3.2/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4=
github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8=
Expand Down
5 changes: 5 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/urns"
Expand Down Expand Up @@ -336,8 +338,11 @@ func (w *Sender) sendMessage(msg MsgOut) {
// report to librato
if status.Status() == MsgStatusErrored || status.Status() == MsgStatusFailed {
analytics.Gauge(fmt.Sprintf("courier.msg_send_error_%s", msg.Channel().ChannelType()), secondDuration)
backend.CloudWatchService().Send(sendCTX, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgSendError_%s", msg.Channel().ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})

} else {
analytics.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration)
backend.CloudWatchService().Send(sendCTX, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgSend_%s", msg.Channel().ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
}
}

Expand Down
7 changes: 7 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/nyaruka/courier/utils/clogs"
Expand Down Expand Up @@ -319,8 +321,10 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe
// if we have a channel but no events were created, we still log this to analytics
if len(events) == 0 {
if hErr != nil {
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("ChannelError_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
analytics.Gauge(fmt.Sprintf("courier.channel_error_%s", channel.ChannelType()), secondDuration)
} else {
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("ChannelIgnored_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})

Check warning on line 327 in server.go

View check run for this annotation

Codecov / codecov/patch

server.go#L327

Added line #L327 was not covered by tests
analytics.Gauge(fmt.Sprintf("courier.channel_ignored_%s", channel.ChannelType()), secondDuration)
}
}
Expand All @@ -329,13 +333,16 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe
switch e := event.(type) {
case MsgIn:
clog.SetAttached(true)
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgReceive_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
analytics.Gauge(fmt.Sprintf("courier.msg_receive_%s", channel.ChannelType()), secondDuration)
LogMsgReceived(r, e)
case StatusUpdate:
clog.SetAttached(true)
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgStatus_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})

Check warning on line 341 in server.go

View check run for this annotation

Codecov / codecov/patch

server.go#L341

Added line #L341 was not covered by tests
analytics.Gauge(fmt.Sprintf("courier.msg_status_%s", channel.ChannelType()), secondDuration)
LogMsgStatusReceived(r, e)
case ChannelEvent:
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("EvtReceive_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})

Check warning on line 345 in server.go

View check run for this annotation

Codecov / codecov/patch

server.go#L345

Added line #L345 was not covered by tests
analytics.Gauge(fmt.Sprintf("courier.evt_receive_%s", channel.ChannelType()), secondDuration)
LogChannelEventReceived(r, e)
}
Expand Down

0 comments on commit 9b91ac2

Please sign in to comment.