I used to schedule some cron tasks and send the results to IM application, such as Slack, Discord ...etc.
What I did before was implementing a gigantic, stable, famous workflow platform like Apache Airflow and Prefect. They are still good choices, but it's time for me to adopt a lighter, event-driven architecture.
When talk about AWS, we can use Amazon EventBridge, AWS Lambda, Amazon SQS to build the same architecture to fulfill the same job:
Hope this will cost less money and effort. Let's get hands dirty 🛠️ and have the party started! 🎉
- Components
- File Sturcture
- Deployment
- Step 1: Configure AWS Credentials
- Step 2: Set the stacks' prefix
- Step 3: Reset the scheduler
- Step 4: Create Task Definitions
- (Optional) Step 5: Install Python Lambda Layer (Python dependencies)
- Step 6: Add Business Logic Code
- Step 7: Configure timeout duration of Lambda functions
- (Optional) Step 8: Change time related parameters of SQS Queue
- Step 9: Deploy with CDK toolkit (
cdk
command) - (Optional) Step 10: Clean all resources
- Some Development Tips & Explainations
- Example References
All we need are:
- EventBridge Scheduler (Event Scheduler): Scheduler to invoke a Lambda function with a Role attached
- Service Role: Has permission to invoke a Lambda function
- Lambda function (Send Tasks): Send messages to queue with a Role attached
- Service Role: Has permission to send messages to SQS, and manage CloudWatch logs
- Queue: Store the tasks
- Lambda function (Run Task): Receive messages from queue with a Role attached
- Service Role: Has permission to receive message from queue, and manage CloudWatch logs
.
├── README.md
├── app.py # Entrypoint
├── cdk.context.json # Set this stack's prefix
├── cdk.json
├── diagram-detailed.jpg
├── diagram-services.jpg
├── diagram-story.jpg
├── lambda
│ ├── run_task
│ │ ├── __init__.py
│ │ └── index.py # Task processing logic
│ └── send_task
│ ├── __init__.py
│ └── index.py # Task sending logic
├── layer
│ └── python # Lambda's Python dependencies
├── poetry.lock
├── pyproject.toml
├── requirements-layer.txt # Lambda's Python dependency list
├── requirements.txt
├── stacks
│ ├── __init__.py
│ ├── __pycache__
│ ├── lambda_stack.py # Lambdas' configurations
│ ├── scheduler_stack.py # EventBridge Scheduler's configurations
│ └── sqs_stack.py # SQS Queue's configurations
└── tests # Test (Incomplete)
├── __init__.py
└── unit
Install AWS CLi first, then create the config file by executing:
aws configure
The deployed stack has a prefix, the default value is cdklab
. Edit the value in runtime context file (cdk.context.json
):
{
"prefix": "cdklab"
}
The default schedule is set at 23:59:59 on 2037-12-31, Refer to stacks.scheduler_stack.scheduler_lambda
.
Choose to set it as an rate-based, cron-based or one-time job (Reference), and replace the value string of the schedule_expression
parameter that follows the syntax below:
- For rated-based job, the syntax is
rate(value unit)
- For cron-based job, the syntax is
cron(minutes hours day-of-month month day-of-week year)
- For one-time job, the syntax is
at(yyyy-mm-ddThh:mm:ss)
And set the timezone by replacing the value of the schedule_expression_timezone
parameter with the TZ identifier like Asia/Taipei
.
By default, the tasks are stored in the tasks
List object in lambda/send_task/index.py
. Such as:
tasks = [
"Hello from Lambda #1!",
"Hello from Lambda #2!",
]
Simply modify the content and each String object will be the task that the next (run-task) Lambda function should execute with.
If the task definitions is much larger than this example, try store into another text file then read it in:
# e.g. The tasks are now in tasks.txt, each line represents a unique task definition
with open("tasks.txt", "r") as file:
tasks = file.open().splitlines()
All Python dependencies must stored under layer/python/
as Lambda Layer then pack and send to AWS Lambda.
First add the dependencies to requirements-layer.txt
, then install the dependencies with the command below:
mkdir -p ./layer/python/
pip install --target ./layer/python -r requirements-layer.txt
[NOTE] Remember to manually add dependencies to
requirements-layer.txt
.
Insert the code into lambda/run_task/index.py
.
Find lambda_stack.py and change the timeout duration. These are configured by default:
- For send-task Lambda function:
30
seconds - For run-task Lambda function:
1
minute
Feel free to change the duration from 1
second to 15
minutes. Evaluate the duration by planning a dry run of your business logics.
There are 3 parameters:
visibility_timeout
: How long does a Lambda function process the task?- Default value:
1
minute
- Default value:
retention_period
: How long does the task remain in the queue?- Default value:
15
minutes
- Default value:
receive_message_wait_time
: How long does the "ReceiveMessage" (Polling) action takes?- Default value:
0
minute (Immediate)
- Default value:
Estimate the settings by the program you goona run, end override them by editing the stack/sqs_stack.py
file.
Install the CDK toolkit then deploy by executing:
cdk deploy --all --require-approval=never
Not going to use these anymore? Remove them with:
cdk destroy --all
CDK will try to create roles and add necessary permission policies by evaluate the resources that are going to be created. So, if we create a Lambda function, and set a SQS Queue to trigger the function, CDK will automatically create the policies below:
- CloudWatch log related policies
- Read SQS Queue message related policies
But there are exceptions in this project:
- If the action is written in Lambda function, CDK won't understand what permission does the function need (Refer to
stacks.lambda_stack.lambda_sendtask
) - If we use L1 construct to create the resources, everything in the CloudFormation template should be created by ourselves (Refer to
stacks.scheduler_stack.scheduler_lambda
)
If these situations exists, try to create the Roles and Policies manually first.
We can use the standard JSON
library and call json.dumps()
function to acomplish the serialization process. But try to do with the code below:
import json
json.dumps(json) # TypeError: Object of type module is not JSON serializable
Error occurs: module object can not be JSON serialized. Sometimes it's annoying because we can't realize if the object is serializable or not at a glance.
JSONPickle is an amazing project for making JSON serialization. The reason to use JSONPickle better than calling json.dumps
is based on the project itself's description:
"It can take almost any Python object and turn the object into JSON"
We added into the Python Lambda Layer by default and already implemented into the Lambda function. Try and enjoy it! 🍻