From deb5298879e715135dc1f9304460fb746ef4f433 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Fri, 17 Jan 2025 14:47:27 -0500 Subject: [PATCH 01/10] move up new sample logic --- .../check_for_new_samples_from_pipeline.py | 13 ++++++------ seqr/utils/search/add_data_utils.py | 20 ++++++++----------- seqr/views/utils/dataset_utils.py | 5 ++++- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 8e045ebaa8..55546c64fd 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -1,8 +1,6 @@ from collections import defaultdict from django.contrib.postgres.aggregates import ArrayAgg from django.core.management.base import BaseCommand, CommandError -from django.db.models import Q -from django.db.models.functions import JSONObject import json import logging import re @@ -130,7 +128,7 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers sample_type = metadata['sample_type'] logger.info(f'Loading {len(sample_project_tuples)} {sample_type} {dataset_type} samples in {len(samples_by_project)} projects') - updated_samples, inactivated_sample_guids, *args = match_and_update_search_samples( + updated_samples, new_samples, *args = match_and_update_search_samples( projects=samples_by_project.keys(), sample_project_tuples=sample_project_tuples, sample_data={'data_source': run_version, 'elasticsearch_index': ';'.join(metadata['callsets'])}, @@ -142,7 +140,7 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers # Send loading notifications and update Airtable PDOs update_sample_data_by_project = { s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate( - samples=ArrayAgg(JSONObject(sample_id='sample_id', individual_id='individual_id')), + sample_db_ids=ArrayAgg('id', distinct=True), family_guids=ArrayAgg('individual__family__guid', distinct=True), ) } @@ -152,10 +150,13 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers session = AirtableSession(user=None, no_auth=True) for project, sample_ids in samples_by_project.items(): project_sample_data = update_sample_data_by_project[project.id] + new_project_samples = [ + sample_id for db_id, sample_id in new_samples.items() if db_id in project_sample_data['sample_db_ids'] + ] is_internal = not project_has_anvil(project) or is_internal_anvil_project(project) notify_search_data_loaded( - project, is_internal, dataset_type, sample_type, inactivated_sample_guids, - updated_samples=project_sample_data['samples'], num_samples=len(sample_ids), + project, is_internal, dataset_type, sample_type, new_project_samples, + num_samples=len(sample_ids), ) project_families = project_sample_data['family_guids'] updated_families.update(project_families) diff --git a/seqr/utils/search/add_data_utils.py b/seqr/utils/search/add_data_utils.py index 805f176239..c1f1630558 100644 --- a/seqr/utils/search/add_data_utils.py +++ b/seqr/utils/search/add_data_utils.py @@ -39,7 +39,7 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte request_json['mappingFilePath'], user) if request_json.get('mappingFilePath') else {} ignore_extra_samples = request_json.get('ignoreExtraSamplesInCallset') sample_project_tuples = [(sample_id, project.name) for sample_id in sample_ids] - updated_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples( + updated_samples, new_samples, inactivated_sample_guids, num_skipped, updated_family_guids = match_and_update_search_samples( projects=[project], user=user, sample_project_tuples=sample_project_tuples, @@ -52,31 +52,27 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte ) if notify: - updated_sample_data = updated_samples.values('sample_id', 'individual_id') - _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_sample_data) + _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples.values()) return inactivated_sample_guids, updated_family_guids, updated_samples -def _basic_notify_search_data_loaded(project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=None, slack_channel=None, include_slack_detail=False): - previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True)) - new_sample_ids = [sample['sample_id'] for sample in updated_samples if sample['individual_id'] not in previous_loaded_individuals] - +def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples, format_email=None, slack_channel=None, include_slack_detail=False): msg_dataset_type = '' if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS else f' {dataset_type}' - num_new_samples = len(new_sample_ids) + num_new_samples = len(new_samples) sample_summary = f'{num_new_samples} new {sample_type}{msg_dataset_type} samples' return send_project_notification( project, notification=sample_summary, - email_template=format_email(num_new_samples) if format_email else None, + email_template=format_email(num_new_samples) if format_email else None, # TODO cleanup format email subject='New data available in seqr', slack_channel=slack_channel, - slack_detail=', '.join(sorted(new_sample_ids)) if include_slack_detail else None, + slack_detail=', '.join(sorted(new_samples)) if include_slack_detail else None, ) -def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, inactivated_sample_guids, updated_samples, num_samples): +def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, new_samples, num_samples): if is_internal: format_email = None else: @@ -90,7 +86,7 @@ def format_email(num_new_samples): ]) url = _basic_notify_search_data_loaded( - project, dataset_type, sample_type, inactivated_sample_guids, updated_samples, format_email=format_email, + project, dataset_type, sample_type, new_samples, format_email=format_email, slack_channel=SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL if is_internal else SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, include_slack_detail=is_internal, ) diff --git a/seqr/views/utils/dataset_utils.py b/seqr/views/utils/dataset_utils.py index f75e93eb00..42882f5dd4 100644 --- a/seqr/views/utils/dataset_utils.py +++ b/seqr/views/utils/dataset_utils.py @@ -222,7 +222,10 @@ def match_and_update_search_samples( Family.bulk_update( user, {'analysis_status': Family.ANALYSIS_STATUS_ANALYSIS_IN_PROGRESS}, guid__in=family_guids_to_update) - return updated_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update + previous_loaded_individuals = set(Sample.objects.filter(guid__in=inactivated_sample_guids).values_list('individual_id', flat=True)) + new_samples = dict(updated_samples.exclude(individual_id__in=previous_loaded_individuals).values_list('id', 'sample_id')) + + return updated_samples, new_samples, inactivated_sample_guids, len(remaining_sample_keys), family_guids_to_update def _parse_tsv_row(row): From 8f9e2872b6bfe303e3f9240c6756159b71458c60 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Fri, 17 Jan 2025 14:49:59 -0500 Subject: [PATCH 02/10] clean up email template --- seqr/utils/search/add_data_utils.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/seqr/utils/search/add_data_utils.py b/seqr/utils/search/add_data_utils.py index c1f1630558..d910cb23bf 100644 --- a/seqr/utils/search/add_data_utils.py +++ b/seqr/utils/search/add_data_utils.py @@ -57,7 +57,7 @@ def add_new_es_search_samples(request_json, project, user, notify=False, expecte return inactivated_sample_guids, updated_family_guids, updated_samples -def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples, format_email=None, slack_channel=None, include_slack_detail=False): +def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_samples, email_template=None, slack_channel=None, include_slack_detail=False): msg_dataset_type = '' if dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS else f' {dataset_type}' num_new_samples = len(new_samples) sample_summary = f'{num_new_samples} new {sample_type}{msg_dataset_type} samples' @@ -65,7 +65,7 @@ def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_sam return send_project_notification( project, notification=sample_summary, - email_template=format_email(num_new_samples) if format_email else None, # TODO cleanup format email + email_template=email_template, subject='New data available in seqr', slack_channel=slack_channel, slack_detail=', '.join(sorted(new_samples)) if include_slack_detail else None, @@ -74,19 +74,19 @@ def _basic_notify_search_data_loaded(project, dataset_type, sample_type, new_sam def notify_search_data_loaded(project, is_internal, dataset_type, sample_type, new_samples, num_samples): if is_internal: - format_email = None + email_template = None else: workspace_name = f'{project.workspace_namespace}/{project.workspace_name}' - def format_email(num_new_samples): - reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else '' - return '\n'.join([ - f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.', - f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace {workspace_name} to the corresponding seqr project {{project_link}}.', - 'Let us know if you have any questions.', - ]) + num_new_samples = len(new_samples) + reload_summary = f' and {num_samples - num_new_samples} re-loaded samples' if num_samples > num_new_samples else '' + email_template = '\n'.join([ + f'We are following up on the request to load data from AnVIL on {project.created_date.date().strftime("%B %d, %Y")}.', + f'We have loaded {{notification}}{reload_summary} from the AnVIL workspace {workspace_name} to the corresponding seqr project {{project_link}}.', + 'Let us know if you have any questions.', + ]) url = _basic_notify_search_data_loaded( - project, dataset_type, sample_type, new_samples, format_email=format_email, + project, dataset_type, sample_type, new_samples, email_template=email_template, slack_channel=SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL if is_internal else SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, include_slack_detail=is_internal, ) From 9d3c9fc9d6e349e8ac546aaedfd95255ddd52dc5 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Fri, 17 Jan 2025 15:09:45 -0500 Subject: [PATCH 03/10] better logging for family reloading --- .../tests/check_for_new_samples_from_pipeline_tests.py | 4 ++-- seqr/management/tests/reload_saved_variant_json_tests.py | 2 +- seqr/views/utils/variant_utils.py | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 4b162f9d27..9ba4a68580 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -561,8 +561,8 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') self.mock_utils_logger.info.assert_has_calls([ - mock.call('Updated 0 variants for project Test Reprocessed Project'), - mock.call('Updated 1 variants for project Non-Analyst Project'), + mock.call('Updated 0 variants in 2 families for project Test Reprocessed Project'), + mock.call('Updated 1 variants in 1 families for project Non-Analyst Project'), mock.call('Reload Summary: '), mock.call(' Non-Analyst Project: Updated 1 variants'), mock.call('Reloading saved variants in 2 projects'), diff --git a/seqr/management/tests/reload_saved_variant_json_tests.py b/seqr/management/tests/reload_saved_variant_json_tests.py index 4ceb4314b6..104db53f40 100644 --- a/seqr/management/tests/reload_saved_variant_json_tests.py +++ b/seqr/management/tests/reload_saved_variant_json_tests.py @@ -30,7 +30,7 @@ def test_with_param_command(self, mock_get_variants, mock_logger): [family_1], ['1-46859832-G-A','21-3343353-GAGA-G'], user=None, user_email='manage_command') logger_info_calls = [ - mock.call('Updated 2 variants for project 1kg project n\xe5me with uni\xe7\xf8de'), + mock.call('Updated 2 variants in 1 families for project 1kg project n\xe5me with uni\xe7\xf8de'), mock.call('Reload Summary: '), mock.call(' 1kg project n\xe5me with uni\xe7\xf8de: Updated 2 variants') ] diff --git a/seqr/views/utils/variant_utils.py b/seqr/views/utils/variant_utils.py index aaf65a129b..8407052685 100644 --- a/seqr/views/utils/variant_utils.py +++ b/seqr/views/utils/variant_utils.py @@ -45,7 +45,8 @@ def update_projects_saved_variant_json(projects, user_email, **kwargs): skipped[project_name] = True else: success[project_name] = len(updated_saved_variants) - logger.info(f'Updated {len(updated_saved_variants)} variants for project {project_name}') + family_summary = f' in {len(family_guids)} families' if family_guids else '' + logger.info(f'Updated {len(updated_saved_variants)} variants{family_summary} for project {project_name}') updated_variants_by_id.update({v.variant_id: v.saved_variant_json for v in updated_saved_variants.values()}) except Exception as e: traceback_message = traceback.format_exc() From 427284db383eb5963a881665e9543bf0df601dd5 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Fri, 17 Jan 2025 15:12:24 -0500 Subject: [PATCH 04/10] only reload families with new samples --- .../check_for_new_samples_from_pipeline.py | 18 ++++++++---------- ...heck_for_new_samples_from_pipeline_tests.py | 2 +- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index a94a0a4bba..883498c309 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -194,9 +194,9 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers ) # Send loading notifications and update Airtable PDOs - update_sample_data_by_project = { - s['individual__family__project']: s for s in updated_samples.values('individual__family__project').annotate( - sample_db_ids=ArrayAgg('id', distinct=True), + new_sample_data_by_project = { + s['individual__family__project']: s for s in updated_samples.filter(id__in=new_samples).values('individual__family__project').annotate( + samples=ArrayAgg('sample_id', distinct=True), family_guids=ArrayAgg('individual__family__guid', distinct=True), ) } @@ -205,18 +205,16 @@ def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_vers split_project_pdos = {} session = AirtableSession(user=None, no_auth=True) for project, sample_ids in samples_by_project.items(): - project_sample_data = update_sample_data_by_project[project.id] - new_project_samples = [ - sample_id for db_id, sample_id in new_samples.items() if db_id in project_sample_data['sample_db_ids'] - ] + project_sample_data = new_sample_data_by_project[project.id] is_internal = not project_has_anvil(project) or is_internal_anvil_project(project) notify_search_data_loaded( - project, is_internal, dataset_type, sample_type, new_project_samples, + project, is_internal, dataset_type, sample_type, project_sample_data['samples'], num_samples=len(sample_ids), ) project_families = project_sample_data['family_guids'] - updated_families.update(project_families) - updated_project_families.append((project.id, project.name, project.genome_version, project_families)) + if project_families: + updated_families.update(project_families) + updated_project_families.append((project.id, project.name, project.genome_version, project_families)) if is_internal and dataset_type == Sample.DATASET_TYPE_VARIANT_CALLS: split_project_pdos[project.name] = cls._update_pdos(session, project.guid, sample_ids) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 9ba4a68580..6a88c51760 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -561,7 +561,7 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') self.mock_utils_logger.info.assert_has_calls([ - mock.call('Updated 0 variants in 2 families for project Test Reprocessed Project'), + mock.call('Updated 0 variants in 1 families for project Test Reprocessed Project'), mock.call('Updated 1 variants in 1 families for project Non-Analyst Project'), mock.call('Reload Summary: '), mock.call(' Non-Analyst Project: Updated 1 variants'), From 20760740c6d1dbdef075c929012ba9ce7489c3c3 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 21 Jan 2025 12:04:57 -0500 Subject: [PATCH 05/10] clearer error log --- .../tests/check_for_new_samples_from_pipeline_tests.py | 2 +- seqr/management/tests/reload_saved_variant_json_tests.py | 2 +- seqr/views/utils/variant_utils.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 6a88c51760..5d92f0f28c 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -559,7 +559,7 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo self.assertEqual(annotation_updated_json['mainTranscriptId'], 'ENST00000505820') self.assertEqual(len(annotation_updated_json['genotypes']), 3) - self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') + self.mock_utils_logger.error.assert_called_with('Error reloading variants in Test Reprocessed Project: Bad Request') self.mock_utils_logger.info.assert_has_calls([ mock.call('Updated 0 variants in 1 families for project Test Reprocessed Project'), mock.call('Updated 1 variants in 1 families for project Non-Analyst Project'), diff --git a/seqr/management/tests/reload_saved_variant_json_tests.py b/seqr/management/tests/reload_saved_variant_json_tests.py index 104db53f40..abe15b58d0 100644 --- a/seqr/management/tests/reload_saved_variant_json_tests.py +++ b/seqr/management/tests/reload_saved_variant_json_tests.py @@ -81,4 +81,4 @@ def test_with_param_command(self, mock_get_variants, mock_logger): ] mock_logger.info.assert_has_calls(logger_info_calls) - mock_logger.error.assert_called_with('Error in project 1kg project n\xe5me with uni\xe7\xf8de: Database error.') + mock_logger.error.assert_called_with('Error reloading variants in 1kg project n\xe5me with uni\xe7\xf8de: Database error.') diff --git a/seqr/views/utils/variant_utils.py b/seqr/views/utils/variant_utils.py index 8407052685..52b2fe500a 100644 --- a/seqr/views/utils/variant_utils.py +++ b/seqr/views/utils/variant_utils.py @@ -51,7 +51,7 @@ def update_projects_saved_variant_json(projects, user_email, **kwargs): except Exception as e: traceback_message = traceback.format_exc() logger.error(traceback_message) - logger.error(f'Error in project {project_name}: {e}') + logger.error(f'Error reloading variants in {project_name}: {e}') error[project_name] = e logger.info('Reload Summary: ') From 8e0e635eeadd28e22a8d757ea0ad75106a29ce77 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 21 Jan 2025 12:04:57 -0500 Subject: [PATCH 06/10] clearer error log --- .../tests/check_for_new_samples_from_pipeline_tests.py | 2 +- seqr/management/tests/reload_saved_variant_json_tests.py | 2 +- seqr/views/utils/variant_utils.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 6a88c51760..5d92f0f28c 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -559,7 +559,7 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo self.assertEqual(annotation_updated_json['mainTranscriptId'], 'ENST00000505820') self.assertEqual(len(annotation_updated_json['genotypes']), 3) - self.mock_utils_logger.error.assert_called_with('Error in project Test Reprocessed Project: Bad Request') + self.mock_utils_logger.error.assert_called_with('Error reloading variants in Test Reprocessed Project: Bad Request') self.mock_utils_logger.info.assert_has_calls([ mock.call('Updated 0 variants in 1 families for project Test Reprocessed Project'), mock.call('Updated 1 variants in 1 families for project Non-Analyst Project'), diff --git a/seqr/management/tests/reload_saved_variant_json_tests.py b/seqr/management/tests/reload_saved_variant_json_tests.py index 104db53f40..abe15b58d0 100644 --- a/seqr/management/tests/reload_saved_variant_json_tests.py +++ b/seqr/management/tests/reload_saved_variant_json_tests.py @@ -81,4 +81,4 @@ def test_with_param_command(self, mock_get_variants, mock_logger): ] mock_logger.info.assert_has_calls(logger_info_calls) - mock_logger.error.assert_called_with('Error in project 1kg project n\xe5me with uni\xe7\xf8de: Database error.') + mock_logger.error.assert_called_with('Error reloading variants in 1kg project n\xe5me with uni\xe7\xf8de: Database error.') diff --git a/seqr/views/utils/variant_utils.py b/seqr/views/utils/variant_utils.py index 8407052685..52b2fe500a 100644 --- a/seqr/views/utils/variant_utils.py +++ b/seqr/views/utils/variant_utils.py @@ -51,7 +51,7 @@ def update_projects_saved_variant_json(projects, user_email, **kwargs): except Exception as e: traceback_message = traceback.format_exc() logger.error(traceback_message) - logger.error(f'Error in project {project_name}: {e}') + logger.error(f'Error reloading variants in {project_name}: {e}') error[project_name] = e logger.info('Reload Summary: ') From cc31818b96b75e53c83f83c03df27780d5f50f22 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 21 Jan 2025 12:38:50 -0500 Subject: [PATCH 07/10] accurately sort variant ids --- .../management/commands/check_for_new_samples_from_pipeline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 883498c309..a05c36cee6 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -336,10 +336,11 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian variant_id: {k: v for k, v in variant.items() if k not in {'familyGuids', 'genotypes'}} for variant_id, variant in (updated_variants_by_id or {}).items() } - fetch_variant_ids = sorted(set(variants_by_id.keys()) - set(updated_variants_by_id.keys())) + fetch_variant_ids = set(variants_by_id.keys()) - set(updated_variants_by_id.keys()) if fetch_variant_ids: if not is_sv: fetch_variant_ids = [parse_valid_variant_id(variant_id) for variant_id in fetch_variant_ids] + fetch_variant_ids.sort() for i in range(0, len(fetch_variant_ids), MAX_LOOKUP_VARIANTS): updated_variants = hail_variant_multi_lookup(USER_EMAIL, fetch_variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version) logger.info(f'Fetched {len(updated_variants)} additional variants') From d01b54dbba75ad9eabfa111d07ef85c0d8b34cbc Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 21 Jan 2025 12:52:39 -0500 Subject: [PATCH 08/10] group varint id intervals --- hail_search/queries/base.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hail_search/queries/base.py b/hail_search/queries/base.py index 0d284c6a59..84f2cab757 100644 --- a/hail_search/queries/base.py +++ b/hail_search/queries/base.py @@ -738,7 +738,12 @@ def _parse_intervals(self, intervals, gene_ids=None, variant_keys=None, variant_ return intervals if variant_ids: - intervals = [(chrom, pos, pos+1) for chrom, pos, _, _ in variant_ids] + first_chrom = variant_ids[0][0] + if all(first_chrom == v[0] for v in variant_ids): + positions = [pos for _, pos, _, _ in variant_ids] + intervals = [(first_chrom, min(positions), max(positions) + 1)] + else: + intervals = [(chrom, pos, pos+1) for chrom, pos, _, _ in variant_ids] is_x_linked = self._inheritance_mode == X_LINKED_RECESSIVE if not (intervals or is_x_linked): From 3530f06ac635d2c89d0a67ec00126a7407f99ed8 Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 21 Jan 2025 13:11:21 -0500 Subject: [PATCH 09/10] group lookup calls by chromosome --- .../check_for_new_samples_from_pipeline.py | 20 ++++++++++++------- ...eck_for_new_samples_from_pipeline_tests.py | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index a05c36cee6..531aec0f76 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -338,13 +338,19 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian } fetch_variant_ids = set(variants_by_id.keys()) - set(updated_variants_by_id.keys()) if fetch_variant_ids: - if not is_sv: - fetch_variant_ids = [parse_valid_variant_id(variant_id) for variant_id in fetch_variant_ids] - fetch_variant_ids.sort() - for i in range(0, len(fetch_variant_ids), MAX_LOOKUP_VARIANTS): - updated_variants = hail_variant_multi_lookup(USER_EMAIL, fetch_variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version) - logger.info(f'Fetched {len(updated_variants)} additional variants') - updated_variants_by_id.update({variant['variantId']: variant for variant in updated_variants}) + if is_sv: + variant_ids_by_chrom = {'all': fetch_variant_ids} + else: + variant_ids_by_chrom = defaultdict(list) + for variant_id in fetch_variant_ids: + parsed_id = parse_valid_variant_id(variant_id) + variant_ids_by_chrom[parsed_id[0]].append(parsed_id) + for chrom, variant_ids in variant_ids_by_chrom.items(): + variant_ids = sorted(variant_ids) + for i in range(0, len(variant_ids), MAX_LOOKUP_VARIANTS): + updated_variants = hail_variant_multi_lookup(USER_EMAIL, variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version) + logger.info(f'Fetched {len(updated_variants)} additional variants in chromosome {chrom}') + updated_variants_by_id.update({variant['variantId']: variant for variant in updated_variants}) updated_variant_models = [] for variant_id, variant in updated_variants_by_id.items(): diff --git a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py index 5d92f0f28c..60a336f82b 100644 --- a/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py +++ b/seqr/management/tests/check_for_new_samples_from_pipeline_tests.py @@ -423,7 +423,7 @@ def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mo {'individual_guid': 'I000017_na20889', 'family_guid': 'F000012_12', 'project_guid': 'R0003_test', 'affected': 'A', 'sample_id': 'NA20889', 'sample_type': 'WES'}, ]}}, ], reload_annotations_logs=[ - 'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants', 'Fetched 1 additional variants', 'Updated 2 SNV_INDEL GRCh38 saved variants', + 'Reloading shared annotations for 3 SNV_INDEL GRCh38 saved variants (3 unique)', 'Fetched 1 additional variants in chromosome 1', 'Fetched 1 additional variants in chromosome 1', 'Updated 2 SNV_INDEL GRCh38 saved variants', 'No additional SV_WES GRCh38 saved variants to update', ], run_loading_logs={ 'GRCh38/SNV_INDEL': 'Loading 4 WES SNV_INDEL samples in 2 projects', From 0c250ceb9f19d5acda03c3dc9caddd18516f0dbe Mon Sep 17 00:00:00 2001 From: Hana Snow Date: Tue, 21 Jan 2025 13:19:35 -0500 Subject: [PATCH 10/10] update tests --- .../check_for_new_samples_from_pipeline.py | 2 +- .../reload_saved_variant_annotations_tests.py | 25 ++++++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/seqr/management/commands/check_for_new_samples_from_pipeline.py b/seqr/management/commands/check_for_new_samples_from_pipeline.py index 531aec0f76..5a7551a4a2 100644 --- a/seqr/management/commands/check_for_new_samples_from_pipeline.py +++ b/seqr/management/commands/check_for_new_samples_from_pipeline.py @@ -345,7 +345,7 @@ def _reload_shared_variant_annotations(data_type, genome_version, updated_varian for variant_id in fetch_variant_ids: parsed_id = parse_valid_variant_id(variant_id) variant_ids_by_chrom[parsed_id[0]].append(parsed_id) - for chrom, variant_ids in variant_ids_by_chrom.items(): + for chrom, variant_ids in sorted(variant_ids_by_chrom.items()): variant_ids = sorted(variant_ids) for i in range(0, len(variant_ids), MAX_LOOKUP_VARIANTS): updated_variants = hail_variant_multi_lookup(USER_EMAIL, variant_ids[i:i+MAX_LOOKUP_VARIANTS], data_type, genome_version) diff --git a/seqr/management/tests/reload_saved_variant_annotations_tests.py b/seqr/management/tests/reload_saved_variant_annotations_tests.py index 2301759b16..95227e3761 100644 --- a/seqr/management/tests/reload_saved_variant_annotations_tests.py +++ b/seqr/management/tests/reload_saved_variant_annotations_tests.py @@ -39,18 +39,24 @@ def test_command(self, mock_logger): mock_logger.info.assert_has_calls([mock.call(log) for log in [ 'Reloading shared annotations for 3 SNV_INDEL GRCh37 saved variants (3 unique)', - 'Fetched 2 additional variants', + 'Fetched 2 additional variants in chromosome 1', + 'Fetched 2 additional variants in chromosome 21', 'Updated 2 SNV_INDEL GRCh37 saved variants', ]]) - self.assertEqual(len(responses.calls), 1) - multi_lookup_request = responses.calls[0].request - self.assertEqual(multi_lookup_request.url, f'{MOCK_HAIL_ORIGIN}:5000/multi_lookup') - self.assertEqual(multi_lookup_request.headers.get('From'), 'manage_command') - self.assertDictEqual(json.loads(multi_lookup_request.body), { + self.assertEqual(len(responses.calls), 2) + for call in responses.calls: + self.assertEqual(call.request.url, f'{MOCK_HAIL_ORIGIN}:5000/multi_lookup') + self.assertEqual(call.request.headers.get('From'), 'manage_command') + self.assertDictEqual(json.loads(responses.calls[0].request.body), { 'genome_version': 'GRCh37', 'data_type': 'SNV_INDEL', - 'variant_ids': [['1', 248367227, 'TC', 'T'], ['1', 46859832, 'G', 'A'], ['21', 3343353, 'GAGA', 'G']], + 'variant_ids': [['1', 46859832, 'G', 'A'], ['1', 248367227, 'TC', 'T']], + }) + self.assertDictEqual(json.loads(responses.calls[1].request.body), { + 'genome_version': 'GRCh37', + 'data_type': 'SNV_INDEL', + 'variant_ids': [['21', 3343353, 'GAGA', 'G']], }) annotation_updated_json_1 = SavedVariant.objects.get(guid='SV0000002_1248367227_r0390_100').saved_variant_json @@ -66,11 +72,12 @@ def test_command(self, mock_logger): self.assertEqual(len(annotation_updated_json_2['genotypes']), 3) # Test SVs + responses.calls.reset() Sample.objects.filter(guid='S000147_na21234').update(individual_id=20) call_command('reload_saved_variant_annotations', 'SV_WGS', 'GRCh37') - self.assertEqual(len(responses.calls), 2) - self.assertDictEqual(json.loads(responses.calls[1].request.body), { + self.assertEqual(len(responses.calls), 1) + self.assertDictEqual(json.loads(responses.calls[0].request.body), { 'genome_version': 'GRCh37', 'data_type': 'SV_WGS', 'variant_ids': ['prefix_19107_DEL'],