diff --git a/lib/exec.js b/lib/exec.js index 72d650a..4daac01 100644 --- a/lib/exec.js +++ b/lib/exec.js @@ -1,10 +1,10 @@ -/* eslint-disable promise/prefer-await-to-callbacks */ - import { spawn } from 'child_process'; import { quote } from 'shell-quote'; import B from 'bluebird'; import _ from 'lodash'; import { formatEnoent } from './helpers'; +import {LRUCache} from 'lru-cache'; + const MAX_BUFFER_SIZE = 100 * 1024 * 1024; @@ -13,16 +13,16 @@ const MAX_BUFFER_SIZE = 100 * 1024 * 1024; * @template {TeenProcessExecOptions} T * @param {string} cmd - Program to execute * @param {string[]} [args] - Arguments to pass to the program - * @param {T} [opts] - Options + * @param {T} [originalOpts] - Options * @returns {Promise extends true ? TeenProcessExecBufferResult : TeenProcessExecStringResult>} */ -async function exec (cmd, args = [], opts = /** @type {T} */({})) { +async function exec (cmd, args = [], originalOpts = /** @type {T} */({})) { // get a quoted representation of the command for error strings const rep = quote([cmd, ...args]); // extend default options; we're basically re-implementing exec's options // for use here with spawn under the hood - opts = /** @type {T} */(_.defaults(opts, { + const opts = /** @type {T} */(_.defaults(_.cloneDeep(originalOpts), { timeout: null, encoding: 'utf8', killSignal: 'SIGTERM', @@ -43,8 +43,16 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) { return await new B((resolve, reject) => { // spawn the child process with options; we don't currently expose any of // the other 'spawn' options through the API - let proc = spawn(cmd, args, {cwd: opts.cwd, env: opts.env, shell: opts.shell}); - let stdoutArr = [], stderrArr = [], timer = null; + const proc = spawn(cmd, args, {cwd: opts.cwd, env: opts.env, shell: opts.shell}); + /** @type {LRUCache} */ + const stdoutCache = new LRUCache({ + max: opts.maxStdoutBufferSize ?? MAX_BUFFER_SIZE, + });; + /** @type {LRUCache} */ + const stderrCache = new LRUCache({ + max: opts.maxStdoutBufferSize ?? MAX_BUFFER_SIZE, + });; + let timer = null; // if the process errors out, reject the promise proc.on('error', /** @param {NodeJS.ErrnoException} err */ async (err) => { @@ -58,7 +66,7 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) { reject(new Error(`Standard input '${err.syscall}' error: ${err.stack}`)); }); } - const handleStream = (streamType, streamProps) => { + const handleStream = (/** @type {string} */ streamType, /** @type {LRUCache} */ cache) => { if (!proc[streamType]) { return; } @@ -74,28 +82,15 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) { } // keep track of the stream if we don't want to ignore it - const {chunks, maxSize} = streamProps; - let size = 0; - proc[streamType].on('data', (chunk) => { - chunks.push(chunk); - size += chunk.length; - while (chunks.length > 1 && size >= maxSize) { - size -= chunks[0].length; - chunks.shift(); - } + proc[streamType].on('data', (/** @type {Buffer} */ chunk) => { + cache.set(cache.size, chunk); if (opts.logger && _.isFunction(opts.logger.debug)) { opts.logger.debug(chunk.toString()); } }); }; - handleStream('stdout', { - maxSize: opts.maxStdoutBufferSize, - chunks: stdoutArr, - }); - handleStream('stderr', { - maxSize: opts.maxStderrBufferSize, - chunks: stderrArr, - }); + handleStream('stdout', stdoutCache); + handleStream('stderr', stderrCache); /** * @template {boolean} U @@ -103,13 +98,18 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) { * @returns {U extends true ? {stdout: Buffer, stderr: Buffer} : {stdout: string, stderr: string}} */ function getStdio (isBuffer) { - let stdout, stderr; + /** @type {string | Buffer} */ + let stdout; + /** @type {string | Buffer} */ + let stderr; if (isBuffer) { - stdout = Buffer.concat(stdoutArr); - stderr = Buffer.concat(stderrArr); + stdout = Buffer.concat(/** @type {Uint8Array[]} */ ([...stdoutCache.values()])); + stderr = Buffer.concat(/** @type {Uint8Array[]} */ ([...stderrCache.values()])); } else { - stdout = Buffer.concat(stdoutArr).toString(opts.encoding); - stderr = Buffer.concat(stderrArr).toString(opts.encoding); + stdout = Buffer.concat(/** @type {Uint8Array[]} */ ([...stdoutCache.values()])) + .toString(opts.encoding); + stderr = Buffer.concat(/** @type {Uint8Array[]} */ ([...stderrCache.values()])) + .toString(opts.encoding); } return /** @type {U extends true ? {stdout: Buffer, stderr: Buffer} : {stdout: string, stderr: string}} */({stdout, stderr}); } @@ -121,7 +121,7 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) { if (timer) { clearTimeout(timer); } - let {stdout, stderr} = getStdio(isBuffer); + const {stdout, stderr} = getStdio(isBuffer); if (code === 0) { resolve(/** @type {BufferProp extends true ? TeenProcessExecBufferResult : TeenProcessExecStringResult} */({stdout, stderr, code})); } else { @@ -136,7 +136,7 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) { // have in case it's helpful in debugging if (opts.timeout) { timer = setTimeout(() => { - let {stdout, stderr} = getStdio(isBuffer); + const {stdout, stderr} = getStdio(isBuffer); let err = new Error(`Command '${rep}' timed out after ${opts.timeout}ms`); err = Object.assign(err, {stdout, stderr, code: null}); reject(err); diff --git a/lib/subprocess.js b/lib/subprocess.js index e92a43f..7934c21 100644 --- a/lib/subprocess.js +++ b/lib/subprocess.js @@ -1,5 +1,3 @@ -/* eslint-disable promise/prefer-await-to-callbacks */ - import { spawn } from 'child_process'; import events from 'events'; const { EventEmitter } = events; @@ -7,28 +5,10 @@ import B from 'bluebird'; import { quote } from 'shell-quote'; import _ from 'lodash'; import { formatEnoent } from './helpers'; - - -// This is needed to avoid memory leaks -// when the process output is too long and contains -// no line breaks -const MAX_LINE_PORTION_LENGTH = 0xFFFF; - -function cutSuffix (str, suffixLength) { - return str.length > suffixLength - // https://bugs.chromium.org/p/v8/issues/detail?id=2869 - ? ` ${str.substr(str.length - suffixLength)}`.substr(1) - : str; -} - +import { createInterface } from 'node:readline/promises'; class SubProcess extends EventEmitter { - /** - * @type { {stdout: string, stderr: string} } - */ - lastLinePortion; - /** @type {import('child_process').ChildProcess?} */ proc; @@ -74,8 +54,6 @@ class SubProcess extends EventEmitter { // get a quoted representation of the command for error strings this.rep = quote([cmd, ...args]); - - this.lastLinePortion = {stdout: '', stderr: ''}; } get isRunning () { @@ -85,18 +63,24 @@ class SubProcess extends EventEmitter { /** * - * @param {string} stream - * @param {Iterable} lines + * @param {string} streamName + * @param {Iterable|string} lines */ - emitLines (stream, lines) { - for (let line of lines) { - this.emit('stream-line', `[${stream.toUpperCase()}] ${line}`); + emitLines (streamName, lines) { + const doEmit = (/** @type {string} */ line) => this.emit('stream-line', `[${streamName.toUpperCase()}] ${line}`); + + if (_.isString(lines)) { + doEmit(lines); + } else { + for (const line of lines) { + doEmit(line); + } } } - // spawn the subprocess and return control whenever we deem that it has fully - // "started" /** + * spawn the subprocess and return control whenever we deem that it has fully + * "started" * * @param {StartDetector|number?} startDetector * @param {number?} timeoutMs @@ -148,7 +132,6 @@ class SubProcess extends EventEmitter { if (this.proc.stderr) { this.proc.stderr.setEncoding(this.opts.encoding || 'utf8'); } - this.lastLinePortion = {stdout: '', stderr: ''}; // this function handles output that we collect from the subproc /** @@ -170,31 +153,6 @@ class SubProcess extends EventEmitter { // emit the actual output for whomever's listening this.emit('output', stdout, stderr); - - // we also want to emit lines, but it's more complex since output - // comes in chunks and a line could come in two different chunks, so - // we have logic to handle that case (using this.lastLinePortion to - // remember a line that started but did not finish in the last chunk) - for (const [streamName, streamData] of /** @type {[['stdout', string], ['stderr', string]]} */(_.toPairs(streams))) { - if (!streamData) continue; // eslint-disable-line curly - const lines = streamData.split('\n') - // https://bugs.chromium.org/p/v8/issues/detail?id=2869 - .map((x) => ` ${x}`.substr(1)); - if (lines.length > 1) { - lines[0] = this.lastLinePortion[streamName] + lines[0]; - this.lastLinePortion[streamName] = cutSuffix(_.last(lines), MAX_LINE_PORTION_LENGTH); - const resultLines = lines.slice(0, -1); - this.emit(`lines-${streamName}`, resultLines); - this.emitLines(streamName, resultLines); - } else { - const currentPortion = cutSuffix(lines[0], MAX_LINE_PORTION_LENGTH); - if (this.lastLinePortion[streamName].length + currentPortion.length > MAX_LINE_PORTION_LENGTH) { - this.lastLinePortion[streamName] = currentPortion; - } else { - this.lastLinePortion[streamName] += currentPortion; - } - } - } }; // if we get an error spawning the proc, reject and clean up the proc @@ -211,20 +169,29 @@ class SubProcess extends EventEmitter { this.proc = null; }); + const handleStreamLines = (/** @type {string} */ streamName, /** @type {import('stream').Readable} */ input) => { + const rl = createInterface({input}); + rl.on('line', (line) => { + const emitArg = [line]; + this.emit(`lines-${streamName}`, emitArg); + this.emitLines(streamName, emitArg); + }); + }; + if (this.proc.stdout) { this.proc.stdout.on('data', (chunk) => handleOutput({stdout: chunk.toString(), stderr: ''})); + handleStreamLines('stdout', this.proc.stdout); } if (this.proc.stderr) { this.proc.stderr.on('data', (chunk) => handleOutput({stdout: '', stderr: chunk.toString()})); + handleStreamLines('stderr', this.proc.stderr); } // when the proc exits, we might still have a buffer of lines we were // waiting on more chunks to complete. Go ahead and emit those, then // re-emit the exit so a listener can handle the possibly-unexpected exit this.proc.on('exit', (code, signal) => { - this.handleLastLines(); - this.emit('exit', code, signal); // in addition to the bare exit event, also emit one of three other @@ -265,15 +232,12 @@ class SubProcess extends EventEmitter { }); } + /** + * @deprecated This method is deprecated and will be removed + */ handleLastLines () { - for (let stream of ['stdout', 'stderr']) { - if (this.lastLinePortion[stream]) { - const lastLines = [this.lastLinePortion[stream]]; - this.emit(`lines-${stream}`, lastLines); - this.emitLines(stream, lastLines); - this.lastLinePortion[stream] = ''; - } - } + // TODO: THis is a noop left for backward compatibility. + // TODO: Remove it after the major version bump } /** @@ -286,9 +250,6 @@ class SubProcess extends EventEmitter { if (!this.isRunning) { throw new Error(`Can't stop process; it's not currently running (cmd: '${this.rep}')`); } - // make sure to emit any data in our lines buffer whenever we're done with - // the proc - this.handleLastLines(); return await new B((resolve, reject) => { this.proc?.on('close', resolve); this.expectingExit = true; diff --git a/package.json b/package.json index 05b6a90..4940cf1 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "dependencies": { "bluebird": "^3.7.2", "lodash": "^4.17.21", + "lru-cache": "^10.2.2", "shell-quote": "^1.8.1", "source-map-support": "^0.x" }, @@ -73,7 +74,7 @@ "prettier": "^3.1.0", "semantic-release": "^24.0.0", "sinon": "^18.0.0", - "typescript": "~5.4", + "typescript": "^5.4.1", "ts-node": "^10.9.1" }, "engines": {