Skip to content

Commit

Permalink
aggregated registry refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Augustyniak committed Oct 14, 2022
1 parent a1a2a16 commit 2bb230a
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 170 deletions.
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ exports.exponentialBuckets =
exports.collectDefaultMetrics = require('./lib/defaultMetrics');

exports.aggregators = require('./lib/metricAggregators').aggregators;
exports.AggregatorRegistry = require('./lib/cluster');
exports.AggregatorRegistry = require('./lib/aggregatorRegistry');
95 changes: 95 additions & 0 deletions lib/aggregatorRegistry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict';

const Registry = require('./registry');
const { Grouper } = require('./util');
const { aggregators } = require('./metricAggregators');
const WorkerThreads = require('./workerThreads');
const Cluster = require('./cluster');

class AggregatorRegistry extends Registry {
constructor() {
super();

this.registries = [Registry.globalRegistry];
this.workerThreads = new WorkerThreads({ registries: this.registries });
this.cluster = new Cluster({ registries: this.registries });
}

setWorkers(workers) {
this.workerThreads.set(workers);
}

clusterMetrics() {
return this.cluster.getMetrics();
}

workerThreadsMetrics() {
return this.workerThreads.getMetrics();
}

/**
* Sets the registry or registries to be aggregated. Call from workers to
* use a registry/registries other than the default global registry.
* @param {Array<Registry>|Registry} registries Registry or registries to be
* aggregated.
* @return {void}
*/
setRegistries(registries) {
if (!Array.isArray(registries)) registries = [registries];

registries.forEach(registry => {
if (!(registry instanceof Registry)) {
throw new TypeError(`Expected Registry, got ${typeof registry}`);
}
});

this.registries = registries;
}

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
* the method specified by their `aggregator` property, or by summation if
* `aggregator` is undefined.
* @param {Array} metrics Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metrics) {
const aggregatedRegistry = new Registry();
const metricsByName = new Grouper();

// Gather by name
metrics.forEach(m =>
m.forEach(metric => {
metricsByName.add(metric.name, metric);
}),
);

// Aggregate gathered metrics.
metricsByName.forEach(metric => {
const aggregatorName = metric[0].aggregator;
const aggregatorFn = aggregators[aggregatorName];

if (typeof aggregatorFn !== 'function') {
throw new Error(`'${aggregatorName}' is not a defined aggregator.`);
}

const aggregatedMetric = aggregatorFn(metric);
// NB: The 'omit' aggregator returns undefined.
if (aggregatedMetric) {
const aggregatedMetricWrapper = Object.assign(
{
get: () => aggregatedMetric,
},
aggregatedMetric,
);
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
}
});

return aggregatedRegistry;
}
}

module.exports = AggregatorRegistry;
197 changes: 32 additions & 165 deletions lib/cluster.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,6 @@
'use strict';

/**
* Extends the Registry class with a `clusterMetrics` method that returns
* aggregated metrics for all workers.
*
* In cluster workers, listens for and responds to requests for metrics by the
* cluster master.
*/

const Registry = require('./registry');
const { Grouper } = require('./util');
const { aggregators } = require('./metricAggregators');

let parentPort, MessageChannel, isMainThread, Worker;
try {
/* eslint-disable node/no-unsupported-features/node-builtins */
const worker_threads = require('worker_threads');

parentPort = worker_threads.parentPort;
MessageChannel = worker_threads.MessageChannel;
isMainThread = worker_threads.isMainThread;
Worker = worker_threads.Worker;
} catch {
// node version is too old
}
const RequestQueue = require('./requestQueue');

// We need to lazy-load the 'cluster' module as some application servers -
// namely Passenger - crash when it is imported.
Expand All @@ -33,63 +10,47 @@ let cluster = () => {
return data;
};

const GET_METRICS_REQ = 'prom-client:getMetricsReq';
const GET_METRICS_RES = 'prom-client:getMetricsRes';
class Cluster {
constructor({ registries } = {}) {
this.registries = registries;
this.requests = new RequestQueue();

let registries = [Registry.globalRegistry];
let requestCtr = 0; // Concurrency control
let listenersAdded = false;
const workersQueue = [];
const requests = new Map(); // Pending requests for workers' local metrics.

class AggregatorRegistry extends Registry {
constructor() {
super();
addListeners();
this.onInit();
}

attachWorkers(workers = []) {
for (const worker in workers) {
if (worker instanceof Worker) {
workersQueue.push(worker);
}
onInit() {
if (this._isListenersAttached) {
return;
}

this.attachMasterListeners();
this.attachWorkerListeners();

this._isListenersAttached = true;
}

/**
* Gets aggregated metrics for all workers. The optional callback and
* returned Promise resolve with the same value; either may be used.
* @return {Promise<string>} Promise that resolves with the aggregated
* metrics.
*/
clusterMetrics() {
const requestId = requestCtr++;
attachMasterListeners() {
if (!cluster().isMaster) {
return;
}

return new Promise((resolve, reject) => {
let settled = false;
function done(err, result) {
if (settled) return;
settled = true;
if (err) reject(err);
else resolve(result);
}
cluster().on('message', (worker, message) => {
this.requests.handleResponse(message);
});
}

const request = {
responses: [],
pending: 0,
done,
errorTimeout: setTimeout(() => {
const err = new Error('Operation timed out.');
request.done(err);
}, 5000),
};
requests.set(requestId, request);
attachWorkerListeners() {
if (!cluster().isWorker) {
return;
}

const message = {
type: GET_METRICS_REQ,
requestId,
};
process.on('message', message => {
this.requests.sendResponse(this.registries, message, process.send);
});
}

getMetrics() {
return this.requests.init((message, request) => {
for (const id in cluster().workers) {
// If the worker exits abruptly, it may still be in the workers
// list but not able to communicate.
Expand All @@ -98,102 +59,8 @@ class AggregatorRegistry extends Registry {
request.pending++;
}
}

getWorkerThreadsMetrics(message);
request.pending += workersQueue.length || 0;

if (request.pending === 0) {
// No workers were up
clearTimeout(request.errorTimeout);
process.nextTick(() => done(null, ''));
}
});
}

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
* the method specified by their `aggregator` property, or by summation if
* `aggregator` is undefined.
* @param {Array} metricsArr Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metricsArr) {
const aggregatedRegistry = new Registry();
const metricsByName = new Grouper();

// Gather by name
metricsArr.forEach(metrics => {
metrics.forEach(metric => {
metricsByName.add(metric.name, metric);
});
});

// Aggregate gathered metrics.
metricsByName.forEach(metrics => {
const aggregatorName = metrics[0].aggregator;
const aggregatorFn = aggregators[aggregatorName];
if (typeof aggregatorFn !== 'function') {
throw new Error(`'${aggregatorName}' is not a defined aggregator.`);
}
const aggregatedMetric = aggregatorFn(metrics);
// NB: The 'omit' aggregator returns undefined.
if (aggregatedMetric) {
const aggregatedMetricWrapper = Object.assign(
{
get: () => aggregatedMetric,
},
aggregatedMetric,
);
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
}
});

return aggregatedRegistry;
}

/**
* Sets the registry or registries to be aggregated. Call from workers to
* use a registry/registries other than the default global registry.
* @param {Array<Registry>|Registry} regs Registry or registries to be
* aggregated.
* @return {void}
*/
static setRegistries(regs) {
if (!Array.isArray(regs)) regs = [regs];
regs.forEach(reg => {
if (!(reg instanceof Registry)) {
throw new TypeError(`Expected Registry, got ${typeof reg}`);
}
});
registries = regs;
}
}

function handleWorkerResponse(worker, message) {
if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);
request.pending += message.workerRequests || 0;

if (message.error) {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
}
}

/**
Expand Down
Loading

0 comments on commit 2bb230a

Please sign in to comment.