From a1e56d8d641b14ea8a05dfea6a204029036240d3 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 15 Mar 2022 12:09:34 -0500 Subject: [PATCH] Allow expirations and timeouts to resume sessions for stopped, blocked and archived contacts --- core/tasks/expirations/cron.go | 16 +++++++--------- core/tasks/expirations/cron_test.go | 28 +++++++++++++++++----------- core/tasks/handler/worker.go | 5 ----- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/core/tasks/expirations/cron.go b/core/tasks/expirations/cron.go index 442f19ffa..361cc5cbe 100644 --- a/core/tasks/expirations/cron.go +++ b/core/tasks/expirations/cron.go @@ -77,7 +77,7 @@ func HandleWaitExpirations(ctx context.Context, rt *runtime.Runtime) error { } // if it can't be resumed, add to batch to be expired - if !expiredWait.WaitResumes || expiredWait.ContactStatus != models.ContactStatusActive { + if !expiredWait.WaitResumes { expiredSessions = append(expiredSessions, expiredWait.SessionID) // batch is full? commit it @@ -135,20 +135,18 @@ func HandleWaitExpirations(ctx context.Context, rt *runtime.Runtime) error { } const sqlSelectExpiredWaits = ` - SELECT s.id as session_id, s.org_id, s.wait_expires_on, s.wait_resume_on_expire , c.id as contact_id, c.status as contact_status + SELECT s.id as session_id, s.org_id, s.wait_expires_on, s.wait_resume_on_expire , s.contact_id FROM flows_flowsession s -INNER JOIN contacts_contact c ON c.id = s.contact_id WHERE s.session_type = 'M' AND s.status = 'W' AND s.wait_expires_on <= NOW() ORDER BY s.wait_expires_on ASC LIMIT 25000` type ExpiredWait struct { - SessionID models.SessionID `db:"session_id"` - OrgID models.OrgID `db:"org_id"` - WaitExpiresOn time.Time `db:"wait_expires_on"` - WaitResumes bool `db:"wait_resume_on_expire"` - ContactID models.ContactID `db:"contact_id"` - ContactStatus models.ContactStatus `db:"contact_status"` + SessionID models.SessionID `db:"session_id"` + OrgID models.OrgID `db:"org_id"` + WaitExpiresOn time.Time `db:"wait_expires_on"` + WaitResumes bool `db:"wait_resume_on_expire"` + ContactID models.ContactID `db:"contact_id"` } // ExpireVoiceSessions looks for voice sessions that should be expired and ends them diff --git a/core/tasks/expirations/cron_test.go b/core/tasks/expirations/cron_test.go index 4282927a0..741cab636 100644 --- a/core/tasks/expirations/cron_test.go +++ b/core/tasks/expirations/cron_test.go @@ -1,11 +1,11 @@ package expirations_test import ( - "encoding/json" "testing" "time" "github.com/nyaruka/gocommon/dbutil/assertdb" + "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/goflow/envs" _ "github.com/nyaruka/mailroom/core/handlers" "github.com/nyaruka/mailroom/core/models" @@ -47,7 +47,7 @@ func TestExpirations(t *testing.T) { // create a parent/child session for the blocked contact s5ID := testdata.InsertWaitingSession(db, testdata.Org1, blake, models.FlowTypeMessaging, testdata.Favorites, models.NilConnectionID, time.Now(), time.Now(), true, nil) - r6ID := testdata.InsertFlowRun(db, testdata.Org1, s5ID, blake, testdata.Favorites, models.RunStatusWaiting) + r6ID := testdata.InsertFlowRun(db, testdata.Org1, s5ID, blake, testdata.Favorites, models.RunStatusActive) r7ID := testdata.InsertFlowRun(db, testdata.Org1, s5ID, blake, testdata.Favorites, models.RunStatusWaiting) time.Sleep(5 * time.Millisecond) @@ -73,23 +73,29 @@ func TestExpirations(t *testing.T) { assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1;`, s4ID).Columns(map[string]interface{}{"status": "W"}) assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r5ID).Columns(map[string]interface{}{"status": "W"}) - // blocked contact's session and runs should be expired because a blocked contact can't resume - assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1;`, s5ID).Columns(map[string]interface{}{"status": "X"}) - assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r6ID).Columns(map[string]interface{}{"status": "X"}) - assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r7ID).Columns(map[string]interface{}{"status": "X"}) + // blocked contact's session and runs sshould be unchanged because it's been queued for resumption.. like any other contact + assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1;`, s5ID).Columns(map[string]interface{}{"status": "W"}) + assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r6ID).Columns(map[string]interface{}{"status": "A"}) + assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r7ID).Columns(map[string]interface{}{"status": "W"}) - // should have created one task + // should have created two expiration tasks task, err := queue.PopNextTask(rc, queue.HandlerQueue) assert.NoError(t, err) assert.NotNil(t, task) - // decode the task + // check the first task eventTask := &handler.HandleEventTask{} - err = json.Unmarshal(task.Task, eventTask) + jsonx.MustUnmarshal(task.Task, eventTask) + assert.Equal(t, testdata.George.ID, eventTask.ContactID) + + task, err = queue.PopNextTask(rc, queue.HandlerQueue) assert.NoError(t, err) + assert.NotNil(t, task) - // assert its the right contact - assert.Equal(t, testdata.George.ID, eventTask.ContactID) + // check the second task + eventTask = &handler.HandleEventTask{} + jsonx.MustUnmarshal(task.Task, eventTask) + assert.Equal(t, blake.ID, eventTask.ContactID) // no other tasks task, err = queue.PopNextTask(rc, queue.HandlerQueue) diff --git a/core/tasks/handler/worker.go b/core/tasks/handler/worker.go index b03f5935d..bfb20c410 100644 --- a/core/tasks/handler/worker.go +++ b/core/tasks/handler/worker.go @@ -210,11 +210,6 @@ func handleTimedEvent(ctx context.Context, rt *runtime.Runtime, eventType string return errors.Wrapf(err, "error loading contact") } - // contact has been deleted or is blocked/stopped/archived, ignore this event - if len(contacts) == 0 || contacts[0].Status() != models.ContactStatusActive { - return nil - } - modelContact := contacts[0] // build our flow contact