Fix dangling SSE subscriptions (#2983)
Unlike the sitemap SSE subscriptions generic event subscriptions did not implement a connection lost monitor. Signed-off-by: Jan N. Klug <github@klug.nrw>pull/3005/head
parent
af0566af95
commit
84d5d38606
|
@ -20,6 +20,9 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.annotation.security.RolesAllowed;
|
||||
import javax.inject.Singleton;
|
||||
|
@ -42,6 +45,7 @@ import javax.ws.rs.sse.SseEventSink;
|
|||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.auth.Role;
|
||||
import org.openhab.core.common.ThreadPoolManager;
|
||||
import org.openhab.core.events.Event;
|
||||
import org.openhab.core.io.rest.RESTConstants;
|
||||
import org.openhab.core.io.rest.RESTResource;
|
||||
|
@ -99,6 +103,10 @@ public class SseResource implements RESTResource, SsePublisher {
|
|||
|
||||
private final Logger logger = LoggerFactory.getLogger(SseResource.class);
|
||||
|
||||
private final ScheduledExecutorService scheduler = ThreadPoolManager
|
||||
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
|
||||
private final ScheduledFuture<?> cleanSubscriptionsJob;
|
||||
|
||||
private @Context @NonNullByDefault({}) Sse sse;
|
||||
|
||||
private final SseBroadcaster<SseSinkItemInfo> itemStatesBroadcaster = new SseBroadcaster<>();
|
||||
|
@ -111,6 +119,13 @@ public class SseResource implements RESTResource, SsePublisher {
|
|||
public SseResource(@Reference SseItemStatesEventBuilder itemStatesEventBuilder) {
|
||||
this.executorService = Executors.newSingleThreadExecutor();
|
||||
this.itemStatesEventBuilder = itemStatesEventBuilder;
|
||||
|
||||
cleanSubscriptionsJob = scheduler.scheduleWithFixedDelay(() -> {
|
||||
logger.debug("Run clean SSE subscriptions job");
|
||||
OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("event")
|
||||
.mediaType(MediaType.APPLICATION_JSON_TYPE).data(new ServerAliveEvent()).build();
|
||||
topicBroadcaster.send(outboundSseEvent);
|
||||
}, 1, 2, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
@Deactivate
|
||||
|
@ -118,6 +133,7 @@ public class SseResource implements RESTResource, SsePublisher {
|
|||
itemStatesBroadcaster.close();
|
||||
topicBroadcaster.close();
|
||||
executorService.shutdown();
|
||||
cleanSubscriptionsJob.cancel(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -245,4 +261,8 @@ public class SseResource implements RESTResource, SsePublisher {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class ServerAliveEvent {
|
||||
public final String type = "ALIVE";
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue