Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(transfer): add non-decorated shared transfer function #275

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions dags/veda_data_pipeline/groups/transfer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ def cogify_copy_task(ti):
return cogify_transfer_handler(event_src=config, external_role_arn=external_role_arn)

@task
def transfer_data(ti, payload):
def transfer_data_task(ti=None, payload={}):
"""Transfer data from one S3 bucket to another; s3 copy, no need for docker"""
from veda_data_pipeline.utils.transfer import (
data_transfer_handler,
)
# use task-provided payload if provided, otherwise fall back on ti values
# payload will generally have the same values expected by discovery, so some renames are needed when combining the dicts
config = {
Expand All @@ -42,12 +39,19 @@ def transfer_data(ti, payload):
"target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")),
"dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", False)),
}
transfer_data(config)

# non-decorated function for use in other tasks
def transfer_data(payload={}):
"""Transfer data from one S3 bucket to another; s3 copy, no need for docker"""
from veda_data_pipeline.utils.transfer import (
data_transfer_handler,
)
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN")
# (event, chunk_size=2800, role_arn=None, bucket_output=None):
return data_transfer_handler(event=config, role_arn=external_role_arn)

return data_transfer_handler(event=payload, role_arn=external_role_arn)

# TODO: cogify_transfer handler is missing arg parser so this subdag will not work
def subdag_transfer():
Expand Down
14 changes: 12 additions & 2 deletions dags/veda_data_pipeline/veda_promotion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,17 @@
}

@task(max_active_tis_per_dag=3)
def transfer_assets_to_production_bucket(payload):
transfer_data(payload)
def transfer_assets_to_production_bucket(ti=None, payload={}):
# merge collection id into payload, then transfer data
payload['collection'] = ti.dag_run.conf.get("collection")
config = {
**payload,
"origin_bucket": payload.get("bucket", ti.dag_run.conf.get("origin_bucket", "veda-data-store")),
"origin_prefix": payload.get("prefix", ti.dag_run.conf.get("origin_prefix", "s3-prefix/")),
"target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")),
"dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", False)),
}
transfer_data(payload=config)
# if transfer complete, update discovery payload to reflect new bucket
payload.update({"bucket": "veda-data-store"})
payload.update({"prefix": payload.get("collection")+"/"})
Expand All @@ -97,4 +106,5 @@ def transfer_assets_to_production_bucket(payload):

collection_grp.set_upstream(start)
mutate_payload_task.set_upstream(start)
extract_from_payload.set_upstream(start)
submit_stac.set_downstream(end)
Loading