Skip to content

Commit

Permalink
Merge pull request #76 from uwcirg/dev
Browse files Browse the repository at this point in the history
bring all dev changes into main for RC
  • Loading branch information
pbugni authored Dec 4, 2024
2 parents daee18a + 45b737b commit 0be1c41
Show file tree
Hide file tree
Showing 14 changed files with 438 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on: [push]
jobs:
test:
runs-on: ubuntu-latest
container: python:3.7
container: python:3.11
# Service containers to run with `container-job`
services:
# Label used to access the service container
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7
FROM python:3.11

WORKDIR /opt/app

Expand Down
42 changes: 27 additions & 15 deletions isacc_messaging/api/isacc_record_creator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from datetime import datetime, timedelta
from flask import current_app
import re
import requests
from typing import List, Tuple

from fhirclient.models.communication import Communication
from flask import current_app
from twilio.base.exceptions import TwilioRestException

from isacc_messaging.api.email_notifications import send_message_received_notification
from isacc_messaging.api.ml_utils import predict_score
from isacc_messaging.audit import audit_entry
from isacc_messaging.exceptions import IsaccTwilioSIDnotFound
from isacc_messaging.models.fhir import (
HAPI_request,
first_in_bundle,
Expand Down Expand Up @@ -47,11 +48,6 @@ def case_insensitive_replace(text, old, new):
return c


class IsaccTwilioError(Exception):
"""Raised when Twilio SMS are not functioning as required for ISACC"""
pass


class IsaccRecordCreator:
def __init__(self):
pass
Expand Down Expand Up @@ -210,7 +206,7 @@ def on_twilio_message_status_update(self, values):
extra={"message_sid": message_sid},
level='error'
)
raise IsaccTwilioError(f"ERROR! {error}: {message_sid}")
raise IsaccTwilioSIDnotFound(f"ERROR! {error}: {message_sid}")

cr = CommunicationRequest(cr)
patient = resolve_reference(cr.recipient[0].reference)
Expand Down Expand Up @@ -303,12 +299,20 @@ def on_twilio_message_received(self, values):
)

def score_message(self, message):
model_path = current_app.config.get('TORCH_MODEL_PATH')
if not model_path:
ml_service_address = current_app.config.get('ML_SERVICE_ADDRESS')
if not ml_service_address:
return "routine"

try:
score = predict_score(message, model_path)
url = f'{ml_service_address}/predict_score'
response = requests.post(url, json={"message": message})
response.raise_for_status()
audit_entry(
f"predict_score call response: {response.json()}",
level='info'
)

score = response.json().get('score')
if score == 1:
return "stat"
except Exception as e:
Expand All @@ -321,11 +325,11 @@ def score_message(self, message):

def execute_requests(self) -> Tuple[List[dict], List[dict]]:
"""
For all due CommunicationRequests, generate SMS, create Communication resource, and update CommunicationRequest
For all due CommunicationRequests (up to throttle limit), generate SMS, create Communication resource, and update CommunicationRequest
"""
successes = []
errors = []

throttle_limit = 30 # conservative value based on heuristics from logs
now = datetime.now().astimezone()
cutoff = now - timedelta(days=2)

Expand All @@ -335,6 +339,7 @@ def execute_requests(self) -> Tuple[List[dict], List[dict]]:
"occurrence": f"le{now.isoformat()}",
})

sent = 0
for cr_json in next_in_bundle(result):
cr = CommunicationRequest(cr_json)
# as that message was likely the next-outgoing for the patient,
Expand Down Expand Up @@ -385,7 +390,7 @@ def execute_requests(self) -> Tuple[List[dict], List[dict]]:
try:
cr.status = "completed"
cr.persist()
comm_status, comm_statusReason = self.process_cr(cr, successes)
comm_status, comm_statusReason = self.process_cr(cr)
dispatched_comm = comm.change_status(status=comm_status)
audit_entry(
f"Updated status of Communication/{comm.id} to {comm_status}",
Expand Down Expand Up @@ -415,8 +420,15 @@ def execute_requests(self) -> Tuple[List[dict], List[dict]]:
level='exception'
)

# Flooding system on occasions such as a holiday message to all,
# leads to an overwhelmed system. Restrict the flood by processing
# only throttle_limit per run.
sent += 1
if sent > throttle_limit:
break

return successes, errors

def process_cr(self, cr: CommunicationRequest, successes: list):
def process_cr(self, cr: CommunicationRequest):
status, statusReason = self.dispatch_cr(cr=cr)
return status, statusReason
84 changes: 0 additions & 84 deletions isacc_messaging/api/ml_utils.py

This file was deleted.

59 changes: 51 additions & 8 deletions isacc_messaging/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
import click
import logging
from datetime import datetime
from time import sleep
from flask import Blueprint, jsonify, request
from flask import current_app
from flask.cli import with_appcontext
from twilio.request_validator import RequestValidator

from isacc_messaging.api.isacc_record_creator import IsaccRecordCreator
from isacc_messaging.audit import audit_entry
from twilio.request_validator import RequestValidator
from isacc_messaging.exceptions import IsaccTwilioSIDnotFound
from isacc_messaging.robust_request import serialize_request, queue_request, pop_request

base_blueprint = Blueprint('base', __name__, cli_group=None)

Expand Down Expand Up @@ -88,22 +92,43 @@ def auditlog_addevent():


