From 748831ff1e5b997e7b5c8af117cfd98ad3d89133 Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Fri, 8 Mar 2024 16:20:30 -0800 Subject: [PATCH] feat: exit with error when an anchor worker fails to anchor a batch --- src/app.ts | 10 +++--- .../__tests__/task-scheduler-service.test.ts | 26 ++++++---------- src/services/task-scheduler-service.ts | 31 +++++++++++-------- 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/app.ts b/src/app.ts index 11be4923e..ad5445d13 100644 --- a/src/app.ts +++ b/src/app.ts @@ -302,10 +302,12 @@ export class CeramicAnchorApp { const success = await anchorService .anchorRequests({ signal: controller.signal }) .catch((error: Error) => { - logger.err(`Error when anchoring: ${error}`) - Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, { - message: error.message.substring(0, 50), - }) + if (!controller.signal.aborted) { + logger.err(`Error when anchoring: ${error}`) + Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, { + message: error.message.substring(0, 50), + }) + } throw error }) diff --git a/src/services/__tests__/task-scheduler-service.test.ts b/src/services/__tests__/task-scheduler-service.test.ts index 4f8eb93e4..773bc52d5 100644 --- a/src/services/__tests__/task-scheduler-service.test.ts +++ b/src/services/__tests__/task-scheduler-service.test.ts @@ -2,6 +2,7 @@ import 'reflect-metadata' import { jest, describe, test, expect } from '@jest/globals' import { TaskSchedulerService } from '../task-scheduler-service.js' import { Utils } from '../../utils.js' +import { TestUtils } from '@ceramicnetwork/common' describe('scheduler service', () => { jest.setTimeout(20000) @@ -33,30 +34,15 @@ describe('scheduler service', () => { // test doesn't complete until 'done()' is called }) - test('will continue if the task fails', (done) => { - const numberOfRunsBeforeDone = 5 + test('will stop if the task fails', (done) => { const task = jest.fn() const testScheduler = new TaskSchedulerService() - const runChecks = () => { - // the task runs once right at the start before running every X seconds - expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1) - Utils.delay(3000).then(() => { - done() - }) - } - let count = 0 task.mockImplementation(async () => { - if (count === numberOfRunsBeforeDone) { - testScheduler.stop() - runChecks() - } - count = count + 1 - // the last two runs will be rejected - if (count > numberOfRunsBeforeDone - 2) { + if (count === 2) { return Promise.reject('test error') } @@ -64,6 +50,12 @@ describe('scheduler service', () => { }) testScheduler.start(task as any, 1000) + // @ts-ignore + testScheduler._subscription?.add(() => { + expect(task.mock.calls.length).toEqual(2) + testScheduler.stop() + done() + }) // test doesn't complete until 'done()' is called }) diff --git a/src/services/task-scheduler-service.ts b/src/services/task-scheduler-service.ts index 14f50a752..6e112e0e3 100644 --- a/src/services/task-scheduler-service.ts +++ b/src/services/task-scheduler-service.ts @@ -1,5 +1,15 @@ import { logger } from '../logger/index.js' -import { catchError, Observable, Subscription, defer, share, timer, expand, concatMap, EMPTY, retry } from 'rxjs' +import { + catchError, + Observable, + Subscription, + defer, + share, + timer, + expand, + concatMap, + EMPTY, +} from 'rxjs' import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' import { METRIC_NAMES } from '../settings.js' @@ -11,12 +21,10 @@ export class TaskSchedulerService { private _controller: AbortController private _subscription?: Subscription - constructor() { this._controller = new AbortController() } - /** * Starts the scheduler which will run the provided task at regular intervals * @param task task to perform regularly with a delay of intervalMS between runs @@ -24,7 +32,11 @@ export class TaskSchedulerService { * @param cbAfterNoOp default undefined. cbAfterNoOp is the callback to run if the task returns false. A task returning false indicates that it did not do anything (no op) * cbAfterNoOp will not be called if the task errors. If cbAfterNoOp is not set then the scheduler will continue to run the task regardless if the task was a no op or not */ - start(task: () => Promise, intervalMS = 60000, cbAfterNoOp?: () => Promise): void { + start( + task: () => Promise, + intervalMS = 60000, + cbAfterNoOp?: () => Promise + ): void { if (this._scheduledTask$) { throw new Error('Task already scheduled') } @@ -34,7 +46,7 @@ export class TaskSchedulerService { return false } - return await task().then(result => result === undefined || result) + return await task().then((result) => result === undefined || result) }).pipe( catchError((err: Error) => { Metrics.count(METRIC_NAMES.SCHEDULER_TASK_UNCAUGHT_ERROR, 1) @@ -45,11 +57,6 @@ export class TaskSchedulerService { } throw err - }), - retry({ - delay: intervalMS, - count: 3, - resetOnSuccess: true, }) ) @@ -63,17 +70,15 @@ export class TaskSchedulerService { logger.imp(`Last run of the task was not successful. Stopping the task scheduler`) return EMPTY } - return timer(intervalMS).pipe(concatMap(() => taskWithRetryOnError$)) }), share() ) - this._subscription = this._scheduledTask$.subscribe({ complete: async () => { if (cbAfterNoOp) await cbAfterNoOp() - } + }, }) }