Skip to content

Commit

Permalink
Merge pull request #818 from nyaruka/metrics_v2
Browse files Browse the repository at this point in the history
Refactor metrics so that everything is sent from Heartbeat in the backend
  • Loading branch information
rowanseymour authored Dec 18, 2024
2 parents 68d55f9 + aa5809d commit 9ce2da7
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 118 deletions.
15 changes: 6 additions & 9 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/urns"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ type Backend interface {
// WriteChannelLog writes the passed in channel log to our backend
WriteChannelLog(context.Context, *ChannelLog) error

// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call OnSendComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg(context.Context) (MsgOut, error)

Expand All @@ -80,10 +79,11 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate)
// OnSendComplete is called when the sender has finished trying to send a message
OnSendComplete(context.Context, MsgOut, StatusUpdate, *ChannelLog)

// OnReceiveComplete is called when the server has finished handling an incoming request
OnReceiveComplete(context.Context, Channel, []Event, *ChannelLog)

// SaveAttachment saves an attachment to backend storage
SaveAttachment(context.Context, Channel, string, []byte, string) (string, error)
Expand All @@ -106,9 +106,6 @@ type Backend interface {

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool

// CloudWatch return the CloudWatch service for this backend
CloudWatch() *cwatch.Service
}

// Media is a resolved media object that can be used as a message attachment
Expand Down
76 changes: 35 additions & 41 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ func init() {
courier.RegisterBackend("rapidpro", newBackend)
}

type stats struct {
// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

type backend struct {
config *courier.Config

Expand Down Expand Up @@ -94,7 +87,12 @@ type backend struct {
// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

stats stats
stats *StatsCollector

// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

// NewBackend creates a new RapidPro backend
Expand Down Expand Up @@ -131,6 +129,8 @@ func newBackend(cfg *courier.Config) courier.Backend {
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours

stats: NewStatsCollector(),
}
}

Expand Down Expand Up @@ -194,7 +194,6 @@ func (b *backend) Start() error {
if err != nil {
return err
}
b.cw.StartQueue(time.Second * 3)

// check attachment bucket access
if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil {
Expand Down Expand Up @@ -253,8 +252,6 @@ func (b *backend) Stop() error {
// wait for our threads to exit
b.waitGroup.Wait()

// stop cloudwatch service
b.cw.StopQueue()
return nil
}

Expand Down Expand Up @@ -464,8 +461,8 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
return b.sentIDs.Rem(rc, id.String())
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate) {
// OnSendComplete is called when the sender has finished trying to send a message
func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate, clog *courier.ChannelLog) {
rc := b.rp.Get()
defer rc.Close()

Expand All @@ -489,6 +486,13 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}

b.stats.RecordOutgoing(msg.Channel().ChannelType(), wasSuccess, clog.Elapsed)
}

// OnReceiveComplete is called when the server has finished handling an incoming request
func (b *backend) OnReceiveComplete(ctx context.Context, ch courier.Channel, events []courier.Event, clog *courier.ChannelLog) {
b.stats.RecordIncoming(ch.ChannelType(), events, clog.Elapsed)
}

// WriteMsg writes the passed in message to our store
Expand Down Expand Up @@ -737,11 +741,12 @@ func (b *backend) Health() string {
return health.String()
}

// Heartbeat is called every minute, we log our queue depth to librato
func (b *backend) Heartbeat() error {
rc := b.rp.Get()
defer rc.Close()

metrics := b.stats.Extract().ToMetrics()

active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1"))
if err != nil {
return fmt.Errorf("error getting active queues: %w", err)
Expand Down Expand Up @@ -770,38 +775,32 @@ func (b *backend) Heartbeat() error {
bulkSize += count
}

// get our DB and redis stats
// calculate DB and redis pool metrics
dbStats := b.db.Stats()
redisStats := b.rp.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - b.stats.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.stats.redisWaitDuration

b.stats.dbWaitDuration = dbStats.WaitDuration
b.stats.redisWaitDuration = redisStats.WaitDuration
dbWaitDurationInPeriod := dbStats.WaitDuration - b.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.redisWaitDuration
b.dbWaitDuration = dbStats.WaitDuration
b.redisWaitDuration = redisStats.WaitDuration

hostDim := cwatch.Dimension("Host", b.config.InstanceID)

b.CloudWatch().Queue(
metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
)

b.CloudWatch().Queue(
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim),
cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
)

slog.Info("current metrics",
"db_inuse", dbStats.InUse,
"db_wait", dbWaitDurationInPeriod,
"redis_inuse", redisStats.ActiveCount,
"redis_wait", redisWaitDurationInPeriod,
"priority_size", prioritySize,
"bulk_size", bulkSize,
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := b.cw.Send(ctx, metrics...); err != nil {
slog.Error("error sending metrics", "error", err)
} else {
slog.Info("sent metrics to cloudwatch", "metrics", len(metrics))
}
cancel()

return nil
}

Expand Down Expand Up @@ -878,8 +877,3 @@ func (b *backend) Status() string {
func (b *backend) RedisPool() *redis.Pool {
return b.rp
}

// CloudWatch return the cloudwatch service
func (b *backend) CloudWatch() *cwatch.Service {
return b.cw
}
2 changes: 1 addition & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func (ts *BackendTestSuite) TestOutgoingQueue() {
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog))
ts.b.OnSendComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog), clog)

// this message should now be marked as sent
sent, err := ts.b.WasMsgSent(ctx, msg.ID())
Expand Down
6 changes: 1 addition & 5 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"time"
"unicode/utf8"

cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
Expand Down Expand Up @@ -218,9 +216,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// store this URN on our contact
contact.URNID_ = contactURN.ID

// report that we created a new contact
b.cw.Queue(cwatch.Datum("ContactCreated", float64(1), cwtypes.StandardUnitCount))
b.stats.RecordContactCreated()

// and return it
return contact, nil
}
139 changes: 139 additions & 0 deletions backends/rapidpro/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package rapidpro

