Skip to content

Commit

Permalink
feat: Multiprocess (#1206)
Browse files Browse the repository at this point in the history
* feat: Multiprocess

* fix: fix module name

* wip: no workers when in Jest mode

* wip: maybe teardown

* fix: typing

* trigger test

* doc

---------

Co-authored-by: Samika Kashyap <samikas@samikas-mbp.lan>
  • Loading branch information
ukstv and Samika Kashyap authored Jun 12, 2024
1 parent a9c6283 commit 78439da
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 10 deletions.
112 changes: 112 additions & 0 deletions src/ancillary/multiprocess.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import EventEmitter from 'node:events'
import cluster from 'node:cluster'
import { cpus } from 'node:os'

export type MultiprocessOptions = {
/**
* If `true`, a single worker failure should not kill us all. A failed worker will get respawned.
*/
keepAlive: boolean
/**
* If `true`, the workers are started right away.
*/
autostart: boolean
/**
* Number of child processes to spawn. By default, number of child processes equals to number of CPUs.
*/
workers: number | undefined
}

const DEFAULT_OPTIONS: Partial<MultiprocessOptions> = {
keepAlive: true,
autostart: true,
}

export type TeardownFunction = () => void
export type MultiprocessWork = (() => void) | (() => TeardownFunction)

export class Multiprocess extends EventEmitter {
private keepAlive: boolean
private readonly work: MultiprocessWork
private teardownFn: TeardownFunction | undefined = undefined

constructor(work: MultiprocessWork, options: Partial<MultiprocessOptions>) {
super()
const effectiveOptions = { ...DEFAULT_OPTIONS, ...options }
if (!work || typeof work !== 'function') {
throw new Error('You need to provide a worker function.')
}

this.keepAlive = effectiveOptions.keepAlive ?? true
this.work = () => {
this.teardownFn = work() || undefined
}
this.fork = this.fork.bind(this)
this.stop = this.stop.bind(this)

if (cluster.isPrimary) {
cluster.setupPrimary({
silent: false,
})
}

if (effectiveOptions.autostart) {
if (cluster.isWorker) {
this.work()
} else {
this.start(effectiveOptions)
}
}
}

start(options: Partial<MultiprocessOptions>) {
if (options.workers === 0) {
this.work()
return
}
let processes = options.workers || cpus().length // TODO workers = -1 or undef means no workers
process.on('SIGINT', this.stop).on('SIGTERM', this.stop)
cluster.on('online', (wrk) => {
this.emit('worker', wrk.process.pid)
})
cluster.on('exit', (wrk) => {
this.emit('exit', wrk.process.pid)
return this.fork()
})

while (processes) {
processes -= 1
cluster.fork()
}
}

stop() {
if (cluster.isPrimary) {
this.keepAlive = false
for (const worker of Object.values(cluster.workers || {})) {
if (worker) {
worker.process.kill()
worker.kill()
}
}
this.teardownFn?.()
this.emit('offline')
}
}

fork(): void {
if (this.keepAlive) {
cluster.fork()
}
}
}

/**
* Leverage node:cluster to spawn multiple identical child processes.
*
* @param work - execute inside a worker.
* @param options
*/
export function multiprocess(work: MultiprocessWork, options: Partial<MultiprocessOptions> = {}) {
return new Multiprocess(work, options)
}
32 changes: 22 additions & 10 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ import bodyParser from 'body-parser'
import { Server } from '@overnightjs/core'
import { auth } from './auth/index.js'
import { expressLoggers, logger, expressErrorLogger } from './logger/index.js'

import * as http from 'http'
import { Config } from 'node-config-ts'
import { multiprocess, type Multiprocess } from './ancillary/multiprocess.js'

const DEFAULT_SERVER_PORT = 8081

export class CeramicAnchorServer extends Server {
private _server?: http.Server
private _server?: Multiprocess

constructor(controllers: any[], config: Config) {
super(true)
Expand All @@ -35,17 +34,30 @@ export class CeramicAnchorServer extends Server {
* @param port - Server listening port
*/
start(port: number = DEFAULT_SERVER_PORT): Promise<void> {
const workers = process.env['JEST_WORKER_ID'] ? 0 : undefined
return new Promise<void>((resolve, reject) => {
this._server = this.app
.listen(port, () => {
logger.imp(`Server ready: Listening on port ${port}`)
resolve()
})
.on('error', (err) => reject(err))
this._server = multiprocess(
() => {
const server = this.app
.listen(port, () => {
logger.imp(`Server ready: Listening on port ${port}`)
resolve()
})
.on('error', (err) => reject(err))

return () => {
server.close()
}
},
{
keepAlive: false,
workers: workers,
}
)
})
}

stop(): void {
this._server?.close()
this._server?.stop()
}
}

0 comments on commit 78439da

Please sign in to comment.