Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Feature/sns producer message attributes #48

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/sns-producer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as aws from 'aws-sdk';
import { PromiseResult } from 'aws-sdk/lib/request';
import { MessageAttributeMap } from 'aws-sdk/clients/sns';
import { v4 as uuid } from 'uuid';
import { S3PayloadMeta } from './types';
import {
Expand Down Expand Up @@ -81,7 +82,7 @@ export class SnsProducer {
return new SnsProducer(options);
}

async publishJSON(message: unknown): Promise<PublishResult> {
async publishJSON(message: unknown, snsMessageAttributes?: MessageAttributeMap): Promise<PublishResult> {
const messageBody = JSON.stringify(message);
const msgSize = Buffer.byteLength(messageBody, 'utf-8');

Expand All @@ -104,7 +105,8 @@ export class SnsProducer {
Key: s3Response.Key,
Location: s3Response.Location,
},
msgSize
msgSize,
snsMessageAttributes
);

return {
Expand All @@ -121,6 +123,7 @@ export class SnsProducer {
.publish({
Message: messageBody,
TopicArn: this.topicArn,
MessageAttributes: snsMessageAttributes || {},
})
.promise();

Expand All @@ -131,11 +134,17 @@ export class SnsProducer {

async publishS3Payload(
s3PayloadMeta: S3PayloadMeta,
msgSize?: number
msgSize?: number,
snsMessageAttributes?: MessageAttributeMap
): Promise<PromiseResult<aws.SNS.PublishResponse, aws.AWSError>> {
const messageAttributes = this.extendedLibraryCompatibility
? createExtendedCompatibilityAttributeMap(msgSize)
: {};
const messageAttributes = {
...(snsMessageAttributes || {}),
...(this.extendedLibraryCompatibility
? createExtendedCompatibilityAttributeMap(msgSize)
: {}
)
};

return await this.sns
.publish({
Message: this.extendedLibraryCompatibility
Expand Down
45 changes: 42 additions & 3 deletions tests/sns-sqs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from '../src';

import * as aws from 'aws-sdk';
import { MessageAttributeMap } from 'aws-sdk/clients/sns';
import { v4 as uuid } from 'uuid';
import { S3PayloadMeta } from '../src/types';

Expand Down Expand Up @@ -138,9 +139,9 @@ const getSnsProducer = (options: Partial<SnsProducerOptions> = {}) => {
});
};

async function publishMessage(msg: any, options?: Partial<SnsProducerOptions>) {
async function publishMessage(msg: any, options?: Partial<SnsProducerOptions>, attributes?: MessageAttributeMap) {
const snsProducer = getSnsProducer(options);
await snsProducer.publishJSON(msg);
await snsProducer.publishJSON(msg, attributes);
}

async function publishS3Payload(s3PayloadMeta: S3PayloadMeta, options?: Partial<SnsProducerOptions>) {
Expand All @@ -152,7 +153,7 @@ async function receiveMessages(
expectedMsgsCount: number,
options: Partial<SqsConsumerOptions> = {},
eventHandlers?: Record<string | symbol, (...args) => void>
): Promise<SqsMessage[]> {
): Promise<SqsMessage[] | void> {
const { s3 } = getClients();
return new Promise((resolve, rej) => {
const messages: SqsMessage[] = [];
Expand Down Expand Up @@ -603,6 +604,24 @@ describe('sns-sqs-big-payload', () => {
});
expect(receivedMessage.payload).toEqual(message);
});

it('should publish and receive the message with SNS message attributes', async () => {
const message = { it: 'works' };
const attributes = {
testAttribute: {
DataType: 'String',
StringValue: 'AttrubuteValue',
}
};
await publishMessage(message, {}, attributes);
const [receivedMessage] = await receiveMessages(1, {
transformMessageBody: (body) => {
const snsMessage = JSON.parse(body);
return snsMessage.Message;
},
});
expect(receivedMessage.payload).toEqual(message);
});
});

describe('publishing message through s3', () => {
Expand Down Expand Up @@ -643,6 +662,26 @@ describe('sns-sqs-big-payload', () => {
expect(reReceivedMessage.payload).toEqual(message);
expect(reReceivedMessage.s3PayloadMeta).toEqual(receivedMessage.s3PayloadMeta);
});

it('should send payload though s3 with SNS message attributes', async () => {
const message = { it: 'works' };
const attributes = {
testAttribute: {
DataType: 'String',
StringValue: 'AttrubuteValue',
}
};
await publishMessage(message, { allPayloadThoughS3: true, s3Bucket: TEST_BUCKET_NAME }, attributes);
const [receivedMessage] = await receiveMessages(1, {
getPayloadFromS3: true,
// since it's SNS message we need to unwrap sns envelope first
transformMessageBody: (body) => {
const snsMessage = JSON.parse(body);
return snsMessage.Message;
},
});
expect(receivedMessage.payload).toEqual(message);
});
});
});
});