import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
)

type CountByType map[courier.ChannelType]int

// converts per channel counts into cloudwatch metrics with type as a dimension
func (c CountByType) metrics(name string) []types.MetricDatum {
m := make([]types.MetricDatum, 0, len(c))
for typ, count := range c {
m = append(m, cwatch.Datum(name, float64(count), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ))))
}
return m
}

type DurationByType map[courier.ChannelType]time.Duration

type Stats struct {
IncomingRequests CountByType // number of handler requests
IncomingMessages CountByType // number of messages received
IncomingStatuses CountByType // number of status updates received
IncomingEvents CountByType // number of other events received
IncomingIgnored CountByType // number of requests ignored
IncomingDuration DurationByType // total time spent handling requests

OutgoingSends CountByType // number of sends that succeeded
OutgoingErrors CountByType // number of sends that errored
OutgoingDuration DurationByType // total time spent sending messages

ContactsCreated int
}

func newStats() *Stats {
return &Stats{
IncomingRequests: make(CountByType),
IncomingMessages: make(CountByType),
IncomingStatuses: make(CountByType),
IncomingEvents: make(CountByType),
IncomingIgnored: make(CountByType),
IncomingDuration: make(DurationByType),

OutgoingSends: make(CountByType),
OutgoingErrors: make(CountByType),
OutgoingDuration: make(DurationByType),

ContactsCreated: 0,
}
}

func (s *Stats) ToMetrics() []types.MetricDatum {
metrics := make([]types.MetricDatum, 0, 20)
metrics = append(metrics, s.IncomingRequests.metrics("IncomingRequests")...)
metrics = append(metrics, s.IncomingMessages.metrics("IncomingMessages")...)
metrics = append(metrics, s.IncomingStatuses.metrics("IncomingStatuses")...)
metrics = append(metrics, s.IncomingEvents.metrics("IncomingEvents")...)
metrics = append(metrics, s.IncomingIgnored.metrics("IncomingIgnored")...)

for typ, d := range s.IncomingDuration { // convert to averages
avgTime := float64(d) / float64(s.IncomingRequests[typ])
metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, s.OutgoingSends.metrics("OutgoingSends")...)
metrics = append(metrics, s.OutgoingErrors.metrics("OutgoingErrors")...)

for typ, d := range s.OutgoingDuration { // convert to averages
avgTime := float64(d) / float64(s.OutgoingSends[typ]+s.OutgoingErrors[typ])
metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, cwatch.Datum("ContactsCreated", float64(s.ContactsCreated), types.StandardUnitCount))
return metrics
}

// StatsCollector provides threadsafe stats collection
type StatsCollector struct {
mutex sync.Mutex
stats *Stats
}

// NewStatsCollector creates a new stats collector
func NewStatsCollector() *StatsCollector {
return &StatsCollector{stats: newStats()}
}

func (c *StatsCollector) RecordIncoming(typ courier.ChannelType, evts []courier.Event, d time.Duration) {
c.mutex.Lock()
c.stats.IncomingRequests[typ]++

for _, e := range evts {
switch e.(type) {
case courier.MsgIn:
c.stats.IncomingMessages[typ]++
case courier.StatusUpdate:
c.stats.IncomingStatuses[typ]++
case courier.ChannelEvent:
c.stats.IncomingEvents[typ]++
}
}
if len(evts) == 0 {
c.stats.IncomingIgnored[typ]++
}

c.stats.IncomingDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordOutgoing(typ courier.ChannelType, success bool, d time.Duration) {
c.mutex.Lock()
if success {
c.stats.OutgoingSends[typ]++
} else {
c.stats.OutgoingErrors[typ]++
}
c.stats.OutgoingDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordContactCreated() {
c.mutex.Lock()
c.stats.ContactsCreated++
c.mutex.Unlock()
}

// Extract returns the stats for the period since the last call
func (c *StatsCollector) Extract() *Stats {
c.mutex.Lock()
defer c.mutex.Unlock()
s := c.stats
c.stats = newStats()
return s
}
Loading

0 comments on commit 9ce2da7

Please sign in to comment.