Skip to content

Commit

Permalink
Merge pull request #4602 from broadinstitute/reload-new-sample-famili…
Browse files Browse the repository at this point in the history
…es-only

Reload saved variants for families with new sample only
  • Loading branch information
hanars authored Jan 22, 2025
2 parents a5a8c9c + 0037889 commit cc9e7f8
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 57 deletions.
7 changes: 6 additions & 1 deletion hail_search/queries/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,12 @@ def _parse_intervals(self, intervals, gene_ids=None, variant_keys=None, variant_
return intervals

if variant_ids:
intervals = [(chrom, pos, pos+1) for chrom, pos, _, _ in variant_ids]
first_chrom = variant_ids[0][0]
if all(first_chrom == v[0] for v in variant_ids):
positions = [pos for _, pos, _, _ in variant_ids]
intervals = [(first_chrom, min(positions), max(positions) + 1)]
else:
intervals = [(chrom, pos, pos+1) for chrom, pos, _, _ in variant_ids]

is_x_linked = self._inheritance_mode == X_LINKED_RECESSIVE
if not (intervals or is_x_linked):
Expand Down
42 changes: 24 additions & 18 deletions seqr/management/commands/check_for_new_samples_from_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from django.contrib.postgres.aggregates import ArrayAgg
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Q
from django.db.models.functions import JSONObject
import json
import logging
import re
Expand Down Expand Up @@ -186,7 +184,7 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers

sample_type = metadata['sample_type']
logger.info(f'Loading {len(sample_project_tuples)} {sample_type} {dataset_type} samples in {len(samples_by_project)} projects')
updated_samples, inactivated_sample_guids, *args = match_and_update_search_samples(
updated_samples, new_samples, *args = match_and_update_search_samples(
projects=samples_by_project.keys(),
sample_project_tuples=sample_project_tuples,
sample_data={'data_source': run_version, 'elasticsearch_index': ';'.join(metadata['callsets'])},
Expand All @@ -196,9 +194,9 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers
)

# Send loading notifications and update Airtable PDOs
update_sample_data_by_project = {
s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate(
samples=ArrayAgg(JSONObject(sample_id='sample_id', individual_id='individual_id')),
new_sample_data_by_project = {
s['individual__family__project']: s for s in updated_samples.filter(id__in=new_samples).values('individual__family__project').annotate(
samples=ArrayAgg('sample_id', distinct=True),
family_guids=ArrayAgg('individual__family__guid', distinct=True),
)
}
Expand All @@ -207,15 +205,16 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers
split_project_pdos = {}
session = AirtableSession(user=None, no_auth=True)
for project, sample_ids in samples_by_project.items():
project_sample_data = update_sample_data_by_project[project.id]
project_sample_data = new_sample_data_by_project[project.id]
is_internal = not project_has_anvil(project) or is_internal_anvil_project(project)
notify_search_data_loaded(
project, is_internal, dataset_type, sample_type, inactivated_sample_guids,
updated_samples=project_sample_data['samples'], num_samples=len(sample_ids),
project, is_internal, dataset_type, sample_type, project_sample_data['samples'],
num_samples=len(sample_ids),
)
project_families = project_sample_data['family_guids']
updated_families.update(project_families)
updated_project_families.append((project.id, project.name, project.genome_version, project_families))
if project_families:
updated_families.update(project_families)
updated_project_families.append((project.id, project.name, project.genome_version, project_families))
if is_internal and dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS:
split_project_pdos[project.name] = cls._update_pdos(session, project.guid, sample_ids)

Expand Down Expand Up @@ -337,14 +336,21 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian
variant_id: {k: v for k, v in variant.items() if k not in {'familyGuids', 'genotypes'}}
for variant_id, variant in (updated_variants_by_id or {}).items()
}
fetch_variant_ids = sorted(set(variants_by_id.keys()) - set(updated_variants_by_id.keys()))
fetch_variant_ids = set(variants_by_id.keys()) - set(updated_variants_by_id.keys())
if fetch_variant_ids:
if not is_sv:
fetch_variant_ids = [parse_valid_variant_id(variant_id) for variant_id in fetch_variant_ids]
for i in range(0, len(fetch_variant_ids), MAX_LOOKUP_VARIANTS):
updated_variants = hail_variant_multi_lookup(USER_EMAIL, fetch_variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version)
logger.info(f'Fetched {len(updated_variants)} additional variants')
updated_variants_by_id.update({variant['variantId']: variant for variant in updated_variants})
if is_sv:
variant_ids_by_chrom = {'all': fetch_variant_ids}
else:
variant_ids_by_chrom = defaultdict(list)
for variant_id in fetch_variant_ids:
parsed_id = parse_valid_variant_id(variant_id)
variant_ids_by_chrom[parsed_id[0]].append(parsed_id)
for chrom, variant_ids in sorted(variant_ids_by_chrom.items()):
variant_ids = sorted(variant_ids)
for i in range(0, len(variant_ids), MAX_LOOKUP_VARIANTS):
updated_variants = hail_variant_multi_lookup(USER_EMAIL, variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version)
logger.info(f'Fetched {len(updated_variants)} additional variants in chromosome {chrom}')
updated_variants_by_id.update({variant['variantId']: variant for variant in updated_variants})

updated_variant_models = []
for variant_id, variant in updated_variants_by_id.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo
{'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'},
]}},
], reload_annotations_logs=[
'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants', 'Fetched 1 additional variants', 'Updated 2 SNV_INDEL GRCh38 saved variants',
'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants in chromosome 1', 'Fetched 1 additional variants in chromosome 1', 'Updated 2 SNV_INDEL GRCh38 saved variants',
'No additional SV_WES GRCh38 saved variants to update',
], run_loading_logs={
'GRCh38/SNV_INDEL': 'Loading 4 WES SNV_INDEL samples in 2 projects',
Expand Down Expand Up @@ -559,10 +559,10 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo
self.assertEqual(annotation_updated_json['mainTranscriptId'], 'ENST00000505820')
self.assertEqual(len(annotation_updated_json['genotypes']), 3)

self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request')
self.mock_utils_logger.error.assert_called_with('Error reloading variants in Test Reprocessed Project: Bad Request')
self.mock_utils_logger.info.assert_has_calls([
mock.call('Updated 0 variants for project Test Reprocessed Project'),
mock.call('Updated 1 variants for project Non-Analyst Project'),
mock.call('Updated 0 variants in 1 families for project Test Reprocessed Project'),
mock.call('Updated 1 variants in 1 families for project Non-Analyst Project'),
mock.call('Reload Summary: '),
mock.call(' Non-Analyst Project: Updated 1 variants'),
mock.call('Reloading saved variants in 2 projects'),
Expand Down
25 changes: 16 additions & 9 deletions seqr/management/tests/reload_saved_variant_annotations_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,24 @@ def test_command(self, mock_logger):

mock_logger.info.assert_has_calls([mock.call(log) for log in [
'Reloading shared annotations for 3 SNV_INDEL GRCh37 saved variants (3 unique)',
'Fetched 2 additional variants',
'Fetched 2 additional variants in chromosome 1',
'Fetched 2 additional variants in chromosome 21',
'Updated 2 SNV_INDEL GRCh37 saved variants',
]])

self.assertEqual(len(responses.calls), 1)
multi_lookup_request = responses.calls[0].request
self.assertEqual(multi_lookup_request.url, f'{MOCK_HAIL_ORIGIN}:5000/multi_lookup')
self.assertEqual(multi_lookup_request.headers.get('From'), 'manage_command')
self.assertDictEqual(json.loads(multi_lookup_request.body), {
self.assertEqual(len(responses.calls), 2)
for call in responses.calls:
self.assertEqual(call.request.url, f'{MOCK_HAIL_ORIGIN}:5000/multi_lookup')
self.assertEqual(call.request.headers.get('From'), 'manage_command')
self.assertDictEqual(json.loads(responses.calls[0].request.body), {
'genome_version': 'GRCh37',
'data_type': 'SNV_INDEL',
'variant_ids': [['1', 248367227, 'TC', 'T'], ['1', 46859832, 'G', 'A'], ['21', 3343353, 'GAGA', 'G']],
'variant_ids': [['1', 46859832, 'G', 'A'], ['1', 248367227, 'TC', 'T']],
})
self.assertDictEqual(json.loads(responses.calls[1].request.body), {
'genome_version': 'GRCh37',
'data_type': 'SNV_INDEL',
'variant_ids': [['21', 3343353, 'GAGA', 'G']],
})

annotation_updated_json_1 = SavedVariant.objects.get(guid='SV0000002_1248367227_r0390_100').saved_variant_json
Expand All @@ -66,11 +72,12 @@ def test_command(self, mock_logger):
self.assertEqual(len(annotation_updated_json_2['genotypes']), 3)

# Test SVs
responses.calls.reset()
Sample.objects.filter(guid='S000147_na21234').update(individual_id=20)
call_command('reload_saved_variant_annotations', 'SV_WGS', 'GRCh37')

self.assertEqual(len(responses.calls), 2)
self.assertDictEqual(json.loads(responses.calls[1].request.body), {
self.assertEqual(len(responses.calls), 1)
self.assertDictEqual(json.loads(responses.calls[0].request.body), {
'genome_version': 'GRCh37',
'data_type': 'SV_WGS',
'variant_ids': ['prefix_19107_DEL'],
Expand Down
4 changes: 2 additions & 2 deletions seqr/management/tests/reload_saved_variant_json_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_with_param_command(self, mock_get_variants, mock_logger):
[family_1], ['1-46859832-G-A','21-3343353-GAGA-G'], user=None, user_email='manage_command')

logger_info_calls = [
mock.call('Updated 2 variants for project 1kg project n\xe5me with uni\xe7\xf8de'),
mock.call('Updated 2 variants in 1 families for project 1kg project n\xe5me with uni\xe7\xf8de'),
mock.call('Reload Summary: '),
mock.call(' 1kg project n\xe5me with uni\xe7\xf8de: Updated 2 variants')
]
Expand Down Expand Up @@ -81,4 +81,4 @@ def test_with_param_command(self, mock_get_variants, mock_logger):
]
mock_logger.info.assert_has_calls(logger_info_calls)

mock_logger.error.assert_called_with('Error in project 1kg project n\xe5me with uni\xe7\xf8de: Database error.')
mock_logger.error.assert_called_with('Error reloading variants in 1kg project n\xe5me with uni\xe7\xf8de: Database error.')
36 changes: 16 additions & 20 deletions seqr/utils/search/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte
request_json['mappingFilePath'], user) if request_json.get('mappingFilePath') else {}
ignore_extra_samples = request_json.get('ignoreExtraSamplesInCallset')
sample_project_tuples = [(sample_id, project.name) for sample_id in sample_ids]
updated_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples(
updated_samples, new_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples(
projects=[project],
user=user,
sample_project_tuples=sample_project_tuples,
Expand All @@ -52,45 +52,41 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte
)

if notify:
updated_sample_data = updated_samples.values('sample_id', 'individual_id')
_basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_sample_data)
_basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples.values())

return inactivated_sample_guids, updated_family_guids, updated_samples


def _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=None, slack_channel=None, include_slack_detail=False):
previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True))
new_sample_ids = [sample['sample_id'] for sample in updated_samples if sample['individual_id'] not in previous_loaded_individuals]

def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples, email_template=None, slack_channel=None, include_slack_detail=False):
msg_dataset_type = '' if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS else f' {dataset_type}'
num_new_samples = len(new_sample_ids)
num_new_samples = len(new_samples)
sample_summary = f'{num_new_samples} new {sample_type}{msg_dataset_type} samples'

return send_project_notification(
project,
notification=sample_summary,
email_template=format_email(num_new_samples) if format_email else None,
email_template=email_template,
subject='New data available in seqr',
slack_channel=slack_channel,
slack_detail=', '.join(sorted(new_sample_ids)) if include_slack_detail else None,
slack_detail=', '.join(sorted(new_samples)) if include_slack_detail else None,
)


def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, inactivated_sample_guids, updated_samples, num_samples):
def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, new_samples, num_samples):
if is_internal:
format_email = None
email_template = None
else:
workspace_name = f'{project.workspace_namespace}/{project.workspace_name}'
def format_email(num_new_samples):
reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else ''
return '\n'.join([
f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.',
f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace <a href={ANVIL_UI_URL}#workspaces/{workspace_name}>{workspace_name}</a> to the corresponding seqr project {{project_link}}.',
'Let us know if you have any questions.',
])
num_new_samples = len(new_samples)
reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else ''
email_template = '\n'.join([
f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.',
f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace <a href={ANVIL_UI_URL}#workspaces/{workspace_name}>{workspace_name}</a> to the corresponding seqr project {{project_link}}.',
'Let us know if you have any questions.',
])

