Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Summit Presentation Changes #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/airflow:2.8.2
FROM apache/airflow:2.10.1-python3.12

USER root

Expand All @@ -18,6 +18,6 @@ ADD Pipfile.lock Pipfile.lock
RUN pipenv install --system

# Install terraform versions
ENV TF_VERSIONS="1.6.6|1.7.4"
ENV TF_VERSIONS="1.6.6|1.7.4|1.9.5"
ADD scripts/install_terraform.py install_terraform.py
RUN python3 install_terraform.py --os linux --arch arm64 --versions ${TF_VERSIONS}
108 changes: 108 additions & 0 deletions dags/airflow_summit_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.models.param import Param

from tasks.common import setup
from tasks.git import clone as git_clone
from tasks.terraform import init, apply as tf_apply
from tasks.tools.common import repo_name

import random

class BadLuckException(Exception):
pass

class ReallyBadLuckException(BadLuckException):
pass

@task(task_id='clone_kwargs')
def clone_kwargs(params=None, task_instance=None):
"""Map params to clone arguments, so that the clone task can be reused across many DAGs"""
return [
{
'url': params['url'],
'to_path': str(Path(task_instance.xcom_pull(task_ids='setup.workspace_init')) / repo_name(params['url'])),
'branch': params['branch'],
}
]

@task(task_id='skill_check')
def skill_check():
roll = random.randint(1,20)
if roll == 1:
print('Yikes!')
raise ReallyBadLuckException(f'You rolled a {roll} out of 20')
if roll <= 5:
print('Better luck next time!')
raise BadLuckException(f'You rolled a {roll} out of 20')

@task(task_id='init_kwargs')
def init_kwargs(params=None, task_instance=None):
"""Map params to terraform init/plan arguments, so that these tasks can be reused across many DAGs"""
return [
{
'version': params['version'],
'cwd': task_instance.xcom_pull(task_ids='clone.clone')[0],
'options': ['-no-color', f'-backend-config=./deployments/{params["deployment"]}/backend.hcl'],
'env': {'TF_PLUGIN_CACHE_DIR': str(Path.home() / '.terraform.d' / 'plugin-cache')},
}
]

@task(task_id='apply_kwargs')
def apply_kwargs(params=None, task_instance=None):
"""Map params to terraform init/plan arguments, so that these tasks can be reused across many DAGs"""
return [
{
'version': params['version'],
'cwd': task_instance.xcom_pull(task_ids='clone.clone')[0],
'options': ['-no-color', f'-var-file=./deployments/{params["deployment"]}/terraform.tfvars', '-auto-approve'],
}
]

@task_group(group_id='apply')
def apply():
skill_check() >> init.expand_kwargs(init_kwargs()) >> tf_apply.expand_kwargs(apply_kwargs())

@task_group(group_id='clone')
def clone():
git_clone.expand_kwargs(clone_kwargs())

with DAG(
'airflow_summit_demo',
description='A DAG to apply a terraform project',
params={
'url': Param(
default='https://github.com/jmorgancusick/airflow-summit-demo.git',
type='string',
description='The repo url to clone',
),
'branch': Param(
default='main',
type='string',
description='The repo branch to clone',
),
'version': Param(
default='1.9.5',
type='string',
description='Terraform version',
),
'deployment': Param(
default='0',
type='string',
description='Deployment Name',
),
},
default_args={
'execution_timeout': timedelta(minutes=180),
'trigger_rule': 'none_failed_min_one_success',
'weight_rule': 'upstream', # depth-first execution
},
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['terraform']
) as dag:
setup() >> clone() >> apply()
125 changes: 125 additions & 0 deletions dags/blogpost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.utils.edgemodifier import Label
from airflow.models.param import Param

from tasks.common import print_context, workspace_init
from tasks.git import clone as git_clone
from tasks.terraform import init, plan as tf_plan
from tasks.tools.common import repo_name

import random
import time

class BadLuckException(Exception):
pass

class ReallyBadLuckException(BadLuckException):
pass

@task(task_id='clone')
def clone():
time.sleep(random.randint(5, 15))

@task_group(group_id='setup')
def setup():
print_context() >> workspace_init() >> clone()

@task.branch(task_id='check_terraform_version')
def check_terraform_version():
options = ["init_0.14", "generate_tags", "generate_tags", "generate_tags", "generate_tags", "generate_tags"]
time.sleep(random.randint(1, 2))
return f'plan.{random.choice(options)}'

@task(task_id='generate_tags')
def generate_tags():
time.sleep(random.randint(5, 15))

@task(task_id='init_0.14')
def init_0_14():
roll = random.randint(1,20)
if roll == 1:
print('Yikes!')
raise ReallyBadLuckException(f'You rolled a {roll} out of 20')
if roll <= 5:
print('Better luck next time!')
raise BadLuckException(f'You rolled a {roll} out of 20')

time.sleep(random.randint(5, 15))

@task(task_id='plan_0.14')
def plan_0_14():
time.sleep(random.randint(5, 15))

@task(task_id='plan_1.7')
def plan_1_7():
time.sleep(random.randint(5, 15))

@task_group(group_id='plan')
def plan():
check_terraform_version_t = check_terraform_version()
init_0_14_t = init_0_14()

check_terraform_version_t >> Label('0.13') >> generate_tags() >> init_0_14_t
check_terraform_version_t >> Label('0.14') >> init_0_14_t

