Skip to content

Commit

Permalink
Merge pull request #31 from nyaruka/broadcast_attachments
Browse files Browse the repository at this point in the history
Update creating messages from broadcasts and resolving translations
  • Loading branch information
rowanseymour authored Feb 9, 2023
2 parents 790579f + c3d83f7 commit c910dae
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 207 deletions.
295 changes: 108 additions & 187 deletions core/models/broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package models

import (
"context"
"database/sql/driver"
"encoding/json"
"time"

"github.com/nyaruka/gocommon/dates"
Expand All @@ -14,11 +12,9 @@ import (
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/null/v2"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// BroadcastID is our internal type for broadcast ids, which can be null/0
Expand All @@ -35,45 +31,24 @@ const (
TemplateStateUnevaluated = TemplateState("unevaluated")
)

// BroadcastTranslation is the translation for the passed in language
type BroadcastTranslation struct {
Text string `json:"text"`
Attachments []utils.Attachment `json:"attachments,omitempty"`
QuickReplies []string `json:"quick_replies,omitempty"`
}

type BroadcastTranslations map[envs.Language]*BroadcastTranslation

func (t *BroadcastTranslations) Scan(value any) error {
b, ok := value.([]byte)
if !ok {
return errors.New("failed type assertion to []byte")
}
return json.Unmarshal(b, &t)
}

func (t BroadcastTranslations) Value() (driver.Value, error) {
return json.Marshal(t)
}

// Broadcast represents a broadcast that needs to be sent
type Broadcast struct {
ID BroadcastID `json:"broadcast_id,omitempty" db:"id"`
OrgID OrgID `json:"org_id" db:"org_id"`
Translations BroadcastTranslations `json:"translations" db:"translations"`
TemplateState TemplateState `json:"template_state"`
BaseLanguage envs.Language `json:"base_language" db:"base_language"`
URNs []urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
GroupIDs []GroupID `json:"group_ids,omitempty"`
Query null.String `json:"query,omitempty" db:"query"`
CreatedByID UserID `json:"created_by_id,omitempty" db:"created_by_id"`
ParentID BroadcastID `json:"parent_id,omitempty" db:"parent_id"`
TicketID TicketID `json:"ticket_id,omitempty" db:"ticket_id"`
ID BroadcastID `json:"broadcast_id,omitempty" db:"id"`
OrgID OrgID `json:"org_id" db:"org_id"`
Translations flows.BroadcastTranslations `json:"translations" db:"translations"`
TemplateState TemplateState `json:"template_state"`
BaseLanguage envs.Language `json:"base_language" db:"base_language"`
URNs []urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
GroupIDs []GroupID `json:"group_ids,omitempty"`
Query null.String `json:"query,omitempty" db:"query"`
CreatedByID UserID `json:"created_by_id,omitempty" db:"created_by_id"`
ParentID BroadcastID `json:"parent_id,omitempty" db:"parent_id"`
TicketID TicketID `json:"ticket_id,omitempty" db:"ticket_id"`
}

// NewBroadcast creates a new broadcast with the passed in parameters
func NewBroadcast(orgID OrgID, translations map[envs.Language]*BroadcastTranslation,
func NewBroadcast(orgID OrgID, translations flows.BroadcastTranslations,
state TemplateState, baseLanguage envs.Language, urns []urns.URN, contactIDs []ContactID, groupIDs []GroupID, query string, ticketID TicketID, createdByID UserID) *Broadcast {

return &Broadcast{
Expand Down Expand Up @@ -181,16 +156,6 @@ const sqlInsertBroadcastURNs = `INSERT INTO msgs_broadcast_urns(broadcast_id, co

// NewBroadcastFromEvent creates a broadcast object from the passed in broadcast event
func NewBroadcastFromEvent(ctx context.Context, tx Queryer, oa *OrgAssets, event *events.BroadcastCreatedEvent) (*Broadcast, error) {
// converst our translations to our type
translations := make(map[envs.Language]*BroadcastTranslation)
for l, t := range event.Translations {
translations[l] = &BroadcastTranslation{
Text: t.Text,
Attachments: t.Attachments,
QuickReplies: t.QuickReplies,
}
}

// resolve our contact references
contactIDs, err := GetContactIDsFromReferences(ctx, tx, oa.OrgID(), event.Contacts)
if err != nil {
Expand All @@ -206,7 +171,7 @@ func NewBroadcastFromEvent(ctx context.Context, tx Queryer, oa *OrgAssets, event
}
}

return NewBroadcast(oa.OrgID(), translations, TemplateStateEvaluated, event.BaseLanguage, event.URNs, contactIDs, groupIDs, event.ContactQuery, NilTicketID, NilUserID), nil
return NewBroadcast(oa.OrgID(), event.Translations, TemplateStateEvaluated, event.BaseLanguage, event.URNs, contactIDs, groupIDs, event.ContactQuery, NilTicketID, NilUserID), nil
}

func (b *Broadcast) CreateBatch(contactIDs []ContactID) *BroadcastBatch {
Expand All @@ -224,16 +189,16 @@ func (b *Broadcast) CreateBatch(contactIDs []ContactID) *BroadcastBatch {

// BroadcastBatch represents a batch of contacts that need messages sent for
type BroadcastBatch struct {
BroadcastID BroadcastID `json:"broadcast_id,omitempty"`
OrgID OrgID `json:"org_id"`
Translations BroadcastTranslations `json:"translations"`
BaseLanguage envs.Language `json:"base_language"`
TemplateState TemplateState `json:"template_state"`
URNs map[ContactID]urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
IsLast bool `json:"is_last"`
CreatedByID UserID `json:"created_by_id"`
TicketID TicketID `json:"ticket_id"`
BroadcastID BroadcastID `json:"broadcast_id,omitempty"`
OrgID OrgID `json:"org_id"`
Translations flows.BroadcastTranslations `json:"translations"`
BaseLanguage envs.Language `json:"base_language"`
TemplateState TemplateState `json:"template_state"`
URNs map[ContactID]urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
IsLast bool `json:"is_last"`
CreatedByID UserID `json:"created_by_id"`
TicketID TicketID `json:"ticket_id"`
}

func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAssets) ([]*Msg, error) {
Expand Down Expand Up @@ -266,140 +231,14 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime
return nil, errors.Wrapf(err, "error loading contacts for broadcast")
}

channels := oa.SessionAssets().Channels()

// for each contact, build our message
msgs := make([]*Msg, 0, len(contacts))

// utility method to build up our message
buildMessage := func(c *Contact, forceURN urns.URN) (*Msg, error) {
if c.Status() != ContactStatusActive {
return nil, nil
}

contact, err := c.FlowContact(oa)
if err != nil {
return nil, errors.Wrapf(err, "error creating flow contact")
}

urn := urns.NilURN
var channel *Channel

// we are forcing to send to a non-preferred URN, find the channel
if forceURN != urns.NilURN {
for _, u := range contact.URNs() {
if u.URN().Identity() == forceURN.Identity() {
c := channels.GetForURN(u, assets.ChannelRoleSend)
if c == nil {
return nil, nil
}
urn = u.URN()
channel = oa.ChannelByUUID(c.UUID())
break
}
}
} else {
// no forced URN, find the first URN we can send to
for _, u := range contact.URNs() {
c := channels.GetForURN(u, assets.ChannelRoleSend)
if c != nil {
urn = u.URN()
channel = oa.ChannelByUUID(c.UUID())
break
}
}
}

// no urn and channel? move on
if channel == nil {
return nil, nil
}

// resolve our translations, the order is:
// 1) valid contact language
// 2) org default language
// 3) broadcast base language
lang := contact.Language()
if lang != envs.NilLanguage {
found := false
for _, l := range oa.Env().AllowedLanguages() {
if l == lang {
found = true
break
}
}
if !found {
lang = envs.NilLanguage
}
}

// have a valid contact language, try that
trans := b.Translations
t := trans[lang]

// not found? try org default language
if t == nil {
lang = oa.Env().DefaultLanguage()
t = trans[lang]
}

// not found? use broadcast base language
if t == nil {
lang = b.BaseLanguage
t = trans[lang]
}

if t == nil {
logrus.WithField("base_language", b.BaseLanguage).WithField("translations", trans).Error("unable to find translation for broadcast")
return nil, nil
}

template := ""
if b.TemplateState == TemplateStateUnevaluated {
template = t.Text
}

text := t.Text

// if we have a template, evaluate it
if template != "" {
// build up the minimum viable context for templates
templateCtx := types.NewXObject(map[string]types.XValue{
"contact": flows.Context(oa.Env(), contact),
"fields": flows.Context(oa.Env(), contact.Fields()),
"globals": flows.Context(oa.Env(), oa.SessionAssets().Globals()),
"urns": flows.ContextFunc(oa.Env(), contact.URNs().MapContext),
})
text, _ = excellent.EvaluateTemplate(oa.Env(), templateCtx, template, nil)
}

// don't do anything if we have no text or attachments
if text == "" && len(t.Attachments) == 0 {
return nil, nil
}

unsendableReason := flows.NilUnsendableReason
if contact.Status() != flows.ContactStatusActive {
unsendableReason = flows.UnsendableReasonContactStatus
} else if urn == urns.NilURN || channel == nil {
unsendableReason = flows.UnsendableReasonNoDestination
}

// create our outgoing message
out := flows.NewMsgOut(urn, channel.ChannelReference(), text, t.Attachments, t.QuickReplies, nil, flows.NilMsgTopic, envs.NewLocale(lang, envs.NilCountry), unsendableReason)
msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), channel, contact, out, time.Now(), b.BroadcastID)
if err != nil {
return nil, errors.Wrapf(err, "error creating outgoing message")
}

return msg, nil
}

// run through all our contacts to create our messages
for _, c := range contacts {
// use the preferred URN if present
urn := broadcastURNs[c.ID()]
msg, err := buildMessage(c, urn)
msg, err := b.createMessage(rt, oa, c, urn)
if err != nil {
return nil, errors.Wrapf(err, "error creating broadcast message")
}
Expand All @@ -409,7 +248,7 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime

// if this is a contact that will receive two messages, calculate that one as well
if repeatedContacts[c.ID()] {
m2, err := buildMessage(c, urns.NilURN)
m2, err := b.createMessage(rt, oa, c, urns.NilURN)
if err != nil {
return nil, errors.Wrapf(err, "error creating broadcast message")
}
Expand Down Expand Up @@ -437,6 +276,88 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime
return msgs, nil
}

// creates an outgoing message for the given contact - can return nil if resultant message has no content and thus is a noop
func (b *BroadcastBatch) createMessage(rt *runtime.Runtime, oa *OrgAssets, c *Contact, forceURN urns.URN) (*Msg, error) {
contact, err := c.FlowContact(oa)
if err != nil {
return nil, errors.Wrapf(err, "error creating flow contact for broadcast message")
}

// resolve URN + channel for this contact
urn := urns.NilURN
var channel *Channel

if forceURN != urns.NilURN {
// we are forcing to send to a non-preferred URN, find the channel
channels := oa.SessionAssets().Channels()
for _, u := range contact.URNs() {
if u.URN().Identity() == forceURN.Identity() {
c := channels.GetForURN(u, assets.ChannelRoleSend)
if c == nil {
return nil, nil
}
urn = u.URN()
channel = oa.ChannelByUUID(c.UUID())
break
}
}
} else {
for _, dest := range contact.ResolveDestinations(false) {
urn = dest.URN.URN()
channel = oa.ChannelByUUID(dest.Channel.UUID())
break
}
}

trans, lang := b.Translations.ForContact(oa.Env(), contact, b.BaseLanguage)
if trans == nil {
// in theory shoud never happen because we shouldn't save a broadcast like this
return nil, errors.New("broadcast has no translation in base language")
}

text := trans.Text
attachments := trans.Attachments
quickReplies := trans.QuickReplies
locale := envs.NewLocale(lang, envs.NilCountry)

if b.TemplateState == TemplateStateUnevaluated {
// build up the minimum viable context for templates
templateCtx := types.NewXObject(map[string]types.XValue{
"contact": flows.Context(oa.Env(), contact),
"fields": flows.Context(oa.Env(), contact.Fields()),
"globals": flows.Context(oa.Env(), oa.SessionAssets().Globals()),
"urns": flows.ContextFunc(oa.Env(), contact.URNs().MapContext),
})
text, _ = excellent.EvaluateTemplate(oa.Env(), templateCtx, text, nil)
}

// don't create a message if we have no content
if text == "" && len(attachments) == 0 && len(trans.QuickReplies) == 0 {
return nil, nil
}

var channelRef *assets.ChannelReference
if channel != nil {
channelRef = channel.ChannelReference()
}

unsendableReason := flows.NilUnsendableReason
if contact.Status() != flows.ContactStatusActive {
unsendableReason = flows.UnsendableReasonContactStatus
} else if urn == urns.NilURN || channel == nil {
unsendableReason = flows.UnsendableReasonNoDestination
}

// create our outgoing message
out := flows.NewMsgOut(urn, channelRef, text, attachments, quickReplies, nil, flows.NilMsgTopic, locale, unsendableReason)
msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), channel, contact, out, time.Now(), b.BroadcastID)
if err != nil {
return nil, errors.Wrapf(err, "error creating outgoing message")
}

return msg, nil
}

func (b *BroadcastBatch) updateTicket(ctx context.Context, db Queryer, oa *OrgAssets) error {
firstReplyTime, err := TicketRecordReplied(ctx, db, b.TicketID, dates.Now())
if err != nil {
Expand Down
Loading

0 comments on commit c910dae

Please sign in to comment.