Fix ThreadPool monitoring
* Base UnstoppableExecutorService on ThreadPoolExecutor which is supported by ExecutorServiceMetrics of micrometer-core * Fix handling of references to ExecutorServiceMetrics to avoid garbage collection removing the instances * Remove duplicate registration of ThreadPools Signed-off-by: Holger Friedrich <mail@holger-friedrich.de>pull/4693/head
parent
2a12cd63c3
commit
d7383d9d5e
|
@ -10,5 +10,6 @@ Import-Package: \
|
||||||
Export-Package: \
|
Export-Package: \
|
||||||
com.codahale.metrics.*;-split-package:=merge-first,\
|
com.codahale.metrics.*;-split-package:=merge-first,\
|
||||||
io.micrometer.core.*;-split-package:=merge-first,\
|
io.micrometer.core.*;-split-package:=merge-first,\
|
||||||
|
io.micrometer.common.*;-split-package:=merge-first,\
|
||||||
org.HdrHistogram.*;-split-package:=merge-first,\
|
org.HdrHistogram.*;-split-package:=merge-first,\
|
||||||
org.LatencyUtils.*;-split-package:=merge-first
|
org.LatencyUtils.*;-split-package:=merge-first
|
||||||
|
|
|
@ -91,7 +91,6 @@ public class DefaultMetricsRegistration implements ReadyService.ReadyTracker, Me
|
||||||
meters.add(new ThingStateMetric(bundleContext, thingRegistry, tags));
|
meters.add(new ThingStateMetric(bundleContext, thingRegistry, tags));
|
||||||
meters.add(new EventCountMetric(bundleContext, tags));
|
meters.add(new EventCountMetric(bundleContext, tags));
|
||||||
meters.add(new RuleMetric(bundleContext, tags, ruleRegistry));
|
meters.add(new RuleMetric(bundleContext, tags, ruleRegistry));
|
||||||
meters.add(new ThreadPoolMetric(tags));
|
|
||||||
|
|
||||||
meters.forEach(m -> m.bindTo(registry));
|
meters.forEach(m -> m.bindTo(registry));
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ public class ThreadPoolMetric implements OpenhabCoreMeterBinder {
|
||||||
private static final String POOLNAME_TAG_NAME = "pool";
|
private static final String POOLNAME_TAG_NAME = "pool";
|
||||||
private final Set<Tag> tags = new HashSet<>();
|
private final Set<Tag> tags = new HashSet<>();
|
||||||
private @Nullable MeterRegistry meterRegistry;
|
private @Nullable MeterRegistry meterRegistry;
|
||||||
|
private Set<ExecutorServiceMetrics> executorServiceMetricsSet = new HashSet<>();
|
||||||
|
|
||||||
public ThreadPoolMetric(Collection<Tag> tags) {
|
public ThreadPoolMetric(Collection<Tag> tags) {
|
||||||
this.tags.addAll(tags);
|
this.tags.addAll(tags);
|
||||||
|
@ -66,7 +67,9 @@ public class ThreadPoolMetric implements OpenhabCoreMeterBinder {
|
||||||
}
|
}
|
||||||
Set<Tag> tagsWithPoolname = new HashSet<>(tags);
|
Set<Tag> tagsWithPoolname = new HashSet<>(tags);
|
||||||
tagsWithPoolname.add(Tag.of(POOLNAME_TAG_NAME, poolName));
|
tagsWithPoolname.add(Tag.of(POOLNAME_TAG_NAME, poolName));
|
||||||
new ExecutorServiceMetrics(es, poolName, tagsWithPoolname).bindTo(meterRegistry);
|
ExecutorServiceMetrics metrics = new ExecutorServiceMetrics(es, poolName, tagsWithPoolname);
|
||||||
|
metrics.bindTo(meterRegistry);
|
||||||
|
executorServiceMetricsSet.add(metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -81,5 +84,6 @@ public class ThreadPoolMetric implements OpenhabCoreMeterBinder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.meterRegistry = null;
|
this.meterRegistry = null;
|
||||||
|
executorServiceMetricsSet.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -219,13 +221,18 @@ public class ThreadPoolManager {
|
||||||
return new HashSet<>(pools.keySet());
|
return new HashSet<>(pools.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
static class UnstoppableExecutorService<T extends ExecutorService> implements ExecutorService {
|
// needs to be of a class supported by micrometer-core, see ExecutorServiceMetrics.java,
|
||||||
|
// originally this class was intended to be defined as "implements ExecutorService"
|
||||||
|
static class UnstoppableExecutorService<T extends ExecutorService> extends ThreadPoolExecutor {
|
||||||
|
|
||||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
protected final T delegate;
|
protected final T delegate;
|
||||||
protected final String threadPoolName;
|
protected final String threadPoolName;
|
||||||
|
|
||||||
private UnstoppableExecutorService(String threadPoolName, T delegate) {
|
private UnstoppableExecutorService(String threadPoolName, T delegate) {
|
||||||
|
// although nearly all methods of ThreadPoolExecutor are overwritten, super() needs to be
|
||||||
|
// called with valid parameters
|
||||||
|
super(0, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
|
||||||
this.threadPoolName = threadPoolName;
|
this.threadPoolName = threadPoolName;
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
}
|
}
|
||||||
|
@ -304,6 +311,61 @@ public class ThreadPoolManager {
|
||||||
T getDelegate() {
|
T getDelegate() {
|
||||||
return delegate;
|
return delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the following methods of ThreadPoolExecutor will be queried for collection of metrics,
|
||||||
|
// they need to be replaced and the query is to be delegated to out ThreadPoolExecutor
|
||||||
|
// referenced in variable "delegate"
|
||||||
|
|
||||||
|
// not part of monitoring
|
||||||
|
// public long getTaskCount() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getCompletedTaskCount() {
|
||||||
|
// executor_completed_tasks_total
|
||||||
|
return ((ThreadPoolExecutor) delegate).getCompletedTaskCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveCount() {
|
||||||
|
// executor_active_threads
|
||||||
|
return ((ThreadPoolExecutor) delegate).getActiveCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaximumPoolSize() {
|
||||||
|
// executor_pool_max_threads
|
||||||
|
return ((ThreadPoolExecutor) delegate).getMaximumPoolSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
// not part of monitoring
|
||||||
|
// public int getLargestPoolSize() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getCorePoolSize() {
|
||||||
|
// executor_pool_core_threads
|
||||||
|
return ((ThreadPoolExecutor) delegate).getCorePoolSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPoolSize() {
|
||||||
|
// executor_pool_size_threads
|
||||||
|
return ((ThreadPoolExecutor) delegate).getPoolSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlockingQueue<Runnable> getQueue() {
|
||||||
|
return new ArrayBlockingQueue<Runnable>(1) {
|
||||||
|
public int remainingCapacity() {
|
||||||
|
// executor_queue_remaining_tasks
|
||||||
|
return ((ThreadPoolExecutor) delegate).getQueue().remainingCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
// executor_queued_tasks
|
||||||
|
return ((ThreadPoolExecutor) delegate).getQueue().size();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class UnstoppableScheduledExecutorService extends UnstoppableExecutorService<ScheduledExecutorService>
|
static class UnstoppableScheduledExecutorService extends UnstoppableExecutorService<ScheduledExecutorService>
|
||||||
|
|
Loading…
Reference in New Issue