Skip to content

Commit

Permalink
Merge pull request #398 from nyaruka/new_cat_counts
Browse files Browse the repository at this point in the history
Add support for new category count model
  • Loading branch information
rowanseymour authored Jan 9, 2025
2 parents ca57be2 + 232d2ac commit 005b02e
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 18 deletions.
85 changes: 71 additions & 14 deletions core/models/flow_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/buger/jsonparser"
"github.com/nyaruka/gocommon/stringsx"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/redisx"
)
Expand All @@ -20,22 +22,45 @@ const (

var storeOperandsForTypes = map[string]bool{"wait_for_response": true, "split_by_expression": true, "split_by_contact_field": true, "split_by_run_result": true}

const sqlInsertFlowActivityCount = `
INSERT INTO flows_flowactivitycount( flow_id, scope, count, is_squashed)
VALUES(:flow_id, :scope, :count, FALSE)
`

type FlowActivityCount struct {
FlowID FlowID `db:"flow_id"`
Scope string `db:"scope"`
Count int `db:"count"`
}

const sqlInsertFlowActivityCount = `
INSERT INTO flows_flowactivitycount( flow_id, scope, count, is_squashed)
VALUES(:flow_id, :scope, :count, FALSE)
`

// InsertFlowActivityCounts inserts the given flow activity counts into the database
func InsertFlowActivityCounts(ctx context.Context, db DBorTx, counts []*FlowActivityCount) error {
return BulkQuery(ctx, "insert flow activity counts", db, sqlInsertFlowActivityCount, counts)
}

type FlowResultCount struct {
FlowID FlowID `db:"flow_id"`
Result string `db:"result"`
Category string `db:"category"`
Count int `db:"count"`
}

const sqlInsertFlowResultCount = `
INSERT INTO flows_flowresultcount( flow_id, result, category, count, is_squashed)
VALUES(:flow_id, :result, :category, :count, FALSE)
`

// InsertFlowResultCounts inserts the given flow result counts into the database
func InsertFlowResultCounts(ctx context.Context, db DBorTx, counts []*FlowResultCount) error {
return BulkQuery(ctx, "insert flow result counts", db, sqlInsertFlowResultCount, counts)
}

type resultInfo struct {
flowID FlowID
result string
category string
}

type segmentInfo struct {
flowID FlowID
exitUUID flows.ExitUUID
Expand All @@ -53,6 +78,7 @@ type segmentRecentContact struct {
func RecordFlowStatistics(ctx context.Context, rt *runtime.Runtime, db DBorTx, sessions []flows.Session, sprints []flows.Sprint) error {
countsBySegment := make(map[segmentInfo]int, 10)
recentBySegment := make(map[segmentInfo][]*segmentRecentContact, 10)
categoryChanges := make(map[resultInfo]int, 10)
nodeTypeCache := make(map[flows.NodeUUID]string)

for i, sprint := range sprints {
Expand All @@ -79,19 +105,50 @@ func RecordFlowStatistics(ctx context.Context, rt *runtime.Runtime, db DBorTx, s
recentBySegment[segID] = append(recentBySegment[segID], &segmentRecentContact{contact: session.Contact(), operand: operand, time: seg.Time(), rnd: redisx.RandomBase64(10)})
}
}

for _, e := range sprint.Events() {
switch typed := e.(type) {
case *events.RunResultChangedEvent:
run, _ := session.FindStep(e.StepUUID())
flow := run.Flow().Asset().(*Flow)
resultKey := utils.Snakify(typed.Name)
if typed.Previous != nil {
categoryChanges[resultInfo{flowID: flow.ID(), result: resultKey, category: typed.Previous.Category}]--
}
categoryChanges[resultInfo{flowID: flow.ID(), result: resultKey, category: typed.Category}]++
}
}
}

activityCounts := make([]*FlowActivityCount, 0, len(countsBySegment))
for seg, count := range countsBySegment {
if count != 0 {
activityCounts = append(activityCounts, &FlowActivityCount{
FlowID: seg.flowID,
Scope: fmt.Sprintf("segment:%s:%s", seg.exitUUID, seg.destUUID),
Count: count,
})
}
}

counts := make([]*FlowActivityCount, 0, len(countsBySegment))
for segID, count := range countsBySegment {
counts = append(counts, &FlowActivityCount{
FlowID: segID.flowID,
Scope: fmt.Sprintf("segment:%s:%s", segID.exitUUID, segID.destUUID),
Count: count,
})
if err := InsertFlowActivityCounts(ctx, db, activityCounts); err != nil {
return fmt.Errorf("error inserting flow activity counts: %w", err)
}

resultCounts := make([]*FlowResultCount, 0, len(categoryChanges))
for res, count := range categoryChanges {
if count != 0 {
resultCounts = append(resultCounts, &FlowResultCount{
FlowID: res.flowID,
Result: res.result,
Category: res.category,
Count: count,
})
}
}

if err := InsertFlowActivityCounts(ctx, db, counts); err != nil {
return fmt.Errorf("error inserting flow segment counts: %w", err)
if err := InsertFlowResultCounts(ctx, db, resultCounts); err != nil {
return fmt.Errorf("error inserting flow result counts: %w", err)
}

rc := rt.RP.Get()
Expand Down
38 changes: 34 additions & 4 deletions core/models/flow_stats_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package models_test

import (
"fmt"
"testing"

"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/gocommon/random"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/test"
Expand Down Expand Up @@ -42,13 +44,17 @@ func TestRecordFlowStatistics(t *testing.T) {
require.NoError(t, err)

// should have a single record of all 3 contacts going through the first segment
var counts []*models.FlowActivityCount
err = rt.DB.SelectContext(ctx, &counts, "SELECT flow_id, scope, count FROM flows_flowactivitycount ORDER BY flow_id, scope")
var activityCounts []*models.FlowActivityCount
err = rt.DB.SelectContext(ctx, &activityCounts, "SELECT flow_id, scope, count FROM flows_flowactivitycount ORDER BY flow_id, scope")
require.NoError(t, err)
assert.Len(t, counts, 1)
assert.Equal(t, &models.FlowActivityCount{flow.ID, "segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", 3}, counts[0])
assert.Len(t, activityCounts, 1)
assert.Equal(t, &models.FlowActivityCount{flow.ID, "segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", 3}, activityCounts[0])

// should have no result counts yet
assertdb.Query(t, rt.DB, "SELECT count(*) FROM flows_flowresultcount").Returns(0)

assertFlowActivityCounts(t, rt, flow.ID, map[string]int{"segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94": 3})
assertFlowResultCounts(t, rt, flow.ID, map[string]int{})

assertredis.Keys(t, rc, "*", []string{
"recent_contacts:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", // "what's your fav color" -> color split
Expand Down Expand Up @@ -78,6 +84,7 @@ func TestRecordFlowStatistics(t *testing.T) {
"segment:97cd44ce-dec2-4e19-8ca2-4e20db51dc08:0e1fe072-6f03-4f29-98aa-7bedbe930dab": 2, // "X is a great color" -> split by expression
"segment:614e7451-e0bd-43d9-b317-2aded3c8d790:a1e649db-91e0-47c4-ab14-eba0d1475116": 2, // "you have X tickets" -> group split
})
assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 2, "color/Other": 1})

_, session3Sprint3, err := test.ResumeSession(session3, sa3, "azure")
require.NoError(t, err)
Expand All @@ -94,6 +101,7 @@ func TestRecordFlowStatistics(t *testing.T) {
"segment:97cd44ce-dec2-4e19-8ca2-4e20db51dc08:0e1fe072-6f03-4f29-98aa-7bedbe930dab": 2, // "X is a great color" -> split by expression
"segment:614e7451-e0bd-43d9-b317-2aded3c8d790:a1e649db-91e0-47c4-ab14-eba0d1475116": 2, // "you have X tickets" -> group split
})
assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 2, "color/Other": 1})

assertredis.Keys(t, rc, "*", []string{
"recent_contacts:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", // "what's your fav color" -> color split
Expand All @@ -119,6 +127,15 @@ func TestRecordFlowStatistics(t *testing.T) {
assertredis.ZRange(t, rc, "recent_contacts:2b698218-87e5-4ab8-922e-e65f91d12c10:88d8bf00-51ce-4e5e-aae8-4f957a0761a0", 0, -1,
[]string{"PLQQFoOgV9|123|0", "/cgnkcW6vA|234|0"},
)

// check that category counts are updated correctly when result changes
_, session3Sprint4, err := test.ResumeSession(session3, sa3, "blue")
require.NoError(t, err)

err = models.RecordFlowStatistics(ctx, rt, rt.DB, []flows.Session{session3}, []flows.Sprint{session3Sprint4})
require.NoError(t, err)

assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 3, "color/Other": 0})
}

func assertFlowActivityCounts(t *testing.T, rt *runtime.Runtime, flowID models.FlowID, expected map[string]int) {
Expand All @@ -133,3 +150,16 @@ func assertFlowActivityCounts(t *testing.T, rt *runtime.Runtime, flowID models.F

assert.Equal(t, expected, actual)
}

func assertFlowResultCounts(t *testing.T, rt *runtime.Runtime, flowID models.FlowID, expected map[string]int) {
var counts []*models.FlowResultCount
err := rt.DB.Select(&counts, "SELECT flow_id, result, category, SUM(count) AS count FROM flows_flowresultcount WHERE flow_id = $1 GROUP BY flow_id, result, category", flowID)
require.NoError(t, err)

actual := make(map[string]int)
for _, c := range counts {
actual[fmt.Sprintf("%s/%s", c.Result, c.Category)] = c.Count
}

assert.Equal(t, expected, actual)
}

0 comments on commit 005b02e

Please sign in to comment.