Ensure the PoolBasedSequentialScheduledExecutorService does keep a minimum size (#4288)

Signed-off-by: Jörg Sautter <joerg.sautter@gmx.net>
pull/4295/head
joerg1985 2024-07-01 22:26:27 +02:00 committed by GitHub
parent 425d0f43f2
commit cef917fac9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 94 additions and 33 deletions

View File

@ -18,7 +18,6 @@ import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -29,16 +28,18 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.internal.common.WrappedScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A ScheduledExecutorService that will sequentially perform the tasks like a
@ -55,20 +56,73 @@ import org.eclipse.jdt.annotation.Nullable;
@NonNullByDefault
final class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService {
private static final WeakHashMap<ScheduledThreadPoolExecutor, @NonNull AtomicInteger> PENDING_BY_POOL = new WeakHashMap<>();
static class BasePoolExecutor extends WrappedScheduledExecutorService {
protected final Logger logger = LoggerFactory.getLogger(BasePoolExecutor.class);
private final String threadPoolName;
private final AtomicInteger pending;
private volatile int minimumPoolSize;
public BasePoolExecutor(String threadPoolName, int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
this.threadPoolName = threadPoolName;
// set to one does ensure at least one thread more than tasks running
this.pending = new AtomicInteger(1);
}
public synchronized void resizePool(int mandatoryPoolSize) {
int corePoolSize = getCorePoolSize();
if (minimumPoolSize > mandatoryPoolSize) {
mandatoryPoolSize = minimumPoolSize;
}
if (mandatoryPoolSize > corePoolSize) {
// two more than needed, they will time out if there is no work for them im time
setMaximumPoolSize(mandatoryPoolSize + 2);
setCorePoolSize(mandatoryPoolSize);
} else if (mandatoryPoolSize < corePoolSize) {
setCorePoolSize(mandatoryPoolSize);
// ensure we drop not needed threads, this is only needed under higher load when none of the
// started threads have a chance to timeout
setMaximumPoolSize(mandatoryPoolSize + 2);
}
}
public int getMinimumPoolSize() {
return minimumPoolSize;
}
public void setMinimumPoolSize(int minimumPoolSize) {
this.minimumPoolSize = minimumPoolSize;
resizePool(getCorePoolSize());
}
@Override
public void shutdown() {
logger.warn("shutdown() invoked on a shared thread pool '{}'. This is a bug, please submit a bug report",
threadPoolName, new IllegalStateException());
}
@Override
@NonNullByDefault({})
public List<Runnable> shutdownNow() {
logger.warn("shutdownNow() invoked on a shared thread pool '{}'. This is a bug, please submit a bug report",
threadPoolName, new IllegalStateException());
return List.of();
}
}
private final WorkQueueEntry empty;
private final ScheduledThreadPoolExecutor pool;
private final AtomicInteger pending;
private final BasePoolExecutor pool;
private final List<RunnableFuture<?>> scheduled;
private final ScheduledFuture<?> cleaner;
private @Nullable WorkQueueEntry tail;
public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor pool) {
if (pool.getMaximumPoolSize() != Integer.MAX_VALUE) {
throw new IllegalArgumentException("the pool must scale unlimited to avoid potential dead locks!");
}
public PoolBasedSequentialScheduledExecutorService(BasePoolExecutor pool) {
this.pool = pool;
// prepare the WorkQueueEntry we are using when no tasks are pending
@ -81,20 +135,6 @@ final class PoolBasedSequentialScheduledExecutorService implements ScheduledExec
tail = empty;
// we need one pending counter per pool
synchronized (PENDING_BY_POOL) {
AtomicInteger fromCache = PENDING_BY_POOL.get(pool);
if (fromCache == null) {
// set to one does ensure at least one thread more than tasks running
fromCache = new AtomicInteger(1);
PENDING_BY_POOL.put(pool, fromCache);
}
pending = fromCache;
}
// clean up to ensure we do not keep references to old tasks
cleaner = this.scheduleWithFixedDelay(() -> {
synchronized (this) {
@ -197,6 +237,10 @@ final class PoolBasedSequentialScheduledExecutorService implements ScheduledExec
@Override
public void shutdown() {
synchronized (this) {
if (tail == null) {
return;
}
cleaner.cancel(false);
scheduled.removeIf((sf) -> {
sf.cancel(false);
@ -302,7 +346,7 @@ final class PoolBasedSequentialScheduledExecutorService implements ScheduledExec
// a small hack to throw the Exception unchecked
throw PoolBasedSequentialScheduledExecutorService.unchecked(ex);
} finally {
pending.decrementAndGet();
pool.pending.decrementAndGet();
}
};
@ -314,8 +358,8 @@ final class PoolBasedSequentialScheduledExecutorService implements ScheduledExec
throw new RejectedExecutionException("this scheduled executor has been shutdown before");
}
// set the core pool size even if it does not change, this triggers idle threads to stop
pool.setCorePoolSize(pending.incrementAndGet());
var mandatoryPoolSize = pool.pending.incrementAndGet();
pool.resizePool(mandatoryPoolSize);
// avoid waiting for one pool thread to finish inside a pool thread
runNow = inPool && tail.future.isDone();
@ -335,7 +379,7 @@ final class PoolBasedSequentialScheduledExecutorService implements ScheduledExec
try {
cf.run();
} finally {
pending.decrementAndGet();
pool.pending.decrementAndGet();
}
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.openhab.core.common.PoolBasedSequentialScheduledExecutorService.BasePoolExecutor;
import org.openhab.core.internal.common.WrappedScheduledExecutorService;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentConstants;
@ -100,7 +101,10 @@ public class ThreadPoolManager {
Integer poolSize = Integer.valueOf(string);
configs.put(poolName, poolSize);
ThreadPoolExecutor pool = (ThreadPoolExecutor) pools.get(poolName);
if (pool instanceof ScheduledThreadPoolExecutor) {
if (pool instanceof BasePoolExecutor basePool) {
basePool.setMinimumPoolSize(poolSize);
LOGGER.debug("Updated scheduled thread pool '{}' to minimum size {}", poolName, poolSize);
} else if (pool instanceof ScheduledThreadPoolExecutor) {
pool.setCorePoolSize(poolSize);
LOGGER.debug("Updated scheduled thread pool '{}' to size {}", poolName, poolSize);
} else if (pool instanceof QueueingThreadPoolExecutor) {
@ -129,9 +133,22 @@ public class ThreadPoolManager {
public static ScheduledExecutorService getPoolBasedSequentialScheduledExecutorService(String poolName,
String threadName) {
if (configs.getOrDefault(poolName, 0) > 0) {
ScheduledThreadPoolExecutor pool = getScheduledPoolUnwrapped(poolName);
ExecutorService pool = pools.computeIfAbsent(poolName, name -> {
int cfg = getConfig(name);
ScheduledThreadPoolExecutor executor = new BasePoolExecutor(name, cfg,
new NamedThreadFactory(name, true, Thread.NORM_PRIORITY));
executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
executor.setRemoveOnCancelPolicy(true);
LOGGER.debug("Created scheduled pool based thread pool '{}' of size {}", name, cfg);
return executor;
});
return new PoolBasedSequentialScheduledExecutorService((ScheduledThreadPoolExecutor) pool);
if (pool instanceof BasePoolExecutor service) {
return new PoolBasedSequentialScheduledExecutorService(service);
} else {
throw new IllegalArgumentException("Pool " + poolName + " is not a base pool!");
}
} else {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(threadName));
}

View File

@ -202,7 +202,7 @@ public class ThreadPoolManagerTest {
block.countDown();
}, 20, TimeUnit.MILLISECONDS);
assertTrue(check.await(80, TimeUnit.MILLISECONDS));
assertTrue(check.await(800, TimeUnit.MILLISECONDS));
}
}