Skip to content

Commit

Permalink
fix flows and tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellcassius committed Dec 13, 2023
1 parent eb27a53 commit a9c72e4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pipelines/br_rj_riodejaneiro_brt_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pipelines.constants import constants as emd_constants
from prefeitura_rio.pipelines_utils.custom import Flow
# from prefeitura_rio.pipelines_utils.prefect import get_k8s_dbt_client
from prefeitura_rio.pipelines_utils.tasks import (
from pipelines.tasks import (
get_current_flow_labels,
)

Expand Down Expand Up @@ -136,7 +136,7 @@

with Flow(
"SMTR: GPS BRT - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
# code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as captura_brt:
timestamp = get_current_timestamp()
setup = setup_task()
Expand Down
10 changes: 10 additions & 0 deletions pipelines/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pendulum
import prefect
from prefect import task, Client
from prefect.backend import FlowRunView
from pytz import timezone
import requests

Expand Down Expand Up @@ -52,6 +53,15 @@
def setup_task():
return inject_bd_credentials()

@task
def get_current_flow_labels() -> List[str]:
"""
Get the labels of the current flow.
"""
flow_run_id = prefect.context.get("flow_run_id")
flow_run_view = FlowRunView.from_flow_run_id(flow_run_id)
return flow_run_view.labels

###############
#
# DBT
Expand Down

0 comments on commit a9c72e4

Please sign in to comment.