Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jklugherz committed Jan 21, 2025
1 parent c5ff011 commit 5a00ec8
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 59 deletions.
19 changes: 8 additions & 11 deletions seqr/utils/search/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,23 @@ def format_loading_pipeline_variables(
variables['sample_type'] = sample_type
return variables

def prepare_data_loading_request(
projects: list[Project], sample_type: str, dataset_type: str, genome_version: str,
data_path: str, user: User, pedigree_dir: str, raise_pedigree_error: bool = False, individual_ids: list[int] = None,
skip_validation: bool = False, skip_check_sex_and_relatedness: bool = False, ignore_missing_samples_when_remapping: bool = False,
):
def prepare_data_loading_request(projects: list[Project], sample_type: str, dataset_type: str, genome_version: str,
data_path: str, user: User, pedigree_dir: str, raise_pedigree_error: bool = False,
individual_ids: list[int] = None, skip_validation: bool = False, skip_check_sex_and_relatedness: bool = False):
variables = format_loading_pipeline_variables(
projects,
genome_version,
dataset_type,
sample_type,
callset_path=data_path,
)
config_params = {
'skip_validation': skip_validation,
'skip_check_sex_and_relatedness': skip_check_sex_and_relatedness,
'ignore_missing_samples_when_remapping': ignore_missing_samples_when_remapping,
}
if skip_validation:
variables['skip_validation'] = True
if skip_check_sex_and_relatedness:
variables['skip_check_sex_and_relatedness'] = True
file_path = _get_pedigree_path(pedigree_dir, genome_version, sample_type, dataset_type)
_upload_data_loading_files(projects, user, file_path, individual_ids, raise_pedigree_error)
return variables, file_path, config_params
return variables, file_path


def _dag_dataset_type(sample_type: str, dataset_type: str):
Expand Down
7 changes: 2 additions & 5 deletions seqr/views/apis/anvil_workspace_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,16 +767,13 @@ def _assert_valid_operation(self, project, test_add_data=True):
}}]})
self.assert_expected_airtable_headers(-1)

