Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for new category count model #398

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 71 additions & 14 deletions core/models/flow_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/buger/jsonparser"
"github.com/nyaruka/gocommon/stringsx"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/redisx"
)
Expand All @@ -20,22 +22,45 @@ const (

var storeOperandsForTypes = map[string]bool{"wait_for_response": true, "split_by_expression": true, "split_by_contact_field": true, "split_by_run_result": true}

const sqlInsertFlowActivityCount = `
INSERT INTO flows_flowactivitycount( flow_id, scope, count, is_squashed)
VALUES(:flow_id, :scope, :count, FALSE)
`

type FlowActivityCount struct {
FlowID FlowID `db:"flow_id"`
Scope string `db:"scope"`
Count int `db:"count"`
}

const sqlInsertFlowActivityCount = `
INSERT INTO flows_flowactivitycount( flow_id, scope, count, is_squashed)
VALUES(:flow_id, :scope, :count, FALSE)
`

// InsertFlowActivityCounts inserts the given flow activity counts into the database
func InsertFlowActivityCounts(ctx context.Context, db DBorTx, counts []*FlowActivityCount) error {
return BulkQuery(ctx, "insert flow activity counts", db, sqlInsertFlowActivityCount, counts)
}

type FlowResultCount struct {
FlowID FlowID `db:"flow_id"`
Result string `db:"result"`
Category string `db:"category"`
Count int `db:"count"`
}

const sqlInsertFlowResultCount = `
INSERT INTO flows_flowresultcount( flow_id, result, category, count, is_squashed)
VALUES(:flow_id, :result, :category, :count, FALSE)
`

// InsertFlowResultCounts inserts the given flow result counts into the database
func InsertFlowResultCounts(ctx context.Context, db DBorTx, counts []*FlowResultCount) error {
return BulkQuery(ctx, "insert flow result counts", db, sqlInsertFlowResultCount, counts)
}

type resultInfo struct {
flowID FlowID
result string
category string
}

type segmentInfo struct {
flowID FlowID
exitUUID flows.ExitUUID
Expand All @@ -53,6 +78,7 @@ type segmentRecentContact struct {
func RecordFlowStatistics(ctx context.Context, rt *runtime.Runtime, db DBorTx, sessions []flows.Session, sprints []flows.Sprint) error {
countsBySegment := make(map[segmentInfo]int, 10)
recentBySegment := make(map[segmentInfo][]*segmentRecentContact, 10)
categoryChanges := make(map[resultInfo]int, 10)
nodeTypeCache := make(map[flows.NodeUUID]string)

for i, sprint := range sprints {
Expand All @@ -79,19 +105,50 @@ func RecordFlowStatistics(ctx context.Context, rt *runtime.Runtime, db DBorTx, s
recentBySegment[segID] = append(recentBySegment[segID], &segmentRecentContact{contact: session.Contact(), operand: operand, time: seg.Time(), rnd: redisx.RandomBase64(10)})
}
}

for _, e := range sprint.Events() {
switch typed := e.(type) {
case *events.RunResultChangedEvent:
run, _ := session.FindStep(e.StepUUID())
flow := run.Flow().Asset().(*Flow)
resultKey := utils.Snakify(typed.Name)
if typed.Previous != nil {
categoryChanges[resultInfo{flowID: flow.ID(), result: resultKey, category: typed.Previous.Category}]--
}
categoryChanges[resultInfo{flowID: flow.ID(), result: resultKey, category: typed.Category}]++
}
}
}

activityCounts := make([]*FlowActivityCount, 0, len(countsBySegment))
for seg, count := range countsBySegment {
if count != 0 {
activityCounts = append(activityCounts, &FlowActivityCount{
FlowID: seg.flowID,
Scope: fmt.Sprintf("segment:%s:%s", seg.exitUUID, seg.destUUID),
Count: count,
})
}
}

counts := make([]*FlowActivityCount, 0, len(countsBySegment))
for segID, count := range countsBySegment {
counts = append(counts, &FlowActivityCount{
FlowID: segID.flowID,
Scope: fmt.Sprintf("segment:%s:%s", segID.exitUUID, segID.destUUID),
Count: count,
})
if err := InsertFlowActivityCounts(ctx, db, activityCounts); err != nil {
return fmt.Errorf("error inserting flow activity counts: %w", err)
}

resultCounts := make([]*FlowResultCount, 0, len(categoryChanges))
for res, count := range categoryChanges {
if count != 0 {
resultCounts = append(resultCounts, &FlowResultCount{
FlowID: res.flowID,
Result: res.result,
Category: res.category,
Count: count,
})
}
}

if err := InsertFlowActivityCounts(ctx, db, counts); err != nil {
return fmt.Errorf("error inserting flow segment counts: %w", err)
if err := InsertFlowResultCounts(ctx, db, resultCounts); err != nil {
return fmt.Errorf("error inserting flow result counts: %w", err)
}

rc := rt.RP.Get()
Expand Down
38 changes: 34 additions & 4 deletions core/models/flow_stats_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package models_test

import (
"fmt"
"testing"

"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/gocommon/random"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/test"
Expand Down Expand Up @@ -42,13 +44,17 @@ func TestRecordFlowStatistics(t *testing.T) {
require.NoError(t, err)

// should have a single record of all 3 contacts going through the first segment
var counts []*models.FlowActivityCount
err = rt.DB.SelectContext(ctx, &counts, "SELECT flow_id, scope, count FROM flows_flowactivitycount ORDER BY flow_id, scope")
var activityCounts []*models.FlowActivityCount
err = rt.DB.SelectContext(ctx, &activityCounts, "SELECT flow_id, scope, count FROM flows_flowactivitycount ORDER BY flow_id, scope")
require.NoError(t, err)
assert.Len(t, counts, 1)
assert.Equal(t, &models.FlowActivityCount{flow.ID, "segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", 3}, counts[0])
assert.Len(t, activityCounts, 1)
assert.Equal(t, &models.FlowActivityCount{flow.ID, "segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", 3}, activityCounts[0])

// should have no result counts yet
assertdb.Query(t, rt.DB, "SELECT count(*) FROM flows_flowresultcount").Returns(0)

assertFlowActivityCounts(t, rt, flow.ID, map[string]int{"segment:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94": 3})
assertFlowResultCounts(t, rt, flow.ID, map[string]int{})

assertredis.Keys(t, rc, "*", []string{
"recent_contacts:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", // "what's your fav color" -> color split
Expand Down Expand Up @@ -78,6 +84,7 @@ func TestRecordFlowStatistics(t *testing.T) {
"segment:97cd44ce-dec2-4e19-8ca2-4e20db51dc08:0e1fe072-6f03-4f29-98aa-7bedbe930dab": 2, // "X is a great color" -> split by expression
"segment:614e7451-e0bd-43d9-b317-2aded3c8d790:a1e649db-91e0-47c4-ab14-eba0d1475116": 2, // "you have X tickets" -> group split
})
assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 2, "color/Other": 1})

_, session3Sprint3, err := test.ResumeSession(session3, sa3, "azure")
require.NoError(t, err)
Expand All @@ -94,6 +101,7 @@ func TestRecordFlowStatistics(t *testing.T) {
"segment:97cd44ce-dec2-4e19-8ca2-4e20db51dc08:0e1fe072-6f03-4f29-98aa-7bedbe930dab": 2, // "X is a great color" -> split by expression
"segment:614e7451-e0bd-43d9-b317-2aded3c8d790:a1e649db-91e0-47c4-ab14-eba0d1475116": 2, // "you have X tickets" -> group split
})
assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 2, "color/Other": 1})

assertredis.Keys(t, rc, "*", []string{
"recent_contacts:5fd2e537-0534-4c12-8425-bef87af09d46:072b95b3-61c3-4e0e-8dd1-eb7481083f94", // "what's your fav color" -> color split
Expand All @@ -119,6 +127,15 @@ func TestRecordFlowStatistics(t *testing.T) {
assertredis.ZRange(t, rc, "recent_contacts:2b698218-87e5-4ab8-922e-e65f91d12c10:88d8bf00-51ce-4e5e-aae8-4f957a0761a0", 0, -1,
[]string{"PLQQFoOgV9|123|0", "/cgnkcW6vA|234|0"},
)

// check that category counts are updated correctly when result changes
_, session3Sprint4, err := test.ResumeSession(session3, sa3, "blue")
require.NoError(t, err)

err = models.RecordFlowStatistics(ctx, rt, rt.DB, []flows.Session{session3}, []flows.Sprint{session3Sprint4})
require.NoError(t, err)

assertFlowResultCounts(t, rt, flow.ID, map[string]int{"color/Blue": 3, "color/Other": 0})
}

func assertFlowActivityCounts(t *testing.T, rt *runtime.Runtime, flowID models.FlowID, expected map[string]int) {
Expand All @@ -133,3 +150,16 @@ func assertFlowActivityCounts(t *testing.T, rt *runtime.Runtime, flowID models.F

assert.Equal(t, expected, actual)
}

func assertFlowResultCounts(t *testing.T, rt *runtime.Runtime, flowID models.FlowID, expected map[string]int) {
var counts []*models.FlowResultCount
err := rt.DB.Select(&counts, "SELECT flow_id, result, category, SUM(count) AS count FROM flows_flowresultcount WHERE flow_id = $1 GROUP BY flow_id, result, category", flowID)
require.NoError(t, err)

actual := make(map[string]int)
for _, c := range counts {
actual[fmt.Sprintf("%s/%s", c.Result, c.Category)] = c.Count
}

assert.Equal(t, expected, actual)
}
Loading