diff --git a/isacc_messaging/api/isacc_record_creator.py b/isacc_messaging/api/isacc_record_creator.py index 7b3388b..12912a4 100644 --- a/isacc_messaging/api/isacc_record_creator.py +++ b/isacc_messaging/api/isacc_record_creator.py @@ -3,13 +3,13 @@ import re import requests from typing import List, Tuple -import json from fhirclient.models.communication import Communication from twilio.base.exceptions import TwilioRestException from isacc_messaging.api.email_notifications import send_message_received_notification from isacc_messaging.audit import audit_entry +from isacc_messaging.exceptions import IsaccTwilioSIDnotFound from isacc_messaging.models.fhir import ( HAPI_request, first_in_bundle, @@ -48,11 +48,6 @@ def case_insensitive_replace(text, old, new): return c -class IsaccTwilioError(Exception): - """Raised when Twilio SMS are not functioning as required for ISACC""" - pass - - class IsaccRecordCreator: def __init__(self): pass @@ -211,7 +206,7 @@ def on_twilio_message_status_update(self, values): extra={"message_sid": message_sid}, level='error' ) - raise IsaccTwilioError(f"ERROR! {error}: {message_sid}") + raise IsaccTwilioSIDnotFound(f"ERROR! {error}: {message_sid}") cr = CommunicationRequest(cr) patient = resolve_reference(cr.recipient[0].reference) diff --git a/isacc_messaging/api/views.py b/isacc_messaging/api/views.py index f0d294e..3f63b9b 100644 --- a/isacc_messaging/api/views.py +++ b/isacc_messaging/api/views.py @@ -2,12 +2,16 @@ import click import logging from datetime import datetime +from time import sleep from flask import Blueprint, jsonify, request from flask import current_app +from flask.cli import with_appcontext +from twilio.request_validator import RequestValidator from isacc_messaging.api.isacc_record_creator import IsaccRecordCreator from isacc_messaging.audit import audit_entry -from twilio.request_validator import RequestValidator +from isacc_messaging.exceptions import IsaccTwilioSIDnotFound +from isacc_messaging.robust_request import serialize_request, queue_request, pop_request base_blueprint = Blueprint('base', __name__, cli_group=None) @@ -88,22 +92,43 @@ def auditlog_addevent(): @base_blueprint.route("/MessageStatus", methods=['POST']) -def message_status_update(): +def message_status_update(callback_req=None, attempt_count=0): + """Registered callback for Twilio to transmit updates + + As Twilio occasionally hits this callback prior to local data being + available, it is also called subsequently from a job queue. The + parameters are only defined in the retry state. + + :param req: request from a job queue + :param attempt_count: the number of failed attemts thus far, only + defined from job queue + """ + use_request = request + if callback_req: + use_request = callback_req + audit_entry( f"Call to /MessageStatus webhook", - extra={'request.values': dict(request.values)}, + extra={'use_request.values': dict(use_request.values)}, level='debug' ) record_creator = IsaccRecordCreator() try: - record_creator.on_twilio_message_status_update(request.values) + record_creator.on_twilio_message_status_update(use_request.values) except Exception as ex: audit_entry( f"on_twilio_message_status_update generated error {ex}", level='error' ) - return ex, 200 + # Couldn't locate the message, most likely means twilio was quicker + # to call back, than HAPI could persist and find. Push to REDIS + # for another attempt later + if isinstance(ex, IsaccTwilioSIDnotFound): + req = serialize_request(use_request, attempt_count=attempt_count) + queue_request(req) + + return str(ex), 200 return '', 204 @@ -213,6 +238,25 @@ def execute_requests(): ]) +@base_blueprint.cli.command("retry_requests") +@with_appcontext +def retry_requests(): + """Look for any failed requests and retry now""" + while True: + failed_request = pop_request() + if not failed_request: + break + + # Only expecting one route at this time + if ( + failed_request.url.endswith("/MessageStatus") and + failed_request.method.upper() == 'POST'): + with current_app.test_request_context(): + response, response_code = message_status_update( + failed_request, failed_request.attempt_count + 1) + if response_code != 204: + sleep(1) # give system a moment to catch up before retry + @base_blueprint.cli.command("send-system-emails") @click.argument("category", required=True) @click.option("--dry-run", is_flag=True, default=False, help="Simulate execution; generate but don't send email") diff --git a/isacc_messaging/exceptions.py b/isacc_messaging/exceptions.py new file mode 100644 index 0000000..e6da630 --- /dev/null +++ b/isacc_messaging/exceptions.py @@ -0,0 +1,10 @@ +"""Module to define exceptions used by isacc_messaging.""" + +class IsaccTwilioSIDnotFound(Exception): + """Raised when Twilio calls with SID that can't be found""" + pass + + +class IsaccRequestRetriesExhausted(Exception): + """Raised when max retries have been tried w/o success""" + pass diff --git a/isacc_messaging/robust_request.py b/isacc_messaging/robust_request.py new file mode 100644 index 0000000..4acdf8d --- /dev/null +++ b/isacc_messaging/robust_request.py @@ -0,0 +1,44 @@ +"""Functions used for robust handling of failed requests + +Basic model: a request fails. Rather than give up, push the request +to a job queue and try again from a consumer. +""" +import json +import redis +from flask import current_app + +from isacc_messaging.exceptions import IsaccRequestRetriesExhausted + + +def serialize_request(req, attempt_count=1, max_retries=3): + """Given a request object, returns a serialized form + + :param req: The request object + :param attempt_count: Increment from previous failure on each call + :param max_retries: Maximum number of retries before giving up + + Need a serialized form of the request to push into a job queue. + This also maintains and enforces the number of attempts doesn't + exceed the maximum. + """ + serialized_form = json.dumps({ + "method": req.method, + "url": req.url, + "headers": dict(req.headers), + "body": req.get_data(as_text=True), + "attempt_count": attempt_count, + "max_retries": max_retries + }) + if attempt_count > max_retries: + raise IsaccRequestRetriesExhausted(serialized_form) + return serialized_form + +def queue_request(serialized_request): + redis_client = redis.StrictRedis.from_url(current_app.config.get("REQUEST_CACHE_URL")) + redis_client.lpush("http_request_queue", serialized_request) + + +def pop_request(): + redis_client = redis.StrictRedis.from_url(current_app.config.get("REQUEST_CACHE_URL")) + return redis_client.rpop("http_request_queue") +