Skip to content

Commit

Permalink
Merge pull request #49 from weni-ai/feat/msg-catalog
Browse files Browse the repository at this point in the history
Catalog message support
  • Loading branch information
Robi9 authored Nov 6, 2023
2 parents 64bf2a7 + fde0319 commit 813f85b
Show file tree
Hide file tree
Showing 20 changed files with 1,391 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
_ "github.com/nyaruka/mailroom/core/tasks/timeouts"
_ "github.com/nyaruka/mailroom/services/external/omie"
_ "github.com/nyaruka/mailroom/services/external/openai/chatgpt"
_ "github.com/nyaruka/mailroom/services/external/weni"
_ "github.com/nyaruka/mailroom/services/ivr/twiml"
_ "github.com/nyaruka/mailroom/services/ivr/vonage"
_ "github.com/nyaruka/mailroom/services/tickets/intern"
Expand Down
7 changes: 7 additions & 0 deletions core/goflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var classificationFactory func(*runtime.Config) engine.ClassificationServiceFact
var ticketFactory func(*runtime.Config) engine.TicketServiceFactory
var airtimeFactory func(*runtime.Config) engine.AirtimeServiceFactory
var externalServiceFactory func(*runtime.Config) engine.ExternalServiceServiceFactory
var msgCatalogFactory func(*runtime.Config) engine.MsgCatalogServiceFactory

// RegisterEmailServiceFactory can be used by outside callers to register a email factory
// for use by the engine
Expand Down Expand Up @@ -49,6 +50,10 @@ func RegisterExternalServiceServiceFactory(f func(*runtime.Config) engine.Extern
externalServiceFactory = f
}

func RegisterMsgCatalogServiceFactory(f func(*runtime.Config) engine.MsgCatalogServiceFactory) {
msgCatalogFactory = f
}

