Skip to content

Commit

Permalink
Merge pull request #4576 from broadinstitute/local-vcf-option-dropdown
Browse files Browse the repository at this point in the history
Local vcf option dropdown
  • Loading branch information
hanars authored Jan 8, 2025
2 parents 117ed33 + aeaba01 commit 1f0bc5b
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 98 deletions.
2 changes: 1 addition & 1 deletion deploy/LOCAL_INSTALL_HELM.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ loading_pipeline_queue test.vcf.gz
```
- In the top header of *seqr*, click on the **Data Management** button.
- In the subheader, click on **Load Data**.
- Type the name of the callset path into the **Callset File Path** text box (without the directory prefix), and select the appropriate Sample Type (WES/WGS) and Genome Version (GRCh37/GRCh38) for your project. The pipeline includes a sequence of validation steps to insure the validity of your VCF, but these may be skipped by enabling the **Skip Callset Validation**option. We strongly recommend leaving validation enabled to ensure the quality of your analysis.
- Select your VCF from the dropdown and select the appropriate Sample Type (WES/WGS) and Genome Version (GRCh37/GRCh38) for your project. The pipeline includes a sequence of validation steps to insure the validity of your VCF, but these may be skipped by enabling the **Skip Callset Validation**option. We strongly recommend leaving validation enabled to ensure the quality of your analysis.
- Click through to the next page and select your project from the **Projects to Load** dropdown, then click **Submit**.
- If you wish to check the status of the loading request, you can click through to the **Pipeline Status** tab to view the loading pipeline interface.
- Data should be loaded into the search backend automatically, usually within a few hours.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def test_command(self, mock_email, mock_airtable_utils):
with self.assertRaises(CommandError) as ce:
call_command('check_for_new_samples_from_pipeline', '--genome_version=GRCh37', '--dataset_type=MITO')
self.assertEqual(str(ce.exception), 'No successful runs found for genome_version=GRCh37, dataset_type=MITO')
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/GRCh37/MITO/runs/*/_SUCCESS')
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/GRCh37/MITO/runs/*/_SUCCESS', recursive=False)
self.mock_subprocess.assert_not_called()

call_command('check_for_new_samples_from_pipeline')
Expand All @@ -344,7 +344,7 @@ def test_command(self, mock_email, mock_airtable_utils):
iter([json.dumps(METADATA_FILES[i])]) for i in range(len(local_files))
]
call_command('check_for_new_samples_from_pipeline')
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/*/*/runs/*/_SUCCESS')
self.mock_glob.assert_called_with('/seqr/seqr-hail-search-data/*/*/runs/*/_SUCCESS', recursive=False)
self.mock_open.assert_has_calls(
[mock.call(path.replace('_SUCCESS', 'metadata.json'), 'r') for path in local_files], any_order=True)
self.mock_subprocess.assert_not_called()
Expand Down
3 changes: 2 additions & 1 deletion seqr/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@

from seqr.views.apis.data_manager_api import elasticsearch_status, upload_qc_pipeline_output, delete_index, \
update_rna_seq, load_rna_seq_sample_data, proxy_to_kibana, load_phenotype_prioritization_data, \
validate_callset, get_loaded_projects, load_data, proxy_to_luigi
validate_callset, get_loaded_projects, load_data, loading_vcfs, proxy_to_luigi
from seqr.views.apis.report_api import \
anvil_export, \
family_metadata, \
Expand Down Expand Up @@ -332,6 +332,7 @@
'data_management/update_rna_seq': update_rna_seq,
'data_management/load_rna_seq_sample/(?P<sample_guid>[^/]+)': load_rna_seq_sample_data,
'data_management/load_phenotype_prioritization_data': load_phenotype_prioritization_data,
'data_management/loading_vcfs': loading_vcfs,
'data_management/validate_callset': validate_callset,
'data_management/loaded_projects/(?P<genome_version>[^/]+)/(?P<sample_type>[^/]+)/(?P<dataset_type>[^/]+)': get_loaded_projects,
'data_management/load_data': load_data,
Expand Down
13 changes: 7 additions & 6 deletions seqr/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ def does_file_exist(file_path, user=None):
return os.path.isfile(file_path)


def list_files(wildcard_path, user):
def list_files(wildcard_path, user, check_subfolders=False, allow_missing=True):
if check_subfolders:
wildcard_path = f'{wildcard_path.rstrip("/")}/**'
if is_google_bucket_file_path(wildcard_path):
return get_gs_file_list(wildcard_path, user, check_subfolders=False, allow_missing=True)
return [file_path for file_path in glob.glob(wildcard_path) if os.path.isfile(file_path)]
return _get_gs_file_list(wildcard_path, user, check_subfolders, allow_missing)
return [file_path for file_path in glob.glob(wildcard_path, recursive=check_subfolders) if os.path.isfile(file_path)]


def file_iter(file_path, byte_range=None, raw_content=False, user=None, **kwargs):
Expand Down Expand Up @@ -91,16 +93,15 @@ def mv_file_to_gs(local_path, gs_path, user=None):
run_gsutil_with_wait(command, gs_path, user)


def get_gs_file_list(gs_path, user=None, check_subfolders=True, allow_missing=False):
def _get_gs_file_list(gs_path, user, check_subfolders, allow_missing):
gs_path = gs_path.rstrip('/')
command = 'ls'

if check_subfolders:
# If a bucket is empty gsutil throws an error when running ls with ** instead of returning an empty list
subfolders = _run_gsutil_with_stdout(command, gs_path, user)
subfolders = _run_gsutil_with_stdout(command, gs_path.replace('/**', ''), user)
if not subfolders:
return []
gs_path = f'{gs_path}/**'

all_lines = _run_gsutil_with_stdout(command, gs_path, user, allow_missing=allow_missing)
return [line for line in all_lines if is_google_bucket_file_path(line)]
Expand Down
31 changes: 1 addition & 30 deletions seqr/utils/file_utils_tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import mock

from unittest import TestCase
from seqr.utils.file_utils import mv_file_to_gs, get_gs_file_list
from seqr.utils.file_utils import mv_file_to_gs


class FileUtilsTest(TestCase):
Expand Down Expand Up @@ -30,32 +30,3 @@ def test_mv_file_to_gs(self, mock_logger, mock_subproc):
mock_subproc.Popen.assert_called_with('gsutil mv /temp_path gs://bucket/target_path', stdout=mock_subproc.PIPE, stderr=mock_subproc.STDOUT, shell=True) # nosec
mock_logger.info.assert_called_with('==> gsutil mv /temp_path gs://bucket/target_path', None)
process.wait.assert_called_with()

@mock.patch('seqr.utils.file_utils.subprocess')
@mock.patch('seqr.utils.file_utils.logger')
def test_get_gs_file_list(self, mock_logger, mock_subproc):
with self.assertRaises(Exception) as ee:
get_gs_file_list('/temp_path')
self.assertEqual(str(ee.exception), 'A Google Storage path is expected.')

process = mock_subproc.Popen.return_value
process.communicate.return_value = b'', b'-bash: gsutil: command not found.\nPlease check the path.\n'
with self.assertRaises(Exception) as ee:
get_gs_file_list('gs://bucket/target_path/', user=None)
self.assertEqual(str(ee.exception), 'Run command failed: -bash: gsutil: command not found. Please check the path.')
mock_subproc.Popen.assert_called_with('gsutil ls gs://bucket/target_path', stdout=mock_subproc.PIPE,
stderr=mock_subproc.PIPE, shell=True) # nosec
mock_logger.info.assert_called_with('==> gsutil ls gs://bucket/target_path', None)
process.communicate.assert_called_with()

mock_subproc.reset_mock()
mock_logger.reset_mock()
process.communicate.return_value = b'\n\nUpdates are available for some Cloud SDK components. To install them,\n' \
b'please run:\n $ gcloud components update\ngs://bucket/target_path/id_file.txt\n' \
b'gs://bucket/target_path/data.vcf.gz\n', b''
file_list = get_gs_file_list('gs://bucket/target_path', user=None)
mock_subproc.Popen.assert_called_with('gsutil ls gs://bucket/target_path/**', stdout=mock_subproc.PIPE,
stderr=mock_subproc.PIPE, shell=True) # nosec
mock_logger.info.assert_called_with('==> gsutil ls gs://bucket/target_path/**', None)
process.communicate.assert_called_with()
self.assertEqual(file_list, ['gs://bucket/target_path/id_file.txt', 'gs://bucket/target_path/data.vcf.gz'])
26 changes: 26 additions & 0 deletions seqr/utils/vcf_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import re

from collections import defaultdict
Expand Down Expand Up @@ -107,3 +108,28 @@ def validate_vcf_exists(data_path, user, path_name=None, allowed_exts=None):
raise ErrorsWarningsException(['Data file or path {} is not found.'.format(path_name or data_path)])

return file_to_check


def get_vcf_list(data_path, user):
file_list = list_files(data_path, user, check_subfolders=True, allow_missing=False)
data_path_list = [path.replace(data_path, '') for path in file_list if path.endswith(VCF_FILE_EXTENSIONS)]
return _merge_sharded_vcf(data_path_list)


def _merge_sharded_vcf(vcf_files):
files_by_path = defaultdict(list)

for vcf_file in vcf_files:
subfolder_path, file = vcf_file.rsplit('/', 1)
files_by_path[subfolder_path].append(file)

# discover the sharded VCF files in each folder, replace the sharded VCF files with a single path with '*'
for subfolder_path, files in files_by_path.items():
if len(files) < 2:
continue
prefix = os.path.commonprefix(files)
suffix = re.fullmatch(r'{}\d*(?P<suffix>\D.*)'.format(prefix), files[0]).groupdict()['suffix']
if all([re.fullmatch(r'{}\d+{}'.format(prefix, suffix), file) for file in files]):
files_by_path[subfolder_path] = [f'{prefix}*{suffix}']

return [f'{path}/{file}' for path, files in files_by_path.items() for file in files]
43 changes: 10 additions & 33 deletions seqr/views/apis/anvil_workspace_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""APIs for management of projects related to AnVIL workspaces."""
import json
import time
import os
import re
from datetime import datetime
from functools import wraps
from collections import defaultdict
Expand All @@ -16,7 +14,6 @@
from seqr.models import Project, CAN_EDIT, Sample, Individual, IgvSample
from seqr.views.react_app import render_app_html
from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE
from seqr.utils.search.constants import VCF_FILE_EXTENSIONS
from seqr.utils.search.utils import get_search_samples
from seqr.views.utils.airflow_utils import trigger_airflow_data_loading
from seqr.views.utils.json_to_orm_utils import create_model_from_json
Expand All @@ -27,8 +24,8 @@
from seqr.views.utils.pedigree_info_utils import parse_basic_pedigree_table, JsonConstants
from seqr.views.utils.individual_utils import add_or_update_individuals_and_families
from seqr.utils.communication_utils import send_html_email
from seqr.utils.file_utils import get_gs_file_list
from seqr.utils.vcf_utils import validate_vcf_and_get_samples, validate_vcf_exists
from seqr.utils.file_utils import list_files
from seqr.utils.vcf_utils import validate_vcf_and_get_samples, validate_vcf_exists, get_vcf_list
from seqr.utils.logging_utils import SeqrLogger
from seqr.utils.middleware import ErrorsWarningsException
from seqr.views.utils.permissions_utils import is_anvil_authenticated, check_workspace_perm, login_and_policies_required
Expand Down Expand Up @@ -109,24 +106,23 @@ def grant_workspace_access(request, namespace, name):
return create_json_response({'success': True})


def _get_workspace_files(request, namespace, name, workspace_meta):
def _get_workspace_bucket(namespace, name, workspace_meta):
bucket_name = workspace_meta['workspace']['bucketName']
bucket_path = 'gs://{bucket}'.format(bucket=bucket_name.rstrip('/'))
return bucket_path, get_gs_file_list(bucket_path, request.user)
return 'gs://{bucket}'.format(bucket=bucket_name.rstrip('/'))


@anvil_workspace_access_required(meta_fields=['workspace.bucketName'])
def get_anvil_vcf_list(*args):
bucket_path, file_list = _get_workspace_files(*args)
data_path_list = [path.replace(bucket_path, '') for path in file_list if path.endswith(VCF_FILE_EXTENSIONS)]
data_path_list = _merge_sharded_vcf(data_path_list)
def get_anvil_vcf_list(request, *args):
bucket_path = _get_workspace_bucket(*args)
data_path_list = get_vcf_list(bucket_path, request.user)

return create_json_response({'dataPathList': data_path_list})


@anvil_workspace_access_required(meta_fields=['workspace.bucketName'])
def get_anvil_igv_options(*args):
bucket_path, file_list = _get_workspace_files(*args)
def get_anvil_igv_options(request, *args):
bucket_path = _get_workspace_bucket(*args)
file_list = list_files(bucket_path, request.user, check_subfolders=True, allow_missing=False)
igv_options = [
{'name': path.replace(bucket_path, ''), 'value': path} for path in file_list
if path.endswith(IgvSample.SAMPLE_TYPE_FILE_EXTENSIONS[IgvSample.SAMPLE_TYPE_ALIGNMENT])
Expand Down Expand Up @@ -340,22 +336,3 @@ def _wait_for_service_account_access(user, namespace, name):

def _get_seqr_project_url(project):
return f'{BASE_URL}project/{project.guid}/project_page'


def _merge_sharded_vcf(vcf_files):
files_by_path = defaultdict(list)

for vcf_file in vcf_files:
subfolder_path, file = vcf_file.rsplit('/', 1)
files_by_path[subfolder_path].append(file)

# discover the sharded VCF files in each folder, replace the sharded VCF files with a single path with '*'
for subfolder_path, files in files_by_path.items():
if len(files) < 2:
continue
prefix = os.path.commonprefix(files)
suffix = re.fullmatch(r'{}\d*(?P<suffix>\D.*)'.format(prefix), files[0]).groupdict()['suffix']
if all([re.fullmatch(r'{}\d+{}'.format(prefix, suffix), file) for file in files]):
files_by_path[subfolder_path] = [f'{prefix}*{suffix}']

return [f'{path}/{file}' for path, files in files_by_path.items() for file in files]
6 changes: 6 additions & 0 deletions seqr/views/apis/anvil_workspace_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ def _test_get_workspace_files(self, url, response_key, expected_files, mock_subp
.format(TEST_WORKSPACE_NAMESPACE, TEST_WORKSPACE_NAME1),
self.collaborator_user)

# Test gsutil error
mock_subprocess.return_value.communicate.return_value = b'', b'-bash: gsutil: command not found.\nPlease check the path.\n'
response = self.client.get(url, content_type='application/json')
self.assertEqual(response.status_code, 500)
self.assertEqual(response.json()['error'], 'Run command failed: -bash: gsutil: command not found. Please check the path.')

# Test empty bucket
mock_subprocess.return_value.communicate.return_value = b'', None
response = self.client.get(url, content_type='application/json')
Expand Down
13 changes: 12 additions & 1 deletion seqr/views/apis/data_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import urllib3

from django.contrib.postgres.aggregates import ArrayAgg
from django.core.exceptions import PermissionDenied
from django.db.models import Max, F, Q, Count
from django.http.response import HttpResponse
from django.views.decorators.csrf import csrf_exempt
Expand All @@ -20,7 +21,7 @@
from seqr.utils.file_utils import file_iter, does_file_exist
from seqr.utils.logging_utils import SeqrLogger
from seqr.utils.middleware import ErrorsWarningsException
from seqr.utils.vcf_utils import validate_vcf_exists
from seqr.utils.vcf_utils import validate_vcf_exists, get_vcf_list

from seqr.views.utils.airflow_utils import trigger_airflow_data_loading
from seqr.views.utils.airtable_utils import AirtableSession, LOADABLE_PDO_STATUSES, AVAILABLE_PDO_STATUS
Expand All @@ -30,6 +31,7 @@
from seqr.views.utils.json_utils import create_json_response
from seqr.views.utils.json_to_orm_utils import update_model_from_json
from seqr.views.utils.permissions_utils import data_manager_required, pm_or_data_manager_required, get_internal_projects
from seqr.views.utils.terra_api_utils import anvil_enabled

from seqr.models import Sample, RnaSample, Individual, Project, PhenotypePrioritization

Expand Down Expand Up @@ -445,6 +447,15 @@ def load_phenotype_prioritization_data(request):
}


@pm_or_data_manager_required
def loading_vcfs(request):
if anvil_enabled():
raise PermissionDenied()
return create_json_response({
'vcfs': get_vcf_list(LOADING_DATASETS_DIR, request.user),
})


@pm_or_data_manager_required
def validate_callset(request):
request_json = json.loads(request.body)
Expand Down
31 changes: 30 additions & 1 deletion seqr/views/apis/data_manager_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from seqr.utils.communication_utils import _set_bulk_notification_stream
from seqr.views.apis.data_manager_api import elasticsearch_status, upload_qc_pipeline_output, delete_index, \
update_rna_seq, load_rna_seq_sample_data, load_phenotype_prioritization_data, validate_callset, \
update_rna_seq, load_rna_seq_sample_data, load_phenotype_prioritization_data, validate_callset, loading_vcfs, \
get_loaded_projects, load_data
from seqr.views.utils.orm_to_json_utils import _get_json_for_models
from seqr.views.utils.test_utils import AuthenticationTestCase, AirflowTestCase, AirtableTest
Expand Down Expand Up @@ -1401,6 +1401,32 @@ def _assert_expected_notifications(mock_send_email, expected_notifs: list[dict],
)
mock_send_email.assert_has_calls(calls)

@mock.patch('seqr.utils.file_utils.os.path.isfile', lambda *args: True)
@mock.patch('seqr.utils.file_utils.glob.glob')
def test_loading_vcfs(self, mock_glob):
url = reverse(loading_vcfs)
self.check_pm_login(url)

mock_glob.return_value = []
response = self.client.get(url, content_type='application/json')
self._test_expected_vcf_responses(response, mock_glob, url)

def _test_expected_vcf_responses(self, response, mock_glob, url):
self.assertEqual(response.status_code, 200)
self.assertDictEqual(response.json(), {'vcfs': []})
mock_glob.assert_called_with('/local_datasets/**', recursive=True)

mock_glob.return_value = ['/local_datasets/sharded_vcf/part001.vcf', '/local_datasets/sharded_vcf/part002.vcf', '/local_datasets/test.vcf.gz']
response = self.client.get(url, content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertDictEqual(response.json(), {'vcfs': ['/sharded_vcf/part00*.vcf', '/test.vcf.gz']})
mock_glob.assert_called_with('/local_datasets/**', recursive=True)

# test data manager access
self.login_data_manager_user()
response = self.client.get(url, content_type='application/json')
self.assertEqual(response.status_code, 200)

@mock.patch('seqr.utils.file_utils.os.path.isfile')
@mock.patch('seqr.utils.file_utils.glob.glob')
@mock.patch('seqr.utils.file_utils.subprocess.Popen')
Expand Down Expand Up @@ -1928,3 +1954,6 @@ def _assert_write_pedigree_error(self, response):
def _test_no_affected_family(self, url, body):
# Sample ID filtering skips the unaffected family
pass

def _test_expected_vcf_responses(self, response, mock_glob, url):
self.assertEqual(response.status_code, 403)
Loading

0 comments on commit 1f0bc5b

Please sign in to comment.