url = _basic_notify_search_data_loaded(
project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=format_email,
project, dataset_type, sample_type, new_samples, email_template=email_template,
slack_channel=SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL if is_internal else SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL,
include_slack_detail=is_internal,
)
Expand Down
5 changes: 4 additions & 1 deletion seqr/views/utils/dataset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ def match_and_update_search_samples(
Family.bulk_update(
user, {'analysis_status': Family.ANALYSIS_STATUS_ANALYSIS_IN_PROGRESS}, guid__in=family_guids_to_update)

return updated_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update
previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True))
new_samples = dict(updated_samples.exclude(individual_id__in=previous_loaded_individuals).values_list('id', 'sample_id'))

return updated_samples, new_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update


def _parse_tsv_row(row):
Expand Down
5 changes: 3 additions & 2 deletions seqr/views/utils/variant_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ def update_projects_saved_variant_json(projects, user_email, **kwargs):
skipped[project_name] = True
else:
success[project_name] = len(updated_saved_variants)
logger.info(f'Updated {len(updated_saved_variants)} variants for project {project_name}')
family_summary = f' in {len(family_guids)} families' if family_guids else ''
logger.info(f'Updated {len(updated_saved_variants)} variants{family_summary} for project {project_name}')
updated_variants_by_id.update({v.variant_id: v.saved_variant_json for v in updated_saved_variants.values()})
except Exception as e:
traceback_message = traceback.format_exc()
logger.error(traceback_message)
logger.error(f'Error in project {project_name}: {e}')
logger.error(f'Error reloading variants in {project_name}: {e}')
error[project_name] = e

logger.info('Reload Summary: ')
Expand Down

0 comments on commit cc9e7f8

Please sign in to comment.