Skip to content

Commit

Permalink
chore: Optimize streaming processing and history
Browse files Browse the repository at this point in the history
  • Loading branch information
mykola-mokhnach committed Jun 28, 2024
1 parent 2c02ffb commit 01b3edd
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 102 deletions.
64 changes: 32 additions & 32 deletions lib/exec.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/* eslint-disable promise/prefer-await-to-callbacks */

import { spawn } from 'child_process';
import { quote } from 'shell-quote';
import B from 'bluebird';
import _ from 'lodash';
import { formatEnoent } from './helpers';
import {LRUCache} from 'lru-cache';


const MAX_BUFFER_SIZE = 100 * 1024 * 1024;

Expand All @@ -13,16 +13,16 @@ const MAX_BUFFER_SIZE = 100 * 1024 * 1024;
* @template {TeenProcessExecOptions} T
* @param {string} cmd - Program to execute
* @param {string[]} [args] - Arguments to pass to the program
* @param {T} [opts] - Options
* @param {T} [originalOpts] - Options
* @returns {Promise<BufferProp<T> extends true ? TeenProcessExecBufferResult : TeenProcessExecStringResult>}
*/
async function exec (cmd, args = [], opts = /** @type {T} */({})) {
async function exec (cmd, args = [], originalOpts = /** @type {T} */({})) {
// get a quoted representation of the command for error strings
const rep = quote([cmd, ...args]);

// extend default options; we're basically re-implementing exec's options
// for use here with spawn under the hood
opts = /** @type {T} */(_.defaults(opts, {
const opts = /** @type {T} */(_.defaults(_.cloneDeep(originalOpts), {
timeout: null,
encoding: 'utf8',
killSignal: 'SIGTERM',
Expand All @@ -43,8 +43,16 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) {
return await new B((resolve, reject) => {
// spawn the child process with options; we don't currently expose any of
// the other 'spawn' options through the API
let proc = spawn(cmd, args, {cwd: opts.cwd, env: opts.env, shell: opts.shell});
let stdoutArr = [], stderrArr = [], timer = null;
const proc = spawn(cmd, args, {cwd: opts.cwd, env: opts.env, shell: opts.shell});
/** @type {LRUCache<number, Buffer>} */
const stdoutCache = new LRUCache({
max: opts.maxStdoutBufferSize ?? MAX_BUFFER_SIZE,
});;
/** @type {LRUCache<number, Buffer>} */
const stderrCache = new LRUCache({
max: opts.maxStdoutBufferSize ?? MAX_BUFFER_SIZE,
});;
let timer = null;

// if the process errors out, reject the promise
proc.on('error', /** @param {NodeJS.ErrnoException} err */ async (err) => {
Expand All @@ -58,7 +66,7 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) {
reject(new Error(`Standard input '${err.syscall}' error: ${err.stack}`));
});
}
const handleStream = (streamType, streamProps) => {
const handleStream = (/** @type {string} */ streamType, /** @type {LRUCache<number, Buffer>} */ cache) => {
if (!proc[streamType]) {
return;
}
Expand All @@ -74,42 +82,34 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) {
}

// keep track of the stream if we don't want to ignore it
const {chunks, maxSize} = streamProps;
let size = 0;
proc[streamType].on('data', (chunk) => {
chunks.push(chunk);
size += chunk.length;
while (chunks.length > 1 && size >= maxSize) {
size -= chunks[0].length;
chunks.shift();
}
proc[streamType].on('data', (/** @type {Buffer} */ chunk) => {
cache.set(cache.size, chunk);
if (opts.logger && _.isFunction(opts.logger.debug)) {
opts.logger.debug(chunk.toString());
}
});
};
handleStream('stdout', {
maxSize: opts.maxStdoutBufferSize,
chunks: stdoutArr,
});
handleStream('stderr', {
maxSize: opts.maxStderrBufferSize,
chunks: stderrArr,
});
handleStream('stdout', stdoutCache);
handleStream('stderr', stderrCache);

/**
* @template {boolean} U
* @param {U} isBuffer
* @returns {U extends true ? {stdout: Buffer, stderr: Buffer} : {stdout: string, stderr: string}}
*/
function getStdio (isBuffer) {
let stdout, stderr;
/** @type {string | Buffer} */
let stdout;
/** @type {string | Buffer} */
let stderr;
if (isBuffer) {
stdout = Buffer.concat(stdoutArr);
stderr = Buffer.concat(stderrArr);
stdout = Buffer.concat(/** @type {Uint8Array[]} */ ([...stdoutCache.values()]));
stderr = Buffer.concat(/** @type {Uint8Array[]} */ ([...stderrCache.values()]));
} else {
stdout = Buffer.concat(stdoutArr).toString(opts.encoding);
stderr = Buffer.concat(stderrArr).toString(opts.encoding);
stdout = Buffer.concat(/** @type {Uint8Array[]} */ ([...stdoutCache.values()]))
.toString(opts.encoding);
stderr = Buffer.concat(/** @type {Uint8Array[]} */ ([...stderrCache.values()]))
.toString(opts.encoding);
}
return /** @type {U extends true ? {stdout: Buffer, stderr: Buffer} : {stdout: string, stderr: string}} */({stdout, stderr});
}
Expand All @@ -121,7 +121,7 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) {
if (timer) {
clearTimeout(timer);
}
let {stdout, stderr} = getStdio(isBuffer);
const {stdout, stderr} = getStdio(isBuffer);
if (code === 0) {
resolve(/** @type {BufferProp<T> extends true ? TeenProcessExecBufferResult : TeenProcessExecStringResult} */({stdout, stderr, code}));
} else {
Expand All @@ -136,7 +136,7 @@ async function exec (cmd, args = [], opts = /** @type {T} */({})) {
// have in case it's helpful in debugging
if (opts.timeout) {
timer = setTimeout(() => {
let {stdout, stderr} = getStdio(isBuffer);
const {stdout, stderr} = getStdio(isBuffer);
let err = new Error(`Command '${rep}' timed out after ${opts.timeout}ms`);
err = Object.assign(err, {stdout, stderr, code: null});
reject(err);
Expand Down
99 changes: 30 additions & 69 deletions lib/subprocess.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,14 @@
/* eslint-disable promise/prefer-await-to-callbacks */

import { spawn } from 'child_process';
import events from 'events';
const { EventEmitter } = events;
import B from 'bluebird';
import { quote } from 'shell-quote';
import _ from 'lodash';
import { formatEnoent } from './helpers';


// This is needed to avoid memory leaks
// when the process output is too long and contains
// no line breaks
const MAX_LINE_PORTION_LENGTH = 0xFFFF;

function cutSuffix (str, suffixLength) {
return str.length > suffixLength
// https://bugs.chromium.org/p/v8/issues/detail?id=2869
? ` ${str.substr(str.length - suffixLength)}`.substr(1)
: str;
}

import { createInterface } from 'node:readline/promises';

class SubProcess extends EventEmitter {

/**
* @type { {stdout: string, stderr: string} }
*/
lastLinePortion;

/** @type {import('child_process').ChildProcess?} */
proc;

Expand Down Expand Up @@ -74,8 +54,6 @@ class SubProcess extends EventEmitter {

// get a quoted representation of the command for error strings
this.rep = quote([cmd, ...args]);

this.lastLinePortion = {stdout: '', stderr: ''};
}

get isRunning () {
Expand All @@ -85,18 +63,24 @@ class SubProcess extends EventEmitter {

/**
*
* @param {string} stream
* @param {Iterable<string>} lines
* @param {string} streamName
* @param {Iterable<string>|string} lines
*/
emitLines (stream, lines) {
for (let line of lines) {
this.emit('stream-line', `[${stream.toUpperCase()}] ${line}`);
emitLines (streamName, lines) {
const doEmit = (/** @type {string} */ line) => this.emit('stream-line', `[${streamName.toUpperCase()}] ${line}`);

if (_.isString(lines)) {
doEmit(lines);
} else {
for (const line of lines) {
doEmit(line);
}
}
}

// spawn the subprocess and return control whenever we deem that it has fully
// "started"
/**
* spawn the subprocess and return control whenever we deem that it has fully
* "started"
*
* @param {StartDetector|number?} startDetector
* @param {number?} timeoutMs
Expand Down Expand Up @@ -148,7 +132,6 @@ class SubProcess extends EventEmitter {
if (this.proc.stderr) {
this.proc.stderr.setEncoding(this.opts.encoding || 'utf8');
}
this.lastLinePortion = {stdout: '', stderr: ''};

// this function handles output that we collect from the subproc
/**
Expand All @@ -170,31 +153,6 @@ class SubProcess extends EventEmitter {

// emit the actual output for whomever's listening
this.emit('output', stdout, stderr);

// we also want to emit lines, but it's more complex since output
// comes in chunks and a line could come in two different chunks, so
// we have logic to handle that case (using this.lastLinePortion to
// remember a line that started but did not finish in the last chunk)
for (const [streamName, streamData] of /** @type {[['stdout', string], ['stderr', string]]} */(_.toPairs(streams))) {
if (!streamData) continue; // eslint-disable-line curly
const lines = streamData.split('\n')
// https://bugs.chromium.org/p/v8/issues/detail?id=2869
.map((x) => ` ${x}`.substr(1));
if (lines.length > 1) {
lines[0] = this.lastLinePortion[streamName] + lines[0];
this.lastLinePortion[streamName] = cutSuffix(_.last(lines), MAX_LINE_PORTION_LENGTH);
const resultLines = lines.slice(0, -1);
this.emit(`lines-${streamName}`, resultLines);
this.emitLines(streamName, resultLines);
} else {
const currentPortion = cutSuffix(lines[0], MAX_LINE_PORTION_LENGTH);
if (this.lastLinePortion[streamName].length + currentPortion.length > MAX_LINE_PORTION_LENGTH) {
this.lastLinePortion[streamName] = currentPortion;
} else {
this.lastLinePortion[streamName] += currentPortion;
}
}
}
};

// if we get an error spawning the proc, reject and clean up the proc
Expand All @@ -211,20 +169,29 @@ class SubProcess extends EventEmitter {
this.proc = null;
});

const handleStreamLines = (/** @type {string} */ streamName, /** @type {import('stream').Readable} */ input) => {
const rl = createInterface({input});
rl.on('line', (line) => {
const emitArg = [line];
this.emit(`lines-${streamName}`, emitArg);
this.emitLines(streamName, emitArg);
});
};

if (this.proc.stdout) {
this.proc.stdout.on('data', (chunk) => handleOutput({stdout: chunk.toString(), stderr: ''}));
handleStreamLines('stdout', this.proc.stdout);
}

if (this.proc.stderr) {
this.proc.stderr.on('data', (chunk) => handleOutput({stdout: '', stderr: chunk.toString()}));
handleStreamLines('stderr', this.proc.stderr);
}

// when the proc exits, we might still have a buffer of lines we were
// waiting on more chunks to complete. Go ahead and emit those, then
// re-emit the exit so a listener can handle the possibly-unexpected exit
this.proc.on('exit', (code, signal) => {
this.handleLastLines();

this.emit('exit', code, signal);

// in addition to the bare exit event, also emit one of three other
Expand Down Expand Up @@ -265,15 +232,12 @@ class SubProcess extends EventEmitter {
});
}

/**
* @deprecated This method is deprecated and will be removed
*/
handleLastLines () {
for (let stream of ['stdout', 'stderr']) {
if (this.lastLinePortion[stream]) {
const lastLines = [this.lastLinePortion[stream]];
this.emit(`lines-${stream}`, lastLines);
this.emitLines(stream, lastLines);
this.lastLinePortion[stream] = '';
}
}
// TODO: THis is a noop left for backward compatibility.
// TODO: Remove it after the major version bump
}

/**
Expand All @@ -286,9 +250,6 @@ class SubProcess extends EventEmitter {
if (!this.isRunning) {
throw new Error(`Can't stop process; it's not currently running (cmd: '${this.rep}')`);
}
// make sure to emit any data in our lines buffer whenever we're done with
// the proc
this.handleLastLines();
return await new B((resolve, reject) => {
this.proc?.on('close', resolve);
this.expectingExit = true;
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"dependencies": {
"bluebird": "^3.7.2",
"lodash": "^4.17.21",
"lru-cache": "^10.2.2",
"shell-quote": "^1.8.1",
"source-map-support": "^0.x"
},
Expand All @@ -73,7 +74,7 @@
"prettier": "^3.1.0",
"semantic-release": "^24.0.0",
"sinon": "^18.0.0",
"typescript": "~5.4",
"typescript": "^5.4.1",
"ts-node": "^10.9.1"
},
"engines": {
Expand Down

0 comments on commit 01b3edd

Please sign in to comment.