Skip to content

Commit

Permalink
Merge pull request #37 from nyaruka/send_msg_task
Browse files Browse the repository at this point in the history
Add endpoint to send a single message
  • Loading branch information
rowanseymour authored Feb 15, 2023
2 parents 2114013 + 3ad7288 commit c1101a3
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 41 deletions.
45 changes: 12 additions & 33 deletions core/models/broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/excellent"
"github.com/nyaruka/goflow/excellent/types"
Expand Down Expand Up @@ -195,7 +194,7 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime
// load all our contacts
contacts, err := LoadContacts(ctx, rt.DB, oa, b.ContactIDs)
if err != nil {
return nil, errors.Wrapf(err, "error loading contacts for broadcast")
return nil, errors.Wrap(err, "error loading contacts for broadcast")
}

// for each contact, build our message
Expand All @@ -205,7 +204,7 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime
for _, c := range contacts {
msg, err := b.createMessage(rt, oa, c)
if err != nil {
return nil, errors.Wrapf(err, "error creating broadcast message")
return nil, errors.Wrap(err, "error creating broadcast message")
}
if msg != nil {
msgs = append(msgs, msg)
Expand All @@ -215,12 +214,12 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime
// insert them in a single request
err = InsertMessages(ctx, rt.DB, msgs)
if err != nil {
return nil, errors.Wrapf(err, "error inserting broadcast messages")
return nil, errors.Wrap(err, "error inserting broadcast messages")
}

// if the broadcast was a ticket reply, update the ticket
if b.TicketID != NilTicketID {
if err := b.updateTicket(ctx, rt.DB, oa); err != nil {
if err := RecordTicketReply(ctx, rt.DB, oa, b.TicketID, b.CreatedByID); err != nil {
return nil, err
}
}
Expand All @@ -232,16 +231,7 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime
func (b *BroadcastBatch) createMessage(rt *runtime.Runtime, oa *OrgAssets, c *Contact) (*Msg, error) {
contact, err := c.FlowContact(oa)
if err != nil {
return nil, errors.Wrapf(err, "error creating flow contact for broadcast message")
}

// resolve URN + channel for this contact
urn := urns.NilURN
var channel *Channel
for _, dest := range contact.ResolveDestinations(false) {
urn = dest.URN.URN()
channel = oa.ChannelByUUID(dest.Channel.UUID())
break
return nil, errors.Wrap(err, "error creating flow contact for broadcast message")
}

trans, lang := b.Translations.ForContact(oa.Env(), contact, b.BaseLanguage)
Expand Down Expand Up @@ -271,39 +261,28 @@ func (b *BroadcastBatch) createMessage(rt *runtime.Runtime, oa *OrgAssets, c *Co
return nil, nil
}

var channelRef *assets.ChannelReference
if channel != nil {
channelRef = channel.ChannelReference()
}

unsendableReason := flows.NilUnsendableReason
if contact.Status() != flows.ContactStatusActive {
unsendableReason = flows.UnsendableReasonContactStatus
} else if urn == urns.NilURN || channel == nil {
unsendableReason = flows.UnsendableReasonNoDestination
}

// create our outgoing message
out := flows.NewMsgOut(urn, channelRef, text, attachments, quickReplies, nil, flows.NilMsgTopic, locale, unsendableReason)
msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), channel, contact, out, time.Now(), b.BroadcastID)
out, ch := NewMsgOut(oa, contact, text, attachments, quickReplies, locale)

msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), ch, contact, out, time.Now(), b.BroadcastID)
if err != nil {
return nil, errors.Wrapf(err, "error creating outgoing message")
}

return msg, nil
}

