Skip to content

Commit

Permalink
robustly handle failed callbacks from Twilio (#73)
Browse files Browse the repository at this point in the history
* refactor custom exceptions for use throughout the project.

* on failed twilio SID lookup, push the callback request to REDIS for a delayed 2nd and 3rd attempt.

* Update isacc_messaging/api/views.py

Co-authored-by: Ivan Cvitkovic <ivanc@uw.edu>

---------

Co-authored-by: Ivan Cvitkovic <ivanc@uw.edu>
  • Loading branch information
pbugni and ivan-c authored Nov 21, 2024
1 parent 0ca25dd commit dcd1f37
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 12 deletions.
9 changes: 2 additions & 7 deletions isacc_messaging/api/isacc_record_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import re
import requests
from typing import List, Tuple
import json

from fhirclient.models.communication import Communication
from twilio.base.exceptions import TwilioRestException

from isacc_messaging.api.email_notifications import send_message_received_notification
from isacc_messaging.audit import audit_entry
from isacc_messaging.exceptions import IsaccTwilioSIDnotFound
from isacc_messaging.models.fhir import (
HAPI_request,
first_in_bundle,
Expand Down Expand Up @@ -48,11 +48,6 @@ def case_insensitive_replace(text, old, new):
return c


class IsaccTwilioError(Exception):
"""Raised when Twilio SMS are not functioning as required for ISACC"""
pass


class IsaccRecordCreator:
def __init__(self):
pass
Expand Down Expand Up @@ -211,7 +206,7 @@ def on_twilio_message_status_update(self, values):
extra={"message_sid": message_sid},
level='error'
)
raise IsaccTwilioError(f"ERROR! {error}: {message_sid}")
raise IsaccTwilioSIDnotFound(f"ERROR! {error}: {message_sid}")

cr = CommunicationRequest(cr)
patient = resolve_reference(cr.recipient[0].reference)
Expand Down
54 changes: 49 additions & 5 deletions isacc_messaging/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
import click
import logging
from datetime import datetime
from time import sleep
from flask import Blueprint, jsonify, request
from flask import current_app
from flask.cli import with_appcontext
from twilio.request_validator import RequestValidator

from isacc_messaging.api.isacc_record_creator import IsaccRecordCreator
from isacc_messaging.audit import audit_entry
from twilio.request_validator import RequestValidator
from isacc_messaging.exceptions import IsaccTwilioSIDnotFound
from isacc_messaging.robust_request import serialize_request, queue_request, pop_request

base_blueprint = Blueprint('base', __name__, cli_group=None)

Expand Down Expand Up @@ -88,22 +92,43 @@ def auditlog_addevent():


@base_blueprint.route("/MessageStatus", methods=['POST'])
def message_status_update():
def message_status_update(callback_req=None, attempt_count=0):
"""Registered callback for Twilio to transmit updates
As Twilio occasionally hits this callback prior to local data being
available, it is also called subsequently from a job queue. The
parameters are only defined in the retry state.
:param req: request from a job queue
:param attempt_count: the number of failed attemts thus far, only
defined from job queue
"""
use_request = request
if callback_req:
use_request = callback_req

audit_entry(
f"Call to /MessageStatus webhook",
extra={'request.values': dict(request.values)},
extra={'use_request.values': dict(use_request.values)},
level='debug'
)

record_creator = IsaccRecordCreator()
try:
record_creator.on_twilio_message_status_update(request.values)
record_creator.on_twilio_message_status_update(use_request.values)
except Exception as ex:
audit_entry(
f"on_twilio_message_status_update generated error {ex}",
level='error'
)
return ex, 200
# Couldn't locate the message, most likely means twilio was quicker
# to call back, than HAPI could persist and find. Push to REDIS
# for another attempt later
if isinstance(ex, IsaccTwilioSIDnotFound):
req = serialize_request(use_request, attempt_count=attempt_count)
queue_request(req)

return str(ex), 200
return '', 204


Expand Down Expand Up @@ -213,6 +238,25 @@ def execute_requests():
])


@base_blueprint.cli.command("retry_requests")
@with_appcontext
def retry_requests():
"""Look for any failed requests and retry now"""
while True:
failed_request = pop_request()
if not failed_request:
break

# Only expecting one route at this time
if (
failed_request.url.endswith("/MessageStatus") and
failed_request.method.upper() == 'POST'):
with current_app.test_request_context():
response, response_code = message_status_update(
failed_request, failed_request.attempt_count + 1)
if response_code != 204:
sleep(1) # give system a moment to catch up before retry

@base_blueprint.cli.command("send-system-emails")
@click.argument("category", required=True)
@click.option("--dry-run", is_flag=True, default=False, help="Simulate execution; generate but don't send email")
Expand Down
10 changes: 10 additions & 0 deletions isacc_messaging/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Module to define exceptions used by isacc_messaging."""

class IsaccTwilioSIDnotFound(Exception):
"""Raised when Twilio calls with SID that can't be found"""
pass


class IsaccRequestRetriesExhausted(Exception):
"""Raised when max retries have been tried w/o success"""
pass
44 changes: 44 additions & 0 deletions isacc_messaging/robust_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Functions used for robust handling of failed requests
Basic model: a request fails. Rather than give up, push the request
to a job queue and try again from a consumer.
"""
import json
import redis
from flask import current_app

from isacc_messaging.exceptions import IsaccRequestRetriesExhausted


def serialize_request(req, attempt_count=1, max_retries=3):
"""Given a request object, returns a serialized form
:param req: The request object
:param attempt_count: Increment from previous failure on each call
:param max_retries: Maximum number of retries before giving up
Need a serialized form of the request to push into a job queue.
This also maintains and enforces the number of attempts doesn't
exceed the maximum.
"""
serialized_form = json.dumps({
"method": req.method,
"url": req.url,
"headers": dict(req.headers),
"body": req.get_data(as_text=True),
"attempt_count": attempt_count,
"max_retries": max_retries
})
if attempt_count > max_retries:
raise IsaccRequestRetriesExhausted(serialized_form)
return serialized_form

def queue_request(serialized_request):
redis_client = redis.StrictRedis.from_url(current_app.config.get("REQUEST_CACHE_URL"))
redis_client.lpush("http_request_queue", serialized_request)


def pop_request():
redis_client = redis.StrictRedis.from_url(current_app.config.get("REQUEST_CACHE_URL"))
return redis_client.rpop("http_request_queue")

0 comments on commit dcd1f37

Please sign in to comment.