From 2bb230a836e03e13cabef5f1f45f09b8f1f752cf Mon Sep 17 00:00:00 2001 From: Rafal Augustyniak Date: Thu, 18 Mar 2021 07:14:27 +0100 Subject: [PATCH] aggregated registry refactor --- index.js | 2 +- lib/aggregatorRegistry.js | 95 ++++++++++++++++++ lib/cluster.js | 197 +++++++------------------------------- lib/requestQueue.js | 98 +++++++++++++++++++ lib/workerThreads.js | 74 ++++++++++++++ test/clusterTest.js | 8 +- 6 files changed, 304 insertions(+), 170 deletions(-) create mode 100644 lib/aggregatorRegistry.js create mode 100644 lib/requestQueue.js create mode 100644 lib/workerThreads.js diff --git a/index.js b/index.js index 3199417f..2f6e1d51 100644 --- a/index.js +++ b/index.js @@ -23,4 +23,4 @@ exports.exponentialBuckets = exports.collectDefaultMetrics = require('./lib/defaultMetrics'); exports.aggregators = require('./lib/metricAggregators').aggregators; -exports.AggregatorRegistry = require('./lib/cluster'); +exports.AggregatorRegistry = require('./lib/aggregatorRegistry'); diff --git a/lib/aggregatorRegistry.js b/lib/aggregatorRegistry.js new file mode 100644 index 00000000..8e62b95a --- /dev/null +++ b/lib/aggregatorRegistry.js @@ -0,0 +1,95 @@ +'use strict'; + +const Registry = require('./registry'); +const { Grouper } = require('./util'); +const { aggregators } = require('./metricAggregators'); +const WorkerThreads = require('./workerThreads'); +const Cluster = require('./cluster'); + +class AggregatorRegistry extends Registry { + constructor() { + super(); + + this.registries = [Registry.globalRegistry]; + this.workerThreads = new WorkerThreads({ registries: this.registries }); + this.cluster = new Cluster({ registries: this.registries }); + } + + setWorkers(workers) { + this.workerThreads.set(workers); + } + + clusterMetrics() { + return this.cluster.getMetrics(); + } + + workerThreadsMetrics() { + return this.workerThreads.getMetrics(); + } + + /** + * Sets the registry or registries to be aggregated. Call from workers to + * use a registry/registries other than the default global registry. + * @param {Array|Registry} registries Registry or registries to be + * aggregated. + * @return {void} + */ + setRegistries(registries) { + if (!Array.isArray(registries)) registries = [registries]; + + registries.forEach(registry => { + if (!(registry instanceof Registry)) { + throw new TypeError(`Expected Registry, got ${typeof registry}`); + } + }); + + this.registries = registries; + } + + /** + * Creates a new Registry instance from an array of metrics that were + * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using + * the method specified by their `aggregator` property, or by summation if + * `aggregator` is undefined. + * @param {Array} metrics Array of metrics, each of which created by + * `registry.getMetricsAsJSON()`. + * @return {Registry} aggregated registry. + */ + static aggregate(metrics) { + const aggregatedRegistry = new Registry(); + const metricsByName = new Grouper(); + + // Gather by name + metrics.forEach(m => + m.forEach(metric => { + metricsByName.add(metric.name, metric); + }), + ); + + // Aggregate gathered metrics. + metricsByName.forEach(metric => { + const aggregatorName = metric[0].aggregator; + const aggregatorFn = aggregators[aggregatorName]; + + if (typeof aggregatorFn !== 'function') { + throw new Error(`'${aggregatorName}' is not a defined aggregator.`); + } + + const aggregatedMetric = aggregatorFn(metric); + // NB: The 'omit' aggregator returns undefined. + if (aggregatedMetric) { + const aggregatedMetricWrapper = Object.assign( + { + get: () => aggregatedMetric, + }, + aggregatedMetric, + ); + aggregatedRegistry.registerMetric(aggregatedMetricWrapper); + } + }); + + return aggregatedRegistry; + } +} + +module.exports = AggregatorRegistry; diff --git a/lib/cluster.js b/lib/cluster.js index 5fe3956d..2a911a62 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -1,29 +1,6 @@ 'use strict'; -/** - * Extends the Registry class with a `clusterMetrics` method that returns - * aggregated metrics for all workers. - * - * In cluster workers, listens for and responds to requests for metrics by the - * cluster master. - */ - -const Registry = require('./registry'); -const { Grouper } = require('./util'); -const { aggregators } = require('./metricAggregators'); - -let parentPort, MessageChannel, isMainThread, Worker; -try { - /* eslint-disable node/no-unsupported-features/node-builtins */ - const worker_threads = require('worker_threads'); - - parentPort = worker_threads.parentPort; - MessageChannel = worker_threads.MessageChannel; - isMainThread = worker_threads.isMainThread; - Worker = worker_threads.Worker; -} catch { - // node version is too old -} +const RequestQueue = require('./requestQueue'); // We need to lazy-load the 'cluster' module as some application servers - // namely Passenger - crash when it is imported. @@ -33,63 +10,47 @@ let cluster = () => { return data; }; -const GET_METRICS_REQ = 'prom-client:getMetricsReq'; -const GET_METRICS_RES = 'prom-client:getMetricsRes'; +class Cluster { + constructor({ registries } = {}) { + this.registries = registries; + this.requests = new RequestQueue(); -let registries = [Registry.globalRegistry]; -let requestCtr = 0; // Concurrency control -let listenersAdded = false; -const workersQueue = []; -const requests = new Map(); // Pending requests for workers' local metrics. - -class AggregatorRegistry extends Registry { - constructor() { - super(); - addListeners(); + this.onInit(); } - attachWorkers(workers = []) { - for (const worker in workers) { - if (worker instanceof Worker) { - workersQueue.push(worker); - } + onInit() { + if (this._isListenersAttached) { + return; } + + this.attachMasterListeners(); + this.attachWorkerListeners(); + + this._isListenersAttached = true; } - /** - * Gets aggregated metrics for all workers. The optional callback and - * returned Promise resolve with the same value; either may be used. - * @return {Promise} Promise that resolves with the aggregated - * metrics. - */ - clusterMetrics() { - const requestId = requestCtr++; + attachMasterListeners() { + if (!cluster().isMaster) { + return; + } - return new Promise((resolve, reject) => { - let settled = false; - function done(err, result) { - if (settled) return; - settled = true; - if (err) reject(err); - else resolve(result); - } + cluster().on('message', (worker, message) => { + this.requests.handleResponse(message); + }); + } - const request = { - responses: [], - pending: 0, - done, - errorTimeout: setTimeout(() => { - const err = new Error('Operation timed out.'); - request.done(err); - }, 5000), - }; - requests.set(requestId, request); + attachWorkerListeners() { + if (!cluster().isWorker) { + return; + } - const message = { - type: GET_METRICS_REQ, - requestId, - }; + process.on('message', message => { + this.requests.sendResponse(this.registries, message, process.send); + }); + } + getMetrics() { + return this.requests.init((message, request) => { for (const id in cluster().workers) { // If the worker exits abruptly, it may still be in the workers // list but not able to communicate. @@ -98,102 +59,8 @@ class AggregatorRegistry extends Registry { request.pending++; } } - - getWorkerThreadsMetrics(message); - request.pending += workersQueue.length || 0; - - if (request.pending === 0) { - // No workers were up - clearTimeout(request.errorTimeout); - process.nextTick(() => done(null, '')); - } }); } - - /** - * Creates a new Registry instance from an array of metrics that were - * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using - * the method specified by their `aggregator` property, or by summation if - * `aggregator` is undefined. - * @param {Array} metricsArr Array of metrics, each of which created by - * `registry.getMetricsAsJSON()`. - * @return {Registry} aggregated registry. - */ - static aggregate(metricsArr) { - const aggregatedRegistry = new Registry(); - const metricsByName = new Grouper(); - - // Gather by name - metricsArr.forEach(metrics => { - metrics.forEach(metric => { - metricsByName.add(metric.name, metric); - }); - }); - - // Aggregate gathered metrics. - metricsByName.forEach(metrics => { - const aggregatorName = metrics[0].aggregator; - const aggregatorFn = aggregators[aggregatorName]; - if (typeof aggregatorFn !== 'function') { - throw new Error(`'${aggregatorName}' is not a defined aggregator.`); - } - const aggregatedMetric = aggregatorFn(metrics); - // NB: The 'omit' aggregator returns undefined. - if (aggregatedMetric) { - const aggregatedMetricWrapper = Object.assign( - { - get: () => aggregatedMetric, - }, - aggregatedMetric, - ); - aggregatedRegistry.registerMetric(aggregatedMetricWrapper); - } - }); - - return aggregatedRegistry; - } - - /** - * Sets the registry or registries to be aggregated. Call from workers to - * use a registry/registries other than the default global registry. - * @param {Array|Registry} regs Registry or registries to be - * aggregated. - * @return {void} - */ - static setRegistries(regs) { - if (!Array.isArray(regs)) regs = [regs]; - regs.forEach(reg => { - if (!(reg instanceof Registry)) { - throw new TypeError(`Expected Registry, got ${typeof reg}`); - } - }); - registries = regs; - } -} - -function handleWorkerResponse(worker, message) { - if (message.type === GET_METRICS_RES) { - const request = requests.get(message.requestId); - request.pending += message.workerRequests || 0; - - if (message.error) { - request.done(new Error(message.error)); - return; - } - - message.metrics.forEach(registry => request.responses.push(registry)); - request.pending--; - - if (request.pending === 0) { - // finalize - requests.delete(message.requestId); - clearTimeout(request.errorTimeout); - - const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); - } - } } /** diff --git a/lib/requestQueue.js b/lib/requestQueue.js new file mode 100644 index 00000000..e92fe0ff --- /dev/null +++ b/lib/requestQueue.js @@ -0,0 +1,98 @@ +'use strict'; + +const AggregatorRegistry = require('./aggregatorRegistry'); + +const GET_METRICS_REQ = 'prom-client:getMetricsReq'; +const GET_METRICS_RES = 'prom-client:getMetricsRes'; + +class RequestQueue { + constructor() { + this.requests = new Map(); + this.counter = 0; + } + + init(callback) { + const requestId = this.counter++; + + return new Promise((resolve, reject) => { + let settled = false; + + function done(err, result) { + if (settled) return; + settled = true; + if (err) reject(err); + else resolve(result); + } + + const request = { + responses: [], + pending: 0, + done, + errorTimeout: setTimeout(() => { + const error = new Error('Operation timed out.'); + request.done(error); + }, 5000), + }; + this.requests.set(requestId, request); + + const message = { + type: GET_METRICS_REQ, + requestId, + }; + + callback(message, request); + + if (request.pending === 0) { + // No workers were up + clearTimeout(request.errorTimeout); + process.nextTick(() => done(null, '')); + } + }); + } + + handleResponse(message) { + if (message.type === GET_METRICS_RES) { + const request = this.requests.get(message.requestId); + + if (message.error) { + request.done(new Error(message.error)); + return; + } + + message.metrics.forEach(registry => request.responses.push(registry)); + request.pending--; + + if (request.pending === 0) { + // finalize + this.requests.delete(message.requestId); + clearTimeout(request.errorTimeout); + + const registry = AggregatorRegistry.aggregate(request.responses); + const metrics = registry.metrics(); + request.done(null, metrics); + } + } + } + + sendResponse(registries, message, callback) { + if (message.type === GET_METRICS_REQ) { + Promise.all(registries.map(r => r.getMetricsAsJSON())) + .then(metrics => { + callback({ + type: GET_METRICS_RES, + requestId: message.requestId, + metrics, + }); + }) + .catch(error => { + callback({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: error.message, + }); + }); + } + } +} + +module.exports = RequestQueue; diff --git a/lib/workerThreads.js b/lib/workerThreads.js new file mode 100644 index 00000000..1e1971f7 --- /dev/null +++ b/lib/workerThreads.js @@ -0,0 +1,74 @@ +'use strict'; + +const RequestQueue = require('./requestQueue'); +const REQUIRE_NODE = 'required Node version for worker_threads is 11 or higher'; +/* eslint-disable node/no-unsupported-features/node-builtins */ +let worker_threads = { parentPort: { on: () => {} } }; +try { + worker_threads = require('worker_threads'); +} catch { + console.log(REQUIRE_NODE); +} + +const { Worker, isMainThread, parentPort, MessageChannel } = worker_threads; + +class WorkerThreads { + constructor({ registries, workers = [] } = {}) { + this.registries = registries; + this.requests = new RequestQueue(); + this.workers = workers; + + this.onInit(); + } + + onInit() { + if (this._isListenersAttached) { + return; + } + + this.attachWorkerListeners(); + + this._isListenersAttached = true; + } + + attachWorkerListeners() { + if (isMainThread) { + return; + } + + parentPort.on('message', message => { + this.requests.sendResponse( + this.registries, + message, + message.port.postMessage, + ); + }); + } + + getMetrics() { + if (!MessageChannel) { + throw Error(REQUIRE_NODE); + } + + return this.requests.init((message, request) => { + this.workers.forEach(worker => { + if (worker && worker instanceof Worker) { + const metricsChannel = new MessageChannel(); + + worker.postMessage({ ...message, port: metricsChannel.port1 }, [ + metricsChannel.port1, + ]); + request.pending++; + + metricsChannel.port2.on('message', response => { + this.requests.handleResponse(response); + + metricsChannel.port2.close(); + }); + } + }); + }); + } +} + +module.exports = WorkerThreads; diff --git a/test/clusterTest.js b/test/clusterTest.js index 8569ea99..1c0ead90 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -7,13 +7,13 @@ describe('AggregatorRegistry', () => { it('requiring the cluster should not add any listeners on the cluster module', () => { const originalListenerCount = cluster.listenerCount('message'); - require('../lib/cluster'); + require('../lib/aggregatorRegistry'); expect(cluster.listenerCount('message')).toBe(originalListenerCount); jest.resetModules(); - require('../lib/cluster'); + require('../lib/aggregatorRegistry'); expect(cluster.listenerCount('message')).toBe(originalListenerCount); }); @@ -34,7 +34,7 @@ describe('AggregatorRegistry', () => { describe('aggregatorRegistry.clusterMetrics()', () => { it('works properly if there are no cluster workers', async () => { - const AggregatorRegistry = require('../lib/cluster'); + const AggregatorRegistry = require('../lib/aggregatorRegistry'); const ar = new AggregatorRegistry(); const metrics = await ar.clusterMetrics(); expect(metrics).toEqual(''); @@ -42,7 +42,7 @@ describe('AggregatorRegistry', () => { }); describe('AggregatorRegistry.aggregate()', () => { - const Registry = require('../lib/cluster'); + const Registry = require('../lib/aggregatorRegistry'); // These mimic the output of `getMetricsAsJSON`. const metricsArr1 = [ {