// Engine returns the global engine instance for use with real sessions
func Engine(c *runtime.Config) flows.Engine {
engInit.Do(func() {
Expand All @@ -65,6 +70,7 @@ func Engine(c *runtime.Config) flows.Engine {
WithEmailServiceFactory(emailFactory(c)).
WithTicketServiceFactory(ticketFactory(c)).
WithExternalServiceServiceFactory(externalServiceFactory((c))).
WithMsgCatalogServiceFactory(msgCatalogFactory((c))). // msg catalog
WithAirtimeServiceFactory(airtimeFactory(c)).
WithMaxStepsPerSprint(c.MaxStepsPerSprint).
WithMaxResumesPerSession(c.MaxResumesPerSession).
Expand All @@ -88,6 +94,7 @@ func Simulator(c *runtime.Config) flows.Engine {
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, nil, httpAccess, webhookHeaders, c.WebhooksMaxBodyBytes)).
WithClassificationServiceFactory(classificationFactory(c)). // simulated sessions do real classification
WithExternalServiceServiceFactory(externalServiceFactory((c))). // and real external services
WithMsgCatalogServiceFactory(msgCatalogFactory((c))). // msg catalog
WithEmailServiceFactory(simulatorEmailServiceFactory). // but faked emails
WithTicketServiceFactory(simulatorTicketServiceFactory). // and faked tickets
WithAirtimeServiceFactory(simulatorAirtimeServiceFactory). // and faked airtime transfers
Expand Down
127 changes: 127 additions & 0 deletions core/handlers/msg_catalog_created.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package handlers

import (
"context"
"fmt"

"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/hooks"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"

"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func init() {
models.RegisterEventPreWriteHandler(events.TypeMsgCatalogCreated, handlePreMsgCatalogCreated)
models.RegisterEventHandler(events.TypeMsgCatalogCreated, handleMsgCatalogCreated)
}

// handlePreMsgCatalogCreated clears our timeout on our session so that courier can send it when the message is sent, that will be set by courier when sent
func handlePreMsgCatalogCreated(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.MsgCatalogCreatedEvent)

// we only clear timeouts on messaging flows
if scene.Session().SessionType() != models.FlowTypeMessaging {
return nil
}

// get our channel
var channel *models.Channel

if event.Msg.Channel() != nil {
channel = oa.ChannelByUUID(event.Msg.Channel().UUID)
if channel == nil {
return errors.Errorf("unable to load channel with uuid: %s", event.Msg.Channel().UUID)
}
}

// no channel? this is a no-op
if channel == nil {
return nil
}

// android channels get normal timeouts
if channel.Type() == models.ChannelTypeAndroid {
return nil
}

// everybody else gets their timeout cleared, will be set by courier
scene.Session().ClearTimeoutOn()

return nil
}

// handleMsgCreated creates the db msg for the passed in event
func handleMsgCatalogCreated(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.MsgCatalogCreatedEvent)

// must be in a session
if scene.Session() == nil {
return errors.Errorf("cannot handle msg created event without session")
}

logrus.WithFields(logrus.Fields{
"contact_uuid": scene.ContactUUID(),
"session_id": scene.SessionID(),
"text": event.Msg.Text(),
"header": event.Msg.Header(),
"products": event.Msg.Products(),
"urn": event.Msg.URN(),
"action": event.Msg.Action(),
}).Debug("msg created event")

// messages in messaging flows must have urn id set on them, if not, go look it up
if scene.Session().SessionType() == models.FlowTypeMessaging && event.Msg.URN() != urns.NilURN {
urn := event.Msg.URN()
if models.GetURNInt(urn, "id") == 0 {
urn, err := models.GetOrCreateURN(ctx, tx, oa, scene.ContactID(), event.Msg.URN())
if err != nil {
return errors.Wrapf(err, "unable to get or create URN: %s", event.Msg.URN())
}
// update our Msg with our full URN
event.Msg.SetURN(urn)
}
}

// get our channel
var channel *models.Channel
if event.Msg.Channel() != nil {
channel = oa.ChannelByUUID(event.Msg.Channel().UUID)
if channel == nil {
return errors.Errorf("unable to load channel with uuid: %s", event.Msg.Channel().UUID)
} else {
if fmt.Sprint(channel.Type()) == "WAC" || fmt.Sprint(channel.Type()) == "WA" {
country := envs.DeriveCountryFromTel("+" + event.Msg.URN().Path())
locale := envs.NewLocale(scene.Contact().Language(), country)
languageCode := locale.ToBCP47()

if _, valid := validLanguageCodes[languageCode]; !valid {
languageCode = ""
}

event.Msg.TextLanguage = envs.Language(languageCode)
}
}
}

msg, err := models.NewOutgoingFlowMsgCatalog(rt, oa.Org(), channel, scene.Session(), event.Msg, event.CreatedOn())
if err != nil {
return errors.Wrapf(err, "error creating outgoing message to %s", event.Msg.URN())
}

// register to have this message committed
scene.AppendToEventPreCommitHook(hooks.CommitMessagesHook, msg)

// don't send messages for surveyor flows
if scene.Session().SessionType() != models.FlowTypeSurveyor {
scene.AppendToEventPostCommitHook(hooks.SendMessagesHook, msg)
}

return nil
}
100 changes: 100 additions & 0 deletions core/handlers/msg_catalog_created_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package handlers_test

import (
"fmt"
"testing"

"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/actions"
"github.com/nyaruka/mailroom/core/handlers"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
)

func TestMsgCatalogCreated(t *testing.T) {
ctx, rt, db, rp := testsuite.Get()

defer testsuite.Reset(testsuite.ResetAll)

// add a URN for cathy so we can test all urn sends
testdata.InsertContactURN(db, testdata.Org1, testdata.Cathy, urns.URN("tel:+12065551212"), 10)

// delete all URNs for bob
db.MustExec(`DELETE FROM contacts_contacturn WHERE contact_id = $1`, testdata.Bob.ID)

msg1 := testdata.InsertIncomingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "start", models.MsgStatusHandled)

tcs := []handlers.TestCase{
{
Actions: handlers.ContactActionMap{
testdata.Cathy: []flows.Action{
actions.NewSendMsgCatalog(
handlers.NewActionUUID(),
"", "Some products", "", "View Products", "",
[]map[string]string{
{"product_retailer_id": "9f526c6f-b2cb-4457-8048-a7f1dc101e50"},
{"product_retailer_id": "eb2305cc-bf39-43ad-a069-bbbfb6401acc"},
},
false,
true,
),
},
testdata.George: []flows.Action{
actions.NewSendMsgCatalog(
handlers.NewActionUUID(),
"Select The Service", "", "", "View Products", "",
[]map[string]string{
{"product_retailer_id": "cbd9ba07-7156-406e-8006-5b697d18d091"},
{"product_retailer_id": "63157bd2-6f94-4dbb-b394-ea4eb07ce156"},
},
false,
true,
),
},
testdata.Bob: []flows.Action{
actions.NewSendMsgCatalog(handlers.NewActionUUID(), "No URNs", "", "", "View Products", "i want a water bottle", nil, false, false),
},
},
Msgs: handlers.ContactMsgMap{
testdata.Cathy: msg1,
},
SQLAssertions: []handlers.SQLAssertion{
{
SQL: "SELECT COUNT(*) FROM msgs_msg WHERE contact_id = $1 AND metadata = $2 AND high_priority = TRUE",
Args: []interface{}{testdata.Cathy.ID, `{"action":"View Products","body":"Some products","products":["9f526c6f-b2cb-4457-8048-a7f1dc101e50","eb2305cc-bf39-43ad-a069-bbbfb6401acc"]}`},
Count: 2,
},
{
SQL: "SELECT COUNT(*) FROM msgs_msg WHERE contact_id = $1 AND status = 'Q' AND high_priority = FALSE",
Args: []interface{}{testdata.George.ID},
Count: 1,
},
{
SQL: "SELECT COUNT(*) FROM msgs_msg WHERE contact_id=$1 AND STATUS = 'F' AND failed_reason = 'D';",
Args: []interface{}{testdata.Bob.ID},
Count: 1,
},
},
},
}

handlers.RunTestCases(t, ctx, rt, tcs)

rc := rp.Get()
defer rc.Close()

// Cathy should have 1 batch of queued messages at high priority
count, err := redis.Int(rc.Do("zcard", fmt.Sprintf("msgs:%s|10/1", testdata.TwilioChannel.UUID)))
assert.NoError(t, err)
assert.Equal(t, 1, count)

// One bulk for George
count, err = redis.Int(rc.Do("zcard", fmt.Sprintf("msgs:%s|10/0", testdata.TwilioChannel.UUID)))
assert.NoError(t, err)
assert.Equal(t, 1, count)
}
27 changes: 27 additions & 0 deletions core/models/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type OrgAssets struct {
externalServices []assets.ExternalService
externalServicesByID map[ExternalServiceID]*ExternalService
externalServicesByUUID map[assets.ExternalServiceUUID]*ExternalService

msgCatalogs []assets.MsgCatalog
msgCatalogsByID map[CatalogID]*MsgCatalog
msgCatalogsByUUID map[assets.ChannelUUID]*MsgCatalog
}

