Skip to content

Commit

Permalink
feat: retry on signature expired and log sqs client errors
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 committed May 20, 2024
1 parent 5e12509 commit 7c3a551
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions src/services/queue/sqs-queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type { Config } from 'node-config-ts'
import { AnchorBatchQMessage, RequestQMessage } from '../../models/queue-message.js'
import { Codec, decode } from 'codeco'
import { AbortOptions } from '@ceramicnetwork/common'
import { logger } from '../../logger/index.js'

const DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S = 21600
const DEFAULT_WAIT_TIME_FOR_MESSAGE_S = 10
Expand Down Expand Up @@ -78,10 +79,23 @@ export class SqsQueueService<TValue extends QueueMessageData>
private readonly sqsQueueUrl: string,
private readonly messageType: Codec<TValue, TValue>
) {
const awsLogger = {
// eslint-disable-next-line @typescript-eslint/no-empty-function
info: () => {},
error: (msg: any) => {
logger.err(msg)
},
// eslint-disable-next-line @typescript-eslint/no-empty-function
debug: () => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
warn: () => {},
}

// Set the AWS Region.
this.sqsClient = new SQSClient({
region: config.queue.awsRegion,
endpoint: this.sqsQueueUrl,
logger: awsLogger,
})
this.maxTimeToHoldMessageSec =
config.queue.maxTimeToHoldMessageSec || DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S
Expand All @@ -108,6 +122,9 @@ export class SqsQueueService<TValue extends QueueMessageData>
abortSignal: abortOptions?.signal,
})
.then((result) => result.Messages)
.catch((err) => {
throw new Error(`Failed to receive message from SQS queue ${this.sqsQueueUrl}: ${err}`)
})

if (!messages || messages.length !== 1) {
return undefined
Expand All @@ -125,14 +142,21 @@ export class SqsQueueService<TValue extends QueueMessageData>
* Publishes a message to a sqs queue
* @param data the data you want to publish
*/
async sendMessage(data: TValue): Promise<void> {
async sendMessage(data: TValue, attempt = 0): Promise<void> {
const sendMessageCommandInput = {
QueueUrl: this.sqsQueueUrl,
MessageBody: JSON.stringify(this.messageType.encode(data)),
}

await this.sqsClient.send(new SendMessageCommand(sendMessageCommandInput)).catch((err) => {
throw new Error(`Failed to send message to SQS queue: ${err}`)
if (err.message.includes('Signature expired') && attempt < 3) {
logger.warn(
`Received a signature expired error while sending message to SQS queue ${this.sqsQueueUrl} during attempt ${attempt}`
)
return this.sendMessage(data, attempt + 1)
}

throw new Error(`Failed to send message to SQS queue ${this.sqsQueueUrl}: ${err}`)
})
}
}
Expand Down

0 comments on commit 7c3a551

Please sign in to comment.