diff --git a/bundles/org.openhab.core.io.monitor/bnd.bnd b/bundles/org.openhab.core.io.monitor/bnd.bnd index 185b8d95c8..645fa424ab 100644 --- a/bundles/org.openhab.core.io.monitor/bnd.bnd +++ b/bundles/org.openhab.core.io.monitor/bnd.bnd @@ -10,5 +10,6 @@ Import-Package: \ Export-Package: \ com.codahale.metrics.*;-split-package:=merge-first,\ io.micrometer.core.*;-split-package:=merge-first,\ + io.micrometer.common.*;-split-package:=merge-first,\ org.HdrHistogram.*;-split-package:=merge-first,\ org.LatencyUtils.*;-split-package:=merge-first diff --git a/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/DefaultMetricsRegistration.java b/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/DefaultMetricsRegistration.java index e81c82065f..116a20ee5a 100644 --- a/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/DefaultMetricsRegistration.java +++ b/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/DefaultMetricsRegistration.java @@ -91,7 +91,6 @@ public class DefaultMetricsRegistration implements ReadyService.ReadyTracker, Me meters.add(new ThingStateMetric(bundleContext, thingRegistry, tags)); meters.add(new EventCountMetric(bundleContext, tags)); meters.add(new RuleMetric(bundleContext, tags, ruleRegistry)); - meters.add(new ThreadPoolMetric(tags)); meters.forEach(m -> m.bindTo(registry)); } diff --git a/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/metrics/ThreadPoolMetric.java b/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/metrics/ThreadPoolMetric.java index da5626885e..4a81c50dfc 100644 --- a/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/metrics/ThreadPoolMetric.java +++ b/bundles/org.openhab.core.io.monitor/src/main/java/org/openhab/core/io/monitor/internal/metrics/ThreadPoolMetric.java @@ -41,6 +41,7 @@ public class ThreadPoolMetric implements OpenhabCoreMeterBinder { private static final String POOLNAME_TAG_NAME = "pool"; private final Set tags = new HashSet<>(); private @Nullable MeterRegistry meterRegistry; + private Set executorServiceMetricsSet = new HashSet<>(); public ThreadPoolMetric(Collection tags) { this.tags.addAll(tags); @@ -66,7 +67,9 @@ public class ThreadPoolMetric implements OpenhabCoreMeterBinder { } Set tagsWithPoolname = new HashSet<>(tags); tagsWithPoolname.add(Tag.of(POOLNAME_TAG_NAME, poolName)); - new ExecutorServiceMetrics(es, poolName, tagsWithPoolname).bindTo(meterRegistry); + ExecutorServiceMetrics metrics = new ExecutorServiceMetrics(es, poolName, tagsWithPoolname); + metrics.bindTo(meterRegistry); + executorServiceMetricsSet.add(metrics); } @Override @@ -81,5 +84,6 @@ public class ThreadPoolMetric implements OpenhabCoreMeterBinder { } } this.meterRegistry = null; + executorServiceMetricsSet.clear(); } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java index c3e381c856..a66a83071e 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java @@ -18,6 +18,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -219,13 +221,18 @@ public class ThreadPoolManager { return new HashSet<>(pools.keySet()); } - static class UnstoppableExecutorService implements ExecutorService { + // needs to be of a class supported by micrometer-core, see ExecutorServiceMetrics.java, + // originally this class was intended to be defined as "implements ExecutorService" + static class UnstoppableExecutorService extends ThreadPoolExecutor { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final T delegate; protected final String threadPoolName; private UnstoppableExecutorService(String threadPoolName, T delegate) { + // although nearly all methods of ThreadPoolExecutor are overwritten, super() needs to be + // called with valid parameters + super(0, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(1)); this.threadPoolName = threadPoolName; this.delegate = delegate; } @@ -304,6 +311,61 @@ public class ThreadPoolManager { T getDelegate() { return delegate; } + + // the following methods of ThreadPoolExecutor will be queried for collection of metrics, + // they need to be replaced and the query is to be delegated to out ThreadPoolExecutor + // referenced in variable "delegate" + + // not part of monitoring + // public long getTaskCount() {} + + @Override + public long getCompletedTaskCount() { + // executor_completed_tasks_total + return ((ThreadPoolExecutor) delegate).getCompletedTaskCount(); + } + + @Override + public int getActiveCount() { + // executor_active_threads + return ((ThreadPoolExecutor) delegate).getActiveCount(); + } + + @Override + public int getMaximumPoolSize() { + // executor_pool_max_threads + return ((ThreadPoolExecutor) delegate).getMaximumPoolSize(); + } + + // not part of monitoring + // public int getLargestPoolSize() {} + + @Override + public int getCorePoolSize() { + // executor_pool_core_threads + return ((ThreadPoolExecutor) delegate).getCorePoolSize(); + } + + @Override + public int getPoolSize() { + // executor_pool_size_threads + return ((ThreadPoolExecutor) delegate).getPoolSize(); + } + + @Override + public BlockingQueue getQueue() { + return new ArrayBlockingQueue(1) { + public int remainingCapacity() { + // executor_queue_remaining_tasks + return ((ThreadPoolExecutor) delegate).getQueue().remainingCapacity(); + } + + public int size() { + // executor_queued_tasks + return ((ThreadPoolExecutor) delegate).getQueue().size(); + } + }; + } } static class UnstoppableScheduledExecutorService extends UnstoppableExecutorService