func (b *BroadcastBatch) updateTicket(ctx context.Context, db Queryer, oa *OrgAssets) error {
firstReplyTime, err := TicketRecordReplied(ctx, db, b.TicketID, dates.Now())
func RecordTicketReply(ctx context.Context, db Queryer, oa *OrgAssets, ticketID TicketID, userID UserID) error {
firstReplyTime, err := TicketRecordReplied(ctx, db, ticketID, dates.Now())
if err != nil {
return err
}

// record reply counts for org, user and team
replyCounts := map[string]int{scopeOrg(oa): 1}

if b.CreatedByID != NilUserID {
user := oa.UserByID(b.CreatedByID)
if userID != NilUserID {
user := oa.UserByID(userID)
if user != nil {
replyCounts[scopeUser(oa, user)] = 1
if user.Team() != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func LoadContact(ctx context.Context, db Queryer, oa *OrgAssets, id ContactID) (
return nil, err
}
if len(contacts) == 0 {
return nil, nil
return nil, errors.Errorf("no such contact #%d in org #%d", id, oa.OrgID())
}
return contacts[0], nil
}
Expand Down
34 changes: 31 additions & 3 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, co
return newOutgoingMsg(rt, org, channel, contact, out, createdOn, nil, nil, broadcastID)
}

// NewOutgoingChatMsg creates an outgoing message from chat
func NewOutgoingChatMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, contact, out, createdOn, nil, nil, NilBroadcastID)
}

func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID) (*Msg, error) {
msg := &Msg{}
m := &msg.m
Expand Down Expand Up @@ -601,9 +606,9 @@ msgs_msg(uuid, text, attachments, quick_replies, locale, high_priority, created_
:visibility, :msg_type, :msg_count, :error_count, :next_attempt, :failed_reason, :channel_id,
:contact_id, :contact_urn_id, :org_id, :flow_id, :broadcast_id)
RETURNING
id as id,
now() as modified_on,
now() as queued_on
id AS id,
modified_on AS modified_on,
queued_on AS queued_on
`

// UpdateMessage updates a message after handling
Expand Down Expand Up @@ -774,6 +779,29 @@ func FailChannelMessages(ctx context.Context, db Queryer, orgID OrgID, channelID
return nil
}

func NewMsgOut(oa *OrgAssets, c *flows.Contact, text string, atts []utils.Attachment, qrs []string, locale envs.Locale) (*flows.MsgOut, *Channel) {
// resolve URN + channel for this contact
urn := urns.NilURN
var channel *Channel
var channelRef *assets.ChannelReference
for _, dest := range c.ResolveDestinations(false) {
urn = dest.URN.URN()
channel = oa.ChannelByUUID(dest.Channel.UUID())
channelRef = dest.Channel.Reference()
break
}

// is this message sendable?
unsendableReason := flows.NilUnsendableReason
if c.Status() != flows.ContactStatusActive {
unsendableReason = flows.UnsendableReasonContactStatus
} else if urn == urns.NilURN || channel == nil {
unsendableReason = flows.UnsendableReasonNoDestination
}

return flows.NewMsgOut(urn, channelRef, text, atts, qrs, nil, flows.NilMsgTopic, locale, unsendableReason), channel
}

// NilID implementations

func (i *MsgID) Scan(value any) error { return null.ScanInt(value, i) }
Expand Down
33 changes: 33 additions & 0 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,39 @@ func TestNewOutgoingIVR(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT text, created_on, sent_on FROM msgs_msg WHERE uuid = $1`, dbMsg.UUID()).Columns(map[string]interface{}{"text": "Hello", "created_on": createdOn, "sent_on": createdOn})
}

func TestNewMsgOut(t *testing.T) {
ctx, rt := testsuite.Runtime()

oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
require.NoError(t, err)

_, cathy := testdata.Cathy.Load(rt, oa)

out, ch := models.NewMsgOut(oa, cathy, "hello", nil, nil, envs.Locale(`eng-US`))
assert.Equal(t, "hello", out.Text())
assert.Equal(t, urns.URN("tel:+16055741111?id=10000&priority=1000"), out.URN())
assert.Equal(t, assets.NewChannelReference("74729f45-7f29-4868-9dc4-90e491e3c7d8", "Twilio"), out.Channel())
assert.Equal(t, envs.Locale(`eng-US`), out.Locale())
assert.Equal(t, "Twilio", ch.Name())

cathy.SetStatus(flows.ContactStatusBlocked)

out, ch = models.NewMsgOut(oa, cathy, "hello", nil, nil, envs.Locale(`eng-US`))
assert.Equal(t, urns.URN("tel:+16055741111?id=10000&priority=1000"), out.URN())
assert.Equal(t, assets.NewChannelReference("74729f45-7f29-4868-9dc4-90e491e3c7d8", "Twilio"), out.Channel())
assert.Equal(t, "Twilio", ch.Name())
assert.Equal(t, flows.UnsendableReasonContactStatus, out.UnsendableReason())

cathy.SetStatus(flows.ContactStatusActive)
cathy.ClearURNs()

out, ch = models.NewMsgOut(oa, cathy, "hello", nil, nil, envs.Locale(`eng-US`))
assert.Equal(t, urns.NilURN, out.URN())
assert.Nil(t, out.Channel())
assert.Nil(t, ch)
assert.Equal(t, flows.UnsendableReasonNoDestination, out.UnsendableReason())
}