init_0_14_t >> plan_0_14() >> plan_1_7()

@task(task_id='open_pr')
def open_pr():
time.sleep(random.randint(5, 15))

@task(task_id='is_pr_merged')
def is_pr_merged():
time.sleep(random.randint(5, 15))

@task_group(group_id='quality_control')
def quality_control():
open_pr() >> is_pr_merged()

@task(task_id='jenkins_push')
def jenkins_push():
time.sleep(random.randint(5, 15))

@task(task_id='jenkins_apply')
def jenkins_apply():
time.sleep(random.randint(5, 15))

@task_group(group_id='apply')
def apply():
jenkins_push() >> jenkins_apply()

with DAG(
'blogpost',
description='A DAG to plan a terraform project',
params={
'url': Param(
default='https://github.com/rearc/sample-terraform.git',
type='string',
description='The repo url to clone',
),
'branch': Param(
default='main',
type='string',
description='The repo branch to clone',
),
'version': Param(
default='1.7.4',
type='string',
description='Terraform version',
)
},
default_args={
'execution_timeout': timedelta(minutes=180),
'trigger_rule': 'none_failed_min_one_success',
'weight_rule': 'upstream', # depth-first execution
},
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['terraform']
) as dag:
setup() >> plan() >> quality_control() >> apply()
20 changes: 15 additions & 5 deletions dags/tasks/terraform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@
from airflow.decorators import task
from airflow.hooks.subprocess import SubprocessHook

import os

@task(task_id='init')
def init(version, cwd):
def init(version, cwd, options=[], env={}):
tf_bin = Path('/opt/airflow/terraform') / version / 'terraform'

result = SubprocessHook().run_command(command=[tf_bin, 'init'], cwd=cwd)
result = SubprocessHook().run_command(command=[tf_bin, 'init', *options], cwd=cwd, env=(env|dict(os.environ)))
if result.exit_code:
raise RuntimeError(f'Terraform init failed with exit code: {result.exit_code}')

@task(task_id='plan')
def plan(version, cwd):
def plan(version, cwd, options=[], env={}):
tf_bin = Path('/opt/airflow/terraform') / version / 'terraform'

result = SubprocessHook().run_command(command=[tf_bin, 'plan', *options], cwd=cwd, env=(env|dict(os.environ)))
if result.exit_code:
raise RuntimeError(f'Terraform plan failed with exit code: {result.exit_code}')

@task(task_id='apply')
def apply(version, cwd, options=[], env={}):
tf_bin = Path('/opt/airflow/terraform') / version / 'terraform'

result = SubprocessHook().run_command(command=[tf_bin, 'plan'], cwd=cwd)
result = SubprocessHook().run_command(command=[tf_bin, 'apply', *options], cwd=cwd, env=(env|dict(os.environ)))
if result.exit_code:
raise RuntimeError(f'Terraform plan failed with exit code: {result.exit_code}')
raise RuntimeError(f'Terraform apply failed with exit code: {result.exit_code}')
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/workspace:/opt/airflow/workspace
- ~/.aws:/home/airflow/.aws
# Terraform plugin caching
- ~/.terraform.d/plugin-cache:/home/airflow/.terraform.d/plugin-cache
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
Expand Down
38 changes: 38 additions & 0 deletions scripts/dag_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import airflow_client.client

from airflow_client.client.api import dag_run_api
from airflow_client.client.model.dag_run import DAGRun

from datetime import datetime

num_runs = 10

dag_id = 'blogpost'

client_config = airflow_client.client.Configuration(
host="http://localhost:8080/api/v1",
username='airflow',
password='airflow'
)

run_config = {
'url': 'https://github.com/rearc/sample-terraform.git',
'branch': 'main',
'version': '1.7.4',
}

dt_str = datetime.now().strftime("%m-%d-%Y_%H-%M-%S")

for i in range(num_runs):
run_id = f'{dt_str}_{i}'

with airflow_client.client.ApiClient(client_config) as api_client:
dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
try:
dag_run = DAGRun(
dag_run_id=run_id,
conf=run_config
)
api_response = dag_run_api_instance.post_dag_run(dag_id, dag_run)
except Exception as e:
print(f'Ran into exception: {e}')
39 changes: 39 additions & 0 deletions scripts/summit_demo_dag_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import airflow_client.client

from airflow_client.client.api import dag_run_api
from airflow_client.client.model.dag_run import DAGRun

from datetime import datetime

num_runs = 10

dag_id = 'airflow_summit_demo'

client_config = airflow_client.client.Configuration(
host="http://localhost:8080/api/v1",
username='airflow',
password='airflow'
)

dt_str = datetime.now().strftime("%m-%d-%Y_%H-%M-%S")

for i in range(num_runs):
run_id = f'deployment_{i}_{dt_str}'

run_config = {
'url': 'https://github.com/jmorgancusick/airflow-summit-demo.git',
'branch': '1.2.1',
'version': '1.9.5',
'deployment': str(i),
}

with airflow_client.client.ApiClient(client_config) as api_client:
dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
try:
dag_run = DAGRun(
dag_run_id=run_id,
conf=run_config
)
api_response = dag_run_api_instance.post_dag_run(dag_id, dag_run)
except Exception as e:
print(f'Ran into exception: {e}')