diff --git a/environment/airflow/dags/cookbook1_ingest_customer_data.py b/environment/airflow/dags/cookbook1_ingest_customer_data.py index 714d042..2e184c4 100644 --- a/environment/airflow/dags/cookbook1_ingest_customer_data.py +++ b/environment/airflow/dags/cookbook1_ingest_customer_data.py @@ -4,10 +4,11 @@ import pathlib import pandas as pd -import tutorial_code as tutorial from airflow import DAG from airflow.operators.python import PythonOperator +import tutorial_code as tutorial + log = logging.getLogger("GX validation") diff --git a/environment/airflow/dags/cookbook2_ingest_product_data_handle_invalid_data.py b/environment/airflow/dags/cookbook2_validate_and_handle_invalid_data.py similarity index 83% rename from environment/airflow/dags/cookbook2_ingest_product_data_handle_invalid_data.py rename to environment/airflow/dags/cookbook2_validate_and_handle_invalid_data.py index 744bc46..833c650 100644 --- a/environment/airflow/dags/cookbook2_ingest_product_data_handle_invalid_data.py +++ b/environment/airflow/dags/cookbook2_validate_and_handle_invalid_data.py @@ -4,14 +4,15 @@ import pathlib import pandas as pd -import tutorial_code as tutorial from airflow import DAG from airflow.operators.python import PythonOperator +import tutorial_code as tutorial + log = logging.getLogger("GX validation") -def cookbook2_validate_and_ingest_to_postgres_handle_invalid_data(): +def cookbook2_validate_and_handle_invalid_data(): DATA_DIR = pathlib.Path(os.getenv("AIRFLOW_HOME")) / "data" @@ -64,12 +65,12 @@ def cookbook2_validate_and_ingest_to_postgres_handle_invalid_data(): ) ) - df_products_invalid.to_csv( - DATA_DIR / "invalid_rows/bad_product_rows.csv", index=False - ) - log.warning( - f"{df_products_invalid.shape[0]} invalid product rows written to error file." - ) + # df_products_invalid.to_csv( + # DATA_DIR / "invalid_rows/bad_product_rows.csv", index=False + # ) + # log.warning( + # f"{df_products_invalid.shape[0]} invalid product rows written to error file." + # ) else: df_products_valid = df_products @@ -88,15 +89,15 @@ def cookbook2_validate_and_ingest_to_postgres_handle_invalid_data(): } gx_dag = DAG( - "cookbook2_validate_and_ingest_to_postgres_handle_invalid_data", + "cookbook2_validate_and_handle_invalid_data", default_args=default_args, schedule="0 0 * * *", catchup=False, ) run_gx_task = PythonOperator( - task_id="cookbook2_validate_and_ingest_to_postgres_handle_invalid_data", - python_callable=cookbook2_validate_and_ingest_to_postgres_handle_invalid_data, + task_id="cookbook2_validate_and_handle_invalid_data", + python_callable=cookbook2_validate_and_handle_invalid_data, dag=gx_dag, ) diff --git a/tests/test_cookbook1.py b/tests/test_cookbook1.py index 45868ce..0284034 100644 --- a/tests/test_cookbook1.py +++ b/tests/test_cookbook1.py @@ -6,6 +6,7 @@ import great_expectations as gx import pandas as pd import pytest + import tutorial_code as tutorial diff --git a/tests/test_cookbook2.py b/tests/test_cookbook2.py index fd8d59e..81f61e6 100644 --- a/tests/test_cookbook2.py +++ b/tests/test_cookbook2.py @@ -6,6 +6,7 @@ import great_expectations as gx import pandas as pd import pytest + import tutorial_code as tutorial @@ -287,7 +288,7 @@ def test_airflow_dag_trigger(wait_on_airflow_api_healthcheck): tutorial.db.drop_all_table_rows(table_name) assert tutorial.db.get_table_row_count(table_name) == 0 - dag_id = "cookbook2_validate_and_ingest_to_postgres_handle_invalid_data" + dag_id = "cookbook2_validate_and_handle_invalid_data" dag_run_id, _ = tutorial.airflow.trigger_airflow_dag(dag_id) dag_run_completed = tutorial.airflow.dag_run_completed(dag_id, dag_run_id)