func insertTestSession(t *testing.T, ctx context.Context, rt *runtime.Runtime, org *testdata.Org, contact *testdata.Contact, flow *testdata.Flow) *models.Session {
testdata.InsertWaitingSession(rt, org, contact, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, nil)

Expand Down
17 changes: 17 additions & 0 deletions testsuite/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/textproto"
"os"
"regexp"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -123,6 +124,9 @@ func RunWebTests(t *testing.T, ctx context.Context, rt *runtime.Runtime, truthFi
tc.actualResponse, err = io.ReadAll(resp.Body)
assert.NoError(t, err, "%s: error reading body", tc.Label)

// some timestamps come from db NOW() which we can't mock, so we replace them with $recent_timestamp$
tc.actualResponse = overwriteRecentTimestamps(tc.actualResponse)

if !test.UpdateSnapshots {
assert.Equal(t, tc.Status, actual.Status, "%s: unexpected status", tc.Label)

Expand Down Expand Up @@ -172,6 +176,19 @@ func RunWebTests(t *testing.T, ctx context.Context, rt *runtime.Runtime, truthFi
}
}

var isoTimestampRegex = regexp.MustCompile(`\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,9}Z`)

func overwriteRecentTimestamps(resp []byte) []byte {
return isoTimestampRegex.ReplaceAllFunc(resp, func(b []byte) []byte {
fmt.Printf("found timestamp %s\n", b)
t, _ := time.Parse(time.RFC3339, string(b))
if time.Since(t) < time.Second*10 {
return []byte(`$recent_timestamp$`)
}
return b
})
}

// MultiPartPart is a single part in a multipart encoded request
type MultiPartPart struct {
Name string `json:"name"`
Expand Down
82 changes: 79 additions & 3 deletions web/msg/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,95 @@ import (
"context"
"net/http"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/web"

"github.com/pkg/errors"
)

func init() {
web.RegisterJSONRoute(http.MethodPost, "/mr/msg/send", web.RequireAuthToken(handleSend))
web.RegisterJSONRoute(http.MethodPost, "/mr/msg/resend", web.RequireAuthToken(handleResend))
}

// Request to send a message.
//
// {
// "org_id": 1,
// "contact_id": 123456,
// "user_id": 56,
// "text": "hi there"
// }
type sendRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
UserID models.UserID `json:"user_id" validate:"required"`
ContactID models.ContactID `json:"contact_id" validate:"required"`
Text string `json:"text"`
Attachments []utils.Attachment `json:"attachments"`
TicketID models.TicketID `json:"ticket_id"`
}

// handles a request to resend the given messages
func handleSend(ctx context.Context, rt *runtime.Runtime, r *http.Request) (interface{}, int, error) {
request := &sendRequest{}
if err := web.ReadAndValidateJSON(r, request); err != nil {
return errors.Wrap(err, "request failed validation"), http.StatusBadRequest, nil
}

// grab our org
oa, err := models.GetOrgAssets(ctx, rt, request.OrgID)
if err != nil {
return nil, 0, errors.Wrap(err, "unable to load org assets")
}

// load the contact and generate as a flow contact
c, err := models.LoadContact(ctx, rt.DB, oa, request.ContactID)
if err != nil {
return nil, 0, errors.Wrap(err, "error loading contact")
}

contact, err := c.FlowContact(oa)
if err != nil {
return nil, 0, errors.Wrap(err, "error creating flow contact")
}

out, ch := models.NewMsgOut(oa, contact, request.Text, request.Attachments, nil, contact.Locale(oa.Env()))
msg, err := models.NewOutgoingChatMsg(rt, oa.Org(), ch, contact, out, dates.Now())
if err != nil {
return nil, 0, errors.Wrap(err, "error creating outgoing message")
}

err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg})
if err != nil {
return nil, 0, errors.Wrap(err, "error inserting outgoing message")
}

// if message was a ticket reply, update the ticket
if request.TicketID != models.NilTicketID {
if err := models.RecordTicketReply(ctx, rt.DB, oa, request.TicketID, request.UserID); err != nil {
return nil, 0, errors.Wrap(err, "error recording ticket reply")
}
}

msgio.SendMessages(ctx, rt, rt.DB, nil, []*models.Msg{msg})

return map[string]any{
"id": msg.ID(),
"channel": out.Channel(),
"contact": contact.Reference(),
"urn": msg.URN(),
"text": msg.Text(),
"attachments": msg.Attachments(),
"status": msg.Status(),
"created_on": msg.CreatedOn(),
"modified_on": msg.ModifiedOn(),
}, http.StatusOK, nil
}

// Request to resend failed messages.
//
// {
Expand All @@ -32,13 +108,13 @@ type resendRequest struct {
func handleResend(ctx context.Context, rt *runtime.Runtime, r *http.Request) (interface{}, int, error) {
request := &resendRequest{}
if err := web.ReadAndValidateJSON(r, request); err != nil {
return errors.Wrapf(err, "request failed validation"), http.StatusBadRequest, nil
return errors.Wrap(err, "request failed validation"), http.StatusBadRequest, nil
}

// grab our org
oa, err := models.GetOrgAssets(ctx, rt, request.OrgID)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
return nil, http.StatusInternalServerError, errors.Wrap(err, "unable to load org assets")
}

msgs, err := models.GetMessagesByID(ctx, rt.DB, request.OrgID, models.DirectionOut, request.MsgIDs)
Expand Down
12 changes: 11 additions & 1 deletion web/msg/msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ import (
"github.com/nyaruka/mailroom/testsuite/testdata"
)

func TestServer(t *testing.T) {
func TestSend(t *testing.T) {
ctx, rt := testsuite.Runtime()

defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis)

testsuite.RunWebTests(t, ctx, rt, "testdata/send.json", nil)

testsuite.AssertCourierQueues(t, map[string][]int{"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {1, 1}})
}

func TestResend(t *testing.T) {
ctx, rt := testsuite.Runtime()

defer testsuite.Reset(testsuite.ResetData)
Expand Down
Loading

0 comments on commit c1101a3

Please sign in to comment.