From ac158ca8c9594da7b619241faa046bf2cf1ab73a Mon Sep 17 00:00:00 2001 From: Rafal Augustyniak Date: Tue, 14 Mar 2023 10:11:53 +0100 Subject: [PATCH] bring back version before refactor --- index.js | 1 - lib/aggregatorRegistry.js | 95 ------------- lib/cluster.js | 288 ++++++++++++++++++++++++++------------ lib/requestQueue.js | 98 ------------- lib/workerThreads.js | 63 --------- test/clusterTest.js | 8 +- 6 files changed, 206 insertions(+), 347 deletions(-) delete mode 100644 lib/aggregatorRegistry.js delete mode 100644 lib/requestQueue.js delete mode 100644 lib/workerThreads.js diff --git a/index.js b/index.js index 2f6e1d51..11e25f87 100644 --- a/index.js +++ b/index.js @@ -23,4 +23,3 @@ exports.exponentialBuckets = exports.collectDefaultMetrics = require('./lib/defaultMetrics'); exports.aggregators = require('./lib/metricAggregators').aggregators; -exports.AggregatorRegistry = require('./lib/aggregatorRegistry'); diff --git a/lib/aggregatorRegistry.js b/lib/aggregatorRegistry.js deleted file mode 100644 index 8e62b95a..00000000 --- a/lib/aggregatorRegistry.js +++ /dev/null @@ -1,95 +0,0 @@ -'use strict'; - -const Registry = require('./registry'); -const { Grouper } = require('./util'); -const { aggregators } = require('./metricAggregators'); -const WorkerThreads = require('./workerThreads'); -const Cluster = require('./cluster'); - -class AggregatorRegistry extends Registry { - constructor() { - super(); - - this.registries = [Registry.globalRegistry]; - this.workerThreads = new WorkerThreads({ registries: this.registries }); - this.cluster = new Cluster({ registries: this.registries }); - } - - setWorkers(workers) { - this.workerThreads.set(workers); - } - - clusterMetrics() { - return this.cluster.getMetrics(); - } - - workerThreadsMetrics() { - return this.workerThreads.getMetrics(); - } - - /** - * Sets the registry or registries to be aggregated. Call from workers to - * use a registry/registries other than the default global registry. - * @param {Array|Registry} registries Registry or registries to be - * aggregated. - * @return {void} - */ - setRegistries(registries) { - if (!Array.isArray(registries)) registries = [registries]; - - registries.forEach(registry => { - if (!(registry instanceof Registry)) { - throw new TypeError(`Expected Registry, got ${typeof registry}`); - } - }); - - this.registries = registries; - } - - /** - * Creates a new Registry instance from an array of metrics that were - * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using - * the method specified by their `aggregator` property, or by summation if - * `aggregator` is undefined. - * @param {Array} metrics Array of metrics, each of which created by - * `registry.getMetricsAsJSON()`. - * @return {Registry} aggregated registry. - */ - static aggregate(metrics) { - const aggregatedRegistry = new Registry(); - const metricsByName = new Grouper(); - - // Gather by name - metrics.forEach(m => - m.forEach(metric => { - metricsByName.add(metric.name, metric); - }), - ); - - // Aggregate gathered metrics. - metricsByName.forEach(metric => { - const aggregatorName = metric[0].aggregator; - const aggregatorFn = aggregators[aggregatorName]; - - if (typeof aggregatorFn !== 'function') { - throw new Error(`'${aggregatorName}' is not a defined aggregator.`); - } - - const aggregatedMetric = aggregatorFn(metric); - // NB: The 'omit' aggregator returns undefined. - if (aggregatedMetric) { - const aggregatedMetricWrapper = Object.assign( - { - get: () => aggregatedMetric, - }, - aggregatedMetric, - ); - aggregatedRegistry.registerMetric(aggregatedMetricWrapper); - } - }); - - return aggregatedRegistry; - } -} - -module.exports = AggregatorRegistry; diff --git a/lib/cluster.js b/lib/cluster.js index 2a911a62..6382556a 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -1,6 +1,29 @@ 'use strict'; -const RequestQueue = require('./requestQueue'); +/** + * Extends the Registry class with a `clusterMetrics` method that returns + * aggregated metrics for all workers. + * + * In cluster workers, listens for and responds to requests for metrics by the + * cluster master. + */ + +const Registry = require('./registry'); +const { Grouper } = require('./util'); +const { aggregators } = require('./metricAggregators'); + +let parentPort, MessageChannel, isMainThread, Worker; +try { + /* eslint-disable node/no-unsupported-features/node-builtins */ + const worker_threads = require('worker_threads'); + + parentPort = worker_threads.parentPort; + MessageChannel = worker_threads.MessageChannel; + isMainThread = worker_threads.isMainThread; + Worker = worker_threads.Worker; +} catch { + // node version is too old +} // We need to lazy-load the 'cluster' module as some application servers - // namely Passenger - crash when it is imported. @@ -10,47 +33,59 @@ let cluster = () => { return data; }; -class Cluster { - constructor({ registries } = {}) { - this.registries = registries; - this.requests = new RequestQueue(); - - this.onInit(); - } +const GET_METRICS_REQ = 'prom-client:getMetricsReq'; +const GET_METRICS_RES = 'prom-client:getMetricsRes'; - onInit() { - if (this._isListenersAttached) { - return; - } +let registries = [Registry.globalRegistry]; +let requestCtr = 0; // Concurrency control +let listenersAdded = false; +let workersQueue = []; +const requests = new Map(); // Pending requests for workers' local metrics. - this.attachMasterListeners(); - this.attachWorkerListeners(); +class AggregatorRegistry extends Registry { + constructor() { + super(); + addListeners(); + } - this._isListenersAttached = true; + setWorkers(workers = []) { + workersQueue = [...workers]; } - attachMasterListeners() { - if (!cluster().isMaster) { - return; - } + /** + * Gets aggregated metrics for all workers. The optional callback and + * returned Promise resolve with the same value; either may be used. + * @return {Promise} Promise that resolves with the aggregated + * metrics. + */ + clusterMetrics() { + const requestId = requestCtr++; - cluster().on('message', (worker, message) => { - this.requests.handleResponse(message); - }); - } + return new Promise((resolve, reject) => { + let settled = false; + function done(err, result) { + if (settled) return; + settled = true; + if (err) reject(err); + else resolve(result); + } - attachWorkerListeners() { - if (!cluster().isWorker) { - return; - } + const request = { + responses: [], + pending: 0, + done, + errorTimeout: setTimeout(() => { + const err = new Error('Operation timed out.'); + request.done(err); + }, 5000), + }; + requests.set(requestId, request); - process.on('message', message => { - this.requests.sendResponse(this.registries, message, process.send); - }); - } + const message = { + type: GET_METRICS_REQ, + requestId, + }; - getMetrics() { - return this.requests.init((message, request) => { for (const id in cluster().workers) { // If the worker exits abruptly, it may still be in the workers // list but not able to communicate. @@ -59,8 +94,74 @@ class Cluster { request.pending++; } } + + if (request.pending === 0) { + // No workers were up + clearTimeout(request.errorTimeout); + process.nextTick(() => done(null, '')); + } }); } + + /** + * Creates a new Registry instance from an array of metrics that were + * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using + * the method specified by their `aggregator` property, or by summation if + * `aggregator` is undefined. + * @param {Array} metricsArr Array of metrics, each of which created by + * `registry.getMetricsAsJSON()`. + * @return {Registry} aggregated registry. + */ + static aggregate(metricsArr) { + const aggregatedRegistry = new Registry(); + const metricsByName = new Grouper(); + + // Gather by name + metricsArr.forEach(metrics => { + metrics.forEach(metric => { + metricsByName.add(metric.name, metric); + }); + }); + + // Aggregate gathered metrics. + metricsByName.forEach(metrics => { + const aggregatorName = metrics[0].aggregator; + const aggregatorFn = aggregators[aggregatorName]; + if (typeof aggregatorFn !== 'function') { + throw new Error(`'${aggregatorName}' is not a defined aggregator.`); + } + const aggregatedMetric = aggregatorFn(metrics); + // NB: The 'omit' aggregator returns undefined. + if (aggregatedMetric) { + const aggregatedMetricWrapper = Object.assign( + { + get: () => aggregatedMetric, + }, + aggregatedMetric, + ); + aggregatedRegistry.registerMetric(aggregatedMetricWrapper); + } + }); + + return aggregatedRegistry; + } + + /** + * Sets the registry or registries to be aggregated. Call from workers to + * use a registry/registries other than the default global registry. + * @param {Array|Registry} regs Registry or registries to be + * aggregated. + * @return {void} + */ + static setRegistries(regs) { + if (!Array.isArray(regs)) regs = [regs]; + regs.forEach(reg => { + if (!(reg instanceof Registry)) { + throw new TypeError(`Expected Registry, got ${typeof reg}`); + } + }); + registries = regs; + } } /** @@ -73,68 +174,34 @@ function addListeners() { listenersAdded = true; if (cluster().isMaster) { - // Listen for cluster responses to requests for local metrics - cluster().on('message', handleWorkerResponse); - } -} + // Listen for worker responses to requests for local metrics + cluster().on('message', (worker, message) => { + if (message.type === GET_METRICS_RES) { + const request = requests.get(message.requestId); + request.pending += message.workerRequests || 0; -function getWorkerThreadsMetrics(message) { - workersQueue.forEach(worker => { - if (worker && worker instanceof Worker) { - const metricsChannel = new MessageChannel(); - - worker.postMessage( - { - ...message, - port: metricsChannel.port1, - }, - [metricsChannel.port1], - ); - - metricsChannel.port2.on('message', response => { - if (response.type === GET_METRICS_RES) { - if (cluster().isWorker) { - process.send(response); - } + if (message.error) { + request.done(new Error(message.error)); + return; + } - if (cluster().isMaster) { - handleWorkerResponse(worker, response); - } + message.metrics.forEach(registry => request.responses.push(registry)); + request.pending--; - metricsChannel.port2.close(); - } - } - }); - } + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); - if (cluster().isWorker) { - // Respond to master's requests for worker's local metrics. - process.on('message', message => { - if (message.type === GET_METRICS_REQ) { - getWorkerThreadsMetrics(message); - - Promise.all(registries.map(r => r.getMetricsAsJSON())) - .then(metrics => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - metrics, - workerRequests: workersQueue.length, - }); - }) - .catch(error => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - error: error.message, - }); - }); + const registry = AggregatorRegistry.aggregate(request.responses); + const promString = registry.metrics(); + request.done(null, promString); + } } }); } } -// Respond to master's request for worker_threads worker local metrics if (!isMainThread) { parentPort.on('message', ({ type, requestId, port } = {}) => { if (type === GET_METRICS_REQ) { @@ -145,6 +212,8 @@ if (!isMainThread) { requestId, metrics, }); + + registries.forEach(r => r.resetMetrics()); }) .catch(error => { port.postMessage({ @@ -157,4 +226,51 @@ if (!isMainThread) { }); } +// Respond to master's requests for worker's local metrics. +process.on('message', message => { + if (cluster().isWorker && message.type === GET_METRICS_REQ) { + let workerRequests = 0; + workersQueue.forEach(worker => { + if (worker && worker instanceof Worker) { + const metricsChannel = new MessageChannel(); + + worker.postMessage( + { + ...message, + port: metricsChannel.port1, + }, + [metricsChannel.port1], + ); + + workerRequests++; + + metricsChannel.port2.on('message', response => { + if (response.type === GET_METRICS_RES) { + process.send(response); + + metricsChannel.port2.close(); + } + }); + } + }); + + Promise.all(registries.map(r => r.getMetricsAsJSON())) + .then(metrics => { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + metrics, + workerRequests, + }); + }) + .catch(error => { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: error.message, + }); + }); + } +}); + module.exports = AggregatorRegistry; diff --git a/lib/requestQueue.js b/lib/requestQueue.js deleted file mode 100644 index e92fe0ff..00000000 --- a/lib/requestQueue.js +++ /dev/null @@ -1,98 +0,0 @@ -'use strict'; - -const AggregatorRegistry = require('./aggregatorRegistry'); - -const GET_METRICS_REQ = 'prom-client:getMetricsReq'; -const GET_METRICS_RES = 'prom-client:getMetricsRes'; - -class RequestQueue { - constructor() { - this.requests = new Map(); - this.counter = 0; - } - - init(callback) { - const requestId = this.counter++; - - return new Promise((resolve, reject) => { - let settled = false; - - function done(err, result) { - if (settled) return; - settled = true; - if (err) reject(err); - else resolve(result); - } - - const request = { - responses: [], - pending: 0, - done, - errorTimeout: setTimeout(() => { - const error = new Error('Operation timed out.'); - request.done(error); - }, 5000), - }; - this.requests.set(requestId, request); - - const message = { - type: GET_METRICS_REQ, - requestId, - }; - - callback(message, request); - - if (request.pending === 0) { - // No workers were up - clearTimeout(request.errorTimeout); - process.nextTick(() => done(null, '')); - } - }); - } - - handleResponse(message) { - if (message.type === GET_METRICS_RES) { - const request = this.requests.get(message.requestId); - - if (message.error) { - request.done(new Error(message.error)); - return; - } - - message.metrics.forEach(registry => request.responses.push(registry)); - request.pending--; - - if (request.pending === 0) { - // finalize - this.requests.delete(message.requestId); - clearTimeout(request.errorTimeout); - - const registry = AggregatorRegistry.aggregate(request.responses); - const metrics = registry.metrics(); - request.done(null, metrics); - } - } - } - - sendResponse(registries, message, callback) { - if (message.type === GET_METRICS_REQ) { - Promise.all(registries.map(r => r.getMetricsAsJSON())) - .then(metrics => { - callback({ - type: GET_METRICS_RES, - requestId: message.requestId, - metrics, - }); - }) - .catch(error => { - callback({ - type: GET_METRICS_RES, - requestId: message.requestId, - error: error.message, - }); - }); - } - } -} - -module.exports = RequestQueue; diff --git a/lib/workerThreads.js b/lib/workerThreads.js deleted file mode 100644 index e545884c..00000000 --- a/lib/workerThreads.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict'; - -const RequestQueue = require('./requestQueue'); -const worker_threads = require('worker_threads'); - -const { Worker, isMainThread, parentPort, MessageChannel } = worker_threads; - -class WorkerThreads { - constructor({ registries, workers = [] } = {}) { - this.registries = registries; - this.requests = new RequestQueue(); - this.workers = workers; - - this.onInit(); - } - - onInit() { - if (this._isListenersAttached) { - return; - } - - this.attachWorkerListeners(); - - this._isListenersAttached = true; - } - - attachWorkerListeners() { - if (isMainThread) { - return; - } - - parentPort.on('message', message => { - this.requests.sendResponse( - this.registries, - message, - message.port.postMessage, - ); - }); - } - - getMetrics() { - return this.requests.init((message, request) => { - this.workers.forEach(worker => { - if (worker && worker instanceof Worker) { - const metricsChannel = new MessageChannel(); - - worker.postMessage({ ...message, port: metricsChannel.port1 }, [ - metricsChannel.port1, - ]); - request.pending++; - - metricsChannel.port2.on('message', response => { - this.requests.handleResponse(response); - - metricsChannel.port2.close(); - }); - } - }); - }); - } -} - -module.exports = WorkerThreads; diff --git a/test/clusterTest.js b/test/clusterTest.js index 1c0ead90..8569ea99 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -7,13 +7,13 @@ describe('AggregatorRegistry', () => { it('requiring the cluster should not add any listeners on the cluster module', () => { const originalListenerCount = cluster.listenerCount('message'); - require('../lib/aggregatorRegistry'); + require('../lib/cluster'); expect(cluster.listenerCount('message')).toBe(originalListenerCount); jest.resetModules(); - require('../lib/aggregatorRegistry'); + require('../lib/cluster'); expect(cluster.listenerCount('message')).toBe(originalListenerCount); }); @@ -34,7 +34,7 @@ describe('AggregatorRegistry', () => { describe('aggregatorRegistry.clusterMetrics()', () => { it('works properly if there are no cluster workers', async () => { - const AggregatorRegistry = require('../lib/aggregatorRegistry'); + const AggregatorRegistry = require('../lib/cluster'); const ar = new AggregatorRegistry(); const metrics = await ar.clusterMetrics(); expect(metrics).toEqual(''); @@ -42,7 +42,7 @@ describe('AggregatorRegistry', () => { }); describe('AggregatorRegistry.aggregate()', () => { - const Registry = require('../lib/aggregatorRegistry'); + const Registry = require('../lib/cluster'); // These mimic the output of `getMetricsAsJSON`. const metricsArr1 = [ {