diff --git a/src/app.ts b/src/app.ts index 11be4923e..ad5445d13 100644 --- a/src/app.ts +++ b/src/app.ts @@ -302,10 +302,12 @@ export class CeramicAnchorApp { const success = await anchorService .anchorRequests({ signal: controller.signal }) .catch((error: Error) => { - logger.err(`Error when anchoring: ${error}`) - Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, { - message: error.message.substring(0, 50), - }) + if (!controller.signal.aborted) { + logger.err(`Error when anchoring: ${error}`) + Metrics.count(METRIC_NAMES.ERROR_WHEN_ANCHORING, 1, { + message: error.message.substring(0, 50), + }) + } throw error }) diff --git a/src/services/__tests__/task-scheduler-service.test.ts b/src/services/__tests__/task-scheduler-service.test.ts index 4f8eb93e4..23a8e1169 100644 --- a/src/services/__tests__/task-scheduler-service.test.ts +++ b/src/services/__tests__/task-scheduler-service.test.ts @@ -2,91 +2,73 @@ import 'reflect-metadata' import { jest, describe, test, expect } from '@jest/globals' import { TaskSchedulerService } from '../task-scheduler-service.js' import { Utils } from '../../utils.js' +import { TestUtils } from '@ceramicnetwork/common' describe('scheduler service', () => { jest.setTimeout(20000) - test('will run the task repeatedly', (done) => { + test('will run the task repeatedly', async () => { const numberOfRunsBeforeDone = 3 const task = jest.fn() const testScheduler = new TaskSchedulerService() - const runChecks = () => { - // the task runs once right at the start before running every X seconds - expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1) - done() - } - let count = 0 task.mockImplementation(async () => { - if (count === numberOfRunsBeforeDone) { - testScheduler.stop() - runChecks() - } - count = count + 1 return Promise.resolve() }) testScheduler.start(task as any, 1000) - // test doesn't complete until 'done()' is called + await TestUtils.delay(1000 * numberOfRunsBeforeDone) + await testScheduler.stop() + expect(task.mock.calls.length).toBeGreaterThanOrEqual(numberOfRunsBeforeDone) }) - test('will continue if the task fails', (done) => { - const numberOfRunsBeforeDone = 5 + test('will stop if the task fails', async () => { + const mockExit = jest.spyOn(process, 'exit').mockImplementation(() => { + return + }) + const task = jest.fn() const testScheduler = new TaskSchedulerService() - const runChecks = () => { - // the task runs once right at the start before running every X seconds - expect(task.mock.calls.length).toEqual(numberOfRunsBeforeDone + 1) - Utils.delay(3000).then(() => { - done() - }) - } - let count = 0 task.mockImplementation(async () => { - if (count === numberOfRunsBeforeDone) { - testScheduler.stop() - runChecks() - } - count = count + 1 - // the last two runs will be rejected - if (count > numberOfRunsBeforeDone - 2) { - return Promise.reject('test error') + if (count === 2) { + throw Error('test error') } - return Promise.resolve() + return }) testScheduler.start(task as any, 1000) - // test doesn't complete until 'done()' is called + await TestUtils.waitForConditionOrTimeout(async () => { + // @ts-ignore + return testScheduler._subscription?.closed || false + }) + expect(mockExit).toHaveBeenCalled() }) - test('Will complete current task if stop is called', (done) => { + test('Will complete current task if stop is called', async () => { let calls = 0 const task = async () => { await Utils.delay(2000) calls = calls + 1 } + const testScheduler = new TaskSchedulerService() + testScheduler.start(task as any, 1000) + await Utils.delay(500) // stop is called during the task // stop should only return once the task completes - Utils.delay(1000).then(async () => { - await testScheduler.stop() - await Utils.delay(3000) - // task should have compelted once - expect(calls).toEqual(1) - done() - }) - - testScheduler.start(task as any, 1000) - // test doesn't complete until 'done()' is called + await testScheduler.stop() + await Utils.delay(3000) + // task should have completed once + expect(calls).toEqual(1) }) test('Will run cbAfterNoOp after failure if set', async () => { diff --git a/src/services/task-scheduler-service.ts b/src/services/task-scheduler-service.ts index 14f50a752..e6ef36b3b 100644 --- a/src/services/task-scheduler-service.ts +++ b/src/services/task-scheduler-service.ts @@ -1,7 +1,15 @@ import { logger } from '../logger/index.js' -import { catchError, Observable, Subscription, defer, share, timer, expand, concatMap, EMPTY, retry } from 'rxjs' -import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability' -import { METRIC_NAMES } from '../settings.js' +import { + catchError, + Observable, + Subscription, + defer, + share, + timer, + expand, + concatMap, + EMPTY, +} from 'rxjs' /** * Repeatedly triggers a task to be run after a configured amount of ms @@ -11,12 +19,10 @@ export class TaskSchedulerService { private _controller: AbortController private _subscription?: Subscription - constructor() { this._controller = new AbortController() } - /** * Starts the scheduler which will run the provided task at regular intervals * @param task task to perform regularly with a delay of intervalMS between runs @@ -24,7 +30,11 @@ export class TaskSchedulerService { * @param cbAfterNoOp default undefined. cbAfterNoOp is the callback to run if the task returns false. A task returning false indicates that it did not do anything (no op) * cbAfterNoOp will not be called if the task errors. If cbAfterNoOp is not set then the scheduler will continue to run the task regardless if the task was a no op or not */ - start(task: () => Promise, intervalMS = 60000, cbAfterNoOp?: () => Promise): void { + start( + task: () => Promise, + intervalMS = 60000, + cbAfterNoOp?: () => Promise + ): void { if (this._scheduledTask$) { throw new Error('Task already scheduled') } @@ -34,22 +44,14 @@ export class TaskSchedulerService { return false } - return await task().then(result => result === undefined || result) + return await task().then((result) => result === undefined || result) }).pipe( catchError((err: Error) => { - Metrics.count(METRIC_NAMES.SCHEDULER_TASK_UNCAUGHT_ERROR, 1) - logger.err(`Scheduler task failed: ${err}`) - if (this._controller.signal.aborted) { return EMPTY } throw err - }), - retry({ - delay: intervalMS, - count: 3, - resetOnSuccess: true, }) ) @@ -63,17 +65,19 @@ export class TaskSchedulerService { logger.imp(`Last run of the task was not successful. Stopping the task scheduler`) return EMPTY } - return timer(intervalMS).pipe(concatMap(() => taskWithRetryOnError$)) }), share() ) - this._subscription = this._scheduledTask$.subscribe({ complete: async () => { if (cbAfterNoOp) await cbAfterNoOp() - } + }, + error: (err) => { + logger.err(`Task scheduler exiting because of error: ${err}`) + process.exit(1) + }, }) } diff --git a/src/settings.ts b/src/settings.ts index af217802b..0bc69fd2c 100644 --- a/src/settings.ts +++ b/src/settings.ts @@ -59,10 +59,6 @@ export enum METRIC_NAMES { WITNESS_CAR_CACHE_HIT = 'witness_car_cache_hit', WITNESS_CAR_CACHE_MISS = 'witness_car_cache_miss', - // *******************************************************************// - // Scheduler Service - SCHEDULER_TASK_UNCAUGHT_ERROR = 'scheduler_task_uncaught_error', - // *******************************************************************// // Ceramic Service PIN_SUCCEEDED = 'pin_succeeded', @@ -80,4 +76,3 @@ export enum METRIC_NAMES { REQUEST_NOT_CREATED = 'request_not_created', REQUEST_NOT_FOUND = 'request_not_found', } -