Using Airflow as an orchestrator and using dbt for transformations.
- Overview
- Airflow
- Data Build Tool
- Folder Structure
- Program Flow
- Program Execution
- Data Model
- Level Up
- Documentation and Material
- Tools and Technologies
- The purpose of doing this was to learn how the Tranformations step can be made easy using data build tool (dbt) and to use airflow to orchestrate.
- Check what different functionalities does dbt support that can save development time.
- Snowflake was used as a warehouse. Free 30 day trail when you sign up.
- Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows.
- Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology.
- A web interface helps manage the state of your workflows.
- Follow these steps to execute dbt jobs using dbt.
-
Python, git should be installed.
-
This will create a virtual env
python3 -m venv airflow_env
- Activate the env
- Remember to be in the correct folder.
source airflow_env/bin/activate
- Update and install some packages
sudo apt-get update
sudo apt-add-repository universe
sudo apt-get update
sudo apt-get install python-setuptools
sudo apt install python3-pip
sudo apt-get install libmysqlclient-dev
sudo apt-get install libssl-dev
- If you some other folder name do change its name.
export AIRFLOW_HOME=~/<airflow-folder-name>
- Install airflow
pip3 install apache-airflow
pip3 install typing_extensions
- Installing this will allow to use DbtOperators to execute the code else you can use the BashOperator as well. Documentation Attached in documentation section.
pip install airflow-dbt
- Initialize the airflow database
airflow db init
- In case you face some issue with the above command use the following command
airflow db reset
- Create user and will prompt for password that will be used in the UI.
airflow users create --username admin --firstname admin --lastname testing --role Admin --email admin@domain.com
- Start the scheduler and webserver
airflow scheduler
airflow webserver
- I had created a DAG by the name of dbt and that use the Dbt Operator to run the tasks.
- The other one dbt_dag_bash is using the BashOperator to orchestrate and execute it.
- dbt enables analytics engineers to transform data in their warehouses by simply writing select statements.
- dbt handles turning these select statements into tables and views. These tables can be incremental and dbt will handle that.
- dbt does the T in ELT (Extract, Load, Transform) processes.
- dbt provides a web UI that can be used to visualize the lineage and dependency of models.
- dbt web UI also provides detail about each model and what it depends on as well the as test cases on certain models.
-
Install git and python.
-
dbt installation on Linux has some problem and due to that we need install the dbt-core project.
-
Will be doing this all in a virtual environment.
-
Activate the env
- Remember to be in the correct folder.
source airflow_env/bin/activate
- Some dependency
pip install pyOpenSSL==22.0.0
- Install Dbt
pip install dbt-core==1.1.0
- Install DB/DWH of your choice using Snowflake here.
pip install dbt-snowflake==1.1.0
- Command to verify dbt installed.
dbt --version
- Download the Snowflake Plugin if not specified in the requirements.txt file
pip install dbt-snowflake
- Open your choice of editor, VS Code used in this demo. Running the command will populate with different folders (seeds, models etc)
dbt init <project-name>
- Snowflake provides 30 day free trail.
- When dbt installed and configured, in home directory a .dbt will be visible.
- Two file will be present.
- profile.yml
- .user.yml
- In the profile.yml we provide our Snowflake credentials. Refer to dbt documentation.
- https://docs.getdbt.com/reference/warehouse-profiles/snowflake-profile
dbt_model:
outputs:
dev:
account: ap12345.ap-south-1.aws
database: <database>
password: <password>
role: <role-from-snowflake>
schema: <schema>
threads: 2
type: snowflake
user: <username>
warehouse: <warehouse-name-from-snowflake>
target: dev
- When snowflake profile has been set, run the command to check the connection.
dbt debug
- dags - Write/Paste dag code here.
- dbt_project - python virtual env related
- analyses
- macros - create macros here and refer later
- models - tables, views, incremental load, merge
- seeds - flat files incase want to load to staging tables using dbt
- snapshots - SCD tables
- tests - tests on different models
- dbt_project.yml - one place to configure all
- packages.yml - dbt has many packages which can be downloaded
- Before executing any of the commands remember to be in the correct folder.
cd <airflow-folder>
- Run this so that if you make changes to code it is reflected.
airflow scheduler
- Run this so that if you make changes to code it is reflected.
airflow webserver -p 8080
- Once the file is in the dags folder you will be able to see it and can trigger it manually or schedule it as you like.
-
If you would like to execute the same using dbt only and not airflow you can flow these steps.
-
Be in the virtual environment and open terminal.
-
Before executing any of the commands remember to be in the correct folder.
cd <dbt-project-name>
- To load file from seeds folder to Stage Tables in snowflake.
dbt seed
- The data will be in the Stage Tables, now will load data to Core/Dim tables.
- City, Country, Transations will be loaded as they have no history handling needed.
dbt run
- To run a specific folder inside model folder.
dbt run -m <folder-name>
- The Snapshot folder has all those models on which SCD-2 is being used.
dbt snapshot
- We can also check test cases that are defined on different models, snapshots, seeds.
dbt test
- dbt provides a web UI that can be accessed using.
- Internally it has all metadata in json that is saved and used by the web UI
dbt docs generate
dbt docs serve
- You can check different things in the UI and also the lineage as well.
- The source system provides the Full Dump (all of the data every time).
- The Transactions table is append only data.
- Most the tables in Core/Dim are SCD Type 1 or Type 2.
- Country and City do not change much they can be loaded manually whenever needed.
- Right now just loading data to Core/Dim.
- We are using the dbt Operators as is, we can also use DbtParser to get each of the model and airflow will display those as well. Providing us more info about how many models we have.
- Generating data and placing it in the seeds folder and based on that triggering the dag.
- Make Fact Table and visualizations on top of that.
- Use dbt test for testing purpose and different packages that can aid in development.
- dbt documentation
- dbt profile setup
- dbt Youtube Playlist
- Snowflake Youtube Playlist
- Airflow Documentation
- Airflow with Dbt Operators
- Thanks to Kahan Data Solutions for the demo videos.
- Dbt
- Snowflake
- Git
- Airflow