Skip to content

Commit

Permalink
Send metrics to cloudwatch
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Dec 16, 2024
1 parent d4660c8 commit a1a2a51
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 3 deletions.
28 changes: 26 additions & 2 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
Expand Down Expand Up @@ -642,7 +645,7 @@ func (b *backend) SaveAttachment(ctx context.Context, ch courier.Channel, conten

path := filepath.Join("attachments", strconv.FormatInt(int64(orgID), 10), filename[:4], filename[4:8], filename)

storageURL, err := b.s3.PutObject(ctx, b.config.S3AttachmentsBucket, path, contentType, data, types.ObjectCannedACLPublicRead)
storageURL, err := b.s3.PutObject(ctx, b.config.S3AttachmentsBucket, path, contentType, data, s3types.ObjectCannedACLPublicRead)
if err != nil {
return "", fmt.Errorf("error saving attachment to storage (bytes=%d): %w", len(data), err)
}
Expand Down Expand Up @@ -782,6 +785,27 @@ func (b *backend) Heartbeat() error {
b.stats.redisWaitDuration = redisStats.WaitDuration
b.stats.redisWaitCount = redisStats.WaitCount

metrics := []types.MetricDatum{
{MetricName: aws.String("DBBusy"), Value: aws.Float64(float64(dbStats.InUse)), Unit: cwtypes.StandardUnitCount},
{MetricName: aws.String("DBIdle"), Value: aws.Float64(float64(dbStats.InUse)), Unit: cwtypes.StandardUnitCount},
{MetricName: aws.String("DBWaitMS"), Value: aws.Float64(float64(dbWaitDurationInPeriod / time.Millisecond)), Unit: types.StandardUnitMilliseconds},
{MetricName: aws.String("DBWaitCount"), Value: aws.Float64(float64(dbWaitCountInPeriod)), Unit: cwtypes.StandardUnitCount},

{MetricName: aws.String("RedisActive"), Value: aws.Float64(float64(redisStats.ActiveCount)), Unit: cwtypes.StandardUnitCount},
{MetricName: aws.String("RedisIdle"), Value: aws.Float64(float64(redisStats.IdleCount)), Unit: cwtypes.StandardUnitCount},
{MetricName: aws.String("RedisWaitMS"), Value: aws.Float64(float64(redisWaitDurationInPeriod / time.Millisecond)), Unit: types.StandardUnitMilliseconds},
{MetricName: aws.String("RedisWaitCount"), Value: aws.Float64(float64(redisWaitCountInPeriod)), Unit: cwtypes.StandardUnitCount},

{MetricName: aws.String("BulkQueue"), Value: aws.Float64(float64(bulkSize)), Unit: cwtypes.StandardUnitCount},
{MetricName: aws.String("PriorityQueue"), Value: aws.Float64(float64(prioritySize)), Unit: cwtypes.StandardUnitCount},
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
if err = b.CloudWatchService().Send(ctx, metrics...); err != nil {
slog.Error("error putting metrics", "error", err)
}

analytics.Gauge("courier.db_busy", float64(dbStats.InUse))
analytics.Gauge("courier.db_idle", float64(dbStats.Idle))
analytics.Gauge("courier.db_wait_ms", float64(dbWaitDurationInPeriod/time.Millisecond))
Expand Down
3 changes: 3 additions & 0 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"
"unicode/utf8"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/analytics"
Expand Down Expand Up @@ -219,6 +221,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,

// log that we created a new contact to librato
analytics.Gauge("courier.new_contact", float64(1))
b.cw.Send(ctx, types.MetricDatum{MetricName: aws.String("NewContact"), Value: aws.Float64(float64(1)), Unit: types.StandardUnitCount})

// and return it
return contact, nil
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
require (
cloud.google.com/go/compute/metadata v0.5.2 // indirect
github.com/antchfx/xpath v1.3.2 // indirect
github.com/aws/aws-sdk-go v1.55.5
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.5 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.46 // indirect
Expand All @@ -41,7 +42,7 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24 // indirect
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 // indirect
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/antchfx/xmlquery v1.4.2 h1:MZKd9+wblwxfQ1zd1AdrTsqVaMjMCwow3IqkCSe00K
github.com/antchfx/xmlquery v1.4.2/go.mod h1:QXhvf5ldTuGqhd1SHNvvtlhhdQLks4dD0awIVhXIDTA=
github.com/antchfx/xpath v1.3.2 h1:LNjzlsSjinu3bQpw9hWMY9ocB80oLOWuQqFvO6xt51U=
github.com/antchfx/xpath v1.3.2/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4=
github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8=
Expand Down
5 changes: 5 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/urns"
Expand Down Expand Up @@ -336,8 +338,11 @@ func (w *Sender) sendMessage(msg MsgOut) {
// report to librato
if status.Status() == MsgStatusErrored || status.Status() == MsgStatusFailed {
analytics.Gauge(fmt.Sprintf("courier.msg_send_error_%s", msg.Channel().ChannelType()), secondDuration)
backend.CloudWatchService().Send(sendCTX, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgSendError_%s", msg.Channel().ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})

} else {
analytics.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration)
backend.CloudWatchService().Send(sendCTX, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgSend_%s", msg.Channel().ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
}
}

Expand Down
7 changes: 7 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/nyaruka/courier/utils/clogs"
Expand Down Expand Up @@ -319,8 +321,10 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe
// if we have a channel but no events were created, we still log this to analytics
if len(events) == 0 {
if hErr != nil {
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("ChannelError_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
analytics.Gauge(fmt.Sprintf("courier.channel_error_%s", channel.ChannelType()), secondDuration)
} else {
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("ChannelIgnored_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
analytics.Gauge(fmt.Sprintf("courier.channel_ignored_%s", channel.ChannelType()), secondDuration)
}
}
Expand All @@ -329,13 +333,16 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe
switch e := event.(type) {
case MsgIn:
clog.SetAttached(true)
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgReceive_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
analytics.Gauge(fmt.Sprintf("courier.msg_receive_%s", channel.ChannelType()), secondDuration)
LogMsgReceived(r, e)
case StatusUpdate:
clog.SetAttached(true)
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("MsgStatus_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
analytics.Gauge(fmt.Sprintf("courier.msg_status_%s", channel.ChannelType()), secondDuration)
LogMsgStatusReceived(r, e)
case ChannelEvent:
s.Backend().CloudWatchService().Send(ctx, types.MetricDatum{MetricName: aws.String(fmt.Sprintf("EvtReceive_%s", channel.ChannelType())), Value: aws.Float64(float64(secondDuration)), Unit: types.StandardUnitSeconds})
analytics.Gauge(fmt.Sprintf("courier.evt_receive_%s", channel.ChannelType()), secondDuration)
LogChannelEventReceived(r, e)
}
Expand Down

0 comments on commit a1a2a51

Please sign in to comment.