Skip to content

Commit

Permalink
feat: add metrics to track queue sizes and add operations
Browse files Browse the repository at this point in the history
  • Loading branch information
gvelez17 committed Oct 7, 2024
1 parent 8616ca3 commit 4c8ac1b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 0 deletions.
3 changes: 3 additions & 0 deletions packages/cli/src/s3-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import PQueue from 'p-queue'
import AWSSDK from 'aws-sdk'
import { Mutex } from 'await-semaphore'
import type { DeepNonNullable } from 'ts-essentials'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'

/**
* **Remove** `undefined` fields from an S3 Level search params.
Expand Down Expand Up @@ -185,6 +186,8 @@ class S3KVStore implements IKVStore {
}

get(key: string): Promise<any> {
Metrics.count(LOAD_S3_QUEUE_ADD, 1)
Metrics.observe(LOAD_S3_SIZE, this.#loadingLimit.size)
return this.#loadingLimit.add(async () => {
const value = await this.level.get(key)
return JSON.parse(value)
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/ancillary/task-queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import PQueue from 'p-queue'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'

const TASK_QUEUE_SIZE = 'task_queue_size'
const TASK_QUEUE_SIZE_PENDING = 'task_queue_size_pending'

export const noop = () => {
// Do Nothing
Expand Down Expand Up @@ -52,6 +56,8 @@ export class TaskQueue implements TaskQueueLike {
* Size of the queue. Counts both deferred and currently running tasks.
*/
get size(): number {
Metrics.observe(TASK_QUEUE_SIZE, this.#pq.size)
Metrics.observe(TASK_QUEUE_SIZE_PENDING, this.#pq.pending)
return this.#pq.size + this.#pq.pending
}

Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/state-management/named-task-queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { noop, TaskQueue } from '../ancillary/task-queue.js'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'

const NAMED_TASK_QUEUE_SIZE = 'named_task_queue_size'
const NAMED_TASK_QUEUE_RUN = 'named_task_queue_run'
const NAMED_TASK_QUEUE_ADD = 'named_task_queue_add'

/**
* Set of named PQueues.
Expand Down Expand Up @@ -49,6 +54,8 @@ export class NamedTaskQueue {
*/
run<A>(name: string, task: () => Promise<A>): Promise<A> {
const queue = this.queue(name)
Metrics.observe(NAMED_TASK_QUEUE_SIZE, queue.size, {'name': name})
Metrics.count(NAMED_TASK_QUEUE_RUN, 1, {'name': name})
return queue.run(task).finally(() => {
this.remove(name)
})
Expand All @@ -62,6 +69,8 @@ export class NamedTaskQueue {
*/
add(name: string, task: () => Promise<void>): void {
const queue = this.queue(name)
Metrics.observe(NAMED_TASK_QUEUE_SIZE, queue.size, {'name': name})
Metrics.count(NAMED_TASK_QUEUE_ADD, 1, {'name': name})
queue.add(
() => task(),
() => this.remove(name)
Expand Down
4 changes: 4 additions & 0 deletions packages/indexing/src/history-sync/workers/rebuild-anchor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import type { Worker, Job } from '@ceramicnetwork/job-queue'
import { CID } from 'multiformats/cid'
import { pathString } from '@ceramicnetwork/anchor-utils'
import PQueue from 'p-queue'
import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'

// NOTE: In V' history sync will need to be reworked (ex. use CAR files, use recon)

// Up to 1024 streams could be present in an anchor
const IPFS_LOAD_CONCURRENCY = 16

const REBUILD_ANCHOR_QUEUE_SIZE = 'rebuild_anchor_queue_size'

const REBUILD_ANCHOR_JOB_OPTIONS: SendOptions = {
retryLimit: 5,
retryDelay: 60, // 1 minute
Expand Down Expand Up @@ -151,6 +154,7 @@ export class RebuildAnchorWorker implements Worker<RebuildAnchorJobData> {

const queue = new PQueue({ concurrency: IPFS_LOAD_CONCURRENCY })
await queue.addAll(tasks)
Metrics.observe(REBUILD_ANCHOR_QUEUE_SIZE, queue.size)

this.logger.debug(
`Rebuild anchor job completed for models ${jobData.models}, root ${jobData.root}, and txHash ${jobData.txHash}`
Expand Down

0 comments on commit 4c8ac1b

Please sign in to comment.