diff --git a/src/services/queue/sqs-queue-service.ts b/src/services/queue/sqs-queue-service.ts index dca622fb..376f146a 100644 --- a/src/services/queue/sqs-queue-service.ts +++ b/src/services/queue/sqs-queue-service.ts @@ -17,6 +17,7 @@ import type { Config } from 'node-config-ts' import { AnchorBatchQMessage, RequestQMessage } from '../../models/queue-message.js' import { Codec, decode } from 'codeco' import { AbortOptions } from '@ceramicnetwork/common' +import { logger } from '../../logger/index.js' const DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S = 21600 const DEFAULT_WAIT_TIME_FOR_MESSAGE_S = 10 @@ -78,10 +79,23 @@ export class SqsQueueService private readonly sqsQueueUrl: string, private readonly messageType: Codec ) { + const awsLogger = { + // eslint-disable-next-line @typescript-eslint/no-empty-function + info: () => {}, + error: (msg: any) => { + logger.err(msg) + }, + // eslint-disable-next-line @typescript-eslint/no-empty-function + debug: () => {}, + // eslint-disable-next-line @typescript-eslint/no-empty-function + warn: () => {}, + } + // Set the AWS Region. this.sqsClient = new SQSClient({ region: config.queue.awsRegion, endpoint: this.sqsQueueUrl, + logger: awsLogger, }) this.maxTimeToHoldMessageSec = config.queue.maxTimeToHoldMessageSec || DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S @@ -108,6 +122,9 @@ export class SqsQueueService abortSignal: abortOptions?.signal, }) .then((result) => result.Messages) + .catch((err) => { + throw new Error(`Failed to receive message from SQS queue ${this.sqsQueueUrl}: ${err}`) + }) if (!messages || messages.length !== 1) { return undefined @@ -125,14 +142,21 @@ export class SqsQueueService * Publishes a message to a sqs queue * @param data the data you want to publish */ - async sendMessage(data: TValue): Promise { + async sendMessage(data: TValue, attempt = 0): Promise { const sendMessageCommandInput = { QueueUrl: this.sqsQueueUrl, MessageBody: JSON.stringify(this.messageType.encode(data)), } await this.sqsClient.send(new SendMessageCommand(sendMessageCommandInput)).catch((err) => { - throw new Error(`Failed to send message to SQS queue: ${err}`) + if (err.message.includes('Signature expired') && attempt < 3) { + logger.warn( + `Received a signature expired error while sending message to SQS queue ${this.sqsQueueUrl} during attempt ${attempt}` + ) + return this.sendMessage(data, attempt + 1) + } + + throw new Error(`Failed to send message to SQS queue ${this.sqsQueueUrl}: ${err}`) }) } }