diff --git a/pipelines/br_rj_riodejaneiro_brt_gps/flows.py b/pipelines/br_rj_riodejaneiro_brt_gps/flows.py index 0e612087..521e3c9a 100644 --- a/pipelines/br_rj_riodejaneiro_brt_gps/flows.py +++ b/pipelines/br_rj_riodejaneiro_brt_gps/flows.py @@ -39,9 +39,7 @@ get_raw, parse_timestamp_to_string, rename_current_flow_run_now_time, -) -from pipelines.tasks import run_dbt_model_task as run_dbt_model -from pipelines.tasks import ( # get_local_dbt_client,; setup_task, + run_dbt_model, save_raw_local, save_treated_local, set_last_run_timestamp, diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 3e8bfc68..78355251 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -19,7 +19,7 @@ from basedosdados import Storage, Table from prefect import Client, task from prefect.backend import FlowRunView -from prefeitura_rio.pipelines_utils.dbt import run_dbt_model +from prefeitura_rio.pipelines_utils.dbt import run_dbt_model as run_dbt_model_func from prefeitura_rio.pipelines_utils.infisical import inject_bd_credentials from prefeitura_rio.pipelines_utils.logging import log from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client @@ -75,7 +75,7 @@ def get_current_flow_labels() -> List[str]: @task -def run_dbt_model_task( +def run_dbt_model( dataset_id: str = None, table_id: str = None, dbt_alias: bool = False, @@ -85,7 +85,7 @@ def run_dbt_model_task( flags: str = None, _vars: dict | List[Dict] = None, ): - return run_dbt_model( + return run_dbt_model_func( dataset_id=dataset_id, table_id=table_id, dbt_alias=dbt_alias, @@ -143,7 +143,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments if refresh: log("Running in full refresh mode") log(f"DBT will run the following command:\n{run_command+' --full-refresh'}") - run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id, flags="--full-refresh") + run_dbt_model_func(dataset_id=dataset_id, table_id=mat_table_id, flags="--full-refresh") last_mat_date = get_table_min_max_value( query_project_id, dataset_id, mat_table_id, field_name, "max" ) @@ -152,7 +152,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments log("Running interval step materialization") log(f"DBT will run the following command:\n{run_command}") while last_base_date > last_mat_date: - running = run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id) + running = run_dbt_model_func(dataset_id=dataset_id, table_id=mat_table_id) # running = dbt_client.cli(run_command, sync=True) last_mat_date = get_table_min_max_value( query_project_id,