Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于线程池监控的一个PR #60

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@

import com.alibaba.metrics.Counter;
import com.alibaba.metrics.Meter;
import com.alibaba.metrics.MetricLevel;
import com.alibaba.metrics.MetricName;
import com.alibaba.metrics.MetricRegistry;
import com.alibaba.metrics.PersistentGauge;
import com.alibaba.metrics.ReservoirType;
import com.alibaba.metrics.Timer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -69,12 +76,75 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
* @param name name for this executor service.
*/
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {
this(delegate, registry, name, ReservoirType.EXPONENTIALLY_DECAYING);
}

/**
* Wraps an {@link ExecutorService} with an explicit name.
*
* @param delegate {@link ExecutorService} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
* @param name name for this executor service.
* @param reservoirType reservoirType for timer inner Histogram metric.
*/
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name, ReservoirType reservoirType) {
this(delegate, registry, name, new HashMap<String, String>(), MetricLevel.CRITICAL, reservoirType);
}

/**
* Wraps an {@link ExecutorService} with an explicit name.
*
* @param delegate {@link ExecutorService} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
* @param name name for this executor service.
* @param metricLevel metricLevel of metric.
* @param metricTags metricTags of metric.
* @param reservoirType reservoirType of metric.
*/
public InstrumentedExecutorService(final ExecutorService delegate, final MetricRegistry registry, final String name, final Map<String, String> metricTags, final MetricLevel metricLevel, final ReservoirType reservoirType) {
this.delegate = delegate;
this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));
this.running = registry.counter(MetricRegistry.name(name, "running"));
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
this.duration = registry.timer(MetricRegistry.name(name, "duration"));
this.rejected = registry.meter(MetricRegistry.name(name, "rejected"));
if (delegate instanceof ThreadPoolExecutor) {
final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) delegate;
registry.register(new MetricName(name + MetricName.SEPARATOR + "maxThreadPoolSize", metricTags, metricLevel), new PersistentGauge() {
@Override
public Object getValue() {
return threadPoolExecutor.getMaximumPoolSize();
}
});

registry.register(new MetricName(name + MetricName.SEPARATOR + "coreThreadPoolSize", metricTags, metricLevel), new PersistentGauge() {
@Override
public Object getValue() {
return threadPoolExecutor.getCorePoolSize();
}
});

registry.register(new MetricName(name + MetricName.SEPARATOR + "currentThreadPoolSize", metricTags, metricLevel), new PersistentGauge() {
@Override
public Object getValue() {
return threadPoolExecutor.getPoolSize();
}
});

registry.register(new MetricName(name + MetricName.SEPARATOR + "activeThreadPoolSize", metricTags, metricLevel), new PersistentGauge() {
@Override
public Object getValue() {
return threadPoolExecutor.getActiveCount();
}
});

registry.register(new MetricName(name + MetricName.SEPARATOR + "threadPoolQueueSize", metricTags, metricLevel), new PersistentGauge() {
@Override
public Object getValue() {
return threadPoolExecutor.getQueue().size();
}
});
}
this.submitted = registry.meter(new MetricName(name + MetricName.SEPARATOR + "submitted", metricTags, metricLevel));
this.running = registry.counter(new MetricName(name + MetricName.SEPARATOR + "running", metricTags, metricLevel));
this.completed = registry.meter(new MetricName(name + MetricName.SEPARATOR + "completed", metricTags, metricLevel));
this.duration = registry.timer(new MetricName(name + MetricName.SEPARATOR + "duration", metricTags, metricLevel), reservoirType);
this.rejected = registry.meter(new MetricName(name + MetricName.SEPARATOR + "rejected", metricTags, metricLevel));
}

/**
Expand Down