Skip to content

Commit

Permalink
report pipeline validation errors from seqr (#4581)
Browse files Browse the repository at this point in the history
* first pass report errors

* kwargs

* make one ls call

* write files

* test cases cover most new code

* mock file stuff

* review comments
  • Loading branch information
jklugherz authored Jan 17, 2025
1 parent 1b2e036 commit a5a8c9c
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 36 deletions.
92 changes: 74 additions & 18 deletions seqr/management/commands/check_for_new_samples_from_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
from collections import defaultdict

from django.contrib.postgres.aggregates import ArrayAgg
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Q
Expand All @@ -10,21 +12,25 @@
from reference_data.models import GENOME_VERSION_LOOKUP
from seqr.models import Family, Sample, SavedVariant
from seqr.utils.communication_utils import safe_post_to_slack
from seqr.utils.file_utils import file_iter, list_files
from seqr.utils.file_utils import file_iter, list_files, is_google_bucket_file_path
from seqr.utils.search.add_data_utils import notify_search_data_loaded
from seqr.utils.search.utils import parse_valid_variant_id
from seqr.utils.search.hail_search_utils import hail_variant_multi_lookup, search_data_type
from seqr.views.utils.airtable_utils import AirtableSession, LOADABLE_PDO_STATUSES, AVAILABLE_PDO_STATUS, LOADING_PDO_STATUS
from seqr.views.utils.dataset_utils import match_and_update_search_samples
from seqr.views.utils.export_utils import write_multiple_files
from seqr.views.utils.permissions_utils import is_internal_anvil_project, project_has_anvil
from seqr.views.utils.variant_utils import reset_cached_search_results, update_projects_saved_variant_json, \
get_saved_variants
from settings import SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, HAIL_SEARCH_DATA_DIR

logger = logging.getLogger(__name__)

RUN_SUCCESS_PATH_TEMPLATE = '{data_dir}/{genome_version}/{dataset_type}/runs/{run_version}/_SUCCESS'
RUN_PATH_FIELDS = ['genome_version', 'dataset_type', 'run_version']
RUN_FILE_PATH_TEMPLATE = '{data_dir}/{genome_version}/{dataset_type}/runs/{run_version}/{file_name}'
SUCCESS_FILE_NAME = '_SUCCESS'
VALIDATION_ERRORS_FILE_NAME = 'validation_errors.json'
ERRORS_REPORTED_FILE_NAME = '_ERRORS_REPORTED'
RUN_PATH_FIELDS = ['genome_version', 'dataset_type', 'run_version', 'file_name']

DATASET_TYPE_MAP = {'GCNV': Sample.DATASET_TYPE_SV_CALLS}
USER_EMAIL = 'manage_command'
Expand All @@ -47,35 +53,40 @@ def add_arguments(self, parser):
parser.add_argument('--run-version')

def handle(self, *args, **options):
path = self._run_success_path(lambda field: options[field] or '*')
path_regex = self._run_success_path(lambda field: f'(?P<{field}>[^/]+)')
success_runs = {path: re.match(path_regex, path).groupdict() for path in list_files(path, user=None)}
if not success_runs:
user_args = [f'{k}={options[k]}' for k in RUN_PATH_FIELDS if options[k]]
runs = self._get_runs(**options)
self._report_validation_errors(runs)

success_run_dirs = [run_dir for run_dir, run_details in runs.items() if SUCCESS_FILE_NAME in run_details['files']]
if not success_run_dirs:
user_args = [f'{k}={options.get(k)}' for k in RUN_PATH_FIELDS if options.get(k)]
if user_args:
raise CommandError(f'No successful runs found for {", ".join(user_args)}')
else:
logger.info('No loaded data available')
return

loaded_runs = set(Sample.objects.filter(data_source__isnull=False).values_list('data_source', flat=True))
new_runs = {path: run for path, run in success_runs.items() if run['run_version'] not in loaded_runs}
new_runs = {
run_dir: run_details for run_dir, run_details in runs.items()
if run_dir in success_run_dirs and run_details['run_version'] not in loaded_runs
}
if not new_runs:
logger.info(f'Data already loaded for all {len(success_runs)} runs')
logger.info(f'Data already loaded for all {len(success_run_dirs)} runs')
return

logger.info(f'Loading new samples from {len(success_runs)} run(s)')
logger.info(f'Loading new samples from {len(success_run_dirs)} run(s)')
updated_families_by_data_type = defaultdict(set)
updated_variants_by_data_type = defaultdict(dict)
for path, run in new_runs.items():
for run_dir, run_details in new_runs.items():
try:
metadata_path = path.replace('_SUCCESS', 'metadata.json')
data_type, updated_families, updated_variants_by_id = self._load_new_samples(metadata_path, **run)
data_type_key = (data_type, run['genome_version'])
metadata_path = os.path.join(run_dir, 'metadata.json')
del run_details['files']
data_type, updated_families, updated_variants_by_id = self._load_new_samples(metadata_path, **run_details)
data_type_key = (data_type, run_details['genome_version'])
updated_families_by_data_type[data_type_key].update(updated_families)
updated_variants_by_data_type[data_type_key].update(updated_variants_by_id)
except Exception as e:
logger.error(f'Error loading {run["run_version"]}: {e}')
logger.error(f'Error loading {run_details["run_version"]}: {e}')

# Reset cached results for all projects, as seqr AFs will have changed for all projects when new data is added
reset_cached_search_results(project=None)
Expand All @@ -90,13 +101,58 @@ def handle(self, *args, **options):

logger.info('DONE')

def _get_runs(self, **kwargs):
path = self._run_path(lambda field: kwargs.get(field, '*') or '*')
path_regex = self._run_path(lambda field: f'(?P<{field}>[^/]+)')

runs = defaultdict(lambda: {'files': set()})
for path in list_files(path, user=None):
run_dirname = os.path.dirname(path)
match_dict = re.match(path_regex, path).groupdict()
file_name = match_dict.pop('file_name')
runs[run_dirname]['files'].add(file_name)
runs[run_dirname].update(match_dict)

return runs

@staticmethod
def _run_success_path(get_field_format):
return RUN_SUCCESS_PATH_TEMPLATE.format(
def _run_path(get_field_format):
return RUN_FILE_PATH_TEMPLATE.format(
data_dir=HAIL_SEARCH_DATA_DIR,
**{field: get_field_format(field) for field in RUN_PATH_FIELDS}
)

@staticmethod
def _report_validation_errors(runs) -> None:
messages = []
reported_runs = set()
for run_dir, run_details in runs.items():
files = run_details['files']
if ERRORS_REPORTED_FILE_NAME in files:
continue
if VALIDATION_ERRORS_FILE_NAME in files:
file_path = os.path.join(run_dir, VALIDATION_ERRORS_FILE_NAME)
error_summary = json.loads(next(line for line in file_iter(file_path)))
summary = [
'Callset Validation Failed',
f'Projects: {error_summary["project_guids"]}',
f'Reference Genome: {run_details["genome_version"]}',
f'Dataset Type: {run_details["dataset_type"]}',
f'Run ID: {run_details["run_version"]}',
f'Validation Errors: {error_summary["error_messages"]}',
]
if is_google_bucket_file_path(file_path):
summary.append(f'See more at https://storage.cloud.google.com/{file_path[5:]}')
messages.append('\n'.join(summary))
reported_runs.add(run_dir)

if messages:
safe_post_to_slack(
SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, '\n\n'.join(messages),
)
for run_dir in reported_runs:
write_multiple_files([(ERRORS_REPORTED_FILE_NAME, [], [])], run_dir, user=None, file_format=None)

@classmethod
def _load_new_samples(cls, metadata_path, genome_version, dataset_type, run_version):
dataset_type = DATASET_TYPE_MAP.get(dataset_type, dataset_type)
Expand Down
68 changes: 53 additions & 15 deletions seqr/management/tests/check_for_new_samples_from_pipeline_tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from datetime import datetime
from django.core.management import call_command
from django.core.management.base import CommandError
Expand Down Expand Up @@ -141,12 +142,18 @@
}

RUN_PATHS = [
b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-13/_ERRORS_REPORTED',
b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-13/validation_errors.json',
b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json',
b'gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/_SUCCESS',
b'gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/_SUCCESS',
b'gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/_SUCCESS',
b'gs://seqr-hail-search-data/v3.1/GRCh38/GCNV/runs/auto__2024-09-14/_SUCCESS',
]
METADATA_FILES = [{
OPENED_RUN_JSON_FILES = [{
'project_guids': ['R0003_test'],
'error_messages': ['Missing the following expected contigs:chr17'],
}, {
'callsets': ['1kg.vcf.gz', 'new_samples.vcf.gz'],
'sample_type': 'WES',
'family_samples': {
Expand Down Expand Up @@ -185,10 +192,9 @@
'family_samples': {'F000004_4': ['NA20872'], 'F000012_12': ['NA20889']},
}]


def mock_metadata_file(index):
def mock_opened_file(index):
m = mock.MagicMock()
m.stdout = [json.dumps(METADATA_FILES[index]).encode()]
m.stdout = [json.dumps(OPENED_RUN_JSON_FILES[index]).encode()]
return m


Expand Down Expand Up @@ -230,19 +236,25 @@ def setUp(self):
self.addCleanup(patcher.stop)
self.mock_ls_process = mock.MagicMock()
self.mock_ls_process.communicate.return_value = b'\n'.join(RUN_PATHS), b''
self.mock_mv_process = mock.MagicMock()
self.mock_mv_process.wait.return_value = 0
patcher = mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.HAIL_SEARCH_DATA_DIR')
self.mock_data_dir = patcher.start()
self.addCleanup(patcher.stop)
super().setUp()

def _test_call(self, error_logs, reload_annotations_logs=None, run_loading_logs=None, reload_calls=None):
self.mock_subprocess.reset_mock()
self.mock_subprocess.side_effect = [self.mock_ls_process] + [mock_metadata_file(i) for i in range(len(RUN_PATHS))]
self.mock_subprocess.side_effect = [self.mock_ls_process, mock_opened_file(0), self.mock_mv_process] + [
mock_opened_file(i+1) for i in range(len(RUN_PATHS[3:]))
]

call_command('check_for_new_samples_from_pipeline')

self.mock_subprocess.assert_has_calls([mock.call(command, stdout=-1, stderr=stderr, shell=True) for (command, stderr) in [
('gsutil ls gs://seqr-hail-search-data/v3.1/*/*/runs/*/_SUCCESS', -1),
('gsutil ls gs://seqr-hail-search-data/v3.1/*/*/runs/*/*', -1),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json', -2),
('gsutil mv /mock/tmp/* gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/auto__2023-08-09/metadata.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh37/SNV_INDEL/runs/manual__2023-11-02/metadata.json', -2),
('gsutil cat gs://seqr-hail-search-data/v3.1/GRCh38/MITO/runs/auto__2024-08-12/metadata.json', -2),
Expand Down Expand Up @@ -282,10 +294,13 @@ def _test_call(self, error_logs, reload_annotations_logs=None, run_loading_logs=
@mock.patch('seqr.management.commands.check_for_new_samples_from_pipeline.MAX_LOOKUP_VARIANTS', 1)
@mock.patch('seqr.views.utils.airtable_utils.BASE_URL', 'https://test-seqr.org/')
@mock.patch('seqr.views.utils.airtable_utils.MAX_UPDATE_RECORDS', 2)
@mock.patch('seqr.views.utils.export_utils.os.makedirs')
@mock.patch('seqr.views.utils.export_utils.TemporaryDirectory')
@mock.patch('seqr.views.utils.export_utils.open')
@mock.patch('seqr.views.utils.airtable_utils.logger')
@mock.patch('seqr.utils.communication_utils.EmailMultiAlternatives')
@responses.activate
def test_command(self, mock_email, mock_airtable_utils):
def test_command(self, mock_email, mock_airtable_utils, mock_open_write_file, mock_temp_dir, mock_mkdir):
responses.add(
responses.GET,
"http://testairtable/appUelDNM3BnWaR7M/AnVIL%20Seqr%20Loading%20Requests%20Tracking?fields[]=Status&pageSize=2&filterByFormula=AND({AnVIL Project URL}='https://seqr.broadinstitute.org/project/R0004_non_analyst_project/project_page',OR(Status='Loading',Status='Loading Requested'))",
Expand Down Expand Up @@ -324,7 +339,7 @@ def test_command(self, mock_email, mock_airtable_utils):
with self.assertRaises(CommandError) as ce:
call_command('check_for_new_samples_from_pipeline', '--genome_version=GRCh37', '--dataset_type=MITO')
self.assertEqual(str(ce.exception), 'No successful runs found for genome_version=GRCh37, dataset_type=MITO')
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/GRCh37/MITO/runs/*/_SUCCESS', recursive=False)
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/GRCh37/MITO/runs/*/*', recursive=False)
self.mock_subprocess.assert_not_called()

call_command('check_for_new_samples_from_pipeline')
Expand All @@ -334,19 +349,29 @@ def test_command(self, mock_email, mock_airtable_utils):
self.mock_send_slack.assert_not_called()

local_files = [
'/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL/runs/manual__2025-01-13/_ERRORS_REPORTED',
'/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL/runs/manual__2025-01-13/validation_errors.json',
'/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json',
'/seqr/seqr-hail-search-data/GRCh38/SNV_INDEL/runs/auto__2023-08-09/_SUCCESS',
'/seqr/seqr-hail-search-data/GRCh37/SNV_INDEL/runs/manual__2023-11-02/_SUCCESS',
'/seqr/seqr-hail-search-data/GRCh38/MITO/runs/auto__2024-08-12/_SUCCESS',
'/seqr/seqr-hail-search-data/GRCh38/GCNV/runs/auto__2024-09-14/_SUCCESS',
]
self.mock_glob.return_value = local_files
self.mock_open.return_value.__enter__.return_value.__iter__.side_effect = [
iter([json.dumps(METADATA_FILES[i])]) for i in range(len(local_files))
iter([json.dumps(OPENED_RUN_JSON_FILES[i])]) for i in range(len(local_files[2:]))
]
mock_written_files = defaultdict(mock.MagicMock)
mock_open_write_file.side_effect = lambda file_name, *args: mock_written_files[file_name]

call_command('check_for_new_samples_from_pipeline')
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/*/*/runs/*/_SUCCESS', recursive=False)
self.mock_open.assert_has_calls(
[mock.call(path.replace('_SUCCESS', 'metadata.json'), 'r') for path in local_files], any_order=True)
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/*/*/runs/*/*', recursive=False)
self.mock_open.assert_has_calls([
mock.call(local_files[2], 'r'),
*[mock.call(path.replace('_SUCCESS', 'metadata.json'), 'r') for path in local_files[3:]]
], any_order=True)
mock_mkdir.assert_called_once()
self.assertEqual(list(mock_written_files.keys()), [local_files[2].replace('validation_errors.json', '_ERRORS_REPORTED')])
self.mock_subprocess.assert_not_called()
error_logs = [
'Error loading auto__2023-08-09: Data has genome version GRCh38 but the following projects have conflicting versions: R0003_test (GRCh37)',
Expand All @@ -357,18 +382,21 @@ def test_command(self, mock_email, mock_airtable_utils):
self.mock_logger.error.assert_has_calls([mock.call(error) for error in error_logs])

self.mock_glob.reset_mock()
mock_mkdir.reset_mock()
self.mock_subprocess.return_value.communicate.return_value = b'', b'One or more URLs matched no objects'
self.mock_data_dir.__str__.return_value = 'gs://seqr-hail-search-data/v3.1'
with self.assertRaises(CommandError) as ce:
call_command('check_for_new_samples_from_pipeline', '--genome_version=GRCh37', '--dataset_type=MITO')
self.assertEqual(str(ce.exception), 'No successful runs found for genome_version=GRCh37, dataset_type=MITO')
self.mock_subprocess.assert_called_with(
'gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/*/_SUCCESS', stdout=-1, stderr=-1, shell=True
'gsutil ls gs://seqr-hail-search-data/v3.1/GRCh37/MITO/runs/*/*', stdout=-1, stderr=-1, shell=True
)
self.mock_glob.assert_not_called()

mock_temp_dir.return_value.__enter__.return_value = '/mock/tmp'
self._test_call(error_logs=error_logs)
self.assertEqual(Sample.objects.filter(guid__in=SAMPLE_GUIDS + GCNV_SAMPLE_GUIDS).count(), 0)
mock_mkdir.assert_not_called()

# Update fixture data to allow testing edge cases
Project.objects.filter(id__in=[1, 3]).update(genome_version=38)
Expand All @@ -378,6 +406,7 @@ def test_command(self, mock_email, mock_airtable_utils):
sv.save()

# Test success
self.mock_send_slack.reset_mock()
self.mock_logger.reset_mock()
search_body = {
'genome_version': 'GRCh38', 'num_results': 1, 'variant_ids': [['1', 248367227, 'TC', 'T']], 'variant_keys': [],
Expand Down Expand Up @@ -544,8 +573,17 @@ def test_command(self, mock_email, mock_airtable_utils):
])

# Test notifications
self.assertEqual(self.mock_send_slack.call_count, 7)
self.assertEqual(self.mock_send_slack.call_count, 8)
self.mock_send_slack.assert_has_calls([
mock.call('seqr_loading_notifications',
f"""Callset Validation Failed
Projects: ['{PROJECT_GUID}']
Reference Genome: GRCh38
Dataset Type: SNV_INDEL
Run ID: manual__2025-01-14
Validation Errors: ['Missing the following expected contigs:chr17']
See more at https://storage.cloud.google.com/seqr-hail-search-data/v3.1/GRCh38/SNV_INDEL/runs/manual__2025-01-14/validation_errors.json"""
),
mock.call(
'seqr-data-loading',
f'2 new WES samples are loaded in <{SEQR_URL}project/{PROJECT_GUID}/project_page|Test Reprocessed Project>\n```NA20888, NA20889```',
Expand Down Expand Up @@ -625,7 +663,7 @@ def test_command(self, mock_email, mock_airtable_utils):
str(self.collaborator_user.notifications.first()), 'Non-Analyst Project Loaded 1 new WES samples 0 minutes ago')

# Test reloading has no effect
self.mock_ls_process.communicate.return_value = b'\n'.join([RUN_PATHS[0], RUN_PATHS[3]]), b''
self.mock_ls_process.communicate.return_value = b'\n'.join([RUN_PATHS[3], RUN_PATHS[6]]), b''
self.mock_subprocess.side_effect = [self.mock_ls_process]
self.mock_logger.reset_mock()
mock_email.reset_mock()
Expand Down
7 changes: 4 additions & 3 deletions seqr/views/utils/export_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def export_table(filename_prefix, header, rows, file_format='tsv', titlecase_hea
raise ValueError("Invalid file_format: %s" % file_format)


def _format_files_content(files, file_format='csv', add_header_prefix=False, blank_value='', file_suffixes=None):
if file_format not in DELIMITERS:
def _format_files_content(files, file_format='csv', add_header_prefix=False, blank_value='', file_suffixes=None):
if file_format and file_format not in DELIMITERS:
raise ValueError('Invalid file_format: {}'.format(file_format))
parsed_files = []
for filename, header, rows in files:
Expand All @@ -83,7 +83,8 @@ def _format_files_content(files, file_format='csv', add_header_prefix=False, bl
if any(val != blank_value for val in row)
])
content = str(content.encode('utf-8'), 'ascii', errors='ignore') # Strip unicode chars in the content
parsed_files.append(('{}.{}'.format(filename, (file_suffixes or {}).get(filename, file_format)), content))
file_name = '{}.{}'.format(filename, (file_suffixes or {}).get(filename, file_format)) if file_format else filename
parsed_files.append((file_name, content))
return parsed_files


Expand Down

0 comments on commit a5a8c9c

Please sign in to comment.