Skip to content

Commit

Permalink
see if pipeline file write is the problem
Browse files Browse the repository at this point in the history
  • Loading branch information
rachhouse committed Dec 5, 2024
1 parent ccdfc4d commit 9ba6def
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
3 changes: 2 additions & 1 deletion environment/airflow/dags/cookbook1_ingest_customer_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import pathlib

import pandas as pd
import tutorial_code as tutorial
from airflow import DAG
from airflow.operators.python import PythonOperator

import tutorial_code as tutorial

log = logging.getLogger("GX validation")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import pathlib

import pandas as pd
import tutorial_code as tutorial
from airflow import DAG
from airflow.operators.python import PythonOperator

import tutorial_code as tutorial

log = logging.getLogger("GX validation")


def cookbook2_validate_and_ingest_to_postgres_handle_invalid_data():
def cookbook2_validate_and_handle_invalid_data():

DATA_DIR = pathlib.Path(os.getenv("AIRFLOW_HOME")) / "data"

Expand Down Expand Up @@ -64,12 +65,12 @@ def cookbook2_validate_and_ingest_to_postgres_handle_invalid_data():
)
)

df_products_invalid.to_csv(
DATA_DIR / "invalid_rows/bad_product_rows.csv", index=False
)
log.warning(
f"{df_products_invalid.shape[0]} invalid product rows written to error file."
)
# df_products_invalid.to_csv(
# DATA_DIR / "invalid_rows/bad_product_rows.csv", index=False
# )
# log.warning(
# f"{df_products_invalid.shape[0]} invalid product rows written to error file."
# )

else:
df_products_valid = df_products
Expand All @@ -88,15 +89,15 @@ def cookbook2_validate_and_ingest_to_postgres_handle_invalid_data():
}

gx_dag = DAG(
"cookbook2_validate_and_ingest_to_postgres_handle_invalid_data",
"cookbook2_validate_and_handle_invalid_data",
default_args=default_args,
schedule="0 0 * * *",
catchup=False,
)

run_gx_task = PythonOperator(
task_id="cookbook2_validate_and_ingest_to_postgres_handle_invalid_data",
python_callable=cookbook2_validate_and_ingest_to_postgres_handle_invalid_data,
task_id="cookbook2_validate_and_handle_invalid_data",
python_callable=cookbook2_validate_and_handle_invalid_data,
dag=gx_dag,
)

Expand Down
1 change: 1 addition & 0 deletions tests/test_cookbook1.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import great_expectations as gx
import pandas as pd
import pytest

import tutorial_code as tutorial


Expand Down
3 changes: 2 additions & 1 deletion tests/test_cookbook2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import great_expectations as gx
import pandas as pd
import pytest

import tutorial_code as tutorial


Expand Down Expand Up @@ -287,7 +288,7 @@ def test_airflow_dag_trigger(wait_on_airflow_api_healthcheck):
tutorial.db.drop_all_table_rows(table_name)
assert tutorial.db.get_table_row_count(table_name) == 0

dag_id = "cookbook2_validate_and_ingest_to_postgres_handle_invalid_data"
dag_id = "cookbook2_validate_and_handle_invalid_data"
dag_run_id, _ = tutorial.airflow.trigger_airflow_dag(dag_id)

dag_run_completed = tutorial.airflow.dag_run_completed(dag_id, dag_run_id)
Expand Down

0 comments on commit 9ba6def

Please sign in to comment.