diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 09f23e8..59338da 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,9 +17,12 @@ jobs: uses: actions/checkout@v4 - name: Docker compose up + env: + GX_CLOUD_ORGANIZATION_ID: ${{ secrets.GX_CLOUD_ORGANIZATION_ID }} + GX_CLOUD_ACCESS_TOKEN: ${{ secrets.GX_CLOUD_ACCESS_TOKEN }} run: | echo ---Starting compose setup--- - docker compose up --build --detach --wait --wait-timeout 120 + GX_CLOUD_ORGANIZATION_ID=${GX_CLOUD_ORGANIZATION_ID} GX_CLOUD_ACCESS_TOKEN=${GX_CLOUD_ACCESS_TOKEN} docker compose up --build --detach --wait --wait-timeout 120 echo ---Compose is running--- - name: Run tutorial integration tests @@ -36,7 +39,10 @@ jobs: - name: Docker compose down if: success() || failure() + env: + GX_CLOUD_ORGANIZATION_ID: ${{ secrets.GX_CLOUD_ORGANIZATION_ID }} + GX_CLOUD_ACCESS_TOKEN: ${{ secrets.GX_CLOUD_ACCESS_TOKEN }} run: | echo ---Spinning down compose--- - docker compose down --volumes + GX_CLOUD_ORGANIZATION_ID=${GX_CLOUD_ORGANIZATION_ID} GX_CLOUD_ACCESS_TOKEN=${GX_CLOUD_ACCESS_TOKEN} docker compose down --volumes echo ---Compose is down--- diff --git a/README.md b/README.md index 2651b70..3b67191 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ # tutorial-gx-in-the-data-pipeline -This repo hosts hands-on tutorials that guide you through working examples of GX data validation in an Airflow pipeline. +This repo hosts hands-on tutorials that guide you through working examples of GX data validation in a data pipeline. -If you are new to GX, these tutorials will introduce you to GX concepts and guide you through creating GX data validation workflows that can be triggered and run using Airflow. +While Airflow is used as the data pipeline orchestrator for the tutorials, these examples are meant to show how GX can be integrated into any orchestrator that supports Python code. -If you are an experienced GX user, these tutorials will provide code examples of GX and Airflow integration that can be used as a source of best practices and techniques that can enhance your current data validation pipeline implementations. +If you are new to GX, these tutorials will introduce you to GX concepts and guide you through creating GX data validation workflows that can be triggered and run using a Python-enabled orchestrator. + +If you are an experienced GX user, these tutorials will provide code examples of GX and orchestrator integration that can be used as a source of best practices and techniques that can enhance your current data validation pipeline implementations. ## README table of contents 1. [Prerequisites](#prerequisites) @@ -11,6 +13,7 @@ If you are an experienced GX user, these tutorials will provide code examples of 1. [Cookbooks](#cookbooks) 1. [Tutorial environment](#tutorial-environment) 1. [Tutorial data](#tutorial-data) +1. [Troubleshooting](#troubleshooting) 1. [Additional resources](#additional-resources) ## Prerequisites @@ -18,6 +21,9 @@ If you are an experienced GX user, these tutorials will provide code examples of * Git: You use Git to clone this repository and access the contents locally. Download Git [here](https://git-scm.com/downloads). +* GX Cloud [organization id and access token](https://docs.greatexpectations.io/docs/cloud/connect/connect_python#get-your-user-access-token-and-organization-id): Cookbook 3 uses GX Cloud to store and visualize data validation results. Sign up for a free GX Cloud account [here](https://hubs.ly/Q02TyCZS0). + + ## Quickstart 1. Clone this repo locally. ``` @@ -29,15 +35,24 @@ If you are an experienced GX user, these tutorials will provide code examples of cd tutorial-gx-in-the-data-pipeline ``` -3. Start the tutorial environment using Docker compose. - ``` - docker compose up --build --detach --wait - ``` +3. Start the tutorial environment using Docker compose. **If you are running Cookbook 3, supply your GX Cloud credentials.** + + * To run the environment for Cookbooks 1 or 2: + ``` + docker compose up --build --detach --wait + ``` + + * To run the environment for Cookbooks 1, 2, or 3, replace `` and `` with your GX Cloud organization id and access token values, respectively: + ``` + export GX_CLOUD_ORGANIZATION_ID="" + export GX_CLOUD_ACCESS_TOKEN="" + docker compose up --build --detach --wait + ``` > [!IMPORTANT] > The first time that you start the Docker compose instance, the underlying Docker images need to be built. This process can take several minutes. > -> **When environment is ready, you will see the following output in the terminal:** +> **When environment is ready, you will see the following output (or similar) in the terminal:** > >``` >✔ Network tutorial-gx-in-the-data-pipeline_gxnet Created @@ -70,12 +85,11 @@ Cookbooks will be progressively added to this repo; the table below lists the cu > > If the tutorial environment is not running when you try to access the cookbook, you will receive a connection error. -| No. | Cookbook topic | Cookbook status | Path to running tutorial cookbook | Path to static render of cookbook | -| :--: | :-- | :-- | :-- | :-- | -| 1 | Data validation during ingestion of data into database (happy path) | Available | [Click to open and run Cookbook 1](http://localhost:8888/lab/tree/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) | [View Cookbook 1 on GitHub](cookbooks/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) | -| 2 | Data validation during ingestion of data into database (pipeline fail + then take action) | Available | [Click to open and run Cookbook 2](http://localhost:8888/lab/tree/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb) | [View Cookbook 2 on GitHub](cookbooks/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb) | -| 3 | Data validation of Postgres database tables \* | Coming soon | | | - | 4 | Data validation and automated handling in a medallion data pipeline \* | Coming soon | | | +| No. | Cookbook topic | Path to running tutorial cookbook | Path to static render of cookbook | +| :--: | :-- | :-- | :-- | +| 1 | Data validation during ingestion of data into database (happy path) | [Click to open and run Cookbook 1](http://localhost:8888/lab/tree/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) | [View Cookbook 1 on GitHub](cookbooks/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) | +| 2 | Data validation during ingestion of data into database (pipeline fail + then take action) | [Click to open and run Cookbook 2](http://localhost:8888/lab/tree/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb) | [View Cookbook 2 on GitHub](cookbooks/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb) | +| 3 | Data validation with GX Core and GX Cloud \* | [Click to open and run Cookbook 3](http://localhost:8888/lab/tree/Cookbook_3_Validate_data_with_GX_Core_and_Cloud.ipynb) | [View Cookbook 3 on GitHub](cookbooks/Cookbook_3_Validate_data_with_GX_Core_and_Cloud.ipynb) | \* Cookbook execution requires GX Cloud organization credentials. Sign up for a free GX Cloud account [here](https://hubs.ly/Q02TyCZS0). @@ -88,7 +102,7 @@ Tutorials are hosted and executed within a containerized environment that is run * **Postgres**. The containerized Postgres database hosts the sample data used by the tutorial cookbooks and pipelines. -Cookbooks that feature GX Cloud-based data validation workflows connect to your GX Cloud organization. +Cookbook 3 features GX Cloud-based data validation workflow that connects to your GX Cloud organization. ## Tutorial data @@ -97,6 +111,30 @@ dataset](https://www.kaggle.com/datasets/bhavikjikadara/global-electronics-retai This dataset is used under the Creative Commons Attribution 4.0 International License. Appropriate credit is given to Bhavik Jikadara. The dataset has been modified to suit the requirements of this project. For more information about this license, please visit the [Creative Commons Attribution 4.0 International License](https://creativecommons.org/licenses/by/4.0/). +## Troubleshooting + +This section provides guidance on how to resolve potential errors and unexpected behavior when running the tutorial. + +### Docker compose errors +If you receive unexpected errors when running `docker compose up`, or do not get healthy containers, you can try recreating the tutorial Docker containers using the `--force-recreate` argument. +``` +docker compose up --build --force-recreate --detach --wait +``` + +### GX Cloud environment variables warning +The tutorial Docker compose `docker-compose.yaml` is defined to capture `GX_CLOUD_ORGANIZATION_ID` and `GX_CLOUD_ACCESS_TOKEN` environment variables to support Cookbook 3. If these variables are not provided when running `docker compose up`, you will see the following warnings: + +``` +WARN[0000] The "GX_CLOUD_ORGANIZATION_ID" variable is not set. Defaulting to a blank string. +WARN[0000] The "GX_CLOUD_ACCESS_TOKEN" variable is not set. Defaulting to a blank string. +WARN[0000] The "GX_CLOUD_ORGANIZATION_ID" variable is not set. Defaulting to a blank string. +WARN[0000] The "GX_CLOUD_ACCESS_TOKEN" variable is not set. Defaulting to a blank string. +``` + +You can safely ignore the these warnings if: +* You are not trying to run Cookbook 3. +* You are running `docker compose down --volumes` to stop the running Docker compose. + ## Additional resources * To report a bug for any of the tutorials or code within this repo, [open an issue](https://github.com/greatexpectationslabs/tutorial-gx-in-the-data-pipeline/issues/new). diff --git a/cookbooks/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb b/cookbooks/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb index 980c76e..7e10569 100644 --- a/cookbooks/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb +++ b/cookbooks/Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb @@ -232,7 +232,7 @@ "metadata": {}, "outputs": [], "source": [ - "context = gx.get_context()" + "context = gx.get_context(mode=\"ephemeral\")" ] }, { @@ -300,7 +300,7 @@ "outputs": [], "source": [ "expectation = gxe.ExpectTableColumnsToMatchOrderedList(\n", - " column_list=[\"customer_id\", \"name\", \"dob\", \"city\", \"state\", \"zip\", \"country\"]\n", + " column_list=[\"customer_id\", \"name\", \"city\", \"state\", \"zip\", \"country\"]\n", ")" ] }, @@ -402,14 +402,13 @@ "source": [ "expectations = [\n", " gxe.ExpectTableColumnsToMatchOrderedList(\n", - " column_list=[\"customer_id\", \"name\", \"dob\", \"city\", \"state\", \"zip\", \"country\"]\n", + " column_list=[\"customer_id\", \"name\", \"city\", \"state\", \"zip\", \"country\"]\n", " ),\n", " gxe.ExpectColumnValuesToBeOfType(column=\"customer_id\", type_=\"int\"),\n", " *[\n", " gxe.ExpectColumnValuesToBeOfType(column=x, type_=\"str\")\n", " for x in [\"name\", \"city\", \"state\", \"zip\"]\n", " ],\n", - " gxe.ExpectColumnValuesToMatchRegex(column=\"dob\", regex=r\"^\\d{4}-\\d{2}-\\d{2}$\"),\n", " gxe.ExpectColumnValuesToBeInSet(\n", " column=\"country\", value_set=[\"AU\", \"CA\", \"DE\", \"FR\", \"GB\", \"IT\", \"NL\", \"US\"]\n", " ),\n", diff --git a/cookbooks/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb b/cookbooks/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb index 025f138..f3b0792 100644 --- a/cookbooks/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb +++ b/cookbooks/Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb @@ -303,7 +303,7 @@ "outputs": [], "source": [ "# Create the Data Context.\n", - "context = gx.get_context()\n", + "context = gx.get_context(mode=\"ephemeral\")\n", "\n", "# Create the Data Source, Data Asset, and Batch Definition.\n", "try:\n", @@ -1002,7 +1002,7 @@ "source": [ "This cookbook has walked you through the process of validating data using GX, integrating the data validation workflow in an Airflow pipeline, and then programmatically handling invalid data in the pipeline when validation fails.\n", "\n", - "[Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and Cookbook 2 (this notebook) have focused on usage of [GX Core](https://docs.greatexpectations.io/docs/core/introduction/) to implement data validation in a data pipeline. Subsequent cookbooks will explore integrating [GX Cloud](https://docs.greatexpectations.io/docs/cloud/overview/gx_cloud_overview), GX Core, and an Airflow data pipeline to achieve end-to-end data validation workflows that make validation results available and shareable in the GX Cloud web UI." + "[Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and Cookbook 2 (this notebook) have focused on usage of [GX Core](https://docs.greatexpectations.io/docs/core/introduction/) to implement data validation in a data pipeline. [Cookbook 3](Cookbook_3_Validate_data_with_GX_Core_and_Cloud.ipynb) explores integrating [GX Cloud](https://docs.greatexpectations.io/docs/cloud/overview/gx_cloud_overview), GX Core, and an Airflow data pipeline to achieve end-to-end data validation workflows that make validation results available and shareable in the GX Cloud web UI." ] }, { diff --git a/cookbooks/Cookbook_3_Validate_data_with_GX_Core_and_Cloud.ipynb b/cookbooks/Cookbook_3_Validate_data_with_GX_Core_and_Cloud.ipynb new file mode 100644 index 0000000..1200a51 --- /dev/null +++ b/cookbooks/Cookbook_3_Validate_data_with_GX_Core_and_Cloud.ipynb @@ -0,0 +1,1115 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Cookbook 3: Validate data with GX Core and GX Cloud" + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "**Note:** ***The GX Cloud UI screenshots contained in this cookbook are current as of*** `2025-01-06`. ***As GX Cloud continues to evolve, it is possible that you will see a difference between the latest UI and the screenshots displayed here.***" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "This cookbook showcases a data validation workflow characteristic of vetting existing data in an organization's data stores. It could be representative of two groups within an organization enforcing a publisher-subscriber data contract, or representative of users ensuring that data meets the quality requirements for its intended use, such as analytics or modeling.\n", + "\n", + "[Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and [Cookbook 2](Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb) explored GX Core workflows that were run within a data pipeline, orchestrated by Airflow. This cookbook introduces [GX Cloud](https://greatexpectations.io/gx-cloud) as an additional tool to store and visualize data validation results and features a hybrid workflow using GX Core, GX Cloud, and Airflow.\n", + "\n", + "This cookbook builds on [Cookbook 1: Validate data during ingestion (happy path)](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and [Cookbook 2: Validate data during ingestion (take action on failures)](Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb) and focuses on how data validation failures can be programmatically handled in the pipeline based on GX Validation Results and how failures can be shared using GX Cloud. This cookbook assumes basic familiarity with GX Core workflows; for a step-by-step explanation of the GX data validation workflow, refer to [Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and [Cookbook 2](Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb)." + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "The GX Core content of this cookbook uses the `great_expectations` library.\n", + "\n", + "The `tutorial_code` module contains helper functions used within this notebook and the associated Airflow pipeline.\n", + "\n", + "The `airflow_dags` submodule is included so that you can inspect the code used in the related Airflow DAG directly from this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "import inspect\n", + "import os\n", + "\n", + "import great_expectations as gx\n", + "import great_expectations.expectations as gxe\n", + "import IPython\n", + "import pandas as pd\n", + "\n", + "import tutorial_code as tutorial\n", + "import airflow_dags.cookbook3_validate_postgres_table_data as dag" + ] + }, + { + "cell_type": "markdown", + "id": "6", + "metadata": {}, + "source": [ + "## The GX data quality platform" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "The Great Expectations data quality platform is comprised by:\n", + "* [GX Cloud](https://greatexpectations.io/gx-cloud), a fully managed SaaS solution, with web portal, and\n", + "* [GX Core](https://github.com/great-expectations/great_expectations), the open source Python framework.\n", + "\n", + "GX Cloud and GX Core can be used separately for a cloud-only or programmatic-only approach ([Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and [Cookbook 2](Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb) are an example of a Core-only workflow). However, using GX Core and GX Cloud *together* provides a solution in which GX Cloud serves as a single source of truth for data quality definition and application, and GX Core enables flexible integration of data validation into existing data stacks. Together, GX Cloud and GX Core enable you to achieve data quality definition, monitoring, and management using UI-based workflows, programmatic workflows, or hybrid workflows.\n", + "\n", + "The diagram below depicts different ways you might opt to use the platform (but is not exhaustive):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/diagrams/gx_cloud_core_architecture.png\",\n", + " alt=\"Example modes of working together with GX Cloud and GX Core\",\n", + " width=900,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## Cookbook workflow" + ] + }, + { + "cell_type": "markdown", + "id": "10", + "metadata": {}, + "source": [ + "In this cookbook, you will use GX Core, GX Cloud, and Airflow to define data quality for sample data, run data validation, and explore the results of data validation. The key steps are:\n", + "1. Define your Data Asset, Expectations, and Checkpoint programmatically with GX Core\n", + "2. Store the GX workflow configuration in your GX Cloud organization\n", + "3. Trigger data validation from an Airflow pipeline\n", + "4. Explore data validation results in GX Cloud\n", + "\n", + "The diagram below depicts, in more detail, the underlying interactions of GX Core, GX Cloud, Airflow, and the sample data Postgres database. As you work through this cookbook, you'll implement each of these interactions." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/diagrams/cookbook3_workflow.png\",\n", + " alt=\"GX Cloud and GX Core interactions in Cookbook3\",\n", + " width=900,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "12", + "metadata": {}, + "source": [ + "## Verify GX Cloud connectivity" + ] + }, + { + "cell_type": "markdown", + "id": "13", + "metadata": {}, + "source": [ + "Before working through the tutorial, check that your GX Cloud organization credentials are available in this notebook environment, and log in to GX Cloud." + ] + }, + { + "cell_type": "markdown", + "id": "14", + "metadata": {}, + "source": [ + "### Check that GX Cloud credentials are defined" + ] + }, + { + "cell_type": "markdown", + "id": "15", + "metadata": {}, + "source": [ + "Valid GX Cloud organization credentials need to be provided for GX Core to persist workflow configuration and validation results to GX Cloud. Run the code below to check that your credentials are availabe in this notebook environment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [], + "source": [ + "if tutorial.cloud.gx_cloud_credentials_exist():\n", + " print(\n", + " \"Found stored credentials in the GX_CLOUD_ORGANIZATION_ID and GX_CLOUD_ACCESS_TOKEN environment variables.\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "17", + "metadata": {}, + "source": [ + "```{warning} GX Cloud credential error\n", + "If `tutorial.cloud.check_for_gx_cloud_credentials_exist()` rasies a `ValueError` indicating that `GX_CLOUD_ORGANIZATION_ID` or `GX_CLOUD_ACCESS_TOKEN` is undefined, ensure that you have provided your GX Cloud organization id and access token when starting Docker compose.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "18", + "metadata": {}, + "source": [ + "### Log into GX Cloud" + ] + }, + { + "cell_type": "markdown", + "id": "19", + "metadata": {}, + "source": [ + "In a separate browser window or tab, log in to [GX Cloud](https://hubs.ly/Q02TyCZS0)." + ] + }, + { + "cell_type": "markdown", + "id": "20", + "metadata": {}, + "source": [ + "## Connect to source data" + ] + }, + { + "cell_type": "markdown", + "id": "21", + "metadata": {}, + "source": [ + "In this tutorial, you will validate customer profile information that is hosted in a publicly available Postgres database, provided by GX. The customer profile data extends the sample customer data used in [Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb). Data for each customer includes their age (in years) and annual income (in USD)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [], + "source": [ + "pd.read_sql_query(\n", + " \"select count(*) from customer_profile\",\n", + " con=tutorial.db.get_cloud_postgres_engine(),\n", + ").iloc[0][\"count\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "23", + "metadata": {}, + "outputs": [], + "source": [ + "pd.read_sql_query(\n", + " \"select * from customer_profile limit 5\",\n", + " con=tutorial.db.get_cloud_postgres_engine(),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "24", + "metadata": {}, + "source": [ + "## Profile source data and determine data quality checks" + ] + }, + { + "cell_type": "markdown", + "id": "25", + "metadata": {}, + "source": [ + "The scenario explored in this cookbook assumes that the data has been vetted for schema adherence and completeness. Notably, all rows contain required fields and data is non-null and in a valid format.\n", + "\n", + "The Expectations that you create will assess the distribution of the customer profile dataset - representative of data testing performed before using data for analysis or machine learning purposes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "26", + "metadata": {}, + "outputs": [], + "source": [ + "tutorial.cookbook3.visualize_customer_age_distribution()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27", + "metadata": {}, + "outputs": [], + "source": [ + "tutorial.cookbook3.visualize_customer_income_distribution()" + ] + }, + { + "cell_type": "markdown", + "id": "28", + "metadata": {}, + "source": [ + "You will use the following Expectations in this cookbook to validate distribution of the sample customer profile data:\n", + "* The minimum customer age is between 20 and 25 years\n", + "* The maximum customer age is 85 years or younger\n", + "* The median customer annual income between 45k and 50k, with a standard deviation of 10k." + ] + }, + { + "cell_type": "markdown", + "id": "29", + "metadata": {}, + "source": [ + "## GX validation workflow" + ] + }, + { + "cell_type": "markdown", + "id": "30", + "metadata": {}, + "source": [ + "The GX data validation workflow was introduced in [Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and [Cookbook 2](Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb); refer to these cookbooks for provided walkthroughs of the following GX components:\n", + "* Data definition: Data Source, Data Asset, Batch Definition, Batch\n", + "* Data quality definition: Expectation, Expectation Suite\n", + "* Data Validation: Validation Definition, Checkpoint, Validation Result\n", + "\n", + "This cookbook will provide additional detail on the Data Context and discuss the choice of Data Context when introducing GX Cloud into your data validation workflow." + ] + }, + { + "cell_type": "markdown", + "id": "31", + "metadata": {}, + "source": [ + "### Ephemeral and Cloud Data Contexts" + ] + }, + { + "cell_type": "markdown", + "id": "32", + "metadata": {}, + "source": [ + "All GX Core workflows start with the creation of a Data Contex. A Data Context is the Python object that serves as an entrypoint for the GX Core Python library, and it also manages the settings and metadata for your GX workflow.\n", + "\n", + "* An **Ephemeral Data Context** stores the configuration of your GX workflow in memory. Workflow configurations do not persist beyond the current notebook or Python session.\n", + "\n", + " ```\n", + " context = gx.get_context(mode=\"ephemeral\")\n", + " ```\n", + "\n", + "* A **Cloud Data Context** stores the configuration of your GX workflow in GX Cloud. Configurations stored in GX Cloud are accessible by others in your organization and can be used across sessions and mediums - in Python notebooks, Python scripts, and orchestrators that support Python. When creating a Cloud Data Context, you need to provide credentials for the specific GX Cloud organization that you want to use.\n", + "\n", + " ```\n", + " context = gx.get_context(\n", + " mode=\"cloud\",\n", + " cloud_organization_id=\"\",\n", + " cloud_access_token=\"\"\n", + " )\n", + " ```\n", + "\n", + "For additional detail on Data Contexts, see [Create a Data Context](https://docs.greatexpectations.io/docs/core/set_up_a_gx_environment/create_a_data_context) in the GX Core documentation." + ] + }, + { + "cell_type": "markdown", + "id": "33", + "metadata": {}, + "source": [ + "The `gx.get_context()` method, when called with no arguments, will auto-discover your GX Cloud organization id and access token credentials if they are available as the `GX_CLOUD_ORGANIZATION_ID` and `GX_CLOUD_ACCESS_TOKEN` environment variables, respectively." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "34", + "metadata": {}, + "outputs": [], + "source": [ + "context = gx.get_context()\n", + "\n", + "if (os.getenv(\"GX_CLOUD_ORGANIZATION_ID\", None) is not None) and (\n", + " os.getenv(\"GX_CLOUD_ACCESS_TOKEN\", None) is not None\n", + "):\n", + " assert isinstance(context, gx.data_context.CloudDataContext)\n", + " print(\"GX Cloud credentials found, created CloudDataContext.\")" + ] + }, + { + "cell_type": "markdown", + "id": "35", + "metadata": {}, + "source": [ + "### Define validation workflow and persist configuration in GX Cloud" + ] + }, + { + "cell_type": "markdown", + "id": "36", + "metadata": {}, + "source": [ + "```{admonition} Reminder: Adding GX components to the Data Context\n", + "GX components are unique on name. Once a component is created with the Data Context, adding another component with the same name will cause an error. To enable repeated execution of cookbook cells that add GX workflow components, you will see the following pattern:\n", + "\n", + " try:\n", + " Add a new component(s) to the context\n", + " except:\n", + " Get component(s) from the context by name, or delete and recreate the component(s)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "37", + "metadata": {}, + "source": [ + "#### Create the GX Data Asset" + ] + }, + { + "cell_type": "markdown", + "id": "38", + "metadata": {}, + "source": [ + "Create the Cloud Data Context and the initial components that define a Data Asset for the sample customer profile data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39", + "metadata": {}, + "outputs": [], + "source": [ + "# Create the Cloud Data Context.\n", + "context = gx.get_context()\n", + "\n", + "# Create the Data Source, Data Asset, and Batch Definition.\n", + "try:\n", + " data_source = context.data_sources.add_postgres(\n", + " \"GX tutorial\", connection_string=tutorial.db.get_gx_postgres_connection_string()\n", + " )\n", + " data_asset = data_source.add_table_asset(\n", + " name=\"customer profiles\", table_name=\"customer_profile\"\n", + " )\n", + " batch_definition = data_asset.add_batch_definition_whole_table(\n", + " \"customer profiles batch definition\"\n", + " )\n", + "\n", + "except:\n", + " data_source = context.data_sources.get(\"GX tutorial\")\n", + " data_asset = data_source.get_asset(name=\"customer profiles\")\n", + " batch_definition = data_asset.get_batch_definition(\n", + " \"customer profiles batch definition\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "40", + "metadata": {}, + "source": [ + "#### Examine the Data Asset in GX Cloud" + ] + }, + { + "cell_type": "markdown", + "id": "41", + "metadata": {}, + "source": [ + "Since the Cloud Data Context was used to create the Data Source and Data Asset, you will now see these components in your GX Cloud organization. View the Data Asset in the [GX Cloud UI](https://hubs.ly/Q02TyCZS0)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "42", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_new_data_asset.png\",\n", + " alt=\"Data Asset created in GX Cloud using a Cloud Data Context\",\n", + " width=900,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "43", + "metadata": {}, + "source": [ + "You will see that the newly created Data Asset does not contain any Expectations or Validation Results yet." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "44", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_new_asset_no_expectations_validations.png\",\n", + " alt=\"A Data Asset newly created with GX Core does not yet have Expectations or Validation Results in GX Cloud\",\n", + " width=700,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "45", + "metadata": {}, + "source": [ + "#### Add Expectations and a Checkpoint to the workflow" + ] + }, + { + "cell_type": "markdown", + "id": "46", + "metadata": {}, + "source": [ + "Continue to build your GX validation workflow, adding the Expectation Suite, Expectations, Validation Definition, and Checkpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47", + "metadata": {}, + "outputs": [], + "source": [ + "EXPECTATION_SUITE_NAME = \"Customer profile expectations\"\n", + "VALIDATION_DEFINTION_NAME = \"Customer profile validation definition\"\n", + "CHECKPOINT_NAME = \"Customer profile checkpoint\"\n", + "\n", + "\n", + "def create_gx_validation_workflow_components(\n", + " expectation_suite_name: str, validation_definition_name: str, checkpoint_name: str\n", + ") -> gx.Checkpoint:\n", + " \"\"\"Create the Expectation Suite, Validation Definition, and Checkpoint for the Cookbook 3 workflow.\n", + "\n", + " Returns:\n", + " GX Checkpoint object\n", + " \"\"\"\n", + "\n", + " # Create the Expectation Suite.\n", + " expectation_suite = context.suites.add(\n", + " gx.ExpectationSuite(name=EXPECTATION_SUITE_NAME)\n", + " )\n", + "\n", + " # Add Expectations to Expectation Suite.\n", + " expectations = [\n", + " gxe.ExpectColumnMinToBeBetween(column=\"age\", min_value=20, max_value=25),\n", + " gxe.ExpectColumnMaxToBeBetween(column=\"age\", max_value=90),\n", + " gxe.ExpectColumnMedianToBeBetween(\n", + " column=\"annual_income_usd\", min_value=45_000, max_value=50_000\n", + " ),\n", + " gxe.ExpectColumnStdevToBeBetween(\n", + " column=\"annual_income_usd\", min_value=10_000, max_value=10_000\n", + " ),\n", + " ]\n", + "\n", + " for expectation in expectations:\n", + " expectation_suite.add_expectation(expectation)\n", + "\n", + " expectation_suite.save()\n", + "\n", + " # Create the Validation Definition.\n", + " validation_definition = context.validation_definitions.add(\n", + " gx.ValidationDefinition(\n", + " name=VALIDATION_DEFINTION_NAME,\n", + " data=batch_definition,\n", + " suite=expectation_suite,\n", + " )\n", + " )\n", + "\n", + " # Create the Checkpoint.\n", + " checkpoint = context.checkpoints.add(\n", + " gx.Checkpoint(\n", + " name=CHECKPOINT_NAME,\n", + " validation_definitions=[validation_definition],\n", + " )\n", + " )\n", + "\n", + " return checkpoint\n", + "\n", + "\n", + "# Create (or recreate: delete & create) the cookbook workflow components.\n", + "try:\n", + " checkpoint = create_gx_validation_workflow_components(\n", + " expectation_suite_name=EXPECTATION_SUITE_NAME,\n", + " validation_definition_name=VALIDATION_DEFINTION_NAME,\n", + " checkpoint_name=CHECKPOINT_NAME,\n", + " )\n", + "\n", + "except:\n", + " context.checkpoints.delete(name=CHECKPOINT_NAME)\n", + " context.validation_definitions.delete(name=VALIDATION_DEFINTION_NAME)\n", + " expectation_suite = context.suites.delete(name=EXPECTATION_SUITE_NAME)\n", + "\n", + " checkpoint = create_gx_validation_workflow_components(\n", + " expectation_suite_name=EXPECTATION_SUITE_NAME,\n", + " validation_definition_name=VALIDATION_DEFINTION_NAME,\n", + " checkpoint_name=CHECKPOINT_NAME,\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "48", + "metadata": {}, + "source": [ + "#### Examine Expectations in GX Cloud" + ] + }, + { + "cell_type": "markdown", + "id": "49", + "metadata": {}, + "source": [ + "Examine the newly added Expectations in the [GX Cloud UI](https://hubs.ly/Q02TyCZS0). You will see the GX Core-created Expectations under the **Cloud API** section." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "50", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_expectations_added.png\",\n", + " alt=\"GX Cloud display of Expectations added using GX Core\",\n", + " width=800,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "51", + "metadata": {}, + "source": [ + "### Validate the sample data" + ] + }, + { + "cell_type": "markdown", + "id": "52", + "metadata": {}, + "source": [ + "The GX workflow configuration is now persisted in your GX Cloud organization, accessible via the Cloud Data Context. Run the Checkpoint to validate the sample customer profile data, and then explore the Validation Results in GX Cloud." + ] + }, + { + "cell_type": "markdown", + "id": "53", + "metadata": {}, + "source": [ + "#### Run the Checkpoint" + ] + }, + { + "cell_type": "markdown", + "id": "54", + "metadata": {}, + "source": [ + "Run the Checkpoint to validate the sample data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "55", + "metadata": {}, + "outputs": [], + "source": [ + "checkpoint_result = checkpoint.run()" + ] + }, + { + "cell_type": "markdown", + "id": "56", + "metadata": {}, + "source": [ + "#### View results in GX Cloud" + ] + }, + { + "cell_type": "markdown", + "id": "57", + "metadata": {}, + "source": [ + "The Validation Result object can be extracted from the returned Checkpoint Result object. When produced using a Cloud Data Context, the Validation Result object provides a `result_url` field that contains a direct link to your Validation Results in GX Cloud." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "58", + "metadata": {}, + "outputs": [], + "source": [ + "validation_result = checkpoint_result.run_results[\n", + " list(checkpoint_result.run_results.keys())[0]\n", + "]\n", + "\n", + "print(\n", + " f\"Click this link to view your Validation Results in GX Cloud:\\n\\n{validation_result.result_url}\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "59", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_linked_validation_results.png\",\n", + " alt=\"Validation Results of Expectations against sample customer profile data\",\n", + " width=800,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "60", + "metadata": {}, + "source": [ + "The Validation Results for the `Customer profile expectations` suite inform you that three out of four Expectations passed. The Expectation that the standard deviation of customer annual income is 10k failed - the results indicate that the observed standard deviation is slightly lower, about $9.6k.\n", + "\n", + "For the purposes of this tutorial, it is important that an Expectation failed, rather than why it failed, so that you can experience the exploration of both passing and failing results in GX Cloud." + ] + }, + { + "cell_type": "markdown", + "id": "61", + "metadata": {}, + "source": [ + "## Integrate GX Cloud validation in the Airflow DAG" + ] + }, + { + "cell_type": "markdown", + "id": "62", + "metadata": {}, + "source": [ + "You have run data validation from this notebook, next, you will run data validation within an Airflow DAG." + ] + }, + { + "cell_type": "markdown", + "id": "63", + "metadata": {}, + "source": [ + "### Inspect DAG code" + ] + }, + { + "cell_type": "markdown", + "id": "64", + "metadata": {}, + "source": [ + "Examine the DAG code below that defines the `cookbook3_validate_postgres_table_data` pipeline. The key actions of the code are:\n", + "* Fetch and run the GX Cloud Checkpoint.\n", + "\n", + " ```\n", + " context = gx.get_context()\n", + " \n", + " checkpoint = context.checkpoints.get(\"Customer profile checkpoint\")\n", + " \n", + " checkpoint_result = checkpoint.run()\n", + " ```\n", + "\n", + " * Note that the code assumes that the GX Cloud credentials have been made available in the Airflow environment so that `gx.get_context()` returns a Cloud Data Context.\n", + " * This code snippet, customized for your desired Checkpoint, can be retrieved from GX Cloud using the validation code snippet feature. See the next section of this cookbook for more detail.\n", + "\n", + "* Extract and log the result of validation and GX Cloud results url.\n", + "\n", + " ```\n", + " validation_result = checkpoint_result.run_results[\n", + " list(checkpoint_result.run_results.keys())[0]\n", + " ]\n", + "\n", + " if validation_result[\"success\"]:\n", + " log.info(f\"Validation succeeded: {validation_result.result_url}\")\n", + " else:\n", + " log.warning(f\"Validation failed: {validation_result.result_url}\")\n", + " ```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "65", + "metadata": {}, + "outputs": [], + "source": [ + "%pycat inspect.getsource(dag)" + ] + }, + { + "cell_type": "markdown", + "id": "66", + "metadata": {}, + "source": [ + "### GX Cloud validation code snippet" + ] + }, + { + "cell_type": "markdown", + "id": "67", + "metadata": {}, + "source": [ + "GX Cloud will generate a validation code snippet, which provides the code needed to run a GX Cloud Checkpoint using GX Core. The validation code snippet can be copy-pasted within an Airflow DAG to trigger a Checkpoint run. \n", + "\n", + "1. Navigate to the Validations tab of your Data Asset.\n", + "2. Click the **Use code snippet** button `` directly to the right of the **Validate** button.\n", + "3. Click **Generate Snippet**.\n", + "\n", + "This displays the Validation Expectations dialog box, which contains a GX Core 1.0.x code snippet that has been populated with the name of your Checkpoint. For instance, for the Checkpoint created by this cookbook, you'll see the following snippet:\n", + "```\n", + "import great_expectations as gx\n", + "\n", + "context = gx.get_context()\n", + "\n", + "checkpoint = context.checkpoints.get(\"Customer profile checkpoint\")\n", + "\n", + "checkpoint.run()\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "68", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_validation_code_snippet.png\",\n", + " alt=\"Generate the validation code snippet in GX Cloud\",\n", + " width=800,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "69", + "metadata": {}, + "source": [ + "### View the Airflow pipeline" + ] + }, + { + "cell_type": "markdown", + "id": "70", + "metadata": {}, + "source": [ + "To view the `cookbook3_validate_postgres_table_data` pipeline in the Airflow UI, log into the locally running Airflow instance.\n", + "\n", + "1. Open [http://localhost:8080/](http://localhost:8080/) in a browser window.\n", + "2. Log in with these credentials:\n", + " * Username: `admin`\n", + " * Password: `gx`\n", + "\n", + "You will see the pipeline under **DAGs** on login." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "71", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Video(\"img/screencaptures/log_in_to_airflow.mp4\")" + ] + }, + { + "cell_type": "markdown", + "id": "72", + "metadata": {}, + "source": [ + "### Trigger the pipeline" + ] + }, + { + "cell_type": "markdown", + "id": "73", + "metadata": {}, + "source": [ + "You can trigger the DAG from this notebook, using the provided convenience function in the cell below, or you can trigger the DAG manually in the Airflow UI." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "74", + "metadata": {}, + "outputs": [], + "source": [ + "tutorial.airflow.trigger_airflow_dag_and_wait_for_run(\n", + " \"cookbook3_validate_postgres_table_data\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "75", + "metadata": {}, + "source": [ + "To trigger the `cookbook3_validate_postgres_table_data` DAG from the Airflow UI, click the **Trigger DAG** button (with a play icon) under *Actions*. This will queue the DAG and it will execute shortly. The successful run is indicated by the run count inside the green circle under **Runs**. The triggering of a similar DAG is shown in the clip below." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Video(\"img/screencaptures/trigger_airflow_dag.mp4\")" + ] + }, + { + "cell_type": "markdown", + "id": "77", + "metadata": {}, + "source": [ + "The `cookbook3_validate_postgres_table_data` DAG can be rerun multiple times; you can experiment with running it from this notebook or from the Airflow UI. " + ] + }, + { + "cell_type": "markdown", + "id": "78", + "metadata": {}, + "source": [ + "### View pipeline results" + ] + }, + { + "cell_type": "markdown", + "id": "79", + "metadata": {}, + "source": [ + "Once the pipeline has been run, Validation Results are available in GX Cloud. You can either go directly to the GX Cloud UI, or access the link from the pipeline logs. To access the pipeline run logs in the Airflow UI:\n", + "\n", + "1. On the DAGs screen, click on the run(s) of interest under Runs.\n", + "2. Click the name of the individual run you want to examine. This will load the DAG execution details.\n", + "3. Click the Graph tab, and then the `cookbook3_validate_postgres_table_data` task box on the visual rendering.\n", + "4. Click the Logs tab to load the DAG logs. The link to the GX Cloud results will be in the log output." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "80", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Video(\n", + " \"img/screencaptures/cookbook3_view_pipeline_results.mp4\", width=1000\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "81", + "metadata": {}, + "source": [ + "## Review and take action on validation results in GX Cloud" + ] + }, + { + "cell_type": "markdown", + "id": "82", + "metadata": {}, + "source": [ + "### Review validation results" + ] + }, + { + "cell_type": "markdown", + "id": "83", + "metadata": {}, + "source": [ + "When you integrate data validation into your pipeline using GX, GX Cloud provides a central UI to review and share validation results; result output is not limited to pipeline log messages.\n", + "\n", + "Data validation results are shown in GX Cloud on the **Validations** tab of a Data Asset. You can access these results using a direct link (as shown in this cookbook), or by navigating within the GX Cloud UI.\n", + "\n", + "In addition to the results of individual runs, the Validations tab provides a historical view of your data validation results over multiple runs. This consolidated view contributes to an improved understanding and monitoring of your Data Asset health and quality over time, rather than relying on point-in-time assessments of data quality." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "84", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_gx_cloud_validations_tab_over_time.png\",\n", + " alt=\"GX Cloud Validations tab actions: Alert, Share, Validate\",\n", + " width=800,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "85", + "metadata": {}, + "source": [ + "### Take action on validation results" + ] + }, + { + "cell_type": "markdown", + "id": "86", + "metadata": {}, + "source": [ + "GX Cloud enables you to take action on results generated by validation in your data pipeline. The key capabilities are Alerting, Sharing, and in-app triggering of Validation." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "87", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_gx_cloud_validations_tab_actions.png\",\n", + " alt=\"GX Cloud Validations tab actions: Alert, Share, Validate\",\n", + " width=400,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "88", + "metadata": {}, + "source": [ + "Alerting is enabled by default on newly created Data Assets. If any Validations fail, then you will receive an email that notifies you of the failure and provides a direct link to the failing validation run." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "89", + "metadata": {}, + "outputs": [], + "source": [ + "IPython.display.Image(\n", + " \"img/screencaptures/cookbook3_validation_failure_email_alert.png\",\n", + " alt=\"GX Cloud email alert for data validation failure\",\n", + " width=800,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "90", + "metadata": {}, + "source": [ + "Results can easily be shared with others in your organization. Once individuals have been [added to your GX Cloud organization](https://docs.greatexpectations.io/docs/cloud/users/manage_users#invite-a-user), then you can provide a Share link that takes them directly to the validation run of interest.\n", + "\n", + "Validation can be triggered manually from the GX Cloud UI, enabling data developers and other stakeholders to revalidate data without needing to modify the existing data pipeline operation." + ] + }, + { + "cell_type": "markdown", + "id": "91", + "metadata": {}, + "source": [ + "## Summary" + ] + }, + { + "cell_type": "markdown", + "id": "92", + "metadata": {}, + "source": [ + "This cookbook has walked you through the process of defining a validation workflow with GX Core, persisting the worfklow configuration in GX Cloud, integrating data validation in an Airflow pipeline, and then accessing and taking action on validation results in GX Cloud.\n", + "\n", + "[Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb), [Cookbook 2](Cookbook_2_Validate_data_during_ingestion_take_action_on_failures.ipynb), and Cookbook 3 (this cookbook) have demonstrated how you can integrate data validation in a Python-enabled orchestrator using GX. While the cookbook examples have used Airflow DAGs, the same principles will apply when using GX in other orchestrators, such as Dagster, Prefect, or any other orchestrator that supports Python code." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "93", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/cookbooks/airflow_pipeline_output/.gitkeep b/cookbooks/airflow_pipeline_output/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/cookbooks/img/diagrams/cookbook3_workflow.drawio b/cookbooks/img/diagrams/cookbook3_workflow.drawio new file mode 100644 index 0000000..25f204d --- /dev/null +++ b/cookbooks/img/diagrams/cookbook3_workflow.drawio @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cookbooks/img/diagrams/cookbook3_workflow.png b/cookbooks/img/diagrams/cookbook3_workflow.png new file mode 100644 index 0000000..22bba52 Binary files /dev/null and b/cookbooks/img/diagrams/cookbook3_workflow.png differ diff --git a/cookbooks/img/diagrams/gx_cloud_core_architecture.drawio b/cookbooks/img/diagrams/gx_cloud_core_architecture.drawio new file mode 100644 index 0000000..631a05f --- /dev/null +++ b/cookbooks/img/diagrams/gx_cloud_core_architecture.drawio @@ -0,0 +1,202 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cookbooks/img/diagrams/gx_cloud_core_architecture.png b/cookbooks/img/diagrams/gx_cloud_core_architecture.png new file mode 100644 index 0000000..51452f0 Binary files /dev/null and b/cookbooks/img/diagrams/gx_cloud_core_architecture.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_expectations_added.png b/cookbooks/img/screencaptures/cookbook3_expectations_added.png new file mode 100644 index 0000000..79ccdc9 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_expectations_added.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_gx_cloud_validations_tab_actions.png b/cookbooks/img/screencaptures/cookbook3_gx_cloud_validations_tab_actions.png new file mode 100644 index 0000000..9258fbb Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_gx_cloud_validations_tab_actions.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_gx_cloud_validations_tab_over_time.png b/cookbooks/img/screencaptures/cookbook3_gx_cloud_validations_tab_over_time.png new file mode 100644 index 0000000..2b9bef4 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_gx_cloud_validations_tab_over_time.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_linked_validation_results.png b/cookbooks/img/screencaptures/cookbook3_linked_validation_results.png new file mode 100644 index 0000000..a5bf210 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_linked_validation_results.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_new_asset_no_expectations_validations.png b/cookbooks/img/screencaptures/cookbook3_new_asset_no_expectations_validations.png new file mode 100644 index 0000000..01b8028 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_new_asset_no_expectations_validations.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_new_data_asset.png b/cookbooks/img/screencaptures/cookbook3_new_data_asset.png new file mode 100644 index 0000000..5d35c25 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_new_data_asset.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_validation_code_snippet.png b/cookbooks/img/screencaptures/cookbook3_validation_code_snippet.png new file mode 100644 index 0000000..83151e2 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_validation_code_snippet.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_validation_failure_email_alert.png b/cookbooks/img/screencaptures/cookbook3_validation_failure_email_alert.png new file mode 100644 index 0000000..e04a054 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_validation_failure_email_alert.png differ diff --git a/cookbooks/img/screencaptures/cookbook3_view_pipeline_results.mp4 b/cookbooks/img/screencaptures/cookbook3_view_pipeline_results.mp4 new file mode 100644 index 0000000..782ba44 Binary files /dev/null and b/cookbooks/img/screencaptures/cookbook3_view_pipeline_results.mp4 differ diff --git a/cookbooks/tutorial_code/__init__.py b/cookbooks/tutorial_code/__init__.py index 3022952..5260030 100644 --- a/cookbooks/tutorial_code/__init__.py +++ b/cookbooks/tutorial_code/__init__.py @@ -4,12 +4,15 @@ import warnings import tutorial_code.airflow as airflow +import tutorial_code.cloud as cloud import tutorial_code.cookbook1 as cookbook1 import tutorial_code.cookbook2 as cookbook2 +import tutorial_code.cookbook3 as cookbook3 import tutorial_code.db as db -# Filter DeprecationWarnings, some older libraries are intentionally pinned for Airflow compatibility. +# Filter Deprecation/FutureWarnings, some older libraries are intentionally pinned for Airflow and Altair compatibility. warnings.filterwarnings("ignore", category=DeprecationWarning) +warnings.filterwarnings("ignore", category=FutureWarning) # Set explicit logging levels, importing the airflow module code imported by in the # notebooks causes a change in logging levels for GX. diff --git a/cookbooks/tutorial_code/cloud.py b/cookbooks/tutorial_code/cloud.py new file mode 100644 index 0000000..e70e000 --- /dev/null +++ b/cookbooks/tutorial_code/cloud.py @@ -0,0 +1,33 @@ +"""Helper functions for tutorial notebooks to interact with GX Cloud.""" + +import os + + +def gx_cloud_credentials_exist() -> bool: + """Checks for the presence of the GX Cloud organization id and access token. + + * The organization id should be provided in the GX_CLOUD_ORGANIZATION_ID environment variable. + * The access token should be provided in the GX_CLOUD_ACCESS_TOKEN environment variable. + + Raises: + ValueError if either the GX_CLOUD_ORGANIZATION_ID or GX_CLOUD_ACCESS_TOKEN environment variable is undefined or contains a null/empty string value. + + Returns: + True if credentials are found + """ + + # Check for organization id. + organization_id = os.getenv("GX_CLOUD_ORGANIZATION_ID", None) + if (organization_id is None) or (organization_id == ""): + raise ValueError( + "GX_CLOUD_ORGANIZATION_ID environment variable is undefined. Use this environment variable to provide your GX Cloud organization id." + ) + + # Check for access token. + access_token = os.getenv("GX_CLOUD_ACCESS_TOKEN", None) + if (access_token is None) or (access_token == ""): + raise ValueError( + "GX_CLOUD_ACCESS_TOKEN environment variable is undefined. Use this environment variable to provide your GX Cloud access token." + ) + + return True diff --git a/cookbooks/tutorial_code/cookbook1.py b/cookbooks/tutorial_code/cookbook1.py index 1260cc1..f623612 100644 --- a/cookbooks/tutorial_code/cookbook1.py +++ b/cookbooks/tutorial_code/cookbook1.py @@ -39,11 +39,10 @@ def clean_customer_data(df_original: pd.DataFrame) -> pd.DataFrame: } df["country"] = df["country"].apply(lambda x: COUNTRY_NAME_TO_CODE[x]) - df["dob"] = pd.to_datetime(df["dob"]) df["city"] = df["city"].apply(lambda x: x.title()) # Format final dataframe. - RETAIN_COLUMNS = ["customer_id", "name", "dob", "city", "state", "zip", "country"] + RETAIN_COLUMNS = ["customer_id", "name", "city", "state", "zip", "country"] df = df[RETAIN_COLUMNS] return df @@ -55,7 +54,7 @@ def validate_customer_data( """Run GX data validation on sample customer data for Cookbook 1 and DAG, and return Validation Result.""" # Get GX context. - context = gx.get_context() + context = gx.get_context(mode="ephemeral") # Create Data Source, Data Asset, Batch Definition, and get Batch. data_source = context.data_sources.add_pandas("pandas") @@ -75,7 +74,6 @@ def validate_customer_data( column_list=[ "customer_id", "name", - "dob", "city", "state", "zip", @@ -87,7 +85,6 @@ def validate_customer_data( gxe.ExpectColumnValuesToBeOfType(column=x, type_="str") for x in ["name", "city", "state", "zip"] ], - gxe.ExpectColumnValuesToMatchRegex(column="dob", regex=r"^\d{4}-\d{2}-\d{2}$"), gxe.ExpectColumnValuesToBeInSet( column="country", value_set=["AU", "CA", "DE", "FR", "GB", "IT", "NL", "US"] ), diff --git a/cookbooks/tutorial_code/cookbook2.py b/cookbooks/tutorial_code/cookbook2.py index 372899f..1530abf 100644 --- a/cookbooks/tutorial_code/cookbook2.py +++ b/cookbooks/tutorial_code/cookbook2.py @@ -300,7 +300,7 @@ def validate_product_data( """ # Get GX context. - context = gx.get_context() + context = gx.get_context(mode="ephemeral") # Create the Data Source. data_source = context.data_sources.add_pandas("pandas") diff --git a/cookbooks/tutorial_code/cookbook3.py b/cookbooks/tutorial_code/cookbook3.py new file mode 100644 index 0000000..9d0b63d --- /dev/null +++ b/cookbooks/tutorial_code/cookbook3.py @@ -0,0 +1,130 @@ +"""Helper functions for Cookbook 3.""" + +import altair as alt +import pandas as pd +import tutorial_code as tutorial + +CUSTOMER_PROFILE_TABLE_NAME = "customer_profile" +BINS = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100] +CHART_WIDTH = 600 +CHART_HEIGHT = 300 + + +def _load_customer_profile_data() -> pd.DataFrame: + """Return sample customer profile data as pandas dataframe.""" + + return pd.read_sql_query( + "select * from customer_profile", + con=tutorial.db.get_cloud_postgres_engine(), + ) + + +def _format_chart(chart: alt.Chart, chart_title: str) -> alt.Chart: + """Standardize chart formatting.""" + + return ( + chart.properties(title=chart_title, height=300, width=600) + .configure_title(fontSize=18, anchor="start", color="gray") + .configure_axis(grid=False) + .configure_view(strokeWidth=0) + ) + + +def visualize_customer_age_distribution() -> alt.Chart: + """Return histogram visualization of customer age data.""" + + # Fetch data. + df = tutorial.cookbook3._load_customer_profile_data() + + # Prep data for display. + df_hist = pd.cut(df["age"], bins=BINS).value_counts().reset_index() + df_hist = df_hist.rename(columns={"age": "binterval"}) + df_hist["bin_lower"] = df_hist["binterval"].apply(lambda x: x.left) + df_hist["bin_upper"] = df_hist["binterval"].apply(lambda x: x.right) + df_hist["bin_mid"] = df_hist.apply( + lambda row: (row["bin_lower"] + row["bin_upper"]) / 2, axis=1 + ) + df_hist["binterval_str"] = df_hist.apply( + lambda row: f"{row['bin_lower']}-{row['bin_upper']} years", axis=1 + ) + df_hist = ( + df_hist[["binterval_str", "bin_lower", "bin_mid", "bin_upper", "count"]] + .sort_values("bin_lower") + .reset_index(drop=True) + ) + + # Assemble chart. + chart = ( + alt.Chart(df_hist) + .mark_bar() + .encode( + alt.X("bin_mid", bin=True, axis=alt.Axis(title="Customer age")), + alt.Y("count", axis=alt.Axis(title="Number of customers")), + tooltip=[ + alt.Tooltip("binterval_str", title="Age"), + alt.Tooltip("count", title="Customer count", format=","), + ], + ) + ) + + return _format_chart(chart, chart_title="Customer age distribution") + + +def visualize_customer_income_distribution() -> alt.Chart: + """Return histogram visualization of customer age data.""" + + # Fetch data. + df = tutorial.cookbook3._load_customer_profile_data() + + # Prep data for display. + def convert_bin_to_tooltip(bin_lower: int, bin_upper: int) -> str: + if bin_lower == 0: + return "Less than $10k" + elif bin_upper == 100_000: + return f"$90k+" + else: + return f"${int(bin_lower/1_000)}k-{int(bin_upper/1_000)}k" + + df_hist = ( + pd.cut(df["annual_income_usd"], bins=[x * 1_000 for x in BINS]) + .value_counts() + .reset_index() + ) + df_hist = df_hist.rename(columns={"annual_income_usd": "binterval"}) + df_hist["bin_lower"] = df_hist["binterval"].apply(lambda x: x.left) + df_hist["bin_upper"] = df_hist["binterval"].apply(lambda x: x.right) + df_hist["bin_mid"] = df_hist.apply( + lambda row: (row["bin_lower"] + row["bin_upper"]) / 2, axis=1 + ) + df_hist["binterval_str"] = df_hist.apply( + lambda row: convert_bin_to_tooltip(row["bin_lower"], row["bin_upper"]), axis=1 + ) + df_hist = ( + df_hist[["binterval_str", "bin_lower", "bin_mid", "bin_upper", "count"]] + .sort_values("bin_lower") + .reset_index(drop=True) + ) + + # Assemble chart. + chart = ( + alt.Chart(df_hist) + .mark_bar() + .encode( + alt.X( + "bin_mid", + bin=True, + axis=alt.Axis( + title="Customer annual income (USD)", + labelExpr="'$' + format(datum.value, ',')", + labelAngle=-25, + ), + ), + alt.Y("count", axis=alt.Axis(title="Number of customers")), + tooltip=[ + alt.Tooltip("binterval_str", title="Annual income"), + alt.Tooltip("count", title="Customer count", format=","), + ], + ) + ) + + return _format_chart(chart, chart_title="Customer annual income distribution (USD)") diff --git a/cookbooks/tutorial_code/db.py b/cookbooks/tutorial_code/db.py index e881df8..b99b85e 100644 --- a/cookbooks/tutorial_code/db.py +++ b/cookbooks/tutorial_code/db.py @@ -1,4 +1,4 @@ -"""Helper functions for tutorial notebooks and DAGs to interact with the postgres database.""" +"""Helper functions for tutorial notebooks and DAGs to interact with Postgres.""" from typing import Dict, List @@ -9,12 +9,24 @@ "postgresql://gx_user:gx_user_password@postgres:5432/gx" ) +GX_PUBLIC_POSTGRES_CONNECTION_STRING = "postgresql+psycopg2://try_gx:try_gx@postgres.workshops.greatexpectations.io/gx_in_the_data_pipeline" + def get_local_postgres_engine() -> sqlalchemy.engine.Engine: """Return a sqlalchemy Engine for the tutorial local postgres database.""" return sqlalchemy.create_engine(TUTORIAL_POSTGRES_CONNECTION_STRING) +def get_gx_postgres_connection_string() -> str: + """Return the connection string for the GX public Postgres instance used for the tutorial.""" + return GX_PUBLIC_POSTGRES_CONNECTION_STRING + + +def get_cloud_postgres_engine() -> sqlalchemy.engine.Engine: + """Return a sqlalchemy Engine for the GX public postgres database.""" + return sqlalchemy.create_engine(GX_PUBLIC_POSTGRES_CONNECTION_STRING) + + def insert_ignore_dataframe_to_postgres( table_name: str, dataframe: pd.DataFrame ) -> int: diff --git a/docker-compose.yaml b/docker-compose.yaml index 515759a..6eae843 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -30,6 +30,8 @@ services: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow_user:airflow_user_password@postgres:5432/airflow AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__LOGGING__LOGGING_LEVEL: INFO + GX_CLOUD_ORGANIZATION_ID: ${GX_CLOUD_ORGANIZATION_ID} + GX_CLOUD_ACCESS_TOKEN: ${GX_CLOUD_ACCESS_TOKEN} volumes: - ./environment/airflow/dags:/usr/local/airflow/dags - ./cookbooks/tutorial_code:/usr/local/airflow/dags/tutorial_code @@ -57,6 +59,9 @@ services: build: context: ./environment/jupyterlab dockerfile: jupyterlab.Dockerfile + environment: + GX_CLOUD_ORGANIZATION_ID: ${GX_CLOUD_ORGANIZATION_ID} + GX_CLOUD_ACCESS_TOKEN: ${GX_CLOUD_ACCESS_TOKEN} networks: - gxnet ports: diff --git a/environment/airflow/airflow.Dockerfile b/environment/airflow/airflow.Dockerfile index 76b1595..dd6487f 100644 --- a/environment/airflow/airflow.Dockerfile +++ b/environment/airflow/airflow.Dockerfile @@ -1,4 +1,4 @@ -FROM quay.io/astronomer/astro-runtime:12.5.0-python-3.11-slim +FROM quay.io/astronomer/astro-runtime:12.6.0-python-3.11-slim USER root RUN apt-get update && apt-get -y install jq && rm -rf /var/lib/apt/lists/* diff --git a/environment/airflow/dags/cookbook3_validate_postgres_table_data.py b/environment/airflow/dags/cookbook3_validate_postgres_table_data.py new file mode 100644 index 0000000..1fc505b --- /dev/null +++ b/environment/airflow/dags/cookbook3_validate_postgres_table_data.py @@ -0,0 +1,50 @@ +import datetime +import logging + +import great_expectations as gx +from airflow import DAG +from airflow.operators.python import PythonOperator + +log = logging.getLogger("GX validation") + + +def cookbook3_validate_postgres_table_data(): + + # Fetch and run the GX Cloud Checkpoint. + context = gx.get_context() + + checkpoint = context.checkpoints.get("Customer profile checkpoint") + + checkpoint_result = checkpoint.run() + + # Extract and log the validation result and results url. + validation_result = checkpoint_result.run_results[ + list(checkpoint_result.run_results.keys())[0] + ] + + if validation_result["success"]: + log.info(f"Validation succeeded: {validation_result.result_url}") + else: + log.warning(f"Validation failed: {validation_result.result_url}") + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime.datetime.today(), +} + +gx_dag = DAG( + "cookbook3_validate_postgres_table_data", + default_args=default_args, + schedule="0 0 * * *", + catchup=False, +) + +run_gx_task = PythonOperator( + task_id="cookbook3_validate_postgres_table_data", + python_callable=cookbook3_validate_postgres_table_data, + dag=gx_dag, +) + +run_gx_task diff --git a/environment/airflow/requirements.txt b/environment/airflow/requirements.txt index 533228a..53b055c 100644 --- a/environment/airflow/requirements.txt +++ b/environment/airflow/requirements.txt @@ -1,3 +1,3 @@ apache-airflow-client==2.10.0 -great_expectations==1.0.6 +great_expectations==1.3.1 pandas==2.1.4 diff --git a/environment/jupyterlab/requirements.txt b/environment/jupyterlab/requirements.txt index 72e8f3a..cd0a167 100644 --- a/environment/jupyterlab/requirements.txt +++ b/environment/jupyterlab/requirements.txt @@ -1,7 +1,8 @@ +altair==4.2.2 apache-airflow==2.10.1 apache-airflow-client==2.10.0 black[jupyter]==24.8.0 -great_expectations==1.2.4 +great_expectations==1.3.1 isort==5.13.2 jupyterlab==4.2.5 jupyterlab_myst==2.4.2 diff --git a/environment/postgres/init_postgres.sql b/environment/postgres/init_postgres.sql index 608bddc..6550f61 100644 --- a/environment/postgres/init_postgres.sql +++ b/environment/postgres/init_postgres.sql @@ -22,7 +22,6 @@ end $$; create table public.customers ( customer_id bigint primary key, name text, - dob date, city text, state text, zip text, diff --git a/tests/conftest.py b/tests/conftest.py index 549dffb..7cf9814 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,59 +1,8 @@ -import os - -import great_expectations as gx import pytest -TEST_PREFIX = "ci-test-" - - -@pytest.fixture -def tutorial_db_connection_string() -> str: - """Return connection string for the tutorial database.""" - return "postgresql+psycopg2://gx_user:gx_user_password@postgres:5432/gx" - - -@pytest.fixture -def gx_cloud_context() -> ( - gx.data_context.data_context.cloud_data_context.CloudDataContext -): - """Return CloudDataContext connecting to the GX Cloud org defined by - GX_CLOUD_ORGANIZATION_ID and GX_CLOUD_ACCESS_TOKEN envs. - """ - org_id = os.environ.get("GX_CLOUD_ORGANIZATION_ID") - if org_id is None: - raise Exception("GX_CLOUD_ORGANIZATION_ID environment variable is not defined") - - access_token = os.environ.get("GX_CLOUD_ACCESS_TOKEN") - if access_token is None: - raise Exception("GX_CLOUD_ACCESS_TOKEN environment variable is not defined") - - return gx.get_context(cloud_organization_id=org_id, cloud_access_token=access_token) - - -@pytest.fixture -def gx_ephemeral_context() -> ( - gx.data_context.data_context.ephemeral_data_context.EphemeralDataContext -): - """Return Ephemeral Data Context.""" - return gx.get_context(mode="ephemeral") - - -@pytest.fixture -def tutorial_db_data_source( - gx_ephemeral_context, tutorial_db_connection_string -) -> gx.datasource.fluent.sql_datasource.SQLDatasource: - """Return Data Source that connects to the tutorial database.""" - context = gx_ephemeral_context - - DATA_SOURCE_NAME = f"{TEST_PREFIX}tutorial-db" - - try: - data_source = context.data_sources.add_postgres( - name=DATA_SOURCE_NAME, - connection_string=tutorial_db_connection_string, - ) - - except gx.exceptions.DataContextError: - data_source = context.get_datasource(DATA_SOURCE_NAME) - return data_source, context +@pytest.fixture(autouse=True) +def remove_gx_cloud_envvars(monkeypatch): + """Set testing environment to not contain GX Cloud credentials by default.""" + monkeypatch.delenv("GX_CLOUD_ORGANIZATION_ID", raising=False) + monkeypatch.delenv("GX_CLOUD_ACCESS_TOKEN", raising=False) diff --git a/tests/test_cloud.py b/tests/test_cloud.py new file mode 100644 index 0000000..20f62dc --- /dev/null +++ b/tests/test_cloud.py @@ -0,0 +1,26 @@ +"""Tests for GX Cloud helper functions.""" + +import pytest +import tutorial_code as tutorial + + +def test_gx_cloud_credentials_exist(monkeypatch): + """Test that function raises an error when credentials are not found.""" + + with pytest.raises( + ValueError, match=r"GX_CLOUD_ORGANIZATION_ID environment variable is undefined" + ): + result = tutorial.cloud.gx_cloud_credentials_exist() + + monkeypatch.setenv("GX_CLOUD_ORGANIZATION_ID", "") + + with pytest.raises( + ValueError, match=r"GX_CLOUD_ACCESS_TOKEN environment variable is undefined" + ): + result = tutorial.cloud.gx_cloud_credentials_exist() + + monkeypatch.setenv("GX_CLOUD_ACCESS_TOKEN", "") + + # GX Cloud credentials are found. + result = tutorial.cloud.gx_cloud_credentials_exist() + assert result is True diff --git a/tests/test_cookbook1.py b/tests/test_cookbook1.py index 9f17bdb..81fe94f 100644 --- a/tests/test_cookbook1.py +++ b/tests/test_cookbook1.py @@ -2,7 +2,6 @@ import datetime import shutil -import time import cookbooks.airflow_dags.cookbook1_ingest_customer_data as airflow_dag import great_expectations as gx @@ -63,7 +62,6 @@ def valid_cleaned_customer_data() -> pd.DataFrame: { "customer_id": 123, "name": "Cookie Monster", - "dob": datetime.datetime(1966, 11, 2), "city": "New York", "state": "NY", "zip": "10123", @@ -80,11 +78,10 @@ def invalid_cleaned_customer_data() -> pd.DataFrame: { "customer_id": 987, "name": "Oscar the Grouch", - "dob": "June 1, 1969", "city": "New York", "state": "NY", "zip": None, - "country": "US", + "country": "Sesame Street", } ] ) @@ -93,13 +90,12 @@ def invalid_cleaned_customer_data() -> pd.DataFrame: def test_clean_customer_data(raw_customer_data): df_cleaned = tutorial.cookbook1.clean_customer_data(raw_customer_data) - assert df_cleaned.shape == (2, 7) + assert df_cleaned.shape == (2, 6) customer0 = df_cleaned.to_dict(orient="records")[0] assert customer0 == { "customer_id": 1693133, "name": "Samuel Hall", - "dob": pd.Timestamp("1976-11-19"), "city": "Norcross", "state": "GA", "zip": "30091", @@ -110,7 +106,6 @@ def test_clean_customer_data(raw_customer_data): assert customer1 == { "customer_id": 887837, "name": "Ileen van Dael", - "dob": pd.Timestamp("1983-09-15"), "city": "Utrecht", "state": "UT", "zip": "3532 XR", @@ -149,7 +144,7 @@ def test_validate_customer_data_with_invalid_data(invalid_cleaned_customer_data) gx.core.expectation_validation_result.ExpectationSuiteValidationResult, ) assert validation_result["success"] is False - assert failed_expectations == ["expect_column_values_to_match_regex"] + assert failed_expectations == ["expect_column_values_to_be_in_set"] def test_airflow_dag_trigger(tmp_path, monkeypatch): diff --git a/tests/test_cookbook2.py b/tests/test_cookbook2.py index b2c7deb..443c8d2 100644 --- a/tests/test_cookbook2.py +++ b/tests/test_cookbook2.py @@ -260,7 +260,7 @@ def test_separate_valid_and_invalid_product_rows( [df_products_valid, df_products_invalid], axis=0 ).reset_index(drop=True) - context = gx.get_context() + context = gx.get_context(mode="ephemeral") context.data_sources.add_pandas("pandas") validation_result = tutorial.cookbook2._validate_products(context, df_products) diff --git a/tests/test_cookbook3.py b/tests/test_cookbook3.py new file mode 100644 index 0000000..450d295 --- /dev/null +++ b/tests/test_cookbook3.py @@ -0,0 +1,14 @@ +import altair as alt +import tutorial_code as tutorial + + +def test_visualize_customer_age_distribution(): + """Test that an altair chart is returned for distribution visualizations.""" + chart = tutorial.cookbook3.visualize_customer_age_distribution() + assert isinstance(chart, alt.Chart) + + +def test_visualize_customer_income_distribution(): + """Test that an altair chart is returned for distribution visualizations.""" + chart = tutorial.cookbook3.visualize_customer_income_distribution() + assert isinstance(chart, alt.Chart) diff --git a/utils/autoformat_tutorial_code.sh b/utils/autoformat_tutorial_code.sh index 3f4a9c9..e0924e4 100755 --- a/utils/autoformat_tutorial_code.sh +++ b/utils/autoformat_tutorial_code.sh @@ -1,3 +1,5 @@ #!/bin/bash -docker exec -t tutorial-gx-in-the-data-pipeline-jupyterlab bash -c 'isort --profile=black ./ /tests && black ./ /tests' +# docker exec -t tutorial-gx-in-the-data-pipeline-jupyterlab bash -c 'isort --profile=black ./ /tests && black ./ /tests' + +docker exec -t tutorial-gx-in-the-data-pipeline-jupyterlab bash -c 'black ./ /tests'