Skip to content

Commit

Permalink
Readable: Use pipes to connect DDB to Sfn
Browse files Browse the repository at this point in the history
  • Loading branch information
philipws committed May 23, 2024
1 parent 25beace commit 0fb22ae
Showing 1 changed file with 119 additions and 53 deletions.
172 changes: 119 additions & 53 deletions infrastructure/lib/features/readable/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Construct } from "constructs";
import * as cdk from "aws-cdk-lib";
import { aws_pipes as pipes } from 'aws-cdk-lib';
import { NagSuppressions } from "cdk-nag";

import {
Expand All @@ -11,7 +12,6 @@ import {
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
aws_lambda as lambda,
aws_lambda_event_sources as eventsources,
aws_appsync as appsync,
aws_s3 as s3,
} from "aws-cdk-lib";
Expand Down Expand Up @@ -82,6 +82,14 @@ export class dt_readableWorkflow extends Construct {
//
// STATE MACHINE
// STATE MACHINE | TASKS
// STATE MACHINE | TASKS | unNestJobDetails
const unNestJobDetails = new sfn.Pass(this, "unNestJobDetails", {
parameters: {
dynamodb: sfn.JsonPath.objectAt(
"$.[0].dynamodb",
),
},
});
// STATE MACHINE | TASKS | unmarshallDdb
const unmarshallDdbLambdaRole = new iam.Role(
this,
Expand Down Expand Up @@ -371,7 +379,8 @@ export class dt_readableWorkflow extends Construct {
{
nameSuffix: "ReadableMain",
removalPolicy: props.removalPolicy,
definition: unmarshallDdbStream
definition: unNestJobDetails
.next(unmarshallDdbStream)
.next(updateDbStatusToProcessing)
.next(getModelParams)
.next(unmarshallModelParams)
Expand Down Expand Up @@ -447,66 +456,123 @@ export class dt_readableWorkflow extends Construct {
true,
);

// INFRA | DYNAMODB | STREAM LAMBDA
const lambdaPassDynamoDBToStepFunctionRole = new iam.Role(
// STATE MACHINE | MAIN | DEF
const sfnMainRename = new dt_stepfunction(
this,
"lambdaPassDynamoDBToStepFunctionRole",
`${cdk.Stack.of(this).stackName}_ReadableMainRename`,
{
// ASM-L6 // ASM-L8
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
description: "Lambda Role (Pass DynamoDB stream to StepFunction)",
nameSuffix: "ReadableMainRename",
removalPolicy: props.removalPolicy,
definition: new tasks.StepFunctionsStartExecution(this, "startSfnMain", {
stateMachine: sfnMain,
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
name: sfn.JsonPath.format(
"{}_{}",
sfn.JsonPath.stringAt("$.[0].dynamodb.Keys.itemId.S"),
sfn.JsonPath.stringAt("$.[0].eventID"),
),
})
},
);
const lambdaPassDynamoDbToStepFunction = new dt_lambda(
this,
"lambdaPassDynamoDbToStepFunction",
{
role: lambdaPassDynamoDBToStepFunctionRole,
path: "lambda/passDynamoDBToStepFunction",
description: "Pass DynamoDB stream to StepFunction",
runtime: lambda.Runtime.NODEJS_18_X,
environment: {
stateMachineArn: sfnMain.stateMachineArn,
skValue: dt_enums.JobTable.SK,
).StateMachine;
// Required for "RUN_JOB"
NagSuppressions.addResourceSuppressions(
sfnMainRename,
[
{
id: "AwsSolutions-IAM5",
reason: "Permissions scoped to dedicated resources.",
},
},
],
true,
);

lambdaPassDynamoDbToStepFunction.lambdaFunction.addEventSource(
new eventsources.DynamoEventSource(props.jobTable, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
retryAttempts: 3,
// PIPE | DDB JOB TO STEPFUNCTION
if (props.jobTable.tableStreamArn) { // tableStreamArn may be undefined
// PIPE | DDB JOB TO STEPFUNCTION | PERMISSIONS
const pipeJobToSfnRole = new iam.Role(this, "pipeJobToSfnRole",
{
// ASM-L6 // ASM-L8
assumedBy: new iam.ServicePrincipal("pipes.amazonaws.com"),
description: "Pipe Role (Pass DynamoDB job to StepFunction)",
},
);
pipeJobToSfnRole.attachInlinePolicy(
new iam.Policy(this, "permitStartExecutionOfMain",
{
policyName: "Start-Sfn-ReadablenMain",
statements: [
new iam.PolicyStatement({
// ASM-IAM
actions: ["states:StartExecution"],
resources: [sfnMainRename.stateMachineArn],
}),
],
},
)
);
pipeJobToSfnRole.attachInlinePolicy(
new iam.Policy(this, "permitReadDynamoDBStream",
{
policyName: "Read-DynamoDB-Stream",
statements: [
new iam.PolicyStatement({
// ASM-IAM
actions: ["dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator"],
resources: [props.jobTable.tableStreamArn],
}),
],
},
)
);

// PIPE | DDB JOB TO STEPFUNCTION | PIPE
// PIPE | DDB JOB TO STEPFUNCTION | PIPE | SOURCE
const pipeSourceDynamoDBStreamParametersProperty: pipes.CfnPipe.PipeSourceDynamoDBStreamParametersProperty = {
startingPosition: 'TRIM_HORIZON',
batchSize: 1,
};
const filterPattern = {
"eventName": ["INSERT", "MODIFY"],
"dynamodb": {
"NewImage": {
"status": {
"S": [{
"equals-ignore-case": dt_enums.ItemStatus.GENERATE
}]
}
}
}
}
const pipeSourceDynamoDBStreamFiltersProperty: pipes.CfnPipe.FilterCriteriaProperty = {
filters: [
lambda.FilterCriteria.filter({
dynamodb: {
NewImage: {
status: {
S: [{ "equals-ignore-case": dt_enums.ItemStatus.GENERATE }],
},
},
},
}),
{
pattern: JSON.stringify(filterPattern),
},
],
}),
);
}
const sourceParameters = {
dynamoDbStreamParameters: pipeSourceDynamoDBStreamParametersProperty,
filterCriteria: pipeSourceDynamoDBStreamFiltersProperty,
}

const permitStartExecutionOfMain = new iam.Policy(
this,
"permitStartExecutionOfMain",
{
policyName: "Start-Sfn-Readable",
statements: [
new iam.PolicyStatement({
// ASM-IAM
actions: ["states:StartExecution"],
resources: [sfnMain.stateMachineArn],
}),
],
},
);
lambdaPassDynamoDBToStepFunctionRole.attachInlinePolicy(
permitStartExecutionOfMain,
);
// PIPE | DDB JOB TO STEPFUNCTION | PIPE | TARGET
const pipeTargetStateMachineParametersProperty: pipes.CfnPipe.PipeTargetStateMachineParametersProperty = {
invocationType: 'FIRE_AND_FORGET',
};
const targetParameters = {
stepFunctionStateMachineParameters: pipeTargetStateMachineParametersProperty
}

// PIPE | DDB JOB TO STEPFUNCTION | PIPE | DEF
const pipeJobToSfn = new pipes.CfnPipe(this, "pipeJobToSfn", {
roleArn: pipeJobToSfnRole.roleArn,
source: props.jobTable.tableStreamArn,
target: sfnMainRename.stateMachineArn,
description: 'DocTran Readable Job to StepFunction',
sourceParameters,
targetParameters,
});
}

// END
}
Expand Down

0 comments on commit 0fb22ae

Please sign in to comment.