Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge master registries to cluster metrics (#183) #442

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ label. (See `example/server.js` for an example using
Metrics are aggregated from the global registry by default. To use a different
registry, call
`client.AggregatorRegistry.setRegistries(registryOrArrayOfRegistries)` from the
worker processes.
master or worker processes.

## API

Expand Down
76 changes: 54 additions & 22 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/**
* Extends the Registry class with a `clusterMetrics` method that returns
* aggregated metrics for all workers.
* aggregated metrics for master and all workers.
*
* In cluster workers, listens for and responds to requests for metrics by the
* cluster master.
Expand Down Expand Up @@ -34,8 +34,8 @@ class AggregatorRegistry extends Registry {
}

/**
* Gets aggregated metrics for all workers. The optional callback and
* returned Promise resolve with the same value; either may be used.
* Gets aggregated metrics for master and all workers. The optional callback
* and returned Promise resolve with the same value; either may be used.
* @return {Promise<string>} Promise that resolves with the aggregated
* metrics.
*/
Expand All @@ -62,6 +62,27 @@ class AggregatorRegistry extends Registry {
};
requests.set(requestId, request);

// Get metrics from master
if (registries && registries.length > 0) {
Promise.all(registries.map(r => r.getMetricsAsJSON()))
.then(metrics => {
processMetricsResponse({
type: GET_METRICS_RES,
requestId,
metrics,
});
})
.catch(error => {
processMetricsResponse({
type: GET_METRICS_RES,
requestId,
error: error.message,
});
});
request.pending++;
}

// Get metrics from workers
const message = {
type: GET_METRICS_REQ,
requestId,
Expand All @@ -78,6 +99,7 @@ class AggregatorRegistry extends Registry {

if (request.pending === 0) {
// No workers were up
requests.delete(requestId);
clearTimeout(request.errorTimeout);
process.nextTick(() => done(null, ''));
}
Expand Down Expand Up @@ -145,6 +167,34 @@ class AggregatorRegistry extends Registry {
}
}

/**
* Adds metrics from master and worker to request and finalizes request when
* all metrics are collected.
* @param {object} message - GET_METRICS_RES message object containing metrics
* @return {void}
*/
function processMetricsResponse(message) {
const request = requests.get(message.requestId);

if (message.error) {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics().then(metrics => metrics.trim());
request.done(null, promString);
}
}

/**
* Adds event listeners for cluster aggregation. Idempotent (safe to call more
* than once).
Expand All @@ -158,25 +208,7 @@ function addListeners() {
// Listen for worker responses to requests for local metrics
cluster().on('message', (worker, message) => {
if (message.type === GET_METRICS_RES) {
const request = requests.get(message.requestId);

if (message.error) {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
processMetricsResponse(message);
}
});
}
Expand Down