From caf9b0ae897db17ce0dda5dd08a522a175623455 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 14 Mar 2022 16:09:28 -0500 Subject: [PATCH] Messages to stopped, blocked or archived contacts should immediately fail --- core/models/msgs.go | 27 ++++++++++++++++----------- core/models/msgs_test.go | 37 ++++++++++++++++++++++++++++--------- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/core/models/msgs.go b/core/models/msgs.go index 698ea39de..87fbb225d 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -79,7 +79,8 @@ type MsgFailedReason null.String const ( NilMsgFailedReason = MsgFailedReason("") - MsgFailedSuspended = MsgFailedReason("S") + MsgFailedSuspended = MsgFailedReason("S") // workspace suspended + MsgFailedContact = MsgFailedReason("C") // contact blocked, stopped or archived MsgFailedLooping = MsgFailedReason("L") MsgFailedErrorLimit = MsgFailedReason("E") MsgFailedTooOld = MsgFailedReason("O") @@ -321,31 +322,31 @@ return count `) // GetMsgRepetitions gets the number of repetitions of this msg text for the given contact in the current 5 minute window -func GetMsgRepetitions(rp *redis.Pool, contactID ContactID, msg *flows.MsgOut) (int, error) { +func GetMsgRepetitions(rp *redis.Pool, contact *flows.Contact, msg *flows.MsgOut) (int, error) { rc := rp.Get() defer rc.Close() keyTime := dates.Now().UTC().Round(time.Minute * 5) key := fmt.Sprintf("msg_repetitions:%s", keyTime.Format("2006-01-02T15:04")) - return redis.Int(msgRepetitionsScript.Do(rc, key, contactID, msg.Text())) + return redis.Int(msgRepetitionsScript.Do(rc, key, contact.ID(), msg.Text())) } // NewOutgoingFlowMsg creates an outgoing message for the passed in flow message func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session *Session, flow *Flow, out *flows.MsgOut, createdOn time.Time) (*Msg, error) { - return newOutgoingMsg(rt, org, channel, session.ContactID(), out, createdOn, session, flow, NilBroadcastID) + return newOutgoingMsg(rt, org, channel, session.Contact(), out, createdOn, session, flow, NilBroadcastID) } // NewOutgoingBroadcastMsg creates an outgoing message which is part of a broadcast -func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgOut, createdOn time.Time, broadcastID BroadcastID) (*Msg, error) { - return newOutgoingMsg(rt, org, channel, contactID, out, createdOn, nil, nil, broadcastID) +func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, broadcastID BroadcastID) (*Msg, error) { + return newOutgoingMsg(rt, org, channel, contact, out, createdOn, nil, nil, broadcastID) } -func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID) (*Msg, error) { +func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID) (*Msg, error) { msg := &Msg{} m := &msg.m m.UUID = out.UUID() m.OrgID = org.ID() - m.ContactID = contactID + m.ContactID = ContactID(contact.ID()) m.BroadcastID = broadcastID m.TopupID = NilTopupID m.Text = out.Text() @@ -364,13 +365,17 @@ func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID C // we fail messages for suspended orgs right away m.Status = MsgStatusFailed m.FailedReason = MsgFailedSuspended + } else if contact.Status() != flows.ContactStatusActive { + // and blocked, stopped or archived contacts + m.Status = MsgStatusFailed + m.FailedReason = MsgFailedContact } else if msg.URN() == urns.NilURN || channel == nil { // if msg is missing the URN or channel, we also fail it m.Status = MsgStatusFailed m.FailedReason = MsgFailedNoDestination } else { // also fail right away if this looks like a loop - repetitions, err := GetMsgRepetitions(rt.RP, contactID, out) + repetitions, err := GetMsgRepetitions(rt.RP, contact, out) if err != nil { return nil, errors.Wrap(err, "error looking up msg repetitions") } @@ -378,7 +383,7 @@ func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID C m.Status = MsgStatusFailed m.FailedReason = MsgFailedLooping - logrus.WithFields(logrus.Fields{"contact_id": contactID, "text": out.Text(), "repetitions": repetitions}).Error("too many repetitions, failing message") + logrus.WithFields(logrus.Fields{"contact_id": contact.ID(), "text": out.Text(), "repetitions": repetitions}).Error("too many repetitions, failing message") } } @@ -1082,7 +1087,7 @@ func CreateBroadcastMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAs // create our outgoing message out := flows.NewMsgOut(urn, channel.ChannelReference(), text, t.Attachments, t.QuickReplies, nil, flows.NilMsgTopic) - msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), channel, c.ID(), out, time.Now(), bcast.BroadcastID()) + msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), channel, contact, out, time.Now(), bcast.BroadcastID()) if err != nil { return nil, errors.Wrapf(err, "error creating outgoing message") } diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index de41a7d4a..2ab317cd8 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -32,6 +32,9 @@ func TestNewOutgoingFlowMsg(t *testing.T) { defer testsuite.Reset(testsuite.ResetData) + blake := testdata.InsertContact(db, testdata.Org1, "79b94a23-6d13-43f4-95fe-c733ee457857", "Blake", envs.NilLanguage, models.ContactStatusBlocked) + blakeURNID := testdata.InsertContactURN(db, testdata.Org1, blake, "tel:++250700000007", 1) + tcs := []struct { ChannelUUID assets.ChannelUUID Text string @@ -118,7 +121,6 @@ func TestNewOutgoingFlowMsg(t *testing.T) { URN: urns.NilURN, URNID: models.URNID(0), Flow: testdata.Favorites, - SuspendedOrg: false, ExpectedStatus: models.MsgStatusFailed, ExpectedFailedReason: models.MsgFailedNoDestination, ExpectedMetadata: map[string]interface{}{}, @@ -129,16 +131,28 @@ func TestNewOutgoingFlowMsg(t *testing.T) { ChannelUUID: "", Text: "missing Channel", Contact: testdata.Cathy, - URN: urns.NilURN, - URNID: models.URNID(0), + URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", testdata.Cathy.URNID)), + URNID: testdata.Cathy.URNID, Flow: testdata.Favorites, - SuspendedOrg: false, ExpectedStatus: models.MsgStatusFailed, ExpectedFailedReason: models.MsgFailedNoDestination, ExpectedMetadata: map[string]interface{}{}, ExpectedMsgCount: 1, ExpectedPriority: false, }, + { + ChannelUUID: "74729f45-7f29-4868-9dc4-90e491e3c7d8", + Text: "blocked contact", + Contact: blake, + URN: urns.URN(fmt.Sprintf("tel:+250700000007?id=%d", blakeURNID)), + URNID: blakeURNID, + Flow: testdata.Favorites, + ExpectedStatus: models.MsgStatusFailed, + ExpectedFailedReason: models.MsgFailedContact, + ExpectedMetadata: map[string]interface{}{}, + ExpectedMsgCount: 1, + ExpectedPriority: false, + }, } now := time.Now() @@ -152,7 +166,7 @@ func TestNewOutgoingFlowMsg(t *testing.T) { channel := oa.ChannelByUUID(tc.ChannelUUID) flow, _ := oa.FlowByID(tc.Flow.ID) - session := insertTestSession(t, ctx, rt, testdata.Org1, testdata.Cathy, testdata.Favorites) + session := insertTestSession(t, ctx, rt, testdata.Org1, tc.Contact, testdata.Favorites) if tc.ResponseTo != models.NilMsgID { session.SetIncomingMsg(flows.MsgID(tc.ResponseTo), null.NullString) } @@ -188,7 +202,7 @@ func TestNewOutgoingFlowMsg(t *testing.T) { } // check nil failed reasons are saved as NULLs - assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE failed_reason IS NOT NULL`).Returns(3) + assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE failed_reason IS NOT NULL`).Returns(4) // ensure org is unsuspended db.MustExec(`UPDATE orgs_org SET is_suspended = FALSE`) @@ -254,6 +268,8 @@ func TestMarshalMsg(t *testing.T) { msg1, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, session, flow, flowMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC)) require.NoError(t, err) + cathy := session.Contact() + err = models.InsertMessages(ctx, db, []*models.Msg{msg1}) require.NoError(t, err) @@ -344,7 +360,7 @@ func TestMarshalMsg(t *testing.T) { // try a broadcast message which won't have session and flow fields set bcastID := testdata.InsertBroadcast(db, testdata.Org1, `eng`, map[envs.Language]string{`eng`: "Blast"}, models.NilScheduleID, []*testdata.Contact{testdata.Cathy}, nil) bcastMsg1 := flows.NewMsgOut(urn, assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), "Blast", nil, nil, nil, flows.NilMsgTopic) - msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), channel, testdata.Cathy.ID, bcastMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC), bcastID) + msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), channel, cathy, bcastMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC), bcastID) require.NoError(t, err) err = models.InsertMessages(ctx, db, []*models.Msg{msg2}) @@ -466,18 +482,21 @@ func TestResendMessages(t *testing.T) { } func TestGetMsgRepetitions(t *testing.T) { - _, _, _, rp := testsuite.Get() + _, rt, db, rp := testsuite.Get() defer testsuite.Reset(testsuite.ResetRedis) defer dates.SetNowSource(dates.DefaultNowSource) dates.SetNowSource(dates.NewFixedNowSource(time.Date(2021, 11, 18, 12, 13, 3, 234567, time.UTC))) + oa := testdata.Org1.Load(rt) + _, cathy := testdata.Cathy.Load(db, oa) + msg1 := flows.NewMsgOut(testdata.Cathy.URN, nil, "foo", nil, nil, nil, flows.NilMsgTopic) msg2 := flows.NewMsgOut(testdata.Cathy.URN, nil, "bar", nil, nil, nil, flows.NilMsgTopic) assertRepetitions := func(m *flows.MsgOut, expected int) { - count, err := models.GetMsgRepetitions(rp, testdata.Cathy.ID, m) + count, err := models.GetMsgRepetitions(rp, cathy, m) require.NoError(t, err) assert.Equal(t, expected, count) }