From 78439daa778d38b2ac8eac3e0eb32cf40540b2c6 Mon Sep 17 00:00:00 2001 From: Sergey Ukustov Date: Wed, 12 Jun 2024 03:17:39 +0300 Subject: [PATCH] feat: Multiprocess (#1206) * feat: Multiprocess * fix: fix module name * wip: no workers when in Jest mode * wip: maybe teardown * fix: typing * trigger test * doc --------- Co-authored-by: Samika Kashyap --- src/ancillary/multiprocess.ts | 112 ++++++++++++++++++++++++++++++++++ src/server.ts | 32 +++++++--- 2 files changed, 134 insertions(+), 10 deletions(-) create mode 100644 src/ancillary/multiprocess.ts diff --git a/src/ancillary/multiprocess.ts b/src/ancillary/multiprocess.ts new file mode 100644 index 00000000..68e38e2a --- /dev/null +++ b/src/ancillary/multiprocess.ts @@ -0,0 +1,112 @@ +import EventEmitter from 'node:events' +import cluster from 'node:cluster' +import { cpus } from 'node:os' + +export type MultiprocessOptions = { + /** + * If `true`, a single worker failure should not kill us all. A failed worker will get respawned. + */ + keepAlive: boolean + /** + * If `true`, the workers are started right away. + */ + autostart: boolean + /** + * Number of child processes to spawn. By default, number of child processes equals to number of CPUs. + */ + workers: number | undefined +} + +const DEFAULT_OPTIONS: Partial = { + keepAlive: true, + autostart: true, +} + +export type TeardownFunction = () => void +export type MultiprocessWork = (() => void) | (() => TeardownFunction) + +export class Multiprocess extends EventEmitter { + private keepAlive: boolean + private readonly work: MultiprocessWork + private teardownFn: TeardownFunction | undefined = undefined + + constructor(work: MultiprocessWork, options: Partial) { + super() + const effectiveOptions = { ...DEFAULT_OPTIONS, ...options } + if (!work || typeof work !== 'function') { + throw new Error('You need to provide a worker function.') + } + + this.keepAlive = effectiveOptions.keepAlive ?? true + this.work = () => { + this.teardownFn = work() || undefined + } + this.fork = this.fork.bind(this) + this.stop = this.stop.bind(this) + + if (cluster.isPrimary) { + cluster.setupPrimary({ + silent: false, + }) + } + + if (effectiveOptions.autostart) { + if (cluster.isWorker) { + this.work() + } else { + this.start(effectiveOptions) + } + } + } + + start(options: Partial) { + if (options.workers === 0) { + this.work() + return + } + let processes = options.workers || cpus().length // TODO workers = -1 or undef means no workers + process.on('SIGINT', this.stop).on('SIGTERM', this.stop) + cluster.on('online', (wrk) => { + this.emit('worker', wrk.process.pid) + }) + cluster.on('exit', (wrk) => { + this.emit('exit', wrk.process.pid) + return this.fork() + }) + + while (processes) { + processes -= 1 + cluster.fork() + } + } + + stop() { + if (cluster.isPrimary) { + this.keepAlive = false + for (const worker of Object.values(cluster.workers || {})) { + if (worker) { + worker.process.kill() + worker.kill() + } + } + this.teardownFn?.() + this.emit('offline') + } + } + + fork(): void { + if (this.keepAlive) { + cluster.fork() + } + } +} + +/** + * Leverage node:cluster to spawn multiple identical child processes. + * + * @param work - execute inside a worker. + * @param options + */ +export function multiprocess(work: MultiprocessWork, options: Partial = {}) { + return new Multiprocess(work, options) +} diff --git a/src/server.ts b/src/server.ts index aac0818f..a2418147 100644 --- a/src/server.ts +++ b/src/server.ts @@ -2,14 +2,13 @@ import bodyParser from 'body-parser' import { Server } from '@overnightjs/core' import { auth } from './auth/index.js' import { expressLoggers, logger, expressErrorLogger } from './logger/index.js' - -import * as http from 'http' import { Config } from 'node-config-ts' +import { multiprocess, type Multiprocess } from './ancillary/multiprocess.js' const DEFAULT_SERVER_PORT = 8081 export class CeramicAnchorServer extends Server { - private _server?: http.Server + private _server?: Multiprocess constructor(controllers: any[], config: Config) { super(true) @@ -35,17 +34,30 @@ export class CeramicAnchorServer extends Server { * @param port - Server listening port */ start(port: number = DEFAULT_SERVER_PORT): Promise { + const workers = process.env['JEST_WORKER_ID'] ? 0 : undefined return new Promise((resolve, reject) => { - this._server = this.app - .listen(port, () => { - logger.imp(`Server ready: Listening on port ${port}`) - resolve() - }) - .on('error', (err) => reject(err)) + this._server = multiprocess( + () => { + const server = this.app + .listen(port, () => { + logger.imp(`Server ready: Listening on port ${port}`) + resolve() + }) + .on('error', (err) => reject(err)) + + return () => { + server.close() + } + }, + { + keepAlive: false, + workers: workers, + } + ) }) } stop(): void { - this._server?.close() + this._server?.stop() } }