Skip to content

Commit

Permalink
split into two SQS queues, one for glue and the other for the traditi…
Browse files Browse the repository at this point in the history
…onal loader.
  • Loading branch information
woodhull committed Jul 22, 2020
1 parent 05b0902 commit dcc6ce3
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
9 changes: 5 additions & 4 deletions iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ data "aws_iam_policy_document" "receiver_execution_policy" {
resources = ["arn:aws:logs:*:*:*"]
}

# allow the lambda to enqueue work
# allow the receiver lambda to enqueue work
statement {
effect = "Allow"
actions = ["sqs:SendMessage"]
resources = ["arn:aws:sqs:${var.aws_region}:*:${aws_sqs_queue.receiver_queue.name}"]
resources = ["arn:aws:sqs:${var.aws_region}:*:${aws_sqs_queue.receiver_queue.name}",
"arn:aws:sqs:${var.aws_region}:*:${aws_sqs_queue.receiver_queue_glue.name}"]
}
}

Expand Down Expand Up @@ -221,11 +222,11 @@ data "aws_iam_policy_document" "run_glue_crawler_execution_policy" {
resources = [ aws_glue_crawler.signatures_crawler.arn ]
}

# allow lambda to be wired up to the queue. These are the minimum permissions for the SQS Lambda Executor.
# allow glue lambda to be wired up to the queue. These are the minimum permissions for the SQS Lambda Executor.
statement {
effect = "Allow"
actions = ["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes"]
resources = ["arn:aws:sqs:${var.aws_region}:*:${aws_sqs_queue.receiver_queue.name}"]
resources = ["arn:aws:sqs:${var.aws_region}:*:${aws_sqs_queue.receiver_queue_glue.name}"]
}
}

Expand Down
16 changes: 12 additions & 4 deletions lambdas/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@ function enqueueTask(receivedData, kind) {

messageBody['kind'] = kind;

const params = {
MessageBody: JSON.stringify(messageBody),
const jsonMessageBody = JSON.stringify(messageBody)

const loaderQueueParams = {
MessageBody: jsonMessageBody,
QueueUrl: process.env.SQS_QUEUE_URL
};

const glueQueueParams = {
MessageBody: jsonMessageBody,
QueueUrl: process.env.GLUE_SQS_QUEUE_URL
};

console.log('Enqueueing ' + JSON.stringify(messageBody) + ' on ' + process.env.SQS_QUEUE_URL);

let resp = sqs.sendMessage(params).promise();
let loaderResp = sqs.sendMessage(loaderQueueParams).promise();
let glueResp = sqs.sendMessage(glueQueueParams).promise();

resp.then(
Promise.all([loaderResp, glueResp]).then(
function(data) {
console.log("Success " + data.MessageId);
},
Expand Down
6 changes: 6 additions & 0 deletions receiver.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ resource "aws_lambda_function" "receiver_lambda" {
environment {
variables = {
SQS_QUEUE_URL = aws_sqs_queue.receiver_queue.id
GLUE_SQS_QUEUE_URL = aws_sqs_queue.receiver_queue_glue.id
}
}
}
Expand Down Expand Up @@ -97,6 +98,11 @@ resource "aws_sqs_queue" "receiver_queue" {
visibility_timeout_seconds = 900
}

resource "aws_sqs_queue" "receiver_queue_glue" {
name = "controlshift-received-webhooks-glue"
visibility_timeout_seconds = 900
}

# there is no formal way to associate this resource, beyond ensuring the name matches.
resource "aws_cloudwatch_log_group" "api_gateway_log_retention" {
name = "API-Gateway-Execution-Logs_${aws_api_gateway_rest_api.receiver.id}/${aws_api_gateway_deployment.deployment.stage_name}"
Expand Down
2 changes: 1 addition & 1 deletion run_glue_crawler.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ resource "aws_lambda_function" "glue_crawler_lambda" {
}

resource "aws_lambda_event_source_mapping" "run_crawler_on_new_data_export_task" {
event_source_arn = aws_sqs_queue.receiver_queue.arn
event_source_arn = aws_sqs_queue.receiver_queue_glue.arn
function_name = aws_lambda_function.glue_crawler_lambda.arn
batch_size = 1
}

0 comments on commit dcc6ce3

Please sign in to comment.