Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Feature: Adds support for sqs message attributes #52

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ await sqsProducer.sendJSON({
});
```

The sendJSON method of sqs producer also supports an optional sqs message options argument.

```ts
SqsMessageOptions {
DelaySeconds?: number;
MessageDeduplicationId?: string;
MessageGroupId?: string;
MessageAttributes?: MessageBodyAttributeMap;
}
```

### SQS Consumer

```ts
Expand Down
2 changes: 1 addition & 1 deletion src/sqs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export class SqsConsumer {
QueueUrl: this.queueUrl,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: this.waitTimeSeconds,
MessageAttributeNames: [SQS_LARGE_PAYLOAD_SIZE_ATTRIBUTE],
MessageAttributeNames: ['All'],
});
if (!this.started) return;
await this.handleSqsResponse(response);
Expand Down
15 changes: 11 additions & 4 deletions src/sqs-producer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as aws from 'aws-sdk';
import { PromiseResult } from 'aws-sdk/lib/request';
import { MessageBodyAttributeMap } from 'aws-sdk/clients/sqs';
import { v4 as uuid } from 'uuid';
import { S3PayloadMeta } from './types';
import {
Expand Down Expand Up @@ -31,6 +32,7 @@ export interface SqsMessageOptions {
DelaySeconds?: number;
MessageDeduplicationId?: string;
MessageGroupId?: string;
MessageAttributes?: MessageBodyAttributeMap
}

export class SqsProducer {
Expand Down Expand Up @@ -122,6 +124,7 @@ export class SqsProducer {
DelaySeconds: options.DelaySeconds,
MessageDeduplicationId: options.MessageDeduplicationId,
MessageGroupId: options.MessageGroupId,
MessageAttributes: options.MessageAttributes || {}
})
.promise();

Expand All @@ -138,9 +141,13 @@ export class SqsProducer {
msgSize?: number,
options: SqsMessageOptions = {}
): Promise<PromiseResult<aws.SQS.SendMessageResult, aws.AWSError>> {
const messageAttributes = this.extendedLibraryCompatibility
? createExtendedCompatibilityAttributeMap(msgSize)
: {};
const messageAttributes = {
...(options.MessageAttributes || {}),
...(this.extendedLibraryCompatibility
? createExtendedCompatibilityAttributeMap(msgSize)
: {}
)
};
return await this.sqs
.sendMessage({
QueueUrl: this.queueUrl,
Expand All @@ -150,7 +157,7 @@ export class SqsProducer {
DelaySeconds: options.DelaySeconds,
MessageDeduplicationId: options.MessageDeduplicationId,
MessageGroupId: options.MessageGroupId,
MessageAttributes: messageAttributes,
MessageAttributes: messageAttributes
})
.promise();
}
Expand Down
50 changes: 47 additions & 3 deletions tests/sns-sqs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import {
SnsProducerOptions,
SnsProducer,
SqsMessage,
SqsMessageOptions
} from '../src';

import { MessageAttributeMap } from 'aws-sdk/clients/sns';
import * as aws from 'aws-sdk';
import { v4 as uuid } from 'uuid';
import { S3PayloadMeta } from '../src/types';
Expand Down Expand Up @@ -89,12 +91,12 @@ const getSqsProducer = (options: Partial<SqsProducerOptions> = {}) => {
});
};

async function sendMessage(msg: any, options?: Partial<SqsProducerOptions>) {
async function sendMessage(msg: any, options?: Partial<SqsProducerOptions>, sqsMessageOptions?:SqsMessageOptions) {
const sqsProducer = getSqsProducer(options);
return await sqsProducer.sendJSON(msg);
return await sqsProducer.sendJSON(msg, sqsMessageOptions);
}

async function sendS3Payload(s3PayloadMeta: S3PayloadMeta, options: Partial<SqsProducerOptions>) {
async function sendS3Payload(s3PayloadMeta: S3PayloadMeta, options: Partial<SqsProducerOptions>, sqsMessageOptions?:SqsMessageOptions) {
const sqsProducer = getSqsProducer(options);
return await sqsProducer.sendS3Payload(s3PayloadMeta);
}
Expand Down Expand Up @@ -237,6 +239,27 @@ describe('sns-sqs-big-payload', () => {
});
});

describe('sending messages with MessageAttributes', () => {
it('should send and receive the message', async () => {
const message = { it: 'works' };
const sqsMessageOptions = {
MessageAttributes: {
testAttribute: {
DataType: 'String',
StringValue: 'test',
StringListValues:[],
BinaryListValues:[]
}
}
};

await sendMessage(message, {}, sqsMessageOptions);
const [receivedMessage] = await receiveMessages(1);
expect(receivedMessage.payload).toEqual(message);
expect(receivedMessage.message.MessageAttributes).toEqual(sqsMessageOptions.MessageAttributes);
});
});

describe('events', () => {
function getEventHandlers() {
const handlers = Object.keys(SqsConsumerEvents).reduce((acc, key) => {
Expand Down Expand Up @@ -446,6 +469,27 @@ describe('sns-sqs-big-payload', () => {
expect(receivedMessage.s3PayloadMeta.Key).toBeDefined();
});

it('should send message though s3 with MessageAttributes', async () => {
const message = { it: 'works' };
const sqsMessageOptions = {
MessageAttributes: {
testAttribute: {
DataType: 'String',
StringValue: 'test',
StringListValues:[],
BinaryListValues:[]
}
}
};

await sendMessage(message, { allPayloadThoughS3: true, s3Bucket: TEST_BUCKET_NAME }, sqsMessageOptions);
const [receivedMessage] = await receiveMessages(1, { getPayloadFromS3: true });
expect(receivedMessage.payload).toEqual(message);
expect(receivedMessage.message.MessageAttributes).toEqual(sqsMessageOptions.MessageAttributes);
expect(receivedMessage.s3PayloadMeta.Bucket).toEqual(TEST_BUCKET_NAME);
expect(receivedMessage.s3PayloadMeta.Key).toBeDefined();
});

it('should send large message through s3', async () => {
const message = 'x'.repeat(256 * 1024 + 1);
await sendMessage(message, { largePayloadThoughS3: true, s3Bucket: TEST_BUCKET_NAME });
Expand Down