EX 1. Preconditions: Variables, Tables and Connection 🔝
Now that both Scheduler and Webserver are running, we can start getting familiar with the Airflow User Interface at http://localhost:8080 and create the preconditions for orchestrating our ML pipelines.
Let's start to customise Airflow adding two Variables that store the tablenames: training
and prediction
:
✅ go to Admin/Variables
and create a new Key training_table
with the value training
.
✅ Do the same for prediction: add the Key prediction_table
with the value prediction
.
✅ Go to Admin/Connections
and search the connection with Conn Id
sqlite_default
.
📌 The connections that you see are examples, are not in use.
✏️ Edit the Conn Id
value from sqlite_default
to sqlite_ml
.
Now that you have:
- saved the variables with the table names, one for training and one for prediction
- created the SQLite DB connection
let's activate the DAG create_ml_tables to let the Scheduler pick it up and create the Database and the tables in it.
✅ go to DAGS
section and toggle ON
the button of the create_ml_tables
dag.
The Scheduler will pickup the DAG and it will run it (there aren't dependecies that prevent the execution).
Click on the DAG name create_ml_tables
: we are now in the Graph View
.
With the Graph View
you can visualise the task dependencies and the current status.
🕚 The create_ml_tables
dag is running.
Refresh the status clicking on the 🔁 REFRESH
button.
✅ If you click on the Code
button on the DAG menu, you can see (but not modify) the Python code:
📌 Note: for running the SQL that creates the tables and the index we instantiated the SqliteOperator
that makes use of the sqlite_ml
connection we have previously created.
create_training_table = SqliteOperator(
task_id="create_training_table",
# triple-quoted spans code on multiple lines
sql=f"""
CREATE TABLE IF NOT EXISTS {TRAINING_TABLE}(
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
mape_test REAL,
rmse_test REAL,
days_in_test REAL
);
""",
# 'sqlite_ml' is the connection created from the Connection View UI
sqlite_conn_id="sqlite_ml"
)
📌 Note: To define the execution order we use the bitshift operator >>. You can see it at the bottom of the code:
[create_training_table , create_prediction_table] >> create_prediction_index
the create_prediction_index
task is executed only after create_training_table
and create_prediction_table
have been successfully executed.
❗ Let's go back to the DAG View: something bad happened!
The task create_prediction_index
is having some issues.
The previous 2 tasks, create_training_table
and create_prediction_table
, finished in success
.
Let's DEBUG using Airflow: click on the task create_prediction_index
: it will open a new window.
❗ Click on View Log
, you can see the error message:
✏️ We need to fix the bug in the code (we CAN'T do it in the Airflow UI).
✅ Open with the editor the file /dags/create_ml_tables.py
in the repository.
Go through the code and find the SQL where we created the index.
CREATE UNIQUE INDEX idx_date_to_predict
ON {PREDICTION_TABLE} (date_to_predict --!!! FIXME add a ) parenthesis
;
❗ Is missing the closing parenthesis )
after date_to_predict
. Add it: date_to_predict)
.
✅ Save the file and go back to the Web UI, in the Graph View
of the create_ml_tables
dag.
✅ Click again on the task create_prediction_index
, but this time, on the open window, click on the Clear
button and in the next window confirm the operation clicking on OK!
.
📌 You are now resetting the task status. :clock11: Wait some seconds to let the scheduler pickup the task and re-run it.
🏆 Once all the tasks will have been executed and terminated in success
, we'll have 2 tables: training and prediction.
🏆 We have also created an index for the prediction
table on the column date_to_predict
: this will guarantee to save only one prediction per day that we want to predict.
Go to EX 2. Train the model.