Skip to content

Commit

Permalink
Fix pre commit (#7)
Browse files Browse the repository at this point in the history
* commit inicial

* remove tabelas

* ordena imports

* corrige ordem de imports

* ajusta imports + run_dbt_model
  • Loading branch information
eng-rodrigocunha authored Jan 5, 2024
1 parent df34c52 commit ba5bec0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 8 deletions.
4 changes: 1 addition & 3 deletions pipelines/br_rj_riodejaneiro_brt_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
get_raw,
parse_timestamp_to_string,
rename_current_flow_run_now_time,
)
from pipelines.tasks import run_dbt_model_task as run_dbt_model
from pipelines.tasks import ( # get_local_dbt_client,; setup_task,
run_dbt_model,
save_raw_local,
save_treated_local,
set_last_run_timestamp,
Expand Down
10 changes: 5 additions & 5 deletions pipelines/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from basedosdados import Storage, Table
from prefect import Client, task
from prefect.backend import FlowRunView
from prefeitura_rio.pipelines_utils.dbt import run_dbt_model
from prefeitura_rio.pipelines_utils.dbt import run_dbt_model as run_dbt_model_func
from prefeitura_rio.pipelines_utils.infisical import inject_bd_credentials
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client
Expand Down Expand Up @@ -75,7 +75,7 @@ def get_current_flow_labels() -> List[str]:


@task
def run_dbt_model_task(
def run_dbt_model(
dataset_id: str = None,
table_id: str = None,
dbt_alias: bool = False,
Expand All @@ -85,7 +85,7 @@ def run_dbt_model_task(
flags: str = None,
_vars: dict | List[Dict] = None,
):
return run_dbt_model(
return run_dbt_model_func(
dataset_id=dataset_id,
table_id=table_id,
dbt_alias=dbt_alias,
Expand Down Expand Up @@ -143,7 +143,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments
if refresh:
log("Running in full refresh mode")
log(f"DBT will run the following command:\n{run_command+' --full-refresh'}")
run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id, flags="--full-refresh")
run_dbt_model_func(dataset_id=dataset_id, table_id=mat_table_id, flags="--full-refresh")
last_mat_date = get_table_min_max_value(
query_project_id, dataset_id, mat_table_id, field_name, "max"
)
Expand All @@ -152,7 +152,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments
log("Running interval step materialization")
log(f"DBT will run the following command:\n{run_command}")
while last_base_date > last_mat_date:
running = run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id)
running = run_dbt_model_func(dataset_id=dataset_id, table_id=mat_table_id)
# running = dbt_client.cli(run_command, sync=True)
last_mat_date = get_table_min_max_value(
query_project_id,
Expand Down

0 comments on commit ba5bec0

Please sign in to comment.