diff --git a/src/ancillary/multiprocess.ts b/src/ancillary/multiprocess.ts index 68e38e2a..5acdf797 100644 --- a/src/ancillary/multiprocess.ts +++ b/src/ancillary/multiprocess.ts @@ -1,6 +1,43 @@ import EventEmitter from 'node:events' import cluster from 'node:cluster' import { cpus } from 'node:os' +import { readFileSync } from 'node:fs' + +/** + * Read a number from a file. + */ +function readNumber(filename: string | URL): number { + return Number(readFileSync(filename, 'utf-8')) +} + +/** + * Read CPU limits from /sys/fs. + * See https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt + * and https://github.com/xiaoxiaojx/get_cpus_length + */ +function readCPULimits(): number { + try { + return ( + readNumber('/sys/fs/cgroup/cpu/cpu.cfs_quota_us') / + readNumber('/sys/fs/cgroup/cpu/cpu.cfs_period_us') + ) + } catch (err) { + return -1 + } +} + +/** + * Return number of CPUs available. + */ +function cpuSize(): number { + const maybeResult = readCPULimits() + + if (maybeResult > 0) { + return maybeResult + } + + return cpus().length +} export type MultiprocessOptions = { /** @@ -64,7 +101,10 @@ export class Multiprocess extends EventEmitter { this.work() return } - let processes = options.workers || cpus().length // TODO workers = -1 or undef means no workers + const workersFromEnv = process.env['MULTIPROCESS_SIZE'] + ? parseInt(process.env['MULTIPROCESS_SIZE'], 10) + : undefined + let processes = options.workers || workersFromEnv || cpuSize() process.on('SIGINT', this.stop).on('SIGTERM', this.stop) cluster.on('online', (wrk) => { this.emit('worker', wrk.process.pid)