@base_blueprint.route("/MessageStatus", methods=['POST'])
def message_status_update():
def message_status_update(callback_req=None, attempt_count=0):
"""Registered callback for Twilio to transmit updates
As Twilio occasionally hits this callback prior to local data being
available, it is also called subsequently from a job queue. The
parameters are only defined in the retry state.
:param req: request from a job queue
:param attempt_count: the number of failed attemts thus far, only
defined from job queue
"""
use_request = request
if callback_req:
use_request = callback_req

audit_entry(
f"Call to /MessageStatus webhook",
extra={'request.values': dict(request.values)},
extra={'use_request.values': dict(use_request.values)},
level='debug'
)

record_creator = IsaccRecordCreator()
try:
record_creator.on_twilio_message_status_update(request.values)
record_creator.on_twilio_message_status_update(use_request.values)
except Exception as ex:
audit_entry(
f"on_twilio_message_status_update generated error {ex}",
level='error'
)
return ex, 200
# Couldn't locate the message, most likely means twilio was quicker
# to call back, than HAPI could persist and find. Push to REDIS
# for another attempt later
if isinstance(ex, IsaccTwilioSIDnotFound):
req = serialize_request(use_request, attempt_count=attempt_count)
queue_request(req)

return str(ex), 200
return '', 204


Expand Down Expand Up @@ -155,8 +180,8 @@ def incoming_sms():
level="error")
return stackstr, 200
if result is not None:
# Occurs when message is incoming from unknown phone
# or request is coming from a subscribed phone number, but
# Occurs when message is incoming from unknown phone
# or request is coming from a subscribed phone number, but
# internal logic renders it invalid
audit_entry(
f"on_twilio_message_received generated error {result}",
Expand Down Expand Up @@ -213,6 +238,25 @@ def execute_requests():
])


@base_blueprint.cli.command("retry_requests")
@with_appcontext
def retry_requests():
"""Look for any failed requests and retry now"""
while True:
failed_request = pop_request()
if not failed_request:
break

# Only expecting one route at this time
if (
failed_request.url.endswith("/MessageStatus") and
failed_request.method.upper() == 'POST'):
with current_app.test_request_context():
response, response_code = message_status_update(
failed_request, failed_request.attempt_count + 1)
if response_code != 204:
sleep(1) # give system a moment to catch up before retry

@base_blueprint.cli.command("send-system-emails")
@click.argument("category", required=True)
@click.option("--dry-run", is_flag=True, default=False, help="Simulate execution; generate but don't send email")
Expand Down Expand Up @@ -301,4 +345,3 @@ def deactivate_patient(patient_id):
f"Patient {patient_id} active set to false",
level='info'
)

3 changes: 1 addition & 2 deletions isacc_messaging/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN")
TWILIO_PHONE_NUMBER = os.getenv("TWILIO_PHONE_NUMBER")
TORCH_MODEL_PATH = os.getenv("TORCH_MODEL_PATH")
ML_SERVICE_ADDRESS = os.getenv("ML_SERVICE_ADDRESS")

ISACC_NOTIFICATION_EMAIL_SENDER_ADDRESS = os.getenv("ISACC_NOTIFICATION_EMAIL_SENDER_ADDRESS")
ISACC_NOTIFICATION_EMAIL_PASSWORD = os.getenv("ISACC_NOTIFICATION_EMAIL_PASSWORD")
Expand All @@ -40,4 +40,3 @@
ISACC_APP_URL = os.getenv("ISACC_APP_URL")
EMAIL_PORT = os.getenv("EMAIL_PORT", 465)
EMAIL_SERVER = os.getenv("EMAIL_SERVER", "smtp.gmail.com")

10 changes: 10 additions & 0 deletions isacc_messaging/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Module to define exceptions used by isacc_messaging."""

class IsaccTwilioSIDnotFound(Exception):
"""Raised when Twilio calls with SID that can't be found"""
pass


class IsaccRequestRetriesExhausted(Exception):
"""Raised when max retries have been tried w/o success"""
pass
5 changes: 4 additions & 1 deletion isacc_messaging/models/isacc_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ def change_status(self, status):
def about_patient(patient):
"""Query for "outside" Communications about the patient
This includes the dummy Communications added when staff resolve
a message without a response (category:isacc-message-resolved-no-send)
NB: only `sent` or `received` will have a valueDateTime depending on
direction of outside communication. `sent` implies communication from
practitioner, `received` implies communication from patient.
"""
return HAPI_request("GET", "Communication", params={
"category": "isacc-non-sms-message",
"category": "isacc-non-sms-message,isacc-message-resolved-no-send",
"subject": f"Patient/{patient.id}",
"_sort": "-sent",
})
Expand Down
2 changes: 1 addition & 1 deletion isacc_messaging/models/isacc_fhirdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ def __repr__(self):


DEEP_PAST = IsaccFHIRDate("1975-01-01T00:00:00Z")
DEEP_FUTURE = IsaccFHIRDate("2025-01-01T00:00:00Z")
DEEP_FUTURE = IsaccFHIRDate("2025-01-01T00:00:00Z")
2 changes: 1 addition & 1 deletion isacc_messaging/models/isacc_patient.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def mark_followup_extension(self, persist_on_change=True):
for c in next_in_bundle(Communication.for_patient(self, category="isacc-manually-sent-message")):
most_recent_followup = FHIRDate(c["sent"])
break
# also possible a followup was recorded as `outside communication`
# also possible a followup was recorded as `outside communication` or resolved
for c in next_in_bundle(Communication.about_patient(self)):
# only consider outside communications reported to have been `sent`
if "sent" in c:
Expand Down
Loading

0 comments on commit 0be1c41

Please sign in to comment.