Skip to content

Commit

Permalink
Merge pull request #671 from nyaruka/fetch_attachments
Browse files Browse the repository at this point in the history
Call courier endpoint to fetch attachments
  • Loading branch information
rowanseymour authored Oct 12, 2022
2 parents 30f9e9b + 9075aa5 commit 738ca23
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 69 deletions.
8 changes: 5 additions & 3 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ RETURNING
`

// UpdateMessage updates a message after handling
func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID, topup TopupID) error {
func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID, topup TopupID, attachments []utils.Attachment, logUUIDs []ChannelLogUUID) error {
_, err := tx.ExecContext(ctx,
`UPDATE
msgs_msg
Expand All @@ -645,10 +645,12 @@ func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status Ms
visibility = $3,
msg_type = $4,
flow_id = $5,
topup_id = $6
topup_id = $6,
attachments = $7,
log_uuids = array_cat(log_uuids, $8)
WHERE
id = $1`,
msgID, status, visibility, msgType, flow, topup)
msgID, status, visibility, msgType, flow, topup, pq.Array(attachments), pq.Array(logUUIDs))

if err != nil {
return errors.Wrapf(err, "error updating msg: %d", msgID)
Expand Down
55 changes: 53 additions & 2 deletions core/msgio/courier.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
package msgio

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/core/models"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/mailroom/runtime"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var courierHttpClient = &http.Client{
Timeout: 5 * time.Second,
}

const (
bulkPriority = 0
highPriority = 1
Expand Down Expand Up @@ -137,3 +150,41 @@ func ClearCourierQueues(rc redis.Conn, ch *models.Channel) error {
_, err := queueClearScript.Do(rc, "msgs", ch.UUID(), ch.TPS())
return err
}

// see https://github.com/nyaruka/courier/blob/main/attachments.go#L23
type fetchAttachmentRequest struct {
ChannelType models.ChannelType `json:"channel_type"`
ChannelUUID assets.ChannelUUID `json:"channel_uuid"`
URL string `json:"url"`
}

type fetchAttachmentResponse struct {
Attachment struct {
ContentType string `json:"content_type"`
URL string `json:"url"`
Size int `json:"size"`
} `json:"attachment"`
LogUUID string `json:"log_uuid"`
}

// FetchAttachment calls courier to fetch the given attachment
func FetchAttachment(ctx context.Context, rt *runtime.Runtime, ch *models.Channel, attURL string) (utils.Attachment, models.ChannelLogUUID, error) {
payload := jsonx.MustMarshal(&fetchAttachmentRequest{
ChannelType: ch.Type(),
ChannelUUID: ch.UUID(),
URL: attURL,
})
req, _ := http.NewRequest("POST", fmt.Sprintf("https://%s/c/_fetch-attachment", rt.Config.Domain), bytes.NewReader(payload))
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", rt.Config.CourierAuthToken))

resp, err := httpx.DoTrace(courierHttpClient, req, nil, nil, -1)
if err != nil || resp.Response.StatusCode != 200 {
return "", "", errors.New("error calling courier endpoint")
}
fa := &fetchAttachmentResponse{}
if err := json.Unmarshal(resp.ResponseBody, fa); err != nil {
return "", "", errors.Wrap(err, "error unmarshaling courier response")
}

return utils.Attachment(fmt.Sprintf("%s:%s", fa.Attachment.ContentType, fa.Attachment.URL)), models.ChannelLogUUID(fa.LogUUID), nil
}
34 changes: 16 additions & 18 deletions core/tasks/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,14 @@ func TestMsgEvents(t *testing.T) {

makeMsgTask := func(org *testdata.Org, channel *testdata.Channel, contact *testdata.Contact, text string) *queue.Task {
return &queue.Task{Type: handler.MsgEventType, OrgID: int(org.ID), Task: jsonx.MustMarshal(&handler.MsgEvent{
ContactID: contact.ID,
OrgID: org.ID,
ChannelUUID: channel.UUID,
ChannelType: channel.Type,
MsgID: dbMsg.ID(),
MsgUUID: dbMsg.UUID(),
URN: contact.URN,
URNID: contact.URNID,
Text: text,
ContactID: contact.ID,
OrgID: org.ID,
ChannelID: channel.ID,
MsgID: dbMsg.ID(),
MsgUUID: dbMsg.UUID(),
URN: contact.URN,
URNID: contact.URNID,
Text: text,
})}
}

Expand Down Expand Up @@ -645,15 +644,14 @@ func TestTimedEvents(t *testing.T) {
Type: tc.EventType,
OrgID: int(tc.Org.ID),
Task: jsonx.MustMarshal(&handler.MsgEvent{
ContactID: tc.Contact.ID,
OrgID: tc.Org.ID,
ChannelUUID: tc.Channel.UUID,
ChannelType: tc.Channel.Type,
MsgID: flows.MsgID(1),
MsgUUID: flows.MsgUUID(uuids.New()),
URN: tc.Contact.URN,
URNID: tc.Contact.URNID,
Text: tc.Message,
ContactID: tc.Contact.ID,
OrgID: tc.Org.ID,
ChannelID: tc.Channel.ID,
MsgID: flows.MsgID(1),
MsgUUID: flows.MsgUUID(uuids.New()),
URN: tc.Contact.URN,
URNID: tc.Contact.URNID,
Text: tc.Message,
}),
}
} else if tc.EventType == handler.ExpirationEventType {
Expand Down
96 changes: 53 additions & 43 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/engine"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/core/runner"
"github.com/nyaruka/mailroom/runtime"
Expand Down Expand Up @@ -478,35 +478,55 @@ func handleStopEvent(ctx context.Context, rt *runtime.Runtime, event *StopEvent)
func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) error {
oa, err := models.GetOrgAssets(ctx, rt, event.OrgID)
if err != nil {
return errors.Wrapf(err, "error loading org")
return errors.Wrap(err, "error loading org")
}

// allocate a topup for this message if org uses topups
topupID, err := models.AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), 1)
if err != nil {
return errors.Wrapf(err, "error allocating topup for incoming message")
return errors.Wrap(err, "error allocating topup for incoming message")
}

// load the channel for this message
channel := oa.ChannelByID(event.ChannelID)

// fetch the attachments on the message (i.e. ask courier to fetch them)
attachments := make([]utils.Attachment, 0, len(event.Attachments))
logUUIDs := make([]models.ChannelLogUUID, 0, len(event.Attachments))

// no channel, no attachments
if channel != nil {
for _, attURL := range event.Attachments {
// if courier has already fetched this attachment, use it as is
if utils.Attachment(attURL).ContentType() != "" {
attachments = append(attachments, utils.Attachment(attURL))
} else {
attachment, logUUID, err := msgio.FetchAttachment(ctx, rt, channel, attURL)
if err != nil {
return errors.Wrapf(err, "error fetching attachment '%s'", attURL)
}

attachments = append(attachments, attachment)
logUUIDs = append(logUUIDs, logUUID)
}
}
}

// load our contact
contacts, err := models.LoadContacts(ctx, rt.ReadonlyDB, oa, []models.ContactID{event.ContactID})
modelContact, err := models.LoadContact(ctx, rt.ReadonlyDB, oa, event.ContactID)
if err != nil {
return errors.Wrapf(err, "error loading contact")
}

// contact has been deleted, ignore this message but mark it as handled
if len(contacts) == 0 {
err := models.UpdateMessage(ctx, rt.DB, event.MsgID, models.MsgStatusHandled, models.VisibilityArchived, models.MsgTypeInbox, models.NilFlowID, topupID)
// contact has been deleted, or is blocked, or channel no longer exists, ignore this message but mark it as handled
if modelContact == nil || modelContact.Status() == models.ContactStatusBlocked || channel == nil {
err := models.UpdateMessage(ctx, rt.DB, event.MsgID, models.MsgStatusHandled, models.VisibilityArchived, models.MsgTypeInbox, models.NilFlowID, topupID, attachments, logUUIDs)
if err != nil {
return errors.Wrapf(err, "error updating message for deleted contact")
}
return nil
}

modelContact := contacts[0]

// load the channel for this message
channel := oa.ChannelByUUID(event.ChannelUUID)

// if we have URNs make sure the message URN is our highest priority (this is usually a noop)
if len(modelContact.URNs()) > 0 {
err = modelContact.UpdatePreferredURN(ctx, rt.DB, oa, event.URNID, channel)
Expand All @@ -515,15 +535,6 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
}
}

// if this channel is no longer active or this contact is blocked, ignore this message (mark it as handled)
if channel == nil || modelContact.Status() == models.ContactStatusBlocked {
err := models.UpdateMessage(ctx, rt.DB, event.MsgID, models.MsgStatusHandled, models.VisibilityArchived, models.MsgTypeInbox, models.NilFlowID, topupID)
if err != nil {
return errors.Wrapf(err, "error marking blocked or nil channel message as handled")
}
return nil
}

// stopped contact? they are unstopped if they send us an incoming message
newContact := event.NewContact
if modelContact.Status() == models.ContactStatusStopped {
Expand Down Expand Up @@ -555,7 +566,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
return errors.Wrapf(err, "unable to look up open tickets for contact")
}
for _, ticket := range tickets {
ticket.ForwardIncoming(ctx, rt, oa, event.MsgUUID, event.Text, event.Attachments)
ticket.ForwardIncoming(ctx, rt, oa, event.MsgUUID, event.Text, attachments)
}

// find any matching triggers
Expand Down Expand Up @@ -583,7 +594,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
}
}

msgIn := flows.NewMsgIn(event.MsgUUID, event.URN, channel.ChannelReference(), event.Text, event.Attachments)
msgIn := flows.NewMsgIn(event.MsgUUID, event.URN, channel.ChannelReference(), event.Text, attachments)
msgIn.SetExternalID(string(event.MsgExternalID))
msgIn.SetID(event.MsgID)

Expand All @@ -595,7 +606,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
}
sessions[0].SetIncomingMsg(event.MsgID, event.MsgExternalID)

return markMsgHandled(ctx, tx, contact, msgIn, flow, topupID, tickets)
return markMsgHandled(ctx, tx, contact, msgIn, flow, topupID, attachments, tickets, logUUIDs)
}

// we found a trigger and their session is nil or doesn't ignore keywords
Expand All @@ -612,7 +623,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
// if this is an IVR flow, we need to trigger that start (which happens in a different queue)
if flow.FlowType() == models.FlowTypeVoice {
ivrMsgHook := func(ctx context.Context, tx *sqlx.Tx) error {
return markMsgHandled(ctx, tx, contact, msgIn, flow, topupID, tickets)
return markMsgHandled(ctx, tx, contact, msgIn, flow, topupID, attachments, tickets, logUUIDs)
}
err = runner.TriggerIVRFlow(ctx, rt, oa.OrgID(), flow.ID(), []models.ContactID{modelContact.ID()}, ivrMsgHook)
if err != nil {
Expand Down Expand Up @@ -642,7 +653,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
}

// this message didn't trigger and new sessions or resume any existing ones, so handle as inbox
err = handleAsInbox(ctx, rt, oa, contact, msgIn, topupID, tickets)
err = handleAsInbox(ctx, rt, oa, contact, msgIn, topupID, attachments, logUUIDs, tickets)
if err != nil {
return errors.Wrapf(err, "error handling inbox message")
}
Expand Down Expand Up @@ -746,7 +757,7 @@ func handleTicketEvent(ctx context.Context, rt *runtime.Runtime, event *models.T
}

// handles a message as an inbox message
func handleAsInbox(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, contact *flows.Contact, msg *flows.MsgIn, topupID models.TopupID, tickets []*models.Ticket) error {
func handleAsInbox(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, contact *flows.Contact, msg *flows.MsgIn, topupID models.TopupID, attachments []utils.Attachment, logUUIDs []models.ChannelLogUUID, tickets []*models.Ticket) error {
// usually last_seen_on is updated by handling the msg_received event in the engine sprint, but since this is an inbox
// message we manually create that event and handle it
msgEvent := events.NewMsgReceived(msg)
Expand All @@ -758,19 +769,19 @@ func handleAsInbox(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAsset
return errors.Wrap(err, "error handling inbox message events")
}

return markMsgHandled(ctx, rt.DB, contact, msg, nil, topupID, tickets)
return markMsgHandled(ctx, rt.DB, contact, msg, nil, topupID, attachments, tickets, logUUIDs)
}

// utility to mark as message as handled and update any open contact tickets
func markMsgHandled(ctx context.Context, db models.Queryer, contact *flows.Contact, msg *flows.MsgIn, flow *models.Flow, topupID models.TopupID, tickets []*models.Ticket) error {
func markMsgHandled(ctx context.Context, db models.Queryer, contact *flows.Contact, msg *flows.MsgIn, flow *models.Flow, topupID models.TopupID, attachments []utils.Attachment, tickets []*models.Ticket, logUUIDs []models.ChannelLogUUID) error {
msgType := models.MsgTypeInbox
flowID := models.NilFlowID
if flow != nil {
msgType = models.MsgTypeFlow
flowID = flow.ID()
}

err := models.UpdateMessage(ctx, db, msg.ID(), models.MsgStatusHandled, models.VisibilityVisible, msgType, flowID, topupID)
err := models.UpdateMessage(ctx, db, msg.ID(), models.MsgStatusHandled, models.VisibilityVisible, msgType, flowID, topupID, attachments, logUUIDs)
if err != nil {
return errors.Wrapf(err, "error marking message as handled")
}
Expand All @@ -797,19 +808,18 @@ type TimedEvent struct {
}

type MsgEvent struct {
ContactID models.ContactID `json:"contact_id"`
OrgID models.OrgID `json:"org_id"`
ChannelUUID assets.ChannelUUID `json:"channel_uuid"`
ChannelType models.ChannelType `json:"channel_type"`
MsgID flows.MsgID `json:"msg_id"`
MsgUUID flows.MsgUUID `json:"msg_uuid"`
MsgExternalID null.String `json:"msg_external_id"`
URN urns.URN `json:"urn"`
URNID models.URNID `json:"urn_id"`
Text string `json:"text"`
Attachments []utils.Attachment `json:"attachments"`
NewContact bool `json:"new_contact"`
CreatedOn time.Time `json:"created_on"`
ContactID models.ContactID `json:"contact_id"`
OrgID models.OrgID `json:"org_id"`
ChannelID models.ChannelID `json:"channel_id"`
MsgID flows.MsgID `json:"msg_id"`
MsgUUID flows.MsgUUID `json:"msg_uuid"`
MsgExternalID null.String `json:"msg_external_id"`
URN urns.URN `json:"urn"`
URNID models.URNID `json:"urn_id"`
Text string `json:"text"`
Attachments []string `json:"attachments"`
NewContact bool `json:"new_contact"`
CreatedOn time.Time `json:"created_on"`
}

type StopEvent struct {
Expand Down
6 changes: 3 additions & 3 deletions runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type Config struct {
AWSSecretAccessKey string `help:"the secret access key id to use when authenticating S3"`
AWSUseCredChain bool `help:"whether to use the AWS credentials chain. Defaults to false."`

LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`

CourierAuthToken string `help:"the authentication token used for requests to Courier"`
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
FCMKey string `help:"the FCM API key used to notify Android relayers to sync"`
MailgunSigningKey string `help:"the signing key used to validate requests from mailgun"`

Expand Down

0 comments on commit 738ca23

Please sign in to comment.