diff --git a/config/default.json b/config/default.json index 575155447..4ac937c0d 100644 --- a/config/default.json +++ b/config/default.json @@ -84,6 +84,8 @@ "type": "sqs", "awsRegion": "us-east-1", "sqsQueueUrl": "", + "s3BucketName": "myS3Bucket", + "s3Endpoint": "", "maxTimeToHoldMessageSec": 21600, "waitTimeForMessageSec": 0 } diff --git a/config/env/dev.json b/config/env/dev.json index bde134af9..a59d7b584 100644 --- a/config/env/dev.json +++ b/config/env/dev.json @@ -79,6 +79,8 @@ "type": "sqs", "awsRegion": "@@AWS_REGION", "sqsQueueUrl": "@@SQS_QUEUE_URL", + "s3BucketName": "@@S3_BUCKET_NAME", + "s3Endpoint": "@@S3_ENDPOINT", "maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC", "waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC" } diff --git a/config/env/prod.json b/config/env/prod.json index bde134af9..a59d7b584 100644 --- a/config/env/prod.json +++ b/config/env/prod.json @@ -79,6 +79,8 @@ "type": "sqs", "awsRegion": "@@AWS_REGION", "sqsQueueUrl": "@@SQS_QUEUE_URL", + "s3BucketName": "@@S3_BUCKET_NAME", + "s3Endpoint": "@@S3_ENDPOINT", "maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC", "waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC" } diff --git a/config/env/test.json b/config/env/test.json index e3d883447..59b3598fb 100644 --- a/config/env/test.json +++ b/config/env/test.json @@ -61,6 +61,7 @@ "type": "sqs", "awsRegion": "us-east-1", "sqsQueueUrl": "", + "s3BucketName": "ceramic-tnet-cas", "maxTimeToHoldMessageSec": 10800, "waitTimeForMessageSec": 10 } diff --git a/src/services/anchor-service.ts b/src/services/anchor-service.ts index fe4420bd1..cddae5736 100644 --- a/src/services/anchor-service.ts +++ b/src/services/anchor-service.ts @@ -217,7 +217,7 @@ export class AnchorService { } try { - logger.imp('Anchoring requests from the batch') + logger.imp(`Anchoring ${batchMessage.data.rids.length} requests from batch ${batchMessage.data.bid}`) const requests = await this.requestRepository.findByIds(batchMessage.data.rids) const requestsNotReplaced = requests.filter( diff --git a/src/services/queue/sqs-queue-service.ts b/src/services/queue/sqs-queue-service.ts index a59047852..acdda4b33 100644 --- a/src/services/queue/sqs-queue-service.ts +++ b/src/services/queue/sqs-queue-service.ts @@ -6,6 +6,9 @@ import { ChangeMessageVisibilityCommand, SendMessageCommand, } from '@aws-sdk/client-sqs' +import AWSSDK from 'aws-sdk' +import LevelUp from 'levelup' +import S3LevelDOWN from 's3leveldown' import { IpfsPubSubPublishQMessage, QueueMessageData } from '../../models/queue-message.js' import { IQueueConsumerService, @@ -19,6 +22,8 @@ import { AbortOptions } from '@ceramicnetwork/common' const DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S = 21600 const DEFAULT_WAIT_TIME_FOR_MESSAGE_S = 10 +const BATCH_STORE_PATH = '/cas/anchor/batch' + /** * Sqs Queue Message received by consumers. * Once the message is done processing you can either "ack" the message (remove the message from the queue) or "nack" the message (put the message back on the queue) @@ -60,6 +65,28 @@ export class SqsQueueMessage implements IQueueM } } +// This wrapper around SqsQueueMessage is used to handle the case where the list of batch request IDs is empty and must +// be fetched from S3. The underlying SqsQueueMessage remains the same (and is what is used for n/acking the message), +// but the data is updated to include the batch request IDs. +export class BatchQueueMessage implements IQueueMessage { + readonly data: AnchorBatchQMessage + + constructor( + private readonly anchorBatchMessage: IQueueMessage, + batchJson: any + ) { + this.data = decode(AnchorBatchQMessage, batchJson) + } + + async ack(): Promise { + await this.anchorBatchMessage.ack() + } + + async nack(): Promise { + await this.anchorBatchMessage.nack() + } +} + /** * Consumer and Producer for Sqs Queues */ @@ -149,10 +176,53 @@ export class ValidationSqsQueueService extends SqsQueueService * AnchorBatchSqsQueueService is used to consume and publish anchor batch messages. These batches are anchored by anchor workers */ export class AnchorBatchSqsQueueService extends SqsQueueService { - constructor(config: Config) { + constructor( + config: Config, + private s3StorePath = config.queue.s3BucketName + BATCH_STORE_PATH, + private s3Endpoint = config.queue.s3Endpoint ? config.queue.s3Endpoint : undefined, + private _s3store?: LevelUp.LevelUp + ) { const queueUrl = config.queue.sqsQueueUrl + 'batch' super(config, queueUrl, AnchorBatchQMessage) } + + /** + * `new LevelUp` attempts to open a database, which leads to a request to AWS. + * Let's make initialization lazy. + */ + get s3store(): LevelUp.LevelUp { + if (!this._s3store) { + const levelDown = this.s3Endpoint + ? new S3LevelDOWN( + this.s3StorePath, + new AWSSDK.S3({ + endpoint: this.s3Endpoint, + s3ForcePathStyle: true, + }) + ) + : new S3LevelDOWN(this.s3StorePath) + + this._s3store = new LevelUp(levelDown) + } + return this._s3store + } + + override async receiveMessage( + abortOptions?: AbortOptions + ): Promise | undefined> { + const anchorBatchMessage: IQueueMessage | undefined = + await super.receiveMessage(abortOptions) + // If the list of batch request IDs is empty, we need to fetch the full batch from S3. + if (anchorBatchMessage && anchorBatchMessage.data.rids.length === 0) { + try { + const batchJson = await this.s3store.get(anchorBatchMessage.data.bid) + return new BatchQueueMessage(anchorBatchMessage, JSON.parse(batchJson)) + } catch (err: any) { + throw Error(`Error retrieving batch ${anchorBatchMessage.data.bid} from S3: ${err.message}`) + } + } + return anchorBatchMessage + } } /**