var ErrNotFound = errors.New("not found")
Expand Down Expand Up @@ -381,6 +385,24 @@ func NewOrgAssets(ctx context.Context, rt *runtime.Runtime, orgID OrgID, prev *O
oa.externalServicesByUUID = prev.externalServicesByUUID
}

if prev == nil || refresh&RefreshMsgCatalogs > 0 {
oa.msgCatalogs, err = loadCatalog(ctx, db, orgID)
if err != nil {
return nil, errors.Wrapf(err, "error loading catalogs for org %d", orgID)
}
oa.msgCatalogsByID = make(map[CatalogID]*MsgCatalog)
oa.msgCatalogsByUUID = make(map[assets.ChannelUUID]*MsgCatalog)

for _, a := range oa.msgCatalogs {
oa.msgCatalogsByID[a.(*MsgCatalog).c.ID] = a.(*MsgCatalog)
oa.msgCatalogsByUUID[a.(*MsgCatalog).c.ChannelUUID] = a.(*MsgCatalog)
}
} else {
oa.msgCatalogs = prev.msgCatalogs
oa.msgCatalogsByID = prev.msgCatalogsByID
oa.msgCatalogsByUUID = prev.msgCatalogsByUUID
}

// intialize our session assets
oa.sessionAssets, err = engine.NewSessionAssets(oa.Env(), oa, goflow.MigrationConfig(rt.Config))
if err != nil {
Expand Down Expand Up @@ -414,6 +436,7 @@ const (
RefreshTopics = Refresh(1 << 15)
RefreshUsers = Refresh(1 << 16)
RefreshExternalServices = Refresh(1 << 17)
RefreshMsgCatalogs = Refresh(1 << 18)
)

// GetOrgAssets creates or gets org assets for the passed in org
Expand Down Expand Up @@ -706,3 +729,7 @@ func (a *OrgAssets) ExternalServiceByID(id ExternalServiceID) *ExternalService {
func (a *OrgAssets) ExternalServiceByUUID(uuid assets.ExternalServiceUUID) *ExternalService {
return a.externalServicesByUUID[uuid]
}

func (a *OrgAssets) MsgCatalogs() ([]assets.MsgCatalog, error) {
return a.msgCatalogs, nil
}
Loading

0 comments on commit 813f85b

Please sign in to comment.