Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reload saved variants for families with new sample only #4602

Merged
merged 13 commits into from
Jan 22, 2025
7 changes: 6 additions & 1 deletion hail_search/queries/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,12 @@ def _parse_intervals(self, intervals, gene_ids=None, variant_keys=None, variant_
return intervals

if variant_ids:
intervals = [(chrom, pos, pos+1) for chrom, pos, _, _ in variant_ids]
first_chrom = variant_ids[0][0]
if all(first_chrom == v[0] for v in variant_ids):
positions = [pos for _, pos, _, _ in variant_ids]
intervals = [(first_chrom, min(positions), max(positions) + 1)]
else:
intervals = [(chrom, pos, pos+1) for chrom, pos, _, _ in variant_ids]

is_x_linked = self._inheritance_mode == X_LINKED_RECESSIVE
if not (intervals or is_x_linked):
Expand Down
42 changes: 24 additions & 18 deletions seqr/management/commands/check_for_new_samples_from_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from django.contrib.postgres.aggregates import ArrayAgg
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Q
from django.db.models.functions import JSONObject
import json
import logging
import re
Expand Down Expand Up @@ -186,7 +184,7 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers

sample_type = metadata['sample_type']
logger.info(f'Loading {len(sample_project_tuples)} {sample_type} {dataset_type} samples in {len(samples_by_project)} projects')
updated_samples, inactivated_sample_guids, *args = match_and_update_search_samples(
updated_samples, new_samples, *args = match_and_update_search_samples(
projects=samples_by_project.keys(),
sample_project_tuples=sample_project_tuples,
sample_data={'data_source': run_version, 'elasticsearch_index': ';'.join(metadata['callsets'])},
Expand All @@ -196,9 +194,9 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers
)

# Send loading notifications and update Airtable PDOs
update_sample_data_by_project = {
s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate(
samples=ArrayAgg(JSONObject(sample_id='sample_id', individual_id='individual_id')),
new_sample_data_by_project = {
s['individual__family__project']: s for s in updated_samples.filter(id__in=new_samples).values('individual__family__project').annotate(
samples=ArrayAgg('sample_id', distinct=True),
family_guids=ArrayAgg('individual__family__guid', distinct=True),
)
}
Expand All @@ -207,15 +205,16 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers
split_project_pdos = {}
session = AirtableSession(user=None, no_auth=True)
for project, sample_ids in samples_by_project.items():
project_sample_data = update_sample_data_by_project[project.id]
project_sample_data = new_sample_data_by_project[project.id]
is_internal = not project_has_anvil(project) or is_internal_anvil_project(project)
notify_search_data_loaded(
project, is_internal, dataset_type, sample_type, inactivated_sample_guids,
updated_samples=project_sample_data['samples'], num_samples=len(sample_ids),
project, is_internal, dataset_type, sample_type, project_sample_data['samples'],
num_samples=len(sample_ids),
)
project_families = project_sample_data['family_guids']
updated_families.update(project_families)
updated_project_families.append((project.id, project.name, project.genome_version, project_families))
if project_families:
updated_families.update(project_families)
updated_project_families.append((project.id, project.name, project.genome_version, project_families))
if is_internal and dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS:
split_project_pdos[project.name] = cls._update_pdos(session, project.guid, sample_ids)

Expand Down Expand Up @@ -337,14 +336,21 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian
variant_id: {k: v for k, v in variant.items() if k not in {'familyGuids', 'genotypes'}}
for variant_id, variant in (updated_variants_by_id or {}).items()
}
fetch_variant_ids = sorted(set(variants_by_id.keys()) - set(updated_variants_by_id.keys()))
fetch_variant_ids = set(variants_by_id.keys()) - set(updated_variants_by_id.keys())
if fetch_variant_ids:
if not is_sv:
fetch_variant_ids = [parse_valid_variant_id(variant_id) for variant_id in fetch_variant_ids]
for i in range(0, len(fetch_variant_ids), MAX_LOOKUP_VARIANTS):
updated_variants = hail_variant_multi_lookup(USER_EMAIL, fetch_variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version)
logger.info(f'Fetched {len(updated_variants)} additional variants')
updated_variants_by_id.update({variant['variantId']: variant for variant in updated_variants})
if is_sv:
variant_ids_by_chrom = {'all': fetch_variant_ids}
else:
variant_ids_by_chrom = defaultdict(list)
for variant_id in fetch_variant_ids:
parsed_id = parse_valid_variant_id(variant_id)
variant_ids_by_chrom[parsed_id[0]].append(parsed_id)
for chrom, variant_ids in sorted(variant_ids_by_chrom.items()):
variant_ids = sorted(variant_ids)
for i in range(0, len(variant_ids), MAX_LOOKUP_VARIANTS):
updated_variants = hail_variant_multi_lookup(USER_EMAIL, variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version)
logger.info(f'Fetched {len(updated_variants)} additional variants in chromosome {chrom}')
updated_variants_by_id.update({variant['variantId']: variant for variant in updated_variants})

updated_variant_models = []
for variant_id, variant in updated_variants_by_id.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo
{'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'},
]}},
], reload_annotations_logs=[
'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants', 'Fetched 1 additional variants', 'Updated 2 SNV_INDEL GRCh38 saved variants',
'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants in chromosome 1', 'Fetched 1 additional variants in chromosome 1', 'Updated 2 SNV_INDEL GRCh38 saved variants',
'No additional SV_WES GRCh38 saved variants to update',
], run_loading_logs={
'GRCh38/SNV_INDEL': 'Loading 4 WES SNV_INDEL samples in 2 projects',
Expand Down Expand Up @@ -559,10 +559,10 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo
self.assertEqual(annotation_updated_json['mainTranscriptId'], 'ENST00000505820')
self.assertEqual(len(annotation_updated_json['genotypes']), 3)

self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request')
self.mock_utils_logger.error.assert_called_with('Error reloading variants in Test Reprocessed Project: Bad Request')
self.mock_utils_logger.info.assert_has_calls([
mock.call('Updated 0 variants for project Test Reprocessed Project'),
mock.call('Updated 1 variants for project Non-Analyst Project'),
mock.call('Updated 0 variants in 1 families for project Test Reprocessed Project'),
mock.call('Updated 1 variants in 1 families for project Non-Analyst Project'),
mock.call('Reload Summary: '),
mock.call(' Non-Analyst Project: Updated 1 variants'),
mock.call('Reloading saved variants in 2 projects'),
Expand Down
25 changes: 16 additions & 9 deletions seqr/management/tests/reload_saved_variant_annotations_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,24 @@ def test_command(self, mock_logger):

mock_logger.info.assert_has_calls([mock.call(log) for log in [
'Reloading shared annotations for 3 SNV_INDEL GRCh37 saved variants (3 unique)',
'Fetched 2 additional variants',
'Fetched 2 additional variants in chromosome 1',
'Fetched 2 additional variants in chromosome 21',
'Updated 2 SNV_INDEL GRCh37 saved variants',
]])

self.assertEqual(len(responses.calls), 1)
multi_lookup_request = responses.calls[0].request
self.assertEqual(multi_lookup_request.url, f'{MOCK_HAIL_ORIGIN}:5000/multi_lookup')
self.assertEqual(multi_lookup_request.headers.get('From'), 'manage_command')
self.assertDictEqual(json.loads(multi_lookup_request.body), {
self.assertEqual(len(responses.calls), 2)
for call in responses.calls:
self.assertEqual(call.request.url, f'{MOCK_HAIL_ORIGIN}:5000/multi_lookup')
self.assertEqual(call.request.headers.get('From'), 'manage_command')
self.assertDictEqual(json.loads(responses.calls[0].request.body), {
'genome_version': 'GRCh37',
'data_type': 'SNV_INDEL',
'variant_ids': [['1', 248367227, 'TC', 'T'], ['1', 46859832, 'G', 'A'], ['21', 3343353, 'GAGA', 'G']],
'variant_ids': [['1', 46859832, 'G', 'A'], ['1', 248367227, 'TC', 'T']],
})
self.assertDictEqual(json.loads(responses.calls[1].request.body), {
'genome_version': 'GRCh37',
'data_type': 'SNV_INDEL',
'variant_ids': [['21', 3343353, 'GAGA', 'G']],
})

annotation_updated_json_1 = SavedVariant.objects.get(guid='SV0000002_1248367227_r0390_100').saved_variant_json
Expand All @@ -66,11 +72,12 @@ def test_command(self, mock_logger):
self.assertEqual(len(annotation_updated_json_2['genotypes']), 3)

# Test SVs
responses.calls.reset()
Sample.objects.filter(guid='S000147_na21234').update(individual_id=20)
call_command('reload_saved_variant_annotations', 'SV_WGS', 'GRCh37')

self.assertEqual(len(responses.calls), 2)
self.assertDictEqual(json.loads(responses.calls[1].request.body), {
self.assertEqual(len(responses.calls), 1)
self.assertDictEqual(json.loads(responses.calls[0].request.body), {
'genome_version': 'GRCh37',
'data_type': 'SV_WGS',
'variant_ids': ['prefix_19107_DEL'],
Expand Down
4 changes: 2 additions & 2 deletions seqr/management/tests/reload_saved_variant_json_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_with_param_command(self, mock_get_variants, mock_logger):
[family_1], ['1-46859832-G-A','21-3343353-GAGA-G'], user=None, user_email='manage_command')

logger_info_calls = [
mock.call('Updated 2 variants for project 1kg project n\xe5me with uni\xe7\xf8de'),
mock.call('Updated 2 variants in 1 families for project 1kg project n\xe5me with uni\xe7\xf8de'),
mock.call('Reload Summary: '),
mock.call(' 1kg project n\xe5me with uni\xe7\xf8de: Updated 2 variants')
]
Expand Down Expand Up @@ -81,4 +81,4 @@ def test_with_param_command(self, mock_get_variants, mock_logger):
]
mock_logger.info.assert_has_calls(logger_info_calls)

mock_logger.error.assert_called_with('Error in project 1kg project n\xe5me with uni\xe7\xf8de: Database error.')
mock_logger.error.assert_called_with('Error reloading variants in 1kg project n\xe5me with uni\xe7\xf8de: Database error.')
36 changes: 16 additions & 20 deletions seqr/utils/search/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte
request_json['mappingFilePath'], user) if request_json.get('mappingFilePath') else {}
ignore_extra_samples = request_json.get('ignoreExtraSamplesInCallset')
sample_project_tuples = [(sample_id, project.name) for sample_id in sample_ids]
updated_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples(
updated_samples, new_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples(
projects=[project],
user=user,
sample_project_tuples=sample_project_tuples,
Expand All @@ -52,45 +52,41 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte
)

if notify:
updated_sample_data = updated_samples.values('sample_id', 'individual_id')
_basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_sample_data)
_basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples.values())

return inactivated_sample_guids, updated_family_guids, updated_samples


def _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=None, slack_channel=None, include_slack_detail=False):
previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True))
new_sample_ids = [sample['sample_id'] for sample in updated_samples if sample['individual_id'] not in previous_loaded_individuals]

def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples, email_template=None, slack_channel=None, include_slack_detail=False):
msg_dataset_type = '' if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS else f' {dataset_type}'
num_new_samples = len(new_sample_ids)
num_new_samples = len(new_samples)
sample_summary = f'{num_new_samples} new {sample_type}{msg_dataset_type} samples'

return send_project_notification(
project,
notification=sample_summary,
email_template=format_email(num_new_samples) if format_email else None,
email_template=email_template,
subject='New data available in seqr',
slack_channel=slack_channel,
slack_detail=', '.join(sorted(new_sample_ids)) if include_slack_detail else None,
slack_detail=', '.join(sorted(new_samples)) if include_slack_detail else None,
)


def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, inactivated_sample_guids, updated_samples, num_samples):
def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, new_samples, num_samples):
if is_internal:
format_email = None
email_template = None
else:
workspace_name = f'{project.workspace_namespace}/{project.workspace_name}'
def format_email(num_new_samples):
reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else ''
return '\n'.join([
f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.',
f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace <a href={ANVIL_UI_URL}#workspaces/{workspace_name}>{workspace_name}</a> to the corresponding seqr project {{project_link}}.',
'Let us know if you have any questions.',
])
num_new_samples = len(new_samples)
reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else ''
email_template = '\n'.join([
f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.',
f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace <a href={ANVIL_UI_URL}#workspaces/{workspace_name}>{workspace_name}</a> to the corresponding seqr project {{project_link}}.',
'Let us know if you have any questions.',
])

url = _basic_notify_search_data_loaded(
project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=format_email,
project, dataset_type, sample_type, new_samples, email_template=email_template,
slack_channel=SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL if is_internal else SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL,
include_slack_detail=is_internal,
)
Expand Down
5 changes: 4 additions & 1 deletion seqr/views/utils/dataset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ def match_and_update_search_samples(
Family.bulk_update(
user, {'analysis_status': Family.ANALYSIS_STATUS_ANALYSIS_IN_PROGRESS}, guid__in=family_guids_to_update)

return updated_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update
previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True))
new_samples = dict(updated_samples.exclude(individual_id__in=previous_loaded_individuals).values_list('id', 'sample_id'))

return updated_samples, new_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update


def _parse_tsv_row(row):
Expand Down
5 changes: 3 additions & 2 deletions seqr/views/utils/variant_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ def update_projects_saved_variant_json(projects, user_email, **kwargs):
skipped[project_name] = True
else:
success[project_name] = len(updated_saved_variants)
logger.info(f'Updated {len(updated_saved_variants)} variants for project {project_name}')
family_summary = f' in {len(family_guids)} families' if family_guids else ''
logger.info(f'Updated {len(updated_saved_variants)} variants{family_summary} for project {project_name}')
updated_variants_by_id.update({v.variant_id: v.saved_variant_json for v in updated_saved_variants.values()})
except Exception as e:
traceback_message = traceback.format_exc()
logger.error(traceback_message)
logger.error(f'Error in project {project_name}: {e}')
logger.error(f'Error reloading variants in {project_name}: {e}')
error[project_name] = e

logger.info('Reload Summary: ')
Expand Down
Loading