Skip to content

Commit

Permalink
feat(cb2-14226) refactor and remove SQS snowball pattern (#84)
Browse files Browse the repository at this point in the history
* feat(cb2-14226): refactor to allow batch item failures

* feat(cb2-14226): refactor to allow batch item failures

* feat(cb2-14226): refactor to allow batch item failures

* feat(cb2-14226): refactor some more

* feat(cb2-14226): refactor some more

* feat(cb2-14226): resolve promise

* feat(cb2-14226): resolve promise

* feat(cb2-14226): resolve promise

* feat(cb2-14226): resolve promise

* feat(cb2-14226): resolve promise

* feat(cb2-14226): adjust SQS creation outside of loop

* feat(cb2-14226): pr feedback

* feat(cb2-14226): pr feedback
  • Loading branch information
naathanbrown authored Oct 16, 2024
1 parent 6cdfe04 commit b84af12
Show file tree
Hide file tree
Showing 8 changed files with 8,171 additions and 21,234 deletions.
29,166 changes: 8,030 additions & 21,136 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"reflect-metadata": "^0.1.13"
},
"devDependencies": {
"@commitlint/cli": "^12.1.4",
"@commitlint/cli": "^19.5.0",
"@commitlint/config-conventional": "^12.1.4",
"@types/aws-lambda": "^8.10.34",
"@types/jest": "^24.0.21",
Expand All @@ -72,7 +72,7 @@
"jest-plugin-context": "^2.9.0",
"jest-sonar-reporter": "^2.0.0",
"prettier": "^2.3.2",
"serverless": "^2.19.0",
"serverless": "^3.0.0",
"serverless-plugin-tracing": "^2.0.0",
"serverless-plugin-typescript": "^2.1.2",
"sonar-scanner": "^3.1.0",
Expand Down
94 changes: 57 additions & 37 deletions src/functions/certGenInit.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { Callback, Context, Handler } from "aws-lambda";
import { ServiceException } from "@smithy/smithy-client";
import { SendMessageCommandOutput, SQSClient } from "@aws-sdk/client-sqs";
import { SQSClient } from "@aws-sdk/client-sqs";
import {
Callback,
Context,
DynamoDBBatchItemFailure,
DynamoDBBatchResponse,
DynamoDBStreamEvent,
Handler,
} from "aws-lambda";
import { SQService } from "../services/SQService";
import { StreamService } from "../services/StreamService";
import { Utils } from "../utils/Utils";
Expand All @@ -12,47 +18,61 @@ import { Utils } from "../utils/Utils";
* @param callback - callback function
*/
const certGenInit: Handler = async (
event: any,
event: DynamoDBStreamEvent,
context?: Context,
callback?: Callback
): Promise<void | Array<SendMessageCommandOutput | any>> => {
): Promise<DynamoDBBatchResponse> => {
if (!event) {
console.error("ERROR: event is not defined.");
return;
throw new Error("ERROR: event is not defined");
}

// Convert the received event into a readable array of filtered test results
const expandedRecords: any[] = StreamService.getTestResultStream(event);
console.log(`Number of Retrieved records: ${expandedRecords.length}`);
const certGenFilteredRecords: any[] =
Utils.filterCertificateGenerationRecords(expandedRecords);

console.log(`Number of Filtered Retrieved Records: ${certGenFilteredRecords.length}`);

// Instantiate the Simple Queue Service
const sqService: SQService = new SQService(new SQSClient());
const sendMessagePromises: Array<
Promise<SendMessageCommandOutput | ServiceException>
> = [];

certGenFilteredRecords.forEach((record: any) => {
const stringifiedRecord = JSON.stringify(record);
console.log(stringifiedRecord);
sendMessagePromises.push(
sqService.sendCertGenMessage(stringifiedRecord)
);
});

return Promise.all(sendMessagePromises).catch((error) => {
console.error(error);
console.log("expandedRecords");
console.log(JSON.stringify(expandedRecords));
console.log("certGenFilteredRecords");
console.log(JSON.stringify(certGenFilteredRecords));
if (error.code !== "InvalidParameterValue") {
throw error;
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
let expandedRecords: any[] = [];
let certGenFilteredRecords: any[] = [];
let sqService: SQService;

try {
// Instantiate the Simple Queue Service
sqService = new SQService(new SQSClient());
} catch (e) {
console.error(`Error creating SQS instance: ${e}`);
throw new Error("Failed to initialize SQS service");
}

for (const record of event.Records) {
try {
expandedRecords = StreamService.getTestResultStream(record);
console.log(`Number of Retrieved records: ${expandedRecords.length}`);

certGenFilteredRecords =
Utils.filterCertificateGenerationRecords(expandedRecords);
console.log(
`Number of Filtered Retrieved Records: ${certGenFilteredRecords.length}`
);

for (const record of certGenFilteredRecords) {
const stringifiedRecord = JSON.stringify(record);
console.log(stringifiedRecord);
await sqService.sendCertGenMessage(stringifiedRecord);
}

console.log(
`event ${record.dynamodb?.SequenceNumber} successfully processed`
);
} catch (err) {
console.error(err);
console.log("expandedRecords");
console.log(JSON.stringify(expandedRecords));
console.log("certGenFilteredRecords");
console.log(JSON.stringify(certGenFilteredRecords));
batchItemFailures.push({
itemIdentifier: record.dynamodb?.SequenceNumber ?? "",
});
}
});
}

return { batchItemFailures };
};

export { certGenInit };
6 changes: 2 additions & 4 deletions src/services/SQService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import {
SQSClient,
SendMessageCommand,
SendMessageCommandInput,
SendMessageCommandOutput,
SetQueueAttributesCommand,
} from "@aws-sdk/client-sqs";

import { Service } from "../models/injector/ServiceDecorator";
Expand Down Expand Up @@ -69,7 +67,7 @@ class SQService {
messageBody: string,
queueName: string,
messageAttributes?: Record<string, MessageAttributeValue>
): Promise<SendMessageCommandOutput | ServiceException> {
) {
// Get the queue URL for the provided queue name
const queueUrlResult: GetQueueUrlCommandOutput = await this.sqsClient.send(
new GetQueueUrlCommand({ QueueName: queueName })
Expand All @@ -86,7 +84,7 @@ class SQService {
}

// Send a message to the queue
return this.sqsClient.send(
await this.sqsClient.send(
new SendMessageCommand(params as SendMessageCommandInput)
);
}
Expand Down
60 changes: 31 additions & 29 deletions src/services/StreamService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,25 @@ class StreamService {
* └── test-type-3
* @param event
*/
public static getTestResultStream(event: any) {
console.log(event);
public static getTestResultStream(record: DynamoDBRecord) {
console.log(record);
let records = [];
// Create from a test result with multiple test types, multiple test result with one test type each
const records: any[] = event.Records.filter((record: DynamoDBRecord) => {
// Retrieve "INSERT" events
return (
record.eventName === "INSERT" ||
(record.eventName === "MODIFY" &&
StreamService.isProcessModifyEventsEnabled())
);
}).map((record: any) => {
// Convert to JS object
if (
record.eventName === "INSERT" ||
(record.eventName === "MODIFY" &&
StreamService.isProcessModifyEventsEnabled())
) {
if (record.dynamodb && record.dynamodb.NewImage) {
return unmarshall(record.dynamodb.NewImage);
const unmarshalledRecord = unmarshall(
(record as any).dynamodb.NewImage
);
records = StreamService.expandRecords([unmarshalledRecord]);
}
});

return StreamService.expandRecords(records);
} else {
console.log("event name was not of correct type");
}
return records;
}

/**
Expand Down Expand Up @@ -75,22 +76,23 @@ class StreamService {
Object.assign(templateRecord, {});
console.log("before for each");
if (record.testTypes instanceof Array) {
record.testTypes?.forEach((testType: any, i: number, array: any[]) => {
console.log("in for each");
const clonedRecord: any = Object.assign({}, templateRecord); // Create record from template
Object.assign(clonedRecord, { testTypes: testType }); // Assign it the test type
Object.assign(clonedRecord, {
// Assign certificate order number
order: {
current: i + 1,
total: array.length,
},
});

splittedRecords.push(clonedRecord);
record.testTypes?.forEach(
(testType: any, i: number, array: any[]) => {
console.log("in for each");
const clonedRecord: any = Object.assign({}, templateRecord); // Create record from template
Object.assign(clonedRecord, { testTypes: testType }); // Assign it the test type
Object.assign(clonedRecord, {
// Assign certificate order number
order: {
current: i + 1,
total: array.length,
},
});
}

splittedRecords.push(clonedRecord);
}
);
}

console.log("after for each");
return splittedRecords;
Expand Down
37 changes: 26 additions & 11 deletions tests/unit/certGenInit.unitTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import {
GetQueueUrlCommand,
ReceiveMessageCommand,
ReceiveMessageCommandOutput,
SQSClient,
SendMessageCommand,
SendMessageCommandOutput,
SQSClient,
} from "@aws-sdk/client-sqs";
import { marshall, unmarshall } from "@aws-sdk/util-dynamodb";
import { DynamoDBRecord } from "aws-lambda";
import { mockClient } from "aws-sdk-client-mock";
import { Injector } from "../../src/models/injector/Injector";
import { SQService } from "../../src/services/SQService";
import { StreamService } from "../../src/services/StreamService";
import { Configuration } from "../../src/utils/Configuration";
import { SQMockClient } from "../models/SQMockClient";
import event from "../resources/stream-event.json";
import { mockClient } from "aws-sdk-client-mock";
import {marshall, unmarshall} from "@aws-sdk/util-dynamodb";

const record = {
testerStaffId: "1",
Expand Down Expand Up @@ -109,7 +110,9 @@ describe("cert-gen-init", () => {
"when fetching test result stream and the eventName is INSERT",
() => {
it("should result in an array of filtered js objects", () => {
processedEvent = StreamService.getTestResultStream(event);
processedEvent = StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
expect(processedEvent).toEqual(expectedResult);
});
}
Expand All @@ -121,34 +124,46 @@ describe("cert-gen-init", () => {
it("shouldn't result in an array of filtered js objects when PROCESS_MODIFY_EVENTS is false", () => {
process.env.PROCESS_MODIFY_EVENTS = "false";
event.Records[0].eventName = "MODIFY";
processedEvent = StreamService.getTestResultStream(event);
processedEvent = StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
expect(processedEvent).toHaveLength(0);
});

it("should result in an array of filtered js objects when PROCESS_MODIFY_EVENTS is true", () => {
process.env.PROCESS_MODIFY_EVENTS = "true";
event.Records[0].eventName = "MODIFY";
processedEvent = StreamService.getTestResultStream(event);
processedEvent = StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
expect(processedEvent).toHaveLength(1);
expect(processedEvent).toEqual(expectedResult);
});

it("should result in an empty array when the test type is an object", () => {
process.env.PROCESS_MODIFY_EVENTS = "true";
const eventWithTestTypeObject = unmarshall({...event}.Records[0].dynamodb.NewImage);
const eventWithTestTypeObject = unmarshall(
{ ...event }.Records[0].dynamodb.NewImage
);
eventWithTestTypeObject.testTypes = {};
event.Records[0].eventName = "MODIFY";
const mainEvent = {...event};
mainEvent.Records[0].dynamodb.NewImage = marshall(eventWithTestTypeObject) as any;
processedEvent = StreamService.getTestResultStream(mainEvent);
const mainEvent = { ...event };
mainEvent.Records[0].dynamodb.NewImage = marshall(
eventWithTestTypeObject
) as any;
processedEvent = StreamService.getTestResultStream(
mainEvent.Records[0] as DynamoDBRecord
);
expect(processedEvent).toEqual([]);
});

it("should throw an error if PROCESS_MODIFY_EVENTS is not true or false", () => {
process.env.PROCESS_MODIFY_EVENTS = "";
event.Records[0].eventName = "MODIFY";
expect(() => {
StreamService.getTestResultStream(event);
StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
}).toThrowError();
});
}
Expand Down
33 changes: 19 additions & 14 deletions tests/unit/certGenInitFunction.unitTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ describe("certGenInit Function", () => {
const result = await certGenInit(undefined, ctx, () => {
return;
});
expect(result).toBe(undefined);
} catch (e) {
} catch (e: any) {
expect(e.message).toBe("ERROR: event is not defined");
console.log(e);
}
});
Expand All @@ -36,7 +36,7 @@ describe("certGenInit Function", () => {
.mockReturnValue([{ TestRecord: "certGenMessage" }]);

try {
await certGenInit({}, ctx, () => {
await certGenInit({ Records: ["this is an event"] }, ctx, () => {
return;
});
} catch (e) {
Expand Down Expand Up @@ -64,15 +64,16 @@ describe("certGenInit Function", () => {
.fn()
.mockReturnValue([{ test: "thing" }]);

expect.assertions(2);
try {
await certGenInit({}, ctx, () => {
expect.assertions(1);

const returnedInfo = await certGenInit(
{ Records: ["this is an event"] },
ctx,
() => {
return;
});
} catch (e: any) {
expect(e.message).toEqual(myError.message);
expect(e.code).toEqual(myError.code);
}
}
);
expect(returnedInfo.batchItemFailures.length).toBe(1);
});
it("should not throw error if code is InvalidParameterValue", async () => {
StreamService.getTestResultStream = jest.fn().mockReturnValue([{}]);
Expand All @@ -90,9 +91,13 @@ describe("certGenInit Function", () => {

expect.assertions(1);
try {
const result = await certGenInit({}, ctx, () => {
return;
});
const result = await certGenInit(
{ Records: ["this is an event"] },
ctx,
() => {
return;
}
);
expect(result).toBe({});
} catch (e) {
console.log(e);
Expand Down
Loading

0 comments on commit b84af12

Please sign in to comment.