diff --git a/dags/veda_data_pipeline/groups/transfer_group.py b/dags/veda_data_pipeline/groups/transfer_group.py index 6eab6cc..2fbce3c 100644 --- a/dags/veda_data_pipeline/groups/transfer_group.py +++ b/dags/veda_data_pipeline/groups/transfer_group.py @@ -27,11 +27,8 @@ def cogify_copy_task(ti): return cogify_transfer_handler(event_src=config, external_role_arn=external_role_arn) @task -def transfer_data(ti, payload): +def transfer_data_task(ti=None, payload={}): """Transfer data from one S3 bucket to another; s3 copy, no need for docker""" - from veda_data_pipeline.utils.transfer import ( - data_transfer_handler, - ) # use task-provided payload if provided, otherwise fall back on ti values # payload will generally have the same values expected by discovery, so some renames are needed when combining the dicts config = { @@ -42,12 +39,19 @@ def transfer_data(ti, payload): "target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")), "dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", False)), } + transfer_data(config) + +# non-decorated function for use in other tasks +def transfer_data(payload={}): + """Transfer data from one S3 bucket to another; s3 copy, no need for docker""" + from veda_data_pipeline.utils.transfer import ( + data_transfer_handler, + ) airflow_vars = Variable.get("aws_dags_variables") airflow_vars_json = json.loads(airflow_vars) external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN") # (event, chunk_size=2800, role_arn=None, bucket_output=None): - return data_transfer_handler(event=config, role_arn=external_role_arn) - + return data_transfer_handler(event=payload, role_arn=external_role_arn) # TODO: cogify_transfer handler is missing arg parser so this subdag will not work def subdag_transfer(): diff --git a/dags/veda_data_pipeline/veda_promotion_pipeline.py b/dags/veda_data_pipeline/veda_promotion_pipeline.py index 02c8a15..42e9ba4 100644 --- a/dags/veda_data_pipeline/veda_promotion_pipeline.py +++ b/dags/veda_data_pipeline/veda_promotion_pipeline.py @@ -69,8 +69,17 @@ } @task(max_active_tis_per_dag=3) -def transfer_assets_to_production_bucket(payload): - transfer_data(payload) +def transfer_assets_to_production_bucket(ti=None, payload={}): + # merge collection id into payload, then transfer data + payload['collection'] = ti.dag_run.conf.get("collection") + config = { + **payload, + "origin_bucket": payload.get("bucket", ti.dag_run.conf.get("origin_bucket", "veda-data-store")), + "origin_prefix": payload.get("prefix", ti.dag_run.conf.get("origin_prefix", "s3-prefix/")), + "target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")), + "dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", False)), + } + transfer_data(payload=config) # if transfer complete, update discovery payload to reflect new bucket payload.update({"bucket": "veda-data-store"}) payload.update({"prefix": payload.get("collection")+"/"}) @@ -97,4 +106,5 @@ def transfer_assets_to_production_bucket(payload): collection_grp.set_upstream(start) mutate_payload_task.set_upstream(start) + extract_from_payload.set_upstream(start) submit_stac.set_downstream(end)