Skip to content

Commit

Permalink
Add retries to sendgrid emailer (#618) (#6164)
Browse files Browse the repository at this point in the history
* Add retries to sendgrid emailer (#618)

Signed-off-by: Katrina Rogan <katroganGH@gmail.com>

* wat

Signed-off-by: Katrina Rogan <katroganGH@gmail.com>

---------

Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
  • Loading branch information
katrogan authored Jan 13, 2025
1 parent b8fb68d commit 6ea9531
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,30 @@ import (
"io/ioutil"
"os"
"strings"
"time"

"github.com/sendgrid/rest"
"github.com/sendgrid/sendgrid-go"
"github.com/sendgrid/sendgrid-go/helpers/mail"

"github.com/flyteorg/flyte/flyteadmin/pkg/async"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

//go:generate mockery -all -case=underscore -output=../mocks -case=underscore

type SendgridClient interface {
Send(email *mail.SGMailV3) (*rest.Response, error)
}

type SendgridEmailer struct {
client *sendgrid.Client
client SendgridClient
systemMetrics emailMetrics
cfg *runtimeInterfaces.NotificationsConfig
}

func getEmailAddresses(addresses []string) []*mail.Email {
Expand Down Expand Up @@ -63,9 +73,18 @@ func getAPIKey(config runtimeInterfaces.EmailServerConfig) string {
func (s SendgridEmailer) SendEmail(ctx context.Context, email *admin.EmailMessage) error {
m := getSendgridEmail(email)
s.systemMetrics.SendTotal.Inc()
response, err := s.client.Send(m)
var response *rest.Response
var err error
err = async.Retry(s.cfg.ReconnectAttempts, time.Duration(s.cfg.ReconnectDelaySeconds)*time.Second, func() error {
response, err = s.client.Send(m)
if err != nil {
logger.Errorf(ctx, "Sendgrid error sending email: %+v with: %+v", email, err)
return err
}
return nil
})
if err != nil {
logger.Errorf(ctx, "Sendgrid error sending %s", err)
logger.Errorf(ctx, "all attempts to send email %+v via sendgrid failed: %+v", email, err)
s.systemMetrics.SendError.Inc()
return err
}
Expand All @@ -79,5 +98,6 @@ func NewSendGridEmailer(config runtimeInterfaces.NotificationsConfig, scope prom
return &SendgridEmailer{
client: sendgrid.NewSendClient(getAPIKey(config.NotificationsEmailerConfig.EmailerConfig)),
systemMetrics: newEmailMetrics(scope.NewSubScope("sendgrid")),
cfg: &config,
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package implementations

import (
"context"
"errors"
"io/ioutil"
"os"
"path"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sendgrid/rest"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/mocks"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

func TestAddresses(t *testing.T) {
addresses := []string{"alice@example.com", "bob@example.com"}
sgAddresses := getEmailAddresses(addresses)
assert.Equal(t, sgAddresses[0].Address, "alice@example.com")
assert.Equal(t, sgAddresses[1].Address, "bob@example.com")
}

func TestGetEmail(t *testing.T) {
emailNotification := &admin.EmailMessage{
var (
emailNotification = &admin.EmailMessage{
SubjectLine: "Notice: Execution \"name\" has succeeded in \"domain\".",
SenderEmail: "no-reply@example.com",
RecipientsEmail: []string{
Expand All @@ -32,7 +31,16 @@ func TestGetEmail(t *testing.T) {
"<a href=\"https://example.com/executions/T/B/D\">" +
"https://example.com/executions/T/B/D</a>.",
}
)

func TestAddresses(t *testing.T) {
addresses := []string{"alice@example.com", "bob@example.com"}
sgAddresses := getEmailAddresses(addresses)
assert.Equal(t, sgAddresses[0].Address, "alice@example.com")
assert.Equal(t, sgAddresses[1].Address, "bob@example.com")
}

func TestGetEmail(t *testing.T) {
sgEmail := getSendgridEmail(emailNotification)
assert.Equal(t, `Notice: Execution "name" has succeeded in "domain".`, sgEmail.Personalizations[0].Subject)
assert.Equal(t, "john@example.com", sgEmail.Personalizations[0].To[1].Address)
Expand Down Expand Up @@ -98,3 +106,63 @@ func TestNoFile(t *testing.T) {
// shouldn't reach here
t.Errorf("did not panic")
}

func TestSendEmail(t *testing.T) {
ctx := context.TODO()
expectedErr := errors.New("expected")
t.Run("exhaust all retry attempts", func(t *testing.T) {
sendgridClient := &mocks.SendgridClient{}
expectedEmail := getSendgridEmail(emailNotification)
sendgridClient.OnSendMatch(expectedEmail).
Return(nil, expectedErr).Times(3)
sendgridClient.OnSendMatch(expectedEmail).
Return(&rest.Response{Body: "email body"}, nil).Once()
scope := promutils.NewScope("bademailer")
emailerMetrics := newEmailMetrics(scope)

emailer := SendgridEmailer{
client: sendgridClient,
systemMetrics: emailerMetrics,
cfg: &runtimeInterfaces.NotificationsConfig{
ReconnectAttempts: 1,
},
}

err := emailer.SendEmail(ctx, emailNotification)
assert.EqualError(t, err, expectedErr.Error())

assert.NoError(t, testutil.CollectAndCompare(emailerMetrics.SendError, strings.NewReader(`
# HELP bademailer:send_error Number of errors when sending email via Emailer
# TYPE bademailer:send_error counter
bademailer:send_error 1
`)))
})
t.Run("exhaust all retry attempts", func(t *testing.T) {
ctx := context.TODO()
sendgridClient := &mocks.SendgridClient{}
expectedEmail := getSendgridEmail(emailNotification)
sendgridClient.OnSendMatch(expectedEmail).
Return(nil, expectedErr).Once()
sendgridClient.OnSendMatch(expectedEmail).
Return(&rest.Response{Body: "email body"}, nil).Once()
scope := promutils.NewScope("goodemailer")
emailerMetrics := newEmailMetrics(scope)

emailer := SendgridEmailer{
client: sendgridClient,
systemMetrics: emailerMetrics,
cfg: &runtimeInterfaces.NotificationsConfig{
ReconnectAttempts: 1,
},
}

err := emailer.SendEmail(ctx, emailNotification)
assert.NoError(t, err)

assert.NoError(t, testutil.CollectAndCompare(emailerMetrics.SendError, strings.NewReader(`
# HELP goodemailer:send_error Number of errors when sending email via Emailer
# TYPE goodemailer:send_error counter
goodemailer:send_error 0
`)))
})
}
56 changes: 56 additions & 0 deletions flyteadmin/pkg/async/notifications/mocks/sendgrid_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6ea9531

Please sign in to comment.