From d9c71013469e48039a0bceee8a7f482e2d7c0e57 Mon Sep 17 00:00:00 2001 From: Jack Cusick Date: Fri, 6 Sep 2024 10:32:36 -0600 Subject: [PATCH 1/3] Sample blogpost DAG and driver --- dags/blogpost.py | 119 ++++++++++++++++++++++++++++++++++++++++++ scripts/dag_driver.py | 38 ++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 dags/blogpost.py create mode 100644 scripts/dag_driver.py diff --git a/dags/blogpost.py b/dags/blogpost.py new file mode 100644 index 0000000..a83e280 --- /dev/null +++ b/dags/blogpost.py @@ -0,0 +1,119 @@ +from datetime import datetime, timedelta +from pathlib import Path + +from airflow import DAG +from airflow.decorators import task, task_group +from airflow.utils.edgemodifier import Label +from airflow.models.param import Param + +from tasks.common import print_context, workspace_init +from tasks.git import clone as git_clone +from tasks.terraform import init, plan as tf_plan +from tasks.tools.common import repo_name + +import random +import time + +class BadLuckException(Exception): + pass + +@task(task_id='clone') +def clone(): + time.sleep(random.randint(5, 15)) + +@task_group(group_id='setup') +def setup(): + print_context() >> workspace_init() >> clone() + +@task.branch(task_id='check_terraform_version') +def check_terraform_version(): + options = ["init_0.14", "generate_tags", "generate_tags", "generate_tags", "generate_tags", "generate_tags"] + time.sleep(random.randint(1, 2)) + return f'plan.{random.choice(options)}' + +@task(task_id='generate_tags') +def generate_tags(): + time.sleep(random.randint(5, 15)) + +@task(task_id='init_0.14') +def init_0_14(): + roll = random.randint(1,20) + if roll <= 3: + print('Better luck next time!') + raise BadLuckException(f'You rolled a {roll}') + + time.sleep(random.randint(5, 15)) + +@task(task_id='plan_0.14') +def plan_0_14(): + time.sleep(random.randint(5, 15)) + +@task(task_id='plan_1.7') +def plan_1_7(): + time.sleep(random.randint(5, 15)) + +@task_group(group_id='plan') +def plan(): + check_terraform_version_t = check_terraform_version() + init_0_14_t = init_0_14() + + check_terraform_version_t >> Label('0.13') >> generate_tags() >> init_0_14_t + check_terraform_version_t >> Label('0.14') >> init_0_14_t + + init_0_14_t >> plan_0_14() >> plan_1_7() + +@task(task_id='open_pr') +def open_pr(): + time.sleep(random.randint(5, 15)) + +@task(task_id='is_pr_merged') +def is_pr_merged(): + time.sleep(random.randint(5, 15)) + +@task_group(group_id='quality_control') +def quality_control(): + open_pr() >> is_pr_merged() + +@task(task_id='jenkins_push') +def jenkins_push(): + time.sleep(random.randint(5, 15)) + +@task(task_id='jenkins_apply') +def jenkins_apply(): + time.sleep(random.randint(5, 15)) + +@task_group(group_id='apply') +def apply(): + jenkins_push() >> jenkins_apply() + +with DAG( + 'blogpost', + description='A DAG to plan a terraform project', + params={ + 'url': Param( + default='https://github.com/rearc/sample-terraform.git', + type='string', + description='The repo url to clone', + ), + 'branch': Param( + default='main', + type='string', + description='The repo branch to clone', + ), + 'version': Param( + default='1.7.4', + type='string', + description='Terraform version', + ) + }, + default_args={ + 'execution_timeout': timedelta(minutes=180), + 'trigger_rule': 'none_failed_min_one_success', + 'weight_rule': 'upstream', # depth-first execution + }, + schedule=None, + start_date=datetime(2021, 1, 1), + catchup=False, + tags=['terraform'] +) as dag: + setup() >> plan() >> quality_control() >> apply() \ No newline at end of file diff --git a/scripts/dag_driver.py b/scripts/dag_driver.py new file mode 100644 index 0000000..b528fda --- /dev/null +++ b/scripts/dag_driver.py @@ -0,0 +1,38 @@ +import airflow_client.client + +from airflow_client.client.api import dag_run_api +from airflow_client.client.model.dag_run import DAGRun + +from datetime import datetime + +num_runs = 10 + +dag_id = 'blogpost' + +client_config = airflow_client.client.Configuration( + host="http://localhost:8080/api/v1", + username='airflow', + password='airflow' +) + +run_config = { + 'url': 'https://github.com/rearc/sample-terraform.git', + 'branch': 'main', + 'version': '1.7.4', +} + +dt_str = datetime.now().strftime("%m-%d-%Y_%H-%M-%S") + +for i in range(num_runs): + run_id = f'{dt_str}_{i}' + + with airflow_client.client.ApiClient(client_config) as api_client: + dag_run_api_instance = dag_run_api.DAGRunApi(api_client) + try: + dag_run = DAGRun( + dag_run_id=run_id, + conf=run_config + ) + api_response = dag_run_api_instance.post_dag_run(dag_id, dag_run) + except Exception as e: + print(f'Ran into exception: {e}') \ No newline at end of file From ec3d05e8efadf324e50a4bbd0c66839fd68963a9 Mon Sep 17 00:00:00 2001 From: Jack Cusick Date: Wed, 11 Sep 2024 01:19:41 -0700 Subject: [PATCH 2/3] modifications for airflow summit demo --- Dockerfile | 4 +- dags/airflow_summit_demo.py | 108 ++++++++++++++++++++++++++++++ dags/blogpost.py | 10 ++- dags/tasks/terraform.py | 20 ++++-- docker-compose.yml | 2 + scripts/summit_demo_dag_driver.py | 39 +++++++++++ 6 files changed, 174 insertions(+), 9 deletions(-) create mode 100644 dags/airflow_summit_demo.py create mode 100644 scripts/summit_demo_dag_driver.py diff --git a/Dockerfile b/Dockerfile index 8f34f0c..a7bab43 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM apache/airflow:2.8.2 +FROM apache/airflow:2.10.1-python3.12 USER root @@ -18,6 +18,6 @@ ADD Pipfile.lock Pipfile.lock RUN pipenv install --system # Install terraform versions -ENV TF_VERSIONS="1.6.6|1.7.4" +ENV TF_VERSIONS="1.6.6|1.7.4|1.9.5" ADD scripts/install_terraform.py install_terraform.py RUN python3 install_terraform.py --os linux --arch arm64 --versions ${TF_VERSIONS} \ No newline at end of file diff --git a/dags/airflow_summit_demo.py b/dags/airflow_summit_demo.py new file mode 100644 index 0000000..7f919d8 --- /dev/null +++ b/dags/airflow_summit_demo.py @@ -0,0 +1,108 @@ +from datetime import datetime, timedelta +from pathlib import Path + +from airflow import DAG +from airflow.decorators import task, task_group +from airflow.models.param import Param + +from tasks.common import setup +from tasks.git import clone as git_clone +from tasks.terraform import init, apply as tf_apply +from tasks.tools.common import repo_name + +import random + +class BadLuckException(Exception): + pass + +class ReallyBadLuckException(BadLuckException): + pass + +@task(task_id='clone_kwargs') +def clone_kwargs(params=None, task_instance=None): + """Map params to clone arguments, so that the clone task can be reused across many DAGs""" + return [ + { + 'url': params['url'], + 'to_path': str(Path(task_instance.xcom_pull(task_ids='setup.workspace_init')) / repo_name(params['url'])), + 'branch': params['branch'], + } + ] + +@task(task_id='skill_check') +def skill_check(): + roll = random.randint(1,20) + if roll == 1: + print('Yikes!') + raise ReallyBadLuckException(f'You rolled a {roll} out of 20') + if roll <= 5: + print('Better luck next time!') + raise BadLuckException(f'You rolled a {roll} out of 20') + +@task(task_id='init_kwargs') +def init_kwargs(params=None, task_instance=None): + """Map params to terraform init/plan arguments, so that these tasks can be reused across many DAGs""" + return [ + { + 'version': params['version'], + 'cwd': task_instance.xcom_pull(task_ids='clone.clone')[0], + 'options': ['-no-color', f'-backend-config=./deployments/{params["deployment"]}/backend.hcl'], + 'env': {'TF_PLUGIN_CACHE_DIR': str(Path.home() / '.terraform.d' / 'plugin-cache')}, + } + ] + +@task(task_id='apply_kwargs') +def apply_kwargs(params=None, task_instance=None): + """Map params to terraform init/plan arguments, so that these tasks can be reused across many DAGs""" + return [ + { + 'version': params['version'], + 'cwd': task_instance.xcom_pull(task_ids='clone.clone')[0], + 'options': ['-no-color', f'-var-file=./deployments/{params["deployment"]}/terraform.tfvars', '-auto-approve'], + } + ] + +@task_group(group_id='apply') +def apply(): + skill_check() >> init.expand_kwargs(init_kwargs()) >> tf_apply.expand_kwargs(apply_kwargs()) + +@task_group(group_id='clone') +def clone(): + git_clone.expand_kwargs(clone_kwargs()) + +with DAG( + 'airflow_summit_demo', + description='A DAG to apply a terraform project', + params={ + 'url': Param( + default='https://github.com/jmorgancusick/airflow-summit-demo.git', + type='string', + description='The repo url to clone', + ), + 'branch': Param( + default='main', + type='string', + description='The repo branch to clone', + ), + 'version': Param( + default='1.9.5', + type='string', + description='Terraform version', + ), + 'deployment': Param( + default='0', + type='string', + description='Deployment Name', + ), + }, + default_args={ + 'execution_timeout': timedelta(minutes=180), + 'trigger_rule': 'none_failed_min_one_success', + 'weight_rule': 'upstream', # depth-first execution + }, + schedule=None, + start_date=datetime(2021, 1, 1), + catchup=False, + tags=['terraform'] +) as dag: + setup() >> clone() >> apply() \ No newline at end of file diff --git a/dags/blogpost.py b/dags/blogpost.py index a83e280..dbbbe7b 100644 --- a/dags/blogpost.py +++ b/dags/blogpost.py @@ -17,6 +17,9 @@ class BadLuckException(Exception): pass +class ReallyBadLuckException(BadLuckException): + pass + @task(task_id='clone') def clone(): time.sleep(random.randint(5, 15)) @@ -38,9 +41,12 @@ def generate_tags(): @task(task_id='init_0.14') def init_0_14(): roll = random.randint(1,20) - if roll <= 3: + if roll == 1: + print('Yikes!') + raise ReallyBadLuckException(f'You rolled a {roll} out of 20') + if roll <= 5: print('Better luck next time!') - raise BadLuckException(f'You rolled a {roll}') + raise BadLuckException(f'You rolled a {roll} out of 20') time.sleep(random.randint(5, 15)) diff --git a/dags/tasks/terraform.py b/dags/tasks/terraform.py index f7f9f3c..19eef80 100644 --- a/dags/tasks/terraform.py +++ b/dags/tasks/terraform.py @@ -3,18 +3,28 @@ from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook +import os + @task(task_id='init') -def init(version, cwd): +def init(version, cwd, options=[], env={}): tf_bin = Path('/opt/airflow/terraform') / version / 'terraform' - result = SubprocessHook().run_command(command=[tf_bin, 'init'], cwd=cwd) + result = SubprocessHook().run_command(command=[tf_bin, 'init', *options], cwd=cwd, env=(env|dict(os.environ))) if result.exit_code: raise RuntimeError(f'Terraform init failed with exit code: {result.exit_code}') @task(task_id='plan') -def plan(version, cwd): +def plan(version, cwd, options=[], env={}): + tf_bin = Path('/opt/airflow/terraform') / version / 'terraform' + + result = SubprocessHook().run_command(command=[tf_bin, 'plan', *options], cwd=cwd, env=(env|dict(os.environ))) + if result.exit_code: + raise RuntimeError(f'Terraform plan failed with exit code: {result.exit_code}') + +@task(task_id='apply') +def apply(version, cwd, options=[], env={}): tf_bin = Path('/opt/airflow/terraform') / version / 'terraform' - result = SubprocessHook().run_command(command=[tf_bin, 'plan'], cwd=cwd) + result = SubprocessHook().run_command(command=[tf_bin, 'apply', *options], cwd=cwd, env=(env|dict(os.environ))) if result.exit_code: - raise RuntimeError(f'Terraform plan failed with exit code: {result.exit_code}') \ No newline at end of file + raise RuntimeError(f'Terraform apply failed with exit code: {result.exit_code}') \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 6302f5c..fc6ce57 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -77,6 +77,8 @@ x-airflow-common: - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins - ${AIRFLOW_PROJ_DIR:-.}/workspace:/opt/airflow/workspace - ~/.aws:/home/airflow/.aws + # Terraform plugin caching + - ~/.terraform.d/plugin-cache:/home/airflow/.terraform.d/plugin-cache user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on diff --git a/scripts/summit_demo_dag_driver.py b/scripts/summit_demo_dag_driver.py new file mode 100644 index 0000000..3fd24b5 --- /dev/null +++ b/scripts/summit_demo_dag_driver.py @@ -0,0 +1,39 @@ +import airflow_client.client + +from airflow_client.client.api import dag_run_api +from airflow_client.client.model.dag_run import DAGRun + +from datetime import datetime + +num_runs = 10 + +dag_id = 'airflow_summit_demo' + +client_config = airflow_client.client.Configuration( + host="http://localhost:8080/api/v1", + username='airflow', + password='airflow' +) + +dt_str = datetime.now().strftime("%m-%d-%Y_%H-%M-%S") + +for i in range(num_runs): + run_id = f'deployment_{i}_{dt_str}' + + run_config = { + 'url': 'https://github.com/jmorgancusick/airflow-summit-demo.git', + 'branch': '0.0.2', + 'version': '1.9.5', + 'deployment': str(i), + } + + with airflow_client.client.ApiClient(client_config) as api_client: + dag_run_api_instance = dag_run_api.DAGRunApi(api_client) + try: + dag_run = DAGRun( + dag_run_id=run_id, + conf=run_config + ) + api_response = dag_run_api_instance.post_dag_run(dag_id, dag_run) + except Exception as e: + print(f'Ran into exception: {e}') \ No newline at end of file From 4d93e9127d6077084cb1ac713e4caffd60c851e1 Mon Sep 17 00:00:00 2001 From: Jack Cusick Date: Wed, 11 Sep 2024 01:34:34 -0700 Subject: [PATCH 3/3] with the latest branch / tag --- scripts/summit_demo_dag_driver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/summit_demo_dag_driver.py b/scripts/summit_demo_dag_driver.py index 3fd24b5..56c1270 100644 --- a/scripts/summit_demo_dag_driver.py +++ b/scripts/summit_demo_dag_driver.py @@ -22,7 +22,7 @@ run_config = { 'url': 'https://github.com/jmorgancusick/airflow-summit-demo.git', - 'branch': '0.0.2', + 'branch': '1.2.1', 'version': '1.9.5', 'deployment': str(i), }