From d4ddc5b358d04c639b1508c4c79b3f4bcf9f93fa Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 8 Jan 2025 11:47:26 -0500 Subject: [PATCH 1/2] Add support for new category count model --- core/models/flow_stats.go | 85 ++++++++++++++++++++++++++++++++------- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/core/models/flow_stats.go b/core/models/flow_stats.go index c1a64572e..69437c023 100644 --- a/core/models/flow_stats.go +++ b/core/models/flow_stats.go @@ -8,6 +8,8 @@ import ( "github.com/buger/jsonparser" "github.com/nyaruka/gocommon/stringsx" "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/events" + "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/runtime" "github.com/nyaruka/redisx" ) @@ -20,22 +22,45 @@ const ( var storeOperandsForTypes = map[string]bool{"wait_for_response": true, "split_by_expression": true, "split_by_contact_field": true, "split_by_run_result": true} -const sqlInsertFlowActivityCount = ` -INSERT INTO flows_flowactivitycount( flow_id, scope, count, is_squashed) - VALUES(:flow_id, :scope, :count, FALSE) -` - type FlowActivityCount struct { FlowID FlowID `db:"flow_id"` Scope string `db:"scope"` Count int `db:"count"` } +const sqlInsertFlowActivityCount = ` +INSERT INTO flows_flowactivitycount( flow_id, scope, count, is_squashed) + VALUES(:flow_id, :scope, :count, FALSE) +` + // InsertFlowActivityCounts inserts the given flow activity counts into the database func InsertFlowActivityCounts(ctx context.Context, db DBorTx, counts []*FlowActivityCount) error { return BulkQuery(ctx, "insert flow activity counts", db, sqlInsertFlowActivityCount, counts) } +type FlowResultCount struct { + FlowID FlowID `db:"flow_id"` + Result string `db:"result"` + Category string `db:"category"` + Count int `db:"count"` +} + +const sqlInsertFlowResultCount = ` +INSERT INTO flows_flowresultcount( flow_id, result, category, count, is_squashed) + VALUES(:flow_id, :result, :category, :count, FALSE) +` + +// InsertFlowResultCounts inserts the given flow result counts into the database +func InsertFlowResultCounts(ctx context.Context, db DBorTx, counts []*FlowResultCount) error { + return BulkQuery(ctx, "insert flow result counts", db, sqlInsertFlowResultCount, counts) +} + +type resultInfo struct { + flowID FlowID + result string + category string +} + type segmentInfo struct { flowID FlowID exitUUID flows.ExitUUID @@ -53,6 +78,7 @@ type segmentRecentContact struct { func RecordFlowStatistics(ctx context.Context, rt *runtime.Runtime, db DBorTx, sessions []flows.Session, sprints []flows.Sprint) error { countsBySegment := make(map[segmentInfo]int, 10) recentBySegment := make(map[segmentInfo][]*segmentRecentContact, 10) + categoryChanges := make(map[resultInfo]int, 10) nodeTypeCache := make(map[flows.NodeUUID]string) for i, sprint := range sprints { @@ -79,19 +105,50 @@ func RecordFlowStatistics(ctx context.Context, rt *runtime.Runtime, db DBorTx, s recentBySegment[segID] = append(recentBySegment[segID], &segmentRecentContact{contact: session.Contact(), operand: operand, time: seg.Time(), rnd: redisx.RandomBase64(10)}) } } + + for _, e := range sprint.Events() { + switch typed := e.(type) { + case *events.RunResultChangedEvent: + run, _ := session.FindStep(e.StepUUID()) + flow := run.Flow().Asset().(*Flow) + resultKey := utils.Snakify(typed.Name) + if typed.Previous != nil { + categoryChanges[resultInfo{flowID: flow.ID(), result: resultKey, category: typed.Previous.Category}]-- + } + categoryChanges[resultInfo{flowID: flow.ID(), result: resultKey, category: typed.Category}]++ + } + } + } + + activityCounts := make([]*FlowActivityCount, 0, len(countsBySegment)) + for seg, count := range countsBySegment { + if count != 0 { + activityCounts = append(activityCounts, &FlowActivityCount{ + FlowID: seg.flowID, + Scope: fmt.Sprintf("segment:%s:%s", seg.exitUUID, seg.destUUID), + Count: count, + }) + } } - counts := make([]*FlowActivityCount, 0, len(countsBySegment)) - for segID, count := range countsBySegment { - counts = append(counts, &FlowActivityCount{ - FlowID: segID.flowID, - Scope: fmt.Sprintf("segment:%s:%s", segID.exitUUID, segID.destUUID), - Count: count, - }) + if err := InsertFlowActivityCounts(ctx, db, activityCounts); err != nil { + return fmt.Errorf("error inserting flow activity counts: %w", err) + } + + resultCounts := make([]*FlowResultCount, 0, len(categoryChanges)) + for res, count := range categoryChanges { + if count != 0 { + resultCounts = append(resultCounts, &FlowResultCount{ + FlowID: res.flowID, + Result: res.result, + Category: res.category, + Count: count, + }) + } } - if err := InsertFlowActivityCounts(ctx, db, counts); err != nil { - return fmt.Errorf("error inserting flow segment counts: %w", err) + if err := InsertFlowResultCounts(ctx, db, resultCounts); err != nil { + return fmt.Errorf("error inserting flow result counts: %w", err) } rc := rt.RP.Get() From 232d2ac29777974378dd8dd8458b6f05426cefa3 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 9 Jan 2025 10:26:50 -0500 Subject: [PATCH 2/2] Add tests for new category counts --- core/models/flow_stats_test.go | 38 ++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/core/models/flow_stats_test.go b/core/models/flow_stats_test.go index 23ef3d7d3..a6108dd63 100644 --- a/core/models/flow_stats_test.go +++ b/core/models/flow_stats_test.go @@ -1,8 +1,10 @@ package models_test import ( + "fmt" "testing" + "github.com/nyaruka/gocommon/dbutil/assertdb" "github.com/nyaruka/gocommon/random" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/test" @@ -42,13 +44,17 @@ func TestRecordFlowStatistics(t *testing.T) { require.NoError(t, err) // should have a single record of all 3 contacts going through the first segment - var counts []*models.FlowActivityCount - err = rt.DB.SelectContext(ctx, &counts, "SELECT flow_id, scope, count FROM flows_flowactivitycount ORDER BY flow_id, scope") + var activityCounts []*models.FlowActivityCount + err = rt.DB.SelectContext(ctx, &activityCounts, "SELECT flow_id, scope, count FROM flows_flowactivitycount ORDER BY flow_id, scope") require.NoError(t, err) - assert.Len(t, counts, 1) - assert.Equal(t, &models.FlowActivityCount{flow.ID, "segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", 3}, counts[0]) + assert.Len(t, activityCounts, 1) + assert.Equal(t, &models.FlowActivityCount{flow.ID, "segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", 3}, activityCounts[0]) + + // should have no result counts yet + assertdb.Query(t, rt.DB, "SELECT count(*) FROM flows_flowresultcount").Returns(0) assertFlowActivityCounts(t, rt, flow.ID, map[string]int{"segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94": 3}) + assertFlowResultCounts(t, rt, flow.ID, map[string]int{}) assertredis.Keys(t, rc, "*", []string{ "recent_contacts:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", // "what's your fav color" -> color split @@ -78,6 +84,7 @@ func TestRecordFlowStatistics(t *testing.T) { "segment:97cd44ce-dec2-4e19-8ca2-4e20db51dc08:0e1fe072-6f03-4f29-98aa-7bedbe930dab": 2, // "X is a great color" -> split by expression "segment:614e7451-e0bd-43d9-b317-2aded3c8d790:a1e649db-91e0-47c4-ab14-eba0d1475116": 2, // "you have X tickets" -> group split }) + assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 2, "color/Other": 1}) _, session3Sprint3, err := test.ResumeSession(session3, sa3, "azure") require.NoError(t, err) @@ -94,6 +101,7 @@ func TestRecordFlowStatistics(t *testing.T) { "segment:97cd44ce-dec2-4e19-8ca2-4e20db51dc08:0e1fe072-6f03-4f29-98aa-7bedbe930dab": 2, // "X is a great color" -> split by expression "segment:614e7451-e0bd-43d9-b317-2aded3c8d790:a1e649db-91e0-47c4-ab14-eba0d1475116": 2, // "you have X tickets" -> group split }) + assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 2, "color/Other": 1}) assertredis.Keys(t, rc, "*", []string{ "recent_contacts:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", // "what's your fav color" -> color split @@ -119,6 +127,15 @@ func TestRecordFlowStatistics(t *testing.T) { assertredis.ZRange(t, rc, "recent_contacts:2b698218-87e5-4ab8-922e-e65f91d12c10:88d8bf00-51ce-4e5e-aae8-4f957a0761a0", 0, -1, []string{"PLQQFoOgV9|123|0", "/cgnkcW6vA|234|0"}, ) + + // check that category counts are updated correctly when result changes + _, session3Sprint4, err := test.ResumeSession(session3, sa3, "blue") + require.NoError(t, err) + + err = models.RecordFlowStatistics(ctx, rt, rt.DB, []flows.Session{session3}, []flows.Sprint{session3Sprint4}) + require.NoError(t, err) + + assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 3, "color/Other": 0}) } func assertFlowActivityCounts(t *testing.T, rt *runtime.Runtime, flowID models.FlowID, expected map[string]int) { @@ -133,3 +150,16 @@ func assertFlowActivityCounts(t *testing.T, rt *runtime.Runtime, flowID models.F assert.Equal(t, expected, actual) } + +func assertFlowResultCounts(t *testing.T, rt *runtime.Runtime, flowID models.FlowID, expected map[string]int) { + var counts []*models.FlowResultCount + err := rt.DB.Select(&counts, "SELECT flow_id, result, category, SUM(count) AS count FROM flows_flowresultcount WHERE flow_id = $1 GROUP BY flow_id, result, category", flowID) + require.NoError(t, err) + + actual := make(map[string]int) + for _, c := range counts { + actual[fmt.Sprintf("%s/%s", c.Result, c.Category)] = c.Count + } + + assert.Equal(t, expected, actual) +}