Skip to content

Commit

Permalink
Merge pull request #27 from nyaruka/simpler_flow_start_batches
Browse files Browse the repository at this point in the history
Simplify FlowStartBatch
  • Loading branch information
rowanseymour authored Feb 6, 2023
2 parents 33af3fd + 67e1275 commit acac56d
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 99 deletions.
2 changes: 1 addition & 1 deletion core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func RequestCall(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets,

// create our call object
conn, err := models.InsertCall(
ctx, rt.DB, oa.OrgID(), channel.ID(), start.StartID(), contact.ID(), models.URNID(urnID),
ctx, rt.DB, oa.OrgID(), channel.ID(), start.StartID, contact.ID(), models.URNID(urnID),
models.CallDirectionOut, models.CallStatusPending, "",
)
if err != nil {
Expand Down
89 changes: 35 additions & 54 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,46 +89,27 @@ func MarkStartFailed(ctx context.Context, db Queryer, startID StartID) error {

// FlowStartBatch represents a single flow batch that needs to be started
type FlowStartBatch struct {
b struct {
StartID StartID `json:"start_id"`
StartType StartType `json:"start_type"`
OrgID OrgID `json:"org_id"`
CreatedByID UserID `json:"created_by_id"`
FlowID FlowID `json:"flow_id"`
FlowType FlowType `json:"flow_type"`
ContactIDs []ContactID `json:"contact_ids"`

ParentSummary null.JSON `json:"parent_summary,omitempty"`
SessionHistory null.JSON `json:"session_history,omitempty"`
Extra null.JSON `json:"extra,omitempty"`

RestartParticipants bool `json:"restart_participants"`
IncludeActive bool `json:"include_active"`

IsLast bool `json:"is_last,omitempty"`
TotalContacts int `json:"total_contacts"`

CreatedBy string `json:"created_by"` // deprecated
}
StartID StartID `json:"start_id"`
StartType StartType `json:"start_type"`
OrgID OrgID `json:"org_id"`
CreatedByID UserID `json:"created_by_id"`
FlowID FlowID `json:"flow_id"`
FlowType FlowType `json:"flow_type"`
ContactIDs []ContactID `json:"contact_ids"`

ParentSummary null.JSON `json:"parent_summary,omitempty"`
SessionHistory null.JSON `json:"session_history,omitempty"`
Extra null.JSON `json:"extra,omitempty"`

RestartParticipants bool `json:"restart_participants"`
IncludeActive bool `json:"include_active"`

IsLast bool `json:"is_last,omitempty"`
TotalContacts int `json:"total_contacts"`
}

func (b *FlowStartBatch) StartID() StartID { return b.b.StartID }
func (b *FlowStartBatch) StartType() StartType { return b.b.StartType }
func (b *FlowStartBatch) OrgID() OrgID { return b.b.OrgID }
func (b *FlowStartBatch) CreatedByID() UserID { return b.b.CreatedByID }
func (b *FlowStartBatch) FlowID() FlowID { return b.b.FlowID }
func (b *FlowStartBatch) ContactIDs() []ContactID { return b.b.ContactIDs }
func (b *FlowStartBatch) ExcludeStartedPreviously() bool { return !b.b.RestartParticipants }
func (b *FlowStartBatch) ExcludeInAFlow() bool { return !b.b.IncludeActive }
func (b *FlowStartBatch) IsLast() bool { return b.b.IsLast }
func (b *FlowStartBatch) TotalContacts() int { return b.b.TotalContacts }

func (b *FlowStartBatch) ParentSummary() null.JSON { return b.b.ParentSummary }
func (b *FlowStartBatch) SessionHistory() null.JSON { return b.b.SessionHistory }
func (b *FlowStartBatch) Extra() null.JSON { return b.b.Extra }

func (b *FlowStartBatch) MarshalJSON() ([]byte, error) { return json.Marshal(b.b) }
func (b *FlowStartBatch) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, &b.b) }
func (b *FlowStartBatch) ExcludeStartedPreviously() bool { return !b.RestartParticipants }
func (b *FlowStartBatch) ExcludeInAFlow() bool { return !b.IncludeActive }

// FlowStart represents the top level flow start in our system
type FlowStart struct {
Expand Down Expand Up @@ -336,22 +317,22 @@ INSERT INTO flows_flowstart_groups(flowstart_id, contactgroup_id) VALUES(:start_

// CreateBatch creates a batch for this start using the passed in contact ids
func (s *FlowStart) CreateBatch(contactIDs []ContactID, last bool, totalContacts int) *FlowStartBatch {
b := &FlowStartBatch{}
b.b.StartID = s.ID()
b.b.StartType = s.s.StartType
b.b.OrgID = s.OrgID()
b.b.FlowID = s.FlowID()
b.b.FlowType = s.FlowType()
b.b.ContactIDs = contactIDs
b.b.RestartParticipants = s.s.RestartParticipants
b.b.IncludeActive = s.s.IncludeActive
b.b.ParentSummary = s.ParentSummary()
b.b.SessionHistory = s.SessionHistory()
b.b.Extra = s.Extra()
b.b.IsLast = last
b.b.TotalContacts = totalContacts
b.b.CreatedByID = s.s.CreatedByID
return b
return &FlowStartBatch{
StartID: s.ID(),
StartType: s.s.StartType,
OrgID: s.OrgID(),
FlowID: s.FlowID(),
FlowType: s.FlowType(),
ContactIDs: contactIDs,
RestartParticipants: s.s.RestartParticipants,
IncludeActive: s.s.IncludeActive,
ParentSummary: s.ParentSummary(),
SessionHistory: s.SessionHistory(),
Extra: s.Extra(),
IsLast: last,
TotalContacts: totalContacts,
CreatedByID: s.s.CreatedByID,
}
}

func (i *StartID) Scan(value any) error { return null.ScanInt(value, i) }
Expand Down
22 changes: 11 additions & 11 deletions core/models/starts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,21 @@ func TestStarts(t *testing.T) {
assertdb.Query(t, db, `SELECT count(*) FROM flows_flowstart_contacts WHERE flowstart_id = $1`, startID).Returns(3)

batch := start.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, false, 3)
assert.Equal(t, startID, batch.StartID())
assert.Equal(t, models.StartTypeManual, batch.StartType())
assert.Equal(t, testdata.SingleMessage.ID, batch.FlowID())
assert.Equal(t, []models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, batch.ContactIDs())
assert.Equal(t, startID, batch.StartID)
assert.Equal(t, models.StartTypeManual, batch.StartType)
assert.Equal(t, testdata.SingleMessage.ID, batch.FlowID)
assert.Equal(t, []models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, batch.ContactIDs)
assert.False(t, batch.ExcludeStartedPreviously())
assert.False(t, batch.ExcludeInAFlow())
assert.Equal(t, testdata.Admin.ID, batch.CreatedByID())
assert.False(t, batch.IsLast())
assert.Equal(t, 3, batch.TotalContacts())
assert.Equal(t, testdata.Admin.ID, batch.CreatedByID)
assert.False(t, batch.IsLast)
assert.Equal(t, 3, batch.TotalContacts)

assert.Equal(t, null.JSON(`{"uuid": "b65b1a22-db6d-4f5a-9b3d-7302368a82e6"}`), batch.ParentSummary())
assert.Equal(t, null.JSON(`{"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1}`), batch.SessionHistory())
assert.Equal(t, null.JSON(`{"foo": "bar"}`), batch.Extra())
assert.Equal(t, null.JSON(`{"uuid": "b65b1a22-db6d-4f5a-9b3d-7302368a82e6"}`), batch.ParentSummary)
assert.Equal(t, null.JSON(`{"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1}`), batch.SessionHistory)
assert.Equal(t, null.JSON(`{"foo": "bar"}`), batch.Extra)

history, err := models.ReadSessionHistory(batch.SessionHistory())
history, err := models.ReadSessionHistory(batch.SessionHistory)
assert.NoError(t, err)
assert.Equal(t, flows.SessionUUID("532a3899-492f-4ffe-aed7-e75ad524efab"), history.ParentUUID)

Expand Down
40 changes: 20 additions & 20 deletions core/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,85 +144,85 @@ func StartFlowBatch(
start := time.Now()

// if this is our last start, no matter what try to set the start as complete as a last step
if batch.IsLast() {
if batch.IsLast {
defer func() {
err := models.MarkStartComplete(ctx, rt.DB, batch.StartID())
err := models.MarkStartComplete(ctx, rt.DB, batch.StartID)
if err != nil {
logrus.WithError(err).WithField("start_id", batch.StartID).Error("error marking start as complete")
}
}()
}

// create our org assets
oa, err := models.GetOrgAssets(ctx, rt, batch.OrgID())
oa, err := models.GetOrgAssets(ctx, rt, batch.OrgID)
if err != nil {
return nil, errors.Wrapf(err, "error creating assets for org: %d", batch.OrgID())
return nil, errors.Wrapf(err, "error creating assets for org: %d", batch.OrgID)
}

// try to load our flow
flow, err := oa.FlowByID(batch.FlowID())
flow, err := oa.FlowByID(batch.FlowID)
if err == models.ErrNotFound {
logrus.WithField("flow_id", batch.FlowID()).Info("skipping flow start, flow no longer active or archived")
logrus.WithField("flow_id", batch.FlowID).Info("skipping flow start, flow no longer active or archived")
return nil, nil
}
if err != nil {
return nil, errors.Wrapf(err, "error loading campaign flow: %d", batch.FlowID())
return nil, errors.Wrapf(err, "error loading campaign flow: %d", batch.FlowID)
}

// get the user that created this flow start if there was one
var flowUser *flows.User
if batch.CreatedByID() != models.NilUserID {
user := oa.UserByID(batch.CreatedByID())
if batch.CreatedByID != models.NilUserID {
user := oa.UserByID(batch.CreatedByID)
if user != nil {
flowUser = oa.SessionAssets().Users().Get(user.Email())
}
}

var params *types.XObject
if !batch.Extra().IsNull() {
params, err = types.ReadXObject(batch.Extra())
if !batch.Extra.IsNull() {
params, err = types.ReadXObject(batch.Extra)
if err != nil {
return nil, errors.Wrap(err, "unable to read JSON from flow start extra")
}
}

var history *flows.SessionHistory
if !batch.SessionHistory().IsNull() {
history, err = models.ReadSessionHistory(batch.SessionHistory())
if !batch.SessionHistory.IsNull() {
history, err = models.ReadSessionHistory(batch.SessionHistory)
if err != nil {
return nil, errors.Wrap(err, "unable to read JSON from flow start history")
}
}

// whether engine allows some functions is based on whether there is more than one contact being started
batchStart := batch.TotalContacts() > 1
batchStart := batch.TotalContacts > 1

// this will build our trigger for each contact started
triggerBuilder := func(contact *flows.Contact) flows.Trigger {
if !batch.ParentSummary().IsNull() {
tb := triggers.NewBuilder(oa.Env(), flow.Reference(), contact).FlowAction(history, json.RawMessage(batch.ParentSummary()))
if !batch.ParentSummary.IsNull() {
tb := triggers.NewBuilder(oa.Env(), flow.Reference(), contact).FlowAction(history, json.RawMessage(batch.ParentSummary))
if batchStart {
tb = tb.AsBatch()
}
return tb.Build()
}

tb := triggers.NewBuilder(oa.Env(), flow.Reference(), contact).Manual()
if batch.Extra() != nil {
if batch.Extra != nil {
tb = tb.WithParams(params)
}
if batchStart {
tb = tb.AsBatch()
}
return tb.WithUser(flowUser).WithOrigin(startTypeToOrigin[batch.StartType()]).Build()
return tb.WithUser(flowUser).WithOrigin(startTypeToOrigin[batch.StartType]).Build()
}

// before committing our runs we want to set the start they are associated with
updateStartID := func(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, sessions []*models.Session) error {
// for each run in our sessions, set the start id
for _, s := range sessions {
for _, r := range s.Runs() {
r.SetStartID(batch.StartID())
r.SetStartID(batch.StartID)
}
}
return nil
Expand All @@ -236,7 +236,7 @@ func StartFlowBatch(
options.TriggerBuilder = triggerBuilder
options.CommitHook = updateStartID

sessions, err := StartFlow(ctx, rt, oa, flow, batch.ContactIDs(), options)
sessions, err := StartFlow(ctx, rt, oa, flow, batch.ContactIDs, options)
if err != nil {
return nil, errors.Wrapf(err, "error starting flow batch")
}
Expand Down
26 changes: 13 additions & 13 deletions core/tasks/ivr/start_ivr_flow_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func HandleFlowStartBatch(ctx context.Context, rt *runtime.Runtime, batch *model
// filter out anybody who has has a flow run in this flow if appropriate
if batch.ExcludeStartedPreviously() {
// find all participants that have been in this flow
started, err := models.FindFlowStartedOverlap(ctx, rt.DB, batch.FlowID(), batch.ContactIDs())
started, err := models.FindFlowStartedOverlap(ctx, rt.DB, batch.FlowID, batch.ContactIDs)
if err != nil {
return errors.Wrapf(err, "error finding others started flow: %d", batch.FlowID())
return errors.Wrapf(err, "error finding others started flow: %d", batch.FlowID)
}
for _, c := range started {
exclude[c] = true
Expand All @@ -56,27 +56,27 @@ func HandleFlowStartBatch(ctx context.Context, rt *runtime.Runtime, batch *model
// filter out our list of contacts to only include those that should be started
if batch.ExcludeInAFlow() {
// find all participants active in other sessions
active, err := models.FilterByWaitingSession(ctx, rt.DB, batch.ContactIDs())
active, err := models.FilterByWaitingSession(ctx, rt.DB, batch.ContactIDs)
if err != nil {
return errors.Wrapf(err, "error finding other active sessions: %d", batch.FlowID())
return errors.Wrapf(err, "error finding other active sessions: %d", batch.FlowID)
}
for _, c := range active {
exclude[c] = true
}
}

// filter into our final list of contacts
contactIDs := make([]models.ContactID, 0, len(batch.ContactIDs()))
for _, c := range batch.ContactIDs() {
contactIDs := make([]models.ContactID, 0, len(batch.ContactIDs))
for _, c := range batch.ContactIDs {
if !exclude[c] {
contactIDs = append(contactIDs, c)
}
}

// load our org assets
oa, err := models.GetOrgAssets(ctx, rt, batch.OrgID())
oa, err := models.GetOrgAssets(ctx, rt, batch.OrgID)
if err != nil {
return errors.Wrapf(err, "error loading org assets for org: %d", batch.OrgID())
return errors.Wrapf(err, "error loading org assets for org: %d", batch.OrgID)
}

// ok, we can initiate calls for the remaining contacts
Expand All @@ -93,29 +93,29 @@ func HandleFlowStartBatch(ctx context.Context, rt *runtime.Runtime, batch *model
session, err := ivr.RequestCall(ctx, rt, oa, batch, contact)
cancel()
if err != nil {
logrus.WithError(err).Errorf("error starting ivr flow for contact: %d and flow: %d", contact.ID(), batch.FlowID())
logrus.WithError(err).Errorf("error starting ivr flow for contact: %d and flow: %d", contact.ID(), batch.FlowID)
continue
}
if session == nil {
logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"contact_id": contact.ID(),
"start_id": batch.StartID(),
"start_id": batch.StartID,
}).Info("call start skipped, no suitable channel")
continue
}
logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"contact_id": contact.ID(),
"status": session.Status(),
"start_id": batch.StartID(),
"start_id": batch.StartID,
"external_id": session.ExternalID(),
}).Info("requested call for contact")
}

// if this is a last batch, mark our start as started
if batch.IsLast() {
err := models.MarkStartComplete(ctx, rt.DB, batch.StartID())
if batch.IsLast {
err := models.MarkStartComplete(ctx, rt.DB, batch.StartID)
if err != nil {
return errors.Wrapf(err, "error trying to set batch as complete")
}
Expand Down

0 comments on commit acac56d

Please sign in to comment.