diff --git a/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/EventHandler.java b/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/EventHandler.java index 9d1c6ee666..56a91fbce4 100644 --- a/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/EventHandler.java +++ b/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/EventHandler.java @@ -16,10 +16,15 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.eclipse.smarthome.core.common.SafeCaller; +import org.eclipse.smarthome.core.common.NamedThreadFactory; import org.eclipse.smarthome.core.events.Event; import org.eclipse.smarthome.core.events.EventFactory; import org.eclipse.smarthome.core.events.EventFilter; @@ -33,26 +38,36 @@ import org.slf4j.LoggerFactory; * @author Markus Rathgeb - Initial contribution */ @NonNullByDefault -public class EventHandler { +public class EventHandler implements AutoCloseable { + + private static final long EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS = TimeUnit.SECONDS.toMillis(5); private final Logger logger = LoggerFactory.getLogger(EventHandler.class); private final Map> typedEventSubscribers; private final Map typedEventFactories; - private final SafeCaller safeCaller; + + private final ScheduledExecutorService watcher = Executors + .newSingleThreadScheduledExecutor(new NamedThreadFactory("EventHandlerWatcher")); + private final ExecutorService executor = Executors + .newSingleThreadExecutor(new NamedThreadFactory("EventHandlerExecutor")); /** * Create a new event handler. * * @param typedEventSubscribers the event subscribers indexed by the event type * @param typedEventFactories the event factories indexed by the event type - * @param safeCaller the safe caller to use */ public EventHandler(final Map> typedEventSubscribers, - final Map typedEventFactories, final SafeCaller safeCaller) { + final Map typedEventFactories) { this.typedEventSubscribers = typedEventSubscribers; this.typedEventFactories = typedEventFactories; - this.safeCaller = safeCaller; + } + + @Override + public void close() { + watcher.shutdownNow(); + executor.shutdownNow(); } public void handleEvent(org.osgi.service.event.Event osgiEvent) { @@ -131,17 +146,22 @@ public class EventHandler { EventFilter filter = eventSubscriber.getEventFilter(); if (filter == null || filter.apply(event)) { logger.trace("Delegate event to subscriber ({}).", eventSubscriber.getClass()); - safeCaller.create(eventSubscriber, EventSubscriber.class).withAsync().onTimeout(() -> { - logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.", - eventSubscriber.toString(), SafeCaller.DEFAULT_TIMEOUT); - }).onException(e -> { - logger.error("Dispatching/filtering event for subscriber '{}' failed: {}", - EventSubscriber.class.getName(), e.getMessage(), e); - }).build().receive(event); + executor.submit(() -> { + ScheduledFuture logTimeout = watcher.schedule( + () -> logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.", + eventSubscriber, EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS), + EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS, TimeUnit.MILLISECONDS); + try { + eventSubscriber.receive(event); + } catch (final Exception ex) { + logger.error("Dispatching/filtering event for subscriber '{}' failed: {}", + EventSubscriber.class.getName(), ex.getMessage(), ex); + } + logTimeout.cancel(false); + }); } else { logger.trace("Skip event subscriber ({}) because of its filter.", eventSubscriber.getClass()); } } } - } diff --git a/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/OSGiEventManager.java b/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/OSGiEventManager.java index 0328e587f1..8379a919df 100644 --- a/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/OSGiEventManager.java +++ b/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/OSGiEventManager.java @@ -18,7 +18,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; -import org.eclipse.smarthome.core.common.SafeCaller; import org.eclipse.smarthome.core.events.Event; import org.eclipse.smarthome.core.events.EventFactory; import org.eclipse.smarthome.core.events.EventSubscriber; @@ -51,11 +50,9 @@ public class OSGiEventManager implements EventHandler { private ThreadedEventHandler eventHandler; - private SafeCaller safeCaller; - @Activate protected void activate(ComponentContext componentContext) { - eventHandler = new ThreadedEventHandler(typedEventSubscribers, typedEventFactories, safeCaller); + eventHandler = new ThreadedEventHandler(typedEventSubscribers, typedEventFactories); eventHandler.open(); } @@ -116,15 +113,6 @@ public class OSGiEventManager implements EventHandler { } } - @Reference - protected void setSafeCaller(SafeCaller safeCaller) { - this.safeCaller = safeCaller; - } - - protected void unsetSafeCaller(SafeCaller safeCaller) { - this.safeCaller = null; - } - @Override public void handleEvent(org.osgi.service.event.Event osgiEvent) { eventHandler.handleEvent(osgiEvent); diff --git a/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/ThreadedEventHandler.java b/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/ThreadedEventHandler.java index d5c3d54f4d..6727c800ee 100644 --- a/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/ThreadedEventHandler.java +++ b/bundles/org.openhab.core/src/main/java/org/eclipse/smarthome/core/internal/events/ThreadedEventHandler.java @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jdt.annotation.NonNullByDefault; -import org.eclipse.smarthome.core.common.SafeCaller; import org.eclipse.smarthome.core.events.EventFactory; import org.eclipse.smarthome.core.events.EventSubscriber; import org.osgi.service.event.Event; @@ -50,31 +49,32 @@ public class ThreadedEventHandler implements Closeable { * * @param typedEventSubscribers the event subscribers * @param typedEventFactories the event factories indexed by the event type - * @param safeCaller the safe caller to use */ ThreadedEventHandler(Map> typedEventSubscribers, - final Map typedEventFactories, final SafeCaller safeCaller) { + final Map typedEventFactories) { thread = new Thread(() -> { - final EventHandler worker = new EventHandler(typedEventSubscribers, typedEventFactories, safeCaller); - while (running.get()) { - try { - logger.trace("wait for event"); - final Event event = queue.poll(1, TimeUnit.HOURS); - logger.trace("inspect event: {}", event); - if (event == null) { - logger.debug("Hey, you have really very few events."); - } else if (event == notifyEvent) { - // received an internal notification - } else { - worker.handleEvent(event); + try (EventHandler worker = new EventHandler(typedEventSubscribers, typedEventFactories)) { + while (running.get()) { + try { + logger.trace("wait for event"); + final Event event = queue.poll(1, TimeUnit.HOURS); + logger.trace("inspect event: {}", event); + if (event == null) { + logger.debug("Hey, you have really very few events."); + } else if (event == notifyEvent) { + // received an internal notification + } else { + worker.handleEvent(event); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (RuntimeException ex) { + logger.error("Error on event handling.", ex); } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (RuntimeException ex) { - logger.error("Error on event handling.", ex); } } - }, "ESH-OSGiEventManager"); + }, "OH-OSGiEventManager"); + } void open() {