dag_args = {
dag_json = {
'projects_to_run': [project.guid],
'dataset_type': 'SNV_INDEL',
'reference_genome': genome_version,
'callset_path': 'gs://test_bucket/test_path.vcf',
'sample_type': 'WES',
'sample_source': 'AnVIL',
'skip_validation': False,
'skip_check_sex_and_relatedness': False,
'ignore_missing_samples_when_remapping': False,
}
sample_summary = '3 new'
if test_add_data:
Expand All @@ -791,7 +788,7 @@ def _assert_valid_operation(self, project, test_add_data=True):
```{dag}```
""".format(guid=project.guid, version=genome_version, workspace_name=project.workspace_name,
project_name=project.name, sample_summary=sample_summary,
dag=json.dumps(dag_args, indent=4),
dag=json.dumps(dag_json, indent=4),
)
self.mock_slack.assert_called_with(
SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, slack_message,
Expand Down
4 changes: 1 addition & 3 deletions seqr/views/apis/data_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,6 @@ def load_data(request):
'user': request.user,
'skip_validation': request_json.get('skipValidation', False),
'skip_check_sex_and_relatedness': request_json.get('skipSRChecks', False),
'ignore_missing_samples_when_remapping': request_json.get('ignoreMissingRemapSamples', False),
}
if AirtableSession.is_airtable_enabled():
individual_ids = _get_valid_project_samples(project_samples, dataset_type, sample_type, request.user)
Expand All @@ -551,10 +550,9 @@ def load_data(request):
success_slack_channel=SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, is_internal=True, individual_ids=individual_ids,
)
else:
request_json, _, config_params = prepare_data_loading_request(
request_json, _ = prepare_data_loading_request(
*loading_args, **loading_kwargs, pedigree_dir=LOADING_DATASETS_DIR, raise_pedigree_error=True,
)
request_json.update(config_params)
response = requests.post(f'{PIPELINE_RUNNER_SERVER}/loading_pipeline_enqueue', json=request_json, timeout=60)
if response.status_code == 409:
raise ErrorsWarningsException(['Loading pipeline is already running. Wait for it to complete and resubmit'])
Expand Down
20 changes: 8 additions & 12 deletions seqr/views/apis/data_manager_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1561,13 +1561,9 @@ def test_load_data(self, mock_temp_dir, mock_open, mock_mkdir):
'reference_genome': 'GRCh38',
'callset_path': f'{self.TRIGGER_CALLSET_DIR}/mito_callset.mt',
'sample_type': 'WES',
}
config_params = {
'skip_validation': True,
'skip_check_sex_and_relatedness': False,
'ignore_missing_samples_when_remapping': False,
}
self._assert_success_notification(dag_json, config_params)
self._assert_success_notification(dag_json)

# Test loading trigger error
self._set_loading_trigger_error()
Expand All @@ -1577,6 +1573,7 @@ def test_load_data(self, mock_temp_dir, mock_open, mock_mkdir):
self.reset_logs()

del body['skipValidation']
del dag_json['skip_validation']
body.update({'datasetType': 'SV', 'filePath': f'{self.CALLSET_DIR}/sv_callset.vcf'})
self._trigger_error(url, body, dag_json, mock_open, mock_mkdir)

Expand Down Expand Up @@ -1698,10 +1695,9 @@ def _assert_expected_load_data_requests(self, dataset_type='MITO', sample_type='
'sample_type': sample_type,
'dataset_type': dataset_type,
'reference_genome': 'GRCh38',
'skip_validation': skip_validation,
'skip_check_sex_and_relatedness': False,
'ignore_missing_samples_when_remapping': False,
}
if skip_validation:
body['skip_validation'] = True
self.assertDictEqual(json.loads(responses.calls[0].request.body), body)

@staticmethod
Expand All @@ -1712,9 +1708,9 @@ def _has_expected_ped_files(self, mock_open, mock_mkdir, dataset_type, *args, sa
super()._has_expected_ped_files(mock_open, mock_mkdir, dataset_type, *args, sample_type, **kwargs)
mock_mkdir.assert_called_once_with(self._local_pedigree_path(dataset_type, sample_type), exist_ok=True)

def _assert_success_notification(self, dag_json, config_params):
def _assert_success_notification(self, dag_json):
self.maxDiff = None
self.assert_json_logs(self.pm_user, [('Triggered loading pipeline', {'detail': {**dag_json, **config_params}})])
self.assert_json_logs(self.pm_user, [('Triggered loading pipeline', {'detail': dag_json})])

def _set_loading_trigger_error(self):
responses.add(responses.POST, PIPELINE_RUNNER_URL, status=400)
Expand Down Expand Up @@ -1858,15 +1854,15 @@ def _set_loading_trigger_error(self):
self.set_dag_trigger_error_response(status=400)
self.mock_authorized_session.reset_mock()

def _assert_success_notification(self, dag_json, config_params):
def _assert_success_notification(self, dag_json):
dag_json['sample_source'] = 'Broad_Internal'

message = f"""*test_data_manager@broadinstitute.org* triggered loading internal WES MITO data for 2 projects
Pedigree files have been uploaded to gs://seqr-loading-temp/v3.1/GRCh38/MITO/pedigrees/WES
DAG LOADING_PIPELINE is triggered with following:
```{json.dumps({**dag_json, **config_params}, indent=4)}```
```{json.dumps(dag_json, indent=4)}```
"""
self.mock_slack.assert_called_once_with(SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, message)
self.mock_slack.reset_mock()
Expand Down
11 changes: 5 additions & 6 deletions seqr/views/utils/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,24 @@ class DagRunningException(Exception):
def trigger_airflow_data_loading(*args, user: User, individual_ids: list[int], success_message: str, success_slack_channel: str,
error_message: str, is_internal: bool = False, **kwargs):
success = True
updated_variables, gs_path, config_params = prepare_data_loading_request(
updated_variables, gs_path = prepare_data_loading_request(
*args, user, individual_ids=individual_ids, pedigree_dir=SEQR_V3_PEDIGREE_GS_PATH, **kwargs,
)
updated_variables['sample_source'] = 'Broad_Internal' if is_internal else 'AnVIL'
upload_info = [f'Pedigree files have been uploaded to {gs_path}']

try:
_check_dag_running_state(LOADING_PIPELINE_DAG_NAME)
_update_variables(updated_variables, LOADING_PIPELINE_DAG_NAME)
_wait_for_dag_variable_update_via_tasks(updated_variables['projects_to_run'], LOADING_PIPELINE_DAG_NAME)
_trigger_dag(LOADING_PIPELINE_DAG_NAME, config_params)
_trigger_dag(LOADING_PIPELINE_DAG_NAME)
except Exception as e:
logger_call = logger.warning if isinstance(e, DagRunningException) else logger.error
logger_call(str(e), user)
_send_slack_msg_on_failure_trigger(e, updated_variables, error_message)
success = False

if success or success_slack_channel != SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL:
_send_load_data_slack_msg([success_message] + upload_info, success_slack_channel, {**updated_variables, **config_params})
_send_load_data_slack_msg([success_message] + upload_info, success_slack_channel, updated_variables)
return success


Expand Down Expand Up @@ -124,9 +123,9 @@ def _get_variables(dag_name: str):
return json.loads(airflow_response['value'])


def _trigger_dag(dag_name: str, config_params: dict = None):
def _trigger_dag(dag_name: str):
endpoint = f'dags/{dag_name}/dagRuns'
_make_airflow_api_request(endpoint, method='POST', json={'conf': config_params or {}})
_make_airflow_api_request(endpoint, method='POST', json={})


def _make_airflow_api_request(endpoint, method='GET', timeout=90, **kwargs):
Expand Down
16 changes: 7 additions & 9 deletions seqr/views/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,21 +665,19 @@ def assert_airflow_loading_calls(self, trigger_error=False, additional_tasks_che
'reference_genome': dag_variable_overrides.get('reference_genome', 'GRCh38'),
'callset_path': f'gs://test_bucket/{dag_variable_overrides["callset_path"]}',
'sample_type': dag_variable_overrides['sample_type'],
'sample_source': dag_variable_overrides['sample_source']
'sample_source': dag_variable_overrides['sample_source'],
}
config_params = {
'ignore_missing_samples_when_remapping': False,
'skip_check_sex_and_relatedness': False,
'skip_validation': skip_validation
}
self._assert_airflow_calls(dag_variables, call_count, config_params, offset=offset)
if dag_variable_overrides.get('skip_validation'):
dag_variables['skip_validation'] = True
dag_variables['sample_source'] = dag_variable_overrides['sample_source']
self._assert_airflow_calls(dag_variables, call_count, offset=offset)

def _assert_call_counts(self, call_count):
self.mock_airflow_logger.info.assert_not_called()
self.assertEqual(len(responses.calls), call_count + self.ADDITIONAL_REQUEST_COUNT)
self.assertEqual(self.mock_authorized_session.call_count, call_count)

def _assert_airflow_calls(self, dag_variables, call_count, config_params, offset=0):
def _assert_airflow_calls(self, dag_variables, call_count, offset=0):
self._assert_dag_running_state_calls(offset)

if call_count < 2:
Expand All @@ -692,7 +690,7 @@ def _assert_airflow_calls(self, dag_variables, call_count, config_params, offset
# trigger dag
self.assertEqual(responses.calls[offset+call_cnt].request.url, f'{self._dag_url}/dagRuns')
self.assertEqual(responses.calls[offset+call_cnt].request.method, 'POST')
self.assertDictEqual(json.loads(responses.calls[offset+call_cnt].request.body), {'conf': config_params})
self.assertDictEqual(json.loads(responses.calls[offset+call_cnt].request.body), {})

self.mock_airflow_logger.warning.assert_not_called()
self.mock_airflow_logger.error.assert_not_called()
Expand Down
21 changes: 8 additions & 13 deletions ui/pages/DataManagement/components/LoadData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,6 @@ const CALLSET_PAGE_FIELDS = [
component: InlineToggle,
asFormInput: true,
},
{
name: 'skipSRChecks',
label: 'Skip Sex and Relatedness Checks',
component: InlineToggle,
asFormInput: true,
},
{
name: 'ignoreMissingRemapSamples',
label: 'Ignore Missing Samples When Remapping',
component: InlineToggle,
asFormInput: true,
},
{
...GENOME_VERSION_FIELD,
component: ButtonRadioGroup,
Expand Down Expand Up @@ -109,7 +97,14 @@ const MULTI_DATA_TYPE_CALLSET_PAGE = {
placeholder: 'gs://',
...FILE_PATH_FIELD,
},
...CALLSET_PAGE_FIELDS,
CALLSET_PAGE_FIELDS[0],
{
name: 'skipSRChecks',
label: 'Skip Sex and Relatedness Checks',
component: InlineToggle,
asFormInput: true,
},
...CALLSET_PAGE_FIELDS.slice(1),
{
name: 'datasetType',
label: 'Dataset Type',
Expand Down

0 comments on commit 5a00ec8

Please sign in to comment.