Skip to content

Commit

Permalink
Merge pull request #599 from nyaruka/fail_msgs_to_inactive_contacts
Browse files Browse the repository at this point in the history
Messages to stopped, blocked or archived contacts should immediately fail
  • Loading branch information
rowanseymour authored Mar 21, 2022
2 parents a7fc6df + caf9b0a commit 3cec50c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 20 deletions.
27 changes: 16 additions & 11 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type MsgFailedReason null.String

const (
NilMsgFailedReason = MsgFailedReason("")
MsgFailedSuspended = MsgFailedReason("S")
MsgFailedSuspended = MsgFailedReason("S") // workspace suspended
MsgFailedContact = MsgFailedReason("C") // contact blocked, stopped or archived
MsgFailedLooping = MsgFailedReason("L")
MsgFailedErrorLimit = MsgFailedReason("E")
MsgFailedTooOld = MsgFailedReason("O")
Expand Down Expand Up @@ -321,31 +322,31 @@ return count
`)

// GetMsgRepetitions gets the number of repetitions of this msg text for the given contact in the current 5 minute window
func GetMsgRepetitions(rp *redis.Pool, contactID ContactID, msg *flows.MsgOut) (int, error) {
func GetMsgRepetitions(rp *redis.Pool, contact *flows.Contact, msg *flows.MsgOut) (int, error) {
rc := rp.Get()
defer rc.Close()

keyTime := dates.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf("msg_repetitions:%s", keyTime.Format("2006-01-02T15:04"))
return redis.Int(msgRepetitionsScript.Do(rc, key, contactID, msg.Text()))
return redis.Int(msgRepetitionsScript.Do(rc, key, contact.ID(), msg.Text()))
}

// NewOutgoingFlowMsg creates an outgoing message for the passed in flow message
func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session *Session, flow *Flow, out *flows.MsgOut, createdOn time.Time) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, session.ContactID(), out, createdOn, session, flow, NilBroadcastID)
return newOutgoingMsg(rt, org, channel, session.Contact(), out, createdOn, session, flow, NilBroadcastID)
}

// NewOutgoingBroadcastMsg creates an outgoing message which is part of a broadcast
func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgOut, createdOn time.Time, broadcastID BroadcastID) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, contactID, out, createdOn, nil, nil, broadcastID)
func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, broadcastID BroadcastID) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, contact, out, createdOn, nil, nil, broadcastID)
}

func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID) (*Msg, error) {
func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID) (*Msg, error) {
msg := &Msg{}
m := &msg.m
m.UUID = out.UUID()
m.OrgID = org.ID()
m.ContactID = contactID
m.ContactID = ContactID(contact.ID())
m.BroadcastID = broadcastID
m.TopupID = NilTopupID
m.Text = out.Text()
Expand All @@ -364,21 +365,25 @@ func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID C
// we fail messages for suspended orgs right away
m.Status = MsgStatusFailed
m.FailedReason = MsgFailedSuspended
} else if contact.Status() != flows.ContactStatusActive {
// and blocked, stopped or archived contacts
m.Status = MsgStatusFailed
m.FailedReason = MsgFailedContact
} else if msg.URN() == urns.NilURN || channel == nil {
// if msg is missing the URN or channel, we also fail it
m.Status = MsgStatusFailed
m.FailedReason = MsgFailedNoDestination
} else {
// also fail right away if this looks like a loop
repetitions, err := GetMsgRepetitions(rt.RP, contactID, out)
repetitions, err := GetMsgRepetitions(rt.RP, contact, out)
if err != nil {
return nil, errors.Wrap(err, "error looking up msg repetitions")
}
if repetitions >= 20 {
m.Status = MsgStatusFailed
m.FailedReason = MsgFailedLooping

logrus.WithFields(logrus.Fields{"contact_id": contactID, "text": out.Text(), "repetitions": repetitions}).Error("too many repetitions, failing message")
logrus.WithFields(logrus.Fields{"contact_id": contact.ID(), "text": out.Text(), "repetitions": repetitions}).Error("too many repetitions, failing message")
}
}

Expand Down Expand Up @@ -1082,7 +1087,7 @@ func CreateBroadcastMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAs

// create our outgoing message
out := flows.NewMsgOut(urn, channel.ChannelReference(), text, t.Attachments, t.QuickReplies, nil, flows.NilMsgTopic)
msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), channel, c.ID(), out, time.Now(), bcast.BroadcastID())
msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), channel, contact, out, time.Now(), bcast.BroadcastID())
if err != nil {
return nil, errors.Wrapf(err, "error creating outgoing message")
}
Expand Down
37 changes: 28 additions & 9 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestNewOutgoingFlowMsg(t *testing.T) {

defer testsuite.Reset(testsuite.ResetData)

blake := testdata.InsertContact(db, testdata.Org1, "79b94a23-6d13-43f4-95fe-c733ee457857", "Blake", envs.NilLanguage, models.ContactStatusBlocked)
blakeURNID := testdata.InsertContactURN(db, testdata.Org1, blake, "tel:++250700000007", 1)

tcs := []struct {
ChannelUUID assets.ChannelUUID
Text string
Expand Down Expand Up @@ -118,7 +121,6 @@ func TestNewOutgoingFlowMsg(t *testing.T) {
URN: urns.NilURN,
URNID: models.URNID(0),
Flow: testdata.Favorites,
SuspendedOrg: false,
ExpectedStatus: models.MsgStatusFailed,
ExpectedFailedReason: models.MsgFailedNoDestination,
ExpectedMetadata: map[string]interface{}{},
Expand All @@ -129,16 +131,28 @@ func TestNewOutgoingFlowMsg(t *testing.T) {
ChannelUUID: "",
Text: "missing Channel",
Contact: testdata.Cathy,
URN: urns.NilURN,
URNID: models.URNID(0),
URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", testdata.Cathy.URNID)),
URNID: testdata.Cathy.URNID,
Flow: testdata.Favorites,
SuspendedOrg: false,
ExpectedStatus: models.MsgStatusFailed,
ExpectedFailedReason: models.MsgFailedNoDestination,
ExpectedMetadata: map[string]interface{}{},
ExpectedMsgCount: 1,
ExpectedPriority: false,
},
{
ChannelUUID: "74729f45-7f29-4868-9dc4-90e491e3c7d8",
Text: "blocked contact",
Contact: blake,
URN: urns.URN(fmt.Sprintf("tel:+250700000007?id=%d", blakeURNID)),
URNID: blakeURNID,
Flow: testdata.Favorites,
ExpectedStatus: models.MsgStatusFailed,
ExpectedFailedReason: models.MsgFailedContact,
ExpectedMetadata: map[string]interface{}{},
ExpectedMsgCount: 1,
ExpectedPriority: false,
},
}

now := time.Now()
Expand All @@ -152,7 +166,7 @@ func TestNewOutgoingFlowMsg(t *testing.T) {
channel := oa.ChannelByUUID(tc.ChannelUUID)
flow, _ := oa.FlowByID(tc.Flow.ID)

session := insertTestSession(t, ctx, rt, testdata.Org1, testdata.Cathy, testdata.Favorites)
session := insertTestSession(t, ctx, rt, testdata.Org1, tc.Contact, testdata.Favorites)
if tc.ResponseTo != models.NilMsgID {
session.SetIncomingMsg(flows.MsgID(tc.ResponseTo), null.NullString)
}
Expand Down Expand Up @@ -188,7 +202,7 @@ func TestNewOutgoingFlowMsg(t *testing.T) {
}

// check nil failed reasons are saved as NULLs
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE failed_reason IS NOT NULL`).Returns(3)
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE failed_reason IS NOT NULL`).Returns(4)

// ensure org is unsuspended
db.MustExec(`UPDATE orgs_org SET is_suspended = FALSE`)
Expand Down Expand Up @@ -254,6 +268,8 @@ func TestMarshalMsg(t *testing.T) {
msg1, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, session, flow, flowMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC))
require.NoError(t, err)

cathy := session.Contact()

err = models.InsertMessages(ctx, db, []*models.Msg{msg1})
require.NoError(t, err)

Expand Down Expand Up @@ -344,7 +360,7 @@ func TestMarshalMsg(t *testing.T) {
// try a broadcast message which won't have session and flow fields set
bcastID := testdata.InsertBroadcast(db, testdata.Org1, `eng`, map[envs.Language]string{`eng`: "Blast"}, models.NilScheduleID, []*testdata.Contact{testdata.Cathy}, nil)
bcastMsg1 := flows.NewMsgOut(urn, assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), "Blast", nil, nil, nil, flows.NilMsgTopic)
msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), channel, testdata.Cathy.ID, bcastMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC), bcastID)
msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), channel, cathy, bcastMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC), bcastID)
require.NoError(t, err)

err = models.InsertMessages(ctx, db, []*models.Msg{msg2})
Expand Down Expand Up @@ -466,18 +482,21 @@ func TestResendMessages(t *testing.T) {
}

func TestGetMsgRepetitions(t *testing.T) {
_, _, _, rp := testsuite.Get()
_, rt, db, rp := testsuite.Get()

defer testsuite.Reset(testsuite.ResetRedis)
defer dates.SetNowSource(dates.DefaultNowSource)

dates.SetNowSource(dates.NewFixedNowSource(time.Date(2021, 11, 18, 12, 13, 3, 234567, time.UTC)))

oa := testdata.Org1.Load(rt)
_, cathy := testdata.Cathy.Load(db, oa)

msg1 := flows.NewMsgOut(testdata.Cathy.URN, nil, "foo", nil, nil, nil, flows.NilMsgTopic)
msg2 := flows.NewMsgOut(testdata.Cathy.URN, nil, "bar", nil, nil, nil, flows.NilMsgTopic)

assertRepetitions := func(m *flows.MsgOut, expected int) {
count, err := models.GetMsgRepetitions(rp, testdata.Cathy.ID, m)
count, err := models.GetMsgRepetitions(rp, cathy, m)
require.NoError(t, err)
assert.Equal(t, expected, count)
}
Expand Down

0 comments on commit 3cec50c

Please sign in to comment.