diff --git a/CHANGELOG.md b/CHANGELOG.md index 68d15104..08dc3820 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ project adheres to [Semantic Versioning](http://semver.org/). ### Added - Support for OpenMetrics and Exemplars +- Support aggregating metrics across workers in a Node.js `worker_threads` ## [14.2.0] - 2023-03-06 diff --git a/example/cluster-with-worker-threads/index.js b/example/cluster-with-worker-threads/index.js new file mode 100644 index 00000000..12a7aaec --- /dev/null +++ b/example/cluster-with-worker-threads/index.js @@ -0,0 +1,27 @@ +'use strict'; + +const cluster = require('cluster'); +const server = require('./server'); + +const AggregatorRegistry = require('../..').AggregatorRegistry; +const aggregatorRegistry = new AggregatorRegistry(); + +if (cluster.isMaster) { + console.log(`Master ${process.pid} is running`); + + for (let i = 0; i < 2; i++) { + cluster.fork(); + } + + cluster.on('exit', worker => { + console.log(`worker ${worker.process.pid} died`); + console.log("Let's fork another worker!"); + cluster.fork(); + }); +} else { + server({ + metrics: { + contentType: aggregatorRegistry.contentType, + }, + }); +} diff --git a/example/cluster-with-worker-threads/server.js b/example/cluster-with-worker-threads/server.js new file mode 100644 index 00000000..4f054d1d --- /dev/null +++ b/example/cluster-with-worker-threads/server.js @@ -0,0 +1,40 @@ +'use strict'; + +const express = require('express'); +const { Counter } = require('../..'); +const AggregatorRegistry = require('../..').AggregatorRegistry; +const aggregatorRegistry = new AggregatorRegistry(); +const worker = require('./worker'); +const PORT = 3000; + +const http_request_total = new Counter({ + name: 'http_request_total', + help: 'request count | clusterId="20212" statusCode="2xx|4xx|5xx"', + labelNames: ['clusterId', 'statusCode'], +}); + +module.exports = (options = {}) => { + const { + metrics: { contentType = '' }, + } = options; + + const app = express(); + + app.get('/', async (req, res) => { + http_request_total.inc({ clusterId: process.pid, statusCode: 200 }); + const result = await worker(); + + res.send(result); + }); + + app.get('/metrics', async (req, res) => { + const metrics = await aggregatorRegistry.workersMetrics(); + + res.set('Content-Type', contentType); + res.send(metrics); + }); + + app.listen(PORT, () => { + console.log(`cluster: #${process.pid} - listening on port ${PORT}`); + }); +}; diff --git a/example/cluster-with-worker-threads/worker.js b/example/cluster-with-worker-threads/worker.js new file mode 100644 index 00000000..67f0d2b5 --- /dev/null +++ b/example/cluster-with-worker-threads/worker.js @@ -0,0 +1,29 @@ +'use strict'; + +const { Counter } = require('../..'); +const { + Worker, + isMainThread, + parentPort, + threadId, +} = require('worker_threads'); + +if (isMainThread) { + module.exports = () => + new Promise((resolve, reject) => { + const worker = new Worker(__filename); + + worker.on('message', resolve); + worker.on('error', reject); + }); +} else { + const worker_invocation_total = new Counter({ + name: 'worker_invocation_total', + help: 'worker invocation count | threadId="20212"', + labelNames: ['threadId'], + }); + + worker_invocation_total.inc({ threadId }); + + parentPort.postMessage(`result: ${Math.random()}`); +} diff --git a/example/cluster.js b/example/cluster.js deleted file mode 100644 index b91e1ceb..00000000 --- a/example/cluster.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict'; - -const cluster = require('cluster'); -const express = require('express'); -const metricsServer = express(); -const AggregatorRegistry = require('../').AggregatorRegistry; -const aggregatorRegistry = new AggregatorRegistry(); - -if (cluster.isMaster) { - for (let i = 0; i < 4; i++) { - cluster.fork(); - } - - metricsServer.get('/cluster_metrics', async (req, res) => { - try { - const metrics = await aggregatorRegistry.clusterMetrics(); - res.set('Content-Type', aggregatorRegistry.contentType); - res.send(metrics); - } catch (ex) { - res.statusCode = 500; - res.send(ex.message); - } - }); - - metricsServer.listen(3001); - console.log( - 'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics', - ); -} else { - require('./server.js'); -} diff --git a/example/cluster/index.js b/example/cluster/index.js new file mode 100644 index 00000000..12a7aaec --- /dev/null +++ b/example/cluster/index.js @@ -0,0 +1,27 @@ +'use strict'; + +const cluster = require('cluster'); +const server = require('./server'); + +const AggregatorRegistry = require('../..').AggregatorRegistry; +const aggregatorRegistry = new AggregatorRegistry(); + +if (cluster.isMaster) { + console.log(`Master ${process.pid} is running`); + + for (let i = 0; i < 2; i++) { + cluster.fork(); + } + + cluster.on('exit', worker => { + console.log(`worker ${worker.process.pid} died`); + console.log("Let's fork another worker!"); + cluster.fork(); + }); +} else { + server({ + metrics: { + contentType: aggregatorRegistry.contentType, + }, + }); +} diff --git a/example/cluster/server.js b/example/cluster/server.js new file mode 100644 index 00000000..d7e66a94 --- /dev/null +++ b/example/cluster/server.js @@ -0,0 +1,38 @@ +'use strict'; + +const express = require('express'); +const { Counter } = require('../..'); +const AggregatorRegistry = require('../..').AggregatorRegistry; +const aggregatorRegistry = new AggregatorRegistry(); +const PORT = 3000; + +const http_request_total = new Counter({ + name: 'http_request_total', + help: 'request count | clusterId="20212" statusCode="2xx|4xx|5xx"', + labelNames: ['clusterId', 'statusCode'], +}); + +module.exports = (options = {}) => { + const { + metrics: { contentType = '' }, + } = options; + + const app = express(); + + app.get('/', (req, res) => { + http_request_total.inc({ clusterId: process.pid, statusCode: 200 }); + + res.send('OK'); + }); + + app.get('/metrics', async (req, res) => { + const metrics = await aggregatorRegistry.clusterMetrics(); + + res.set('Content-Type', contentType); + res.send(metrics); + }); + + app.listen(PORT, () => { + console.log(`cluster: #${process.pid} - listening on port ${PORT}`); + }); +}; diff --git a/example/server.js b/example/server.js index 7818c680..ba91ec7c 100644 --- a/example/server.js +++ b/example/server.js @@ -1,7 +1,6 @@ 'use strict'; const express = require('express'); -const cluster = require('cluster'); const server = express(); const register = require('../').register; @@ -68,13 +67,6 @@ setInterval(() => { g.labels('post', '300').inc(); }, 100); -if (cluster.isWorker) { - // Expose some worker-specific metric as an example - setInterval(() => { - c.inc({ code: `worker_${cluster.worker.id}` }); - }, 2000); -} - const t = []; setInterval(() => { for (let i = 0; i < 100; i++) { diff --git a/example/worker-threads/index.js b/example/worker-threads/index.js new file mode 100644 index 00000000..a3c4c2db --- /dev/null +++ b/example/worker-threads/index.js @@ -0,0 +1,34 @@ +'use strict'; + +const express = require('express'); +const { Counter } = require('../..'); +const AggregatorRegistry = require('../..').AggregatorRegistry; +const aggregatorRegistry = new AggregatorRegistry(); +const worker = require('./worker'); +const PORT = 3000; + +const http_request_total = new Counter({ + name: 'http_request_total', + help: 'request count | statusCode="2xx|4xx|5xx"', + labelNames: ['statusCode'], +}); + +const app = express(); + +app.get('/', async (req, res) => { + http_request_total.inc({ statusCode: 200 }); + const result = await worker(); + + res.send(result); +}); + +app.get('/metrics', async (req, res) => { + const metrics = await aggregatorRegistry.workersMetrics(); + + res.set('Content-Type', aggregatorRegistry.contentType); + res.send(metrics); +}); + +app.listen(PORT, () => { + console.log(`listening on port ${PORT}`); +}); diff --git a/example/worker-threads/worker.js b/example/worker-threads/worker.js new file mode 100644 index 00000000..cb909212 --- /dev/null +++ b/example/worker-threads/worker.js @@ -0,0 +1,30 @@ +'use strict'; + +const { Counter } = require('../..'); +const { + Worker, + isMainThread, + parentPort, + threadId, +} = require('worker_threads'); + +if (isMainThread) { + const worker = new Worker(__filename); + + module.exports.workers = [worker]; + module.exports.calculate = () => + new Promise((resolve, reject) => { + worker.emit('message', resolve); + worker.on('error', reject); + }); +} else { + const worker_invocation_total = new Counter({ + name: 'worker_invocation_total', + help: 'worker invocation count | threadId="20212"', + labelNames: ['threadId'], + }); + + worker_invocation_total.inc({ threadId }); + + parentPort.postMessage(`result: ${Math.random()}`); +} diff --git a/lib/cluster.js b/lib/cluster.js index 5cb707ed..d3757787 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -11,6 +11,20 @@ const Registry = require('./registry'); const { Grouper } = require('./util'); const { aggregators } = require('./metricAggregators'); + +let parentPort, MessageChannel, isMainThread, Worker; +try { + /* eslint-disable node/no-unsupported-features/node-builtins */ + const worker_threads = require('worker_threads'); + + parentPort = worker_threads.parentPort; + MessageChannel = worker_threads.MessageChannel; + isMainThread = worker_threads.isMainThread; + Worker = worker_threads.Worker; +} catch { + // node version is too old +} + // We need to lazy-load the 'cluster' module as some application servers - // namely Passenger - crash when it is imported. let cluster = () => { @@ -25,6 +39,7 @@ const GET_METRICS_RES = 'prom-client:getMetricsRes'; let registries = [Registry.globalRegistry]; let requestCtr = 0; // Concurrency control let listenersAdded = false; +let workersQueue = []; const requests = new Map(); // Pending requests for workers' local metrics. class AggregatorRegistry extends Registry { @@ -33,6 +48,10 @@ class AggregatorRegistry extends Registry { addListeners(); } + setWorkers(workers = []) { + workersQueue = [...workers]; + } + /** * Gets aggregated metrics for all workers. The optional callback and * returned Promise resolve with the same value; either may be used. @@ -170,6 +189,7 @@ function addListeners() { cluster().on('message', (worker, message) => { if (message.type === GET_METRICS_RES) { const request = requests.get(message.requestId); + request.pending += message.workerRequests || 0; if (message.error) { request.done(new Error(message.error)); @@ -191,29 +211,77 @@ function addListeners() { } }); } +} + +if (!isMainThread) { + parentPort.on('message', ({ type, requestId, port } = {}) => { + if (type === GET_METRICS_REQ) { + Promise.all(registries.map(r => r.getMetricsAsJSON())) + .then(metrics => { + port.postMessage({ + type: GET_METRICS_RES, + requestId, + metrics, + }); - if (cluster().isWorker) { - // Respond to master's requests for worker's local metrics. - process.on('message', message => { - if (message.type === GET_METRICS_REQ) { - Promise.all(registries.map(r => r.getMetricsAsJSON())) - .then(metrics => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - metrics, - }); - }) - .catch(error => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - error: error.message, - }); + registries.forEach(r => r.resetMetrics()); + }) + .catch(error => { + port.postMessage({ + type: GET_METRICS_RES, + requestId, + error: error.message, }); + }); + } + }); +} + +// Respond to master's requests for worker's local metrics. +process.on('message', message => { + if (cluster().isWorker && message.type === GET_METRICS_REQ) { + let workerRequests = 0; + workersQueue.forEach(worker => { + if (worker && worker instanceof Worker) { + const metricsChannel = new MessageChannel(); + + worker.postMessage( + { + ...message, + port: metricsChannel.port1, + }, + [metricsChannel.port1], + ); + + workerRequests++; + + metricsChannel.port2.on('message', response => { + if (response.type === GET_METRICS_RES) { + process.send(response); + + metricsChannel.port2.close(); + } + }); } }); + + Promise.all(registries.map(r => r.getMetricsAsJSON())) + .then(metrics => { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + metrics, + workerRequests, + }); + }) + .catch(error => { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: error.message, + }); + }); } -} +}); module.exports = AggregatorRegistry;