Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pre commit #7

Merged
merged 7 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pipelines/br_rj_riodejaneiro_brt_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
get_raw,
parse_timestamp_to_string,
rename_current_flow_run_now_time,
)
from pipelines.tasks import run_dbt_model_task as run_dbt_model
from pipelines.tasks import ( # get_local_dbt_client,; setup_task,
run_dbt_model,
save_raw_local,
save_treated_local,
set_last_run_timestamp,
Expand Down
10 changes: 5 additions & 5 deletions pipelines/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from basedosdados import Storage, Table
from prefect import Client, task
from prefect.backend import FlowRunView
from prefeitura_rio.pipelines_utils.dbt import run_dbt_model
from prefeitura_rio.pipelines_utils.dbt import run_dbt_model as run_dbt_model_func
from prefeitura_rio.pipelines_utils.infisical import inject_bd_credentials
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client
Expand Down Expand Up @@ -75,7 +75,7 @@ def get_current_flow_labels() -> List[str]:


@task
def run_dbt_model_task(
def run_dbt_model(
dataset_id: str = None,
table_id: str = None,
dbt_alias: bool = False,
Expand All @@ -85,7 +85,7 @@ def run_dbt_model_task(
flags: str = None,
_vars: dict | List[Dict] = None,
):
return run_dbt_model(
return run_dbt_model_func(
dataset_id=dataset_id,
table_id=table_id,
dbt_alias=dbt_alias,
Expand Down Expand Up @@ -143,7 +143,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments
if refresh:
log("Running in full refresh mode")
log(f"DBT will run the following command:\n{run_command+' --full-refresh'}")
run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id, flags="--full-refresh")
run_dbt_model_func(dataset_id=dataset_id, table_id=mat_table_id, flags="--full-refresh")
last_mat_date = get_table_min_max_value(
query_project_id, dataset_id, mat_table_id, field_name, "max"
)
Expand All @@ -152,7 +152,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments
log("Running interval step materialization")
log(f"DBT will run the following command:\n{run_command}")
while last_base_date > last_mat_date:
running = run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id)
running = run_dbt_model_func(dataset_id=dataset_id, table_id=mat_table_id)
# running = dbt_client.cli(run_command, sync=True)
last_mat_date = get_table_min_max_value(
query_project_id,
Expand Down
Loading