Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TN-3291 implement caching of research data, for performance improvement #4382

Merged
merged 21 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8eab1c1
TN-3291 implement caching of research data, for dramatic performance …
pbugni Jun 4, 2024
1ffc472
pep8 clean-up only
pbugni Jun 11, 2024
d66e85f
pep8 clean-up only
pbugni Jun 11, 2024
a74591e
patched failing tests: bundled assessment queries require QB association
pbugni Jun 12, 2024
8aa0000
TN-3250 and research_data caching removed the ability to toggle quest…
pbugni Jun 12, 2024
0216f5c
rename variable for fewer source code changes.
pbugni Jun 12, 2024
4f7faf5
pep8
pbugni Jun 12, 2024
47c73ec
pep8
pbugni Jun 12, 2024
0f7e661
added missing `unique=True` detail to `research_data.questionnaire_re…
pbugni Jun 13, 2024
6163c51
patch failing tests: mocking a questionnaire response needs to land i…
pbugni Jun 13, 2024
73c091f
rename variable to reduce # of code changes in branch
pbugni Jul 15, 2024
36d604e
pep issue.
pbugni Jul 15, 2024
4fbfe02
eliminate unused imports.
pbugni Jul 15, 2024
93c4df3
remove redundant unique contraint; questionnaire_response can only be…
pbugni Jul 15, 2024
a9e13c4
stick with `user_id` pattern, rather than `patient_id` as db column n…
pbugni Jul 15, 2024
bde05f9
research_data subject_id alligns best with questionnaire_response sub…
pbugni Jul 15, 2024
f8a03dc
as `invalidate_users_QBT()` also manages adherence and research data …
pbugni Jul 15, 2024
494dc33
when `invalidate_users_QBT()` is called, also clear respective rows f…
pbugni Jul 15, 2024
5c8e291
typo: overlooked `invalidate_existing` param from refactor.
pbugni Jul 15, 2024
3d87a3b
correct failing tests: work around testing scenario with mock QRs and…
pbugni Jul 17, 2024
b2c9a92
Shifted version number in migration following rebase with develop
pbugni Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,8 @@ def find_overlaps(correct_overlaps, reprocess_qnrs):
acting_user_id=admin.id,
)

update_users_QBT(
patient.id, research_study_id=0, invalidate_existing=True)
invalidate_users_QBT(user_id=patient.id, research_study_id=0)
update_users_QBT(user_id=patient.id, research_study_id=0)
present_before_after_state(
patient.id, patient.external_study_id, b4)

Expand Down Expand Up @@ -838,8 +838,8 @@ def preview_site_update(org_id, retired):
research_study_id=0,
acting_user_id=admin.id,
)
update_users_QBT(
patient.id, research_study_id=0, invalidate_existing=True)
invalidate_users_QBT(user_id=patient.id, research_study_id=0)
update_users_QBT(user_id=patient.id, research_study_id=0)
after_qnrs, after_timeline, qnrs_lost_reference, _ = present_before_after_state(
patient.id, patient.external_study_id, patient_state[patient.id])
total_qnrs += len(patient_state[patient.id]['qnrs'])
Expand Down
9 changes: 9 additions & 0 deletions portal/config/eproms/ScheduledJob.json
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@
"resourceType": "ScheduledJob",
"schedule": "30 14 * * *",
"task": "cache_adherence_data_task"
},
{
"active": true,
"args": null,
"kwargs": null,
"name": "Cache Research Report Data",
"resourceType": "ScheduledJob",
"schedule": "30 23 * * *",
"task": "cache_research_data_task"
}
],
"id": "SitePersistence v0.2",
Expand Down
57 changes: 57 additions & 0 deletions portal/migrations/versions/daee63f50d35_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Add research_data table, to hold questionnaire response research data in a cache

Revision ID: daee63f50d35
Revises: cf586ed4f043
Create Date: 2024-05-21 17:00:58.204998

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'daee63f50d35'
down_revision = '6120fcfc474a'


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
'research_data',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('subject_id', sa.Integer(), nullable=False),
sa.Column('questionnaire_response_id', sa.Integer(), nullable=False),
sa.Column('instrument', sa.Text(), nullable=False),
sa.Column('research_study_id', sa.Integer(), nullable=False),
sa.Column('authored', sa.DateTime(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['subject_id'], ['users.id'], ),
sa.ForeignKeyConstraint(
['questionnaire_response_id'], ['questionnaire_responses.id'], ),
sa.PrimaryKeyConstraint('id'),
)

op.create_index(
op.f('ix_research_data_authored'), 'research_data', ['authored'], unique=False)
op.create_index(
op.f('ix_research_data_instrument'), 'research_data', ['instrument'], unique=False)
op.create_index(
op.f('ix_research_data_subject_id'), 'research_data', ['subject_id'], unique=False)
op.create_index(
op.f('ix_research_data_questionnaire_response_id'),
'research_data', ['questionnaire_response_id'], unique=True)
op.create_index(
op.f('ix_research_data_research_study_id'),
'research_data', ['research_study_id'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_research_data_research_study_id'), table_name='research_data')
op.drop_index(op.f('ix_research_data_questionnaire_response_id'), table_name='research_data')
op.drop_index(op.f('ix_research_data_subject_id'), table_name='research_data')
op.drop_index(op.f('ix_research_data_instrument'), table_name='research_data')
op.drop_index(op.f('ix_research_data_authored'), table_name='research_data')
op.drop_table('research_data')
# ### end Alembic commands ###
34 changes: 14 additions & 20 deletions portal/models/qb_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
visit_name,
)
from .questionnaire_response import QNR_results, QuestionnaireResponse
from .research_data import ResearchData
from .research_protocol import ResearchProtocol
from .role import ROLE
from .user import User
Expand Down Expand Up @@ -741,17 +742,23 @@ def ordered_qbs(user, research_study_id, classification=None):


def invalidate_users_QBT(user_id, research_study_id):
"""Mark the given user's QBT rows and adherence_data invalid (by deletion)
"""invalidate the given user's QBT rows and related cached data, by deletion

This also clears a users cached adherence and research data rows from their
respective caches.

:param user_id: user for whom to purge all QBT rows
:param research_study_id: set to limit invalidation to research study or
use string 'all' to invalidate all QBT rows for a user

"""
if research_study_id is None:
raise ValueError('research_study_id must be defined or use "all"')
if research_study_id == 'all':
QBT.query.filter(QBT.user_id == user_id).delete()
AdherenceData.query.filter(
AdherenceData.patient_id == user_id).delete()
ResearchData.query.filter(ResearchData.subject_id == user_id).delete()
else:
QBT.query.filter(QBT.user_id == user_id).filter(
QBT.research_study_id == research_study_id).delete()
Expand All @@ -761,6 +768,8 @@ def invalidate_users_QBT(user_id, research_study_id):
# SQL alchemy can't combine `like` expression with delete op.
for ad in adh_data:
db.session.delete(ad)
ResearchData.query.filter(ResearchData.subject_id == user_id).filter(
ResearchData.research_study_id == research_study_id).delete()

if not current_app.config.get("TESTING", False):
# clear the timeout lock as well, since we need a refresh
Expand All @@ -773,6 +782,7 @@ def invalidate_users_QBT(user_id, research_study_id):
cache_moderation.reset()


# clear cached qb_status_visit_name() using current as_of value
# args have to match order and values - no wild carding avail
as_of = QB_StatusCacheKey().current()
if research_study_id != 'all':
Expand Down Expand Up @@ -863,18 +873,17 @@ def int_or_none(value):
return True


def update_users_QBT(user_id, research_study_id, invalidate_existing=False):
def update_users_QBT(user_id, research_study_id):
"""Populate the QBT rows for given user, research_study

:param user: the user to add QBT rows for
:param research_study_id: the research study being processed
:param invalidate_existing: set true to wipe any current rows first

A user may be eligible for any number of research studies. QBT treats
each (user, research_study) independently, as should clients.

"""
def attempt_update(user_id, research_study_id, invalidate_existing):
def attempt_update(user_id, research_study_id):
"""Updates user's QBT or raises if lock is unattainable"""
from .qb_status import patient_research_study_status
from ..tasks import LOW_PRIORITY, cache_single_patient_adherence_data
Expand All @@ -886,18 +895,6 @@ def attempt_update(user_id, research_study_id, invalidate_existing):
user_id, research_study_id)

with TimeoutLock(key=key, timeout=timeout):
if invalidate_existing:
QBT.query.filter(QBT.user_id == user_id).filter(
QBT.research_study_id == research_study_id).delete()
adh_data = AdherenceData.query.filter(
AdherenceData.patient_id == user_id).filter(
AdherenceData.rs_id_visit.like(f"{research_study_id}:%")
)
# SQL alchemy can't combine `like` expression with delete op.
for ad in adh_data:
db.session.delete(ad)
db.session.commit()

# if any rows are found, assume this user/study is current
if QBT.query.filter(QBT.user_id == user_id).filter(
QBT.research_study_id == research_study_id).count():
Expand Down Expand Up @@ -1206,10 +1203,7 @@ def attempt_update(user_id, research_study_id, invalidate_existing):
success = False
for attempt in range(1, 6):
try:
attempt_update(
user_id=user_id,
research_study_id=research_study_id,
invalidate_existing=invalidate_existing)
attempt_update(user_id=user_id, research_study_id=research_study_id)
success = True
break
except ConnectionError as ce:
Expand Down
105 changes: 26 additions & 79 deletions portal/models/questionnaire_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from flask import current_app, has_request_context, url_for
from flask_swagger import swagger
import jsonschema
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import ENUM, JSONB
from sqlalchemy.orm.exc import MultipleResultsFound

Expand All @@ -31,8 +30,8 @@
trigger_date,
visit_name,
)
from .research_data import ResearchData
from .research_study import EMPRO_RS_ID, research_study_id_from_questionnaire
from .reference import Reference
from .user import User, current_user, patients_query
from .user_consent import consent_withdrawal_dates

Expand Down Expand Up @@ -356,8 +355,7 @@ def quote_double_quote(value):
questionnaire_map = questionnaire.questionnaire_code_map()

for question in document.get('group', {}).get('question', ()):

combined_answers = consolidate_answer_pairs(question['answer'])
combined_answers = consolidate_answer_pairs(question.get('answer', ()))

# Separate out text and coded answer, then override text
text_and_coded_answers = []
Expand All @@ -372,7 +370,8 @@ def quote_double_quote(value):
answer['valueCoding'].get('text')
)

text_and_coded_answers.append({'valueString': text_answer})
if text_answer is not None:
Copy link
Collaborator Author

@pbugni pbugni Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivan-c this change was necessary to get tests to pass, as some results after posting an update to a questionnaire response, would show valueString: None. i can't imagine it would ever be worth saving a valueString with a value of None in a questionnaire response, would it?

those were showing up as differences in the following test:

 py.test tests/test_assessment_engine.py::TestAssessmentEngine::test_update_assessment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't imagine it would ever be worth saving a valueString with a value of None in a questionnaire response, would it?

probably not- None/null isn't the same type as valueString anyway

text_and_coded_answers.append({'valueString': text_answer})
elif 'valueString' in answer and '"' in answer['valueString']:
answer['valueString'] = quote_double_quote(answer['valueString'])

Expand Down Expand Up @@ -844,15 +843,14 @@ def required_qs(self, qb_id):

def aggregate_responses(
instrument_ids, current_user, research_study_id, patch_dstu2=False,
ignore_qb_requirement=False, celery_task=None, patient_ids=None):
celery_task=None, patient_ids=None):
"""Build a bundle of QuestionnaireResponses

:param instrument_ids: list of instrument_ids to restrict results to
:param current_user: user making request, necessary to restrict results
to list of patients the current_user has permission to see
:param research_study_id: study being processed
:param patch_dstu2: set to make bundle DSTU2 compliant
:param ignore_qb_requirement: set to include all questionnaire responses
:param celery_task: if defined, send occasional progress updates
:param patient_ids: if defined, limit result set to given patient list

Expand All @@ -861,89 +859,38 @@ def aggregate_responses(
"""
from .qb_timeline import qb_status_visit_name # avoid cycle

if celery_task:
celery_task.update_state(
state='PROGRESS',
meta={'current': 1, 'total': 100})

# Gather up the patient IDs for whom current user has 'view' permission
user_ids = patients_query(
current_user,
include_test_role=False,
filter_by_ids=patient_ids,
).with_entities(User.id)

annotated_questionnaire_responses = []
questionnaire_responses = QuestionnaireResponse.query.filter(
QuestionnaireResponse.subject_id.in_(user_ids)).order_by(
QuestionnaireResponse.document['authored'].desc())

# TN-3250, don't include QNRs without assigned visits, i.e. qb_id > 0
if not ignore_qb_requirement:
questionnaire_responses = questionnaire_responses.filter(
QuestionnaireResponse.questionnaire_bank_id > 0)

if instrument_ids:
instrument_filters = (
QuestionnaireResponse.document[
("questionnaire", "reference")
].astext.endswith(instrument_id)
for instrument_id in instrument_ids
)
questionnaire_responses = questionnaire_responses.filter(
or_(*instrument_filters))

patient_fields = ("careProvider", "identifier")
system_filter = current_app.config.get('REPORTING_IDENTIFIER_SYSTEMS')
if celery_task:
current, total = 0, questionnaire_responses.count()

for questionnaire_response in questionnaire_responses:
document = questionnaire_response.document_answered.copy()
subject = questionnaire_response.subject
encounter = questionnaire_response.encounter
encounter_fhir = encounter.as_fhir()
document["encounter"] = encounter_fhir
celery_task.update_state(
state='PROGRESS',
meta={'current': 10, 'total': 100})

document["subject"] = {
k: v for k, v in subject.as_fhir().items() if k in patient_fields
}
query = ResearchData.query.filter(
ResearchData.subject_id.in_(user_ids)).order_by(
ResearchData.authored.desc(), ResearchData.subject_id).with_entities(
ResearchData.data)

if subject.organizations:
providers = []
for org in subject.organizations:
org_ref = Reference.organization(org.id).as_fhir()
identifiers = [i.as_fhir() for i in org.identifiers if
i.system in system_filter]
if identifiers:
org_ref['identifier'] = identifiers
providers.append(org_ref)
document["subject"]["careProvider"] = providers

qb_status = qb_status_visit_name(
subject.id,
research_study_id,
FHIR_datetime.parse(questionnaire_response.document['authored']))
document["timepoint"] = qb_status['visit_name']

# Hack: add missing "resource" wrapper for DTSU2 compliance
# Remove when all interventions compliant
if patch_dstu2:
document = {
'resource': document,
# Todo: return URL to individual QuestionnaireResponse resource
'fullUrl': url_for(
'assessment_engine_api.assessment',
patient_id=subject.id,
_external=True,
),
}

annotated_questionnaire_responses.append(document)

if celery_task:
current += 1
if current % 25 == 0:
celery_task.update_state(
state='PROGRESS',
meta={'current': current, 'total': total})

return bundle_results(elements=annotated_questionnaire_responses)
if instrument_ids:
query = query.filter(ResearchData.instrument.in_(tuple(instrument_ids)))
if celery_task:
# the delay is now a single big query, and then bundling - mark near done
celery_task.update_state(
state='PROGRESS',
meta={'current': 70, 'total': 100})
elements = [i.data for i in query.all()]
return bundle_results(elements=elements)


def qnr_document_id(
Expand Down
5 changes: 1 addition & 4 deletions portal/models/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,7 @@ def patient_generator():

def research_report(
instrument_ids, research_study_id, acting_user_id, patch_dstu2,
request_url, response_format, lock_key, ignore_qb_requirement,
celery_task):
request_url, response_format, lock_key, celery_task):
"""Generates the research report

Designed to be executed in a background task - all inputs and outputs are
Expand All @@ -479,7 +478,6 @@ def research_report(
:param request_url: original request url, for inclusion in FHIR bundle
:param response_format: 'json' or 'csv'
:param lock_key: name of TimeoutLock key used to throttle requests
:param ignore_qb_requirement: Set to include all questionnaire responses
:param celery_task: used to update status when run as a celery task
:return: dictionary of results, easily stored as a task output, including
any details needed to assist the view method
Expand All @@ -494,7 +492,6 @@ def research_report(
research_study_id=research_study_id,
current_user=acting_user,
patch_dstu2=patch_dstu2,
ignore_qb_requirement=ignore_qb_requirement,
celery_task=celery_task
)
bundle.update({
Expand Down
Loading