drop blocking async SSE implementation (#754)
There workaround for a servlet implementation lower then 3 can be removed. After we migrated from ESH to openHAB Core we could set servlet >= 3 as a requirement. Signed-off-by: Markus Rathgeb <maggu2810@gmail.com>pull/762/head
parent
e3cb062610
commit
ce72e1083f
|
@ -115,17 +115,6 @@ public class SseResource {
|
|||
// events at the moment of sending them.
|
||||
response.addHeader(HttpHeaders.CONTENT_ENCODING, "identity");
|
||||
|
||||
if (!SseUtil.SERVLET3_SUPPORT) {
|
||||
// Response headers are written now, since the thread will be
|
||||
// blocked later on.
|
||||
response.setStatus(HttpServletResponse.SC_OK);
|
||||
response.setContentType(SseFeature.SERVER_SENT_EVENTS);
|
||||
response.flushBuffer();
|
||||
|
||||
// enable blocking for this thread
|
||||
SseUtil.enableBlockingSse();
|
||||
}
|
||||
|
||||
return eventOutput;
|
||||
}
|
||||
|
||||
|
|
|
@ -12,8 +12,6 @@
|
|||
*/
|
||||
package org.eclipse.smarthome.io.rest.sse.internal;
|
||||
|
||||
import org.eclipse.smarthome.io.rest.sse.internal.async.BlockingAsyncFeature;
|
||||
import org.eclipse.smarthome.io.rest.sse.internal.util.SseUtil;
|
||||
import org.glassfish.jersey.media.sse.SseFeature;
|
||||
import org.osgi.framework.BundleActivator;
|
||||
import org.osgi.framework.BundleContext;
|
||||
|
@ -51,12 +49,6 @@ public class SseActivator implements BundleActivator {
|
|||
logger.debug("SSE API - SseFeature registered.");
|
||||
}
|
||||
|
||||
if (!SseUtil.SERVLET3_SUPPORT) {
|
||||
blockingAsyncFeatureRegistration = bc.registerService(BlockingAsyncFeature.class.getName(),
|
||||
new BlockingAsyncFeature(), null);
|
||||
|
||||
logger.debug("SSE API - SSE BlockingAsyncFeature registered.");
|
||||
}
|
||||
logger.debug("SSE API has been started.");
|
||||
}
|
||||
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2010-2019 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.smarthome.io.rest.sse.internal.async;
|
||||
|
||||
import org.glassfish.hk2.utilities.binding.AbstractBinder;
|
||||
import org.glassfish.jersey.internal.inject.CustomAnnotationLiteral;
|
||||
import org.glassfish.jersey.servlet.spi.AsyncContextDelegateProvider;
|
||||
|
||||
/**
|
||||
* An {@link AbstractBinder} implementation that registers our custom {@link BlockingAsyncContextDelegateProvider} class
|
||||
* as an implementation of
|
||||
* the {@link AsyncContextDelegateProvider} SPI interface.
|
||||
*
|
||||
* @author Ivan Iliev - Initial Contribution and API
|
||||
*
|
||||
*/
|
||||
public class BlockingAsyncBinder extends AbstractBinder {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
// the qualifiedBy is needed in order for our implementation to be used
|
||||
// if there are multiple implementations of AsyncContextDelegateProvider
|
||||
bind(new BlockingAsyncContextDelegateProvider()).to(AsyncContextDelegateProvider.class).qualifiedBy(
|
||||
CustomAnnotationLiteral.INSTANCE);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,98 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2010-2019 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.smarthome.io.rest.sse.internal.async;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.smarthome.io.rest.sse.internal.util.SseUtil;
|
||||
import org.glassfish.jersey.media.sse.SseFeature;
|
||||
import org.glassfish.jersey.servlet.spi.AsyncContextDelegate;
|
||||
import org.glassfish.jersey.servlet.spi.AsyncContextDelegateProvider;
|
||||
|
||||
/**
|
||||
* An {@link AsyncContextDelegateProvider} implementation that returns a
|
||||
* blocking {@link AsyncContextDelegate}, which blocks while the connection is
|
||||
* alive if the response content-type is {@link SseFeature #SERVER_SENT_EVENTS} or throws an
|
||||
* UnsupportedOperationException otherwise. The blocking continues
|
||||
* until the response can longer be written to.
|
||||
*
|
||||
* @author Ivan Iliev - Initial Contribution and API
|
||||
*
|
||||
*/
|
||||
public class BlockingAsyncContextDelegateProvider implements AsyncContextDelegateProvider {
|
||||
|
||||
@Override
|
||||
public final AsyncContextDelegate createDelegate(final HttpServletRequest request,
|
||||
final HttpServletResponse response) {
|
||||
return new BlockingAsyncContextDelegate(request, response);
|
||||
}
|
||||
|
||||
private static final class BlockingAsyncContextDelegate implements AsyncContextDelegate {
|
||||
private static final int PING_TIMEOUT = 15 * 1000;
|
||||
|
||||
private final HttpServletResponse response;
|
||||
|
||||
private volatile boolean isRunning;
|
||||
|
||||
private BlockingAsyncContextDelegate(final HttpServletRequest request, final HttpServletResponse response) {
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete() {
|
||||
isRunning = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suspend() throws IllegalStateException {
|
||||
if (SseUtil.shouldAsyncBlock()) {
|
||||
isRunning = true;
|
||||
|
||||
synchronized (this) {
|
||||
while (isRunning) {
|
||||
try {
|
||||
this.wait(PING_TIMEOUT);
|
||||
ServletOutputStream outputStream = response.getOutputStream();
|
||||
|
||||
// write a new line to the OutputStream and flush to
|
||||
// check connectivity. If the other peer closes the
|
||||
// connection, the first flush() should generate a
|
||||
// TCP reset that is detected on the second flush()
|
||||
outputStream.write('\n');
|
||||
response.flushBuffer();
|
||||
|
||||
outputStream.write('\n');
|
||||
response.flushBuffer();
|
||||
} catch (Exception exception) {
|
||||
// If an exception has occurred during write and
|
||||
// flush we consider the connection closed, attempt
|
||||
// to close the outputstream and stop blocking.
|
||||
try {
|
||||
response.getOutputStream().close();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
|
||||
isRunning = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new UnsupportedOperationException("ASYNCHRONOUS PROCESSING IS NOT SUPPORTED!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2010-2019 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.smarthome.io.rest.sse.internal.async;
|
||||
|
||||
import javax.ws.rs.core.Feature;
|
||||
import javax.ws.rs.core.FeatureContext;
|
||||
|
||||
/**
|
||||
* A {@link Feature} implementation that registers our custom {@link BlockingAsyncBinder}.
|
||||
*
|
||||
* @author Ivan Iliev - Initial Contribution and API
|
||||
*
|
||||
*/
|
||||
public class BlockingAsyncFeature implements Feature {
|
||||
|
||||
@Override
|
||||
public boolean configure(FeatureContext context) {
|
||||
if (context.getConfiguration().isEnabled(BlockingAsyncFeature.class)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
context.register(new BlockingAsyncBinder());
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -16,7 +16,6 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
@ -26,35 +25,19 @@ import org.glassfish.jersey.media.sse.OutboundEvent;
|
|||
|
||||
/**
|
||||
* Utility class containing helper methods for the SSE implementation.
|
||||
*
|
||||
*
|
||||
* @author Ivan Iliev - Initial Contribution and API
|
||||
* @author Dennis Nobel - Changed EventBean
|
||||
*/
|
||||
public class SseUtil {
|
||||
static final String TOPIC_VALIDATE_PATTERN = "(\\w*\\*?\\/?,?\\s*)*";
|
||||
|
||||
static {
|
||||
boolean servlet3 = false;
|
||||
try {
|
||||
servlet3 = ServletRequest.class.getMethod("startAsync") != null;
|
||||
} catch (Exception e) {
|
||||
} finally {
|
||||
SERVLET3_SUPPORT = servlet3;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* True if the {@link ServletRequest} class has a "startAsync" method,
|
||||
* otherwise false.
|
||||
*/
|
||||
public static final boolean SERVLET3_SUPPORT;
|
||||
|
||||
/**
|
||||
* Creates a new {@link OutboundEvent} object containing an {@link EventBean} created for the given Eclipse
|
||||
* SmartHome {@link Event}.
|
||||
*
|
||||
*
|
||||
* @param event the event
|
||||
*
|
||||
*
|
||||
* @return a new OutboundEvent
|
||||
*/
|
||||
public static OutboundEvent buildEvent(Event event) {
|
||||
|
@ -70,40 +53,12 @@ public class SseUtil {
|
|||
return outboundEvent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to mark our current thread(request processing) that SSE blocking
|
||||
* should be enabled.
|
||||
*/
|
||||
private static ThreadLocal<Boolean> blockingSseEnabled = new ThreadLocal<Boolean>() {
|
||||
@Override
|
||||
protected Boolean initialValue() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns true if the current thread is processing an SSE request that
|
||||
* should block.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static boolean shouldAsyncBlock() {
|
||||
return blockingSseEnabled.get().booleanValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the current thread as processing a blocking sse request.
|
||||
*/
|
||||
public static void enableBlockingSse() {
|
||||
blockingSseEnabled.set(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the given topicFilter
|
||||
*
|
||||
*
|
||||
* @param topicFilter
|
||||
* @return true if the given input filter is empty or a valid topic filter string
|
||||
*
|
||||
*
|
||||
*/
|
||||
public static boolean isValidTopicFilter(String topicFilter) {
|
||||
return StringUtils.isEmpty(topicFilter) || topicFilter.matches(TOPIC_VALIDATE_PATTERN);
|
||||
|
@ -112,7 +67,7 @@ public class SseUtil {
|
|||
/**
|
||||
* Splits the given topicFilter at any commas (",") and for each token replaces any wildcards(*) with the regex
|
||||
* pattern (.*)
|
||||
*
|
||||
*
|
||||
* @param topicFilter
|
||||
* @return
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue