[tr064] reduce network load and improve XML handling (#9693)

* reduce network load
* adjust soap timeout

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
pull/9762/head
J-N-K 2021-01-09 10:41:22 +01:00 committed by GitHub
parent 43a0439089
commit ef87af3712
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 175 additions and 49 deletions

View File

@ -17,6 +17,7 @@ import static org.openhab.binding.tr064.internal.Tr064BindingConstants.THING_TYP
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -42,6 +43,7 @@ import org.openhab.binding.tr064.internal.phonebook.PhonebookActions;
import org.openhab.binding.tr064.internal.phonebook.PhonebookProvider;
import org.openhab.binding.tr064.internal.phonebook.Tr064PhonebookImpl;
import org.openhab.binding.tr064.internal.soap.SOAPConnector;
import org.openhab.binding.tr064.internal.soap.SOAPRequest;
import org.openhab.binding.tr064.internal.soap.SOAPValueConverter;
import org.openhab.binding.tr064.internal.util.SCPDUtil;
import org.openhab.binding.tr064.internal.util.Util;
@ -81,8 +83,8 @@ public class Tr064RootHandler extends BaseBridgeHandler implements PhonebookProv
private final Map<ChannelUID, Tr064ChannelConfig> channels = new HashMap<>();
// caching is used to prevent excessive calls to the same action
private final ExpiringCacheMap<ChannelUID, State> stateCache = new ExpiringCacheMap<>(2000);
private Collection<Phonebook> phonebooks = Collections.emptyList();
private final ExpiringCacheMap<ChannelUID, State> stateCache = new ExpiringCacheMap<>(Duration.ofMillis(2000));
private Collection<Phonebook> phonebooks = List.of();
private @Nullable ScheduledFuture<?> connectFuture;
private @Nullable ScheduledFuture<?> pollFuture;
@ -222,8 +224,8 @@ public class Tr064RootHandler extends BaseBridgeHandler implements PhonebookProv
this.deviceType = device.getDeviceType();
// try to get security (https) port
SOAPMessage soapResponse = soapConnector.doSOAPRequest(deviceService, "GetSecurityPort",
Collections.emptyMap());
SOAPMessage soapResponse = soapConnector
.doSOAPRequest(new SOAPRequest(deviceService, "GetSecurityPort"));
if (!soapResponse.getSOAPBody().hasFault()) {
SOAPValueConverter soapValueConverter = new SOAPValueConverter(httpClient);
soapValueConverter.getStateFromSOAPValue(soapResponse, "NewSecurityPort", null)
@ -248,8 +250,8 @@ public class Tr064RootHandler extends BaseBridgeHandler implements PhonebookProv
"Could not get service definition for 'urn:DeviceInfo-com:serviceId:DeviceInfo1'"))
.getActionList().stream().filter(action -> action.getName().equals("GetInfo")).findFirst()
.orElseThrow(() -> new SCPDException("Action 'GetInfo' not found"));
SOAPMessage soapResponse1 = soapConnector.doSOAPRequest(deviceService, getInfoAction.getName(),
Collections.emptyMap());
SOAPMessage soapResponse1 = soapConnector
.doSOAPRequest(new SOAPRequest(deviceService, getInfoAction.getName()));
SOAPValueConverter soapValueConverter = new SOAPValueConverter(httpClient);
Map<String, String> properties = editProperties();
PROPERTY_ARGUMENTS.forEach(argumentName -> getInfoAction.getArgumentList().stream()
@ -276,7 +278,7 @@ public class Tr064RootHandler extends BaseBridgeHandler implements PhonebookProv
*/
public List<SCPDDeviceType> getAllSubDevices() {
final SCPDUtil scpdUtil = this.scpdUtil;
return (scpdUtil == null) ? Collections.emptyList() : scpdUtil.getAllSubDevices();
return (scpdUtil == null) ? List.of() : scpdUtil.getAllSubDevices();
}
/**
@ -334,8 +336,8 @@ public class Tr064RootHandler extends BaseBridgeHandler implements PhonebookProv
.map(phonebookList -> Arrays.stream(phonebookList.toString().split(","))).orElse(Stream.empty())
.map(index -> {
try {
SOAPMessage soapMessageURL = soapConnector.doSOAPRequest(scpdService, "GetPhonebook",
Map.of("NewPhonebookID", index));
SOAPMessage soapMessageURL = soapConnector.doSOAPRequest(
new SOAPRequest(scpdService, "GetPhonebook", Map.of("NewPhonebookID", index)));
return soapValueConverter.getStateFromSOAPValue(soapMessageURL, "NewPhonebookURL", null)
.map(url -> (Phonebook) new Tr064PhonebookImpl(httpClient, url.toString()));
} catch (Tr064CommunicationException e) {
@ -357,12 +359,12 @@ public class Tr064RootHandler extends BaseBridgeHandler implements PhonebookProv
phonebooks = scpdService.map(service -> {
try {
return processPhonebookList(
soapConnector.doSOAPRequest(service, "GetPhonebookList", Collections.emptyMap()), service);
return processPhonebookList(soapConnector.doSOAPRequest(new SOAPRequest(service, "GetPhonebookList")),
service);
} catch (Tr064CommunicationException e) {
return Collections.<Phonebook> emptyList();
}
}).orElse(Collections.emptyList());
}).orElse(List.of());
if (phonebooks.isEmpty()) {
logger.warn("Could not get phonebooks for thing {}", thing.getUID());

View File

@ -17,9 +17,11 @@ import static org.openhab.binding.tr064.internal.util.Util.getSOAPElement;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -57,12 +59,15 @@ import org.slf4j.LoggerFactory;
*/
@NonNullByDefault
public class SOAPConnector {
private static final int SOAP_TIMEOUT = 2000; // in ms
private static final int SOAP_TIMEOUT = 5; // in
private final Logger logger = LoggerFactory.getLogger(SOAPConnector.class);
private final HttpClient httpClient;
private final String endpointBaseURL;
private final SOAPValueConverter soapValueConverter;
private final ExpiringCacheMap<SOAPRequest, SOAPMessage> soapMessageCache = new ExpiringCacheMap<>(
Duration.ofMillis(2000));
public SOAPConnector(HttpClient httpClient, String endpointBaseURL) {
this.httpClient = httpClient;
this.endpointBaseURL = endpointBaseURL;
@ -72,15 +77,12 @@ public class SOAPConnector {
/**
* prepare a SOAP request for an action request to a service
*
* @param service the service
* @param soapAction the action to send
* @param arguments arguments to send along with the request
* @param soapRequest the request to be generated
* @return a jetty Request containing the full SOAP message
* @throws IOException if a problem while writing the SOAP message to the Request occurs
* @throws SOAPException if a problem with creating the SOAP message occurs
*/
private Request prepareSOAPRequest(SCPDServiceType service, String soapAction, Map<String, String> arguments)
throws IOException, SOAPException {
private Request prepareSOAPRequest(SOAPRequest soapRequest) throws IOException, SOAPException {
MessageFactory messageFactory = MessageFactory.newInstance();
SOAPMessage soapMessage = messageFactory.createMessage();
SOAPPart soapPart = soapMessage.getSOAPPart();
@ -89,8 +91,9 @@ public class SOAPConnector {
// SOAP body
SOAPBody soapBody = envelope.getBody();
SOAPElement soapBodyElem = soapBody.addChildElement(soapAction, "u", service.getServiceType());
arguments.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(argument -> {
SOAPElement soapBodyElem = soapBody.addChildElement(soapRequest.soapAction, "u",
soapRequest.service.getServiceType());
soapRequest.arguments.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(argument -> {
try {
soapBodyElem.addChildElement(argument.getKey()).setTextContent(argument.getValue());
} catch (SOAPException e) {
@ -101,11 +104,12 @@ public class SOAPConnector {
// SOAP headers
MimeHeaders headers = soapMessage.getMimeHeaders();
headers.addHeader("SOAPAction", service.getServiceType() + "#" + soapAction);
headers.addHeader("SOAPAction", soapRequest.service.getServiceType() + "#" + soapRequest.soapAction);
soapMessage.saveChanges();
// create Request and add headers and content
Request request = httpClient.newRequest(endpointBaseURL + service.getControlURL()).method(HttpMethod.POST);
Request request = httpClient.newRequest(endpointBaseURL + soapRequest.service.getControlURL())
.method(HttpMethod.POST);
((Iterator<MimeHeader>) soapMessage.getMimeHeaders().getAllHeaders())
.forEachRemaining(header -> request.header(header.getName(), header.getValue()));
try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) {
@ -118,19 +122,46 @@ public class SOAPConnector {
}
/**
* execute a SOAP request
* execute a SOAP request with cache
*
* @param service the service to send the action to
* @param soapAction the action itself
* @param arguments arguments to send along with the request
* @param soapRequest the request itself
* @return the SOAPMessage answer from the remote host
* @throws Tr064CommunicationException if an error occurs during the request
*/
public synchronized SOAPMessage doSOAPRequest(SCPDServiceType service, String soapAction,
Map<String, String> arguments) throws Tr064CommunicationException {
public SOAPMessage doSOAPRequest(SOAPRequest soapRequest) throws Tr064CommunicationException {
try {
Request request = prepareSOAPRequest(service, soapAction, arguments).timeout(SOAP_TIMEOUT,
TimeUnit.MILLISECONDS);
SOAPMessage soapMessage = Objects.requireNonNull(soapMessageCache.putIfAbsentAndGet(soapRequest, () -> {
try {
SOAPMessage newValue = doSOAPRequestUncached(soapRequest);
logger.trace("Storing in cache: {}", newValue);
return newValue;
} catch (Tr064CommunicationException e) {
// wrap exception
throw new IllegalArgumentException(e);
}
}));
logger.trace("Returning from cache: {}", soapMessage);
return soapMessage;
} catch (IllegalArgumentException e) {
Throwable cause = e.getCause();
if (cause instanceof Tr064CommunicationException) {
throw (Tr064CommunicationException) cause;
} else {
throw e;
}
}
}
/**
* execute a SOAP request without cache
*
* @param soapRequest the request itself
* @return the SOAPMessage answer from the remote host
* @throws Tr064CommunicationException if an error occurs during the request
*/
public synchronized SOAPMessage doSOAPRequestUncached(SOAPRequest soapRequest) throws Tr064CommunicationException {
try {
Request request = prepareSOAPRequest(soapRequest).timeout(SOAP_TIMEOUT, TimeUnit.SECONDS);
if (logger.isTraceEnabled()) {
request.getContent().forEach(buffer -> logger.trace("Request: {}", new String(buffer.array())));
}
@ -140,8 +171,7 @@ public class SOAPConnector {
// retry once if authentication expired
logger.trace("Re-Auth needed.");
httpClient.getAuthenticationStore().clearAuthenticationResults();
request = prepareSOAPRequest(service, soapAction, arguments).timeout(SOAP_TIMEOUT,
TimeUnit.MILLISECONDS);
request = prepareSOAPRequest(soapRequest).timeout(SOAP_TIMEOUT, TimeUnit.SECONDS);
response = request.send();
}
try (final ByteArrayInputStream is = new ByteArrayInputStream(response.getContent())) {
@ -186,7 +216,9 @@ public class SOAPConnector {
channelConfig.getChannelTypeDescription().getGetAction().getParameter().getName(),
parameter);
}
doSOAPRequest(service, channelTypeDescription.getSetAction().getName(), arguments);
SOAPRequest soapRequest = new SOAPRequest(service,
channelTypeDescription.getSetAction().getName(), arguments);
doSOAPRequestUncached(soapRequest);
} catch (Tr064CommunicationException e) {
logger.warn("Could not send command {}: {}", command, e.getMessage());
}
@ -224,8 +256,8 @@ public class SOAPConnector {
if (parameter != null && !action.getParameter().isInternalOnly()) {
arguments.put(action.getParameter().getName(), parameter);
}
SOAPMessage soapResponse = doSOAPRequest(channelConfig.getService(), getAction.getName(), arguments);
SOAPMessage soapResponse = doSOAPRequest(
new SOAPRequest(channelConfig.getService(), getAction.getName(), arguments));
String argumentName = channelConfig.getChannelTypeDescription().getGetAction().getArgument();
// find all other channels with the same action that are already in cache, so we can update them
Map<ChannelUID, Tr064ChannelConfig> channelsInRequest = channelConfigMap.entrySet().stream()

View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2010-2021 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.tr064.internal.soap;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.tr064.internal.dto.scpd.root.SCPDServiceType;
/**
* The {@link SOAPRequest} is a wrapper for SOAP requests
*
* @author Jan N. Klug - Initial contribution
*/
@NonNullByDefault
public class SOAPRequest {
public SCPDServiceType service;
public String soapAction;
public Map<String, String> arguments = Map.of();
public SOAPRequest(SCPDServiceType service, String soapAction) {
this.service = service;
this.soapAction = soapAction;
}
public SOAPRequest(SCPDServiceType service, String soapAction, Map<String, String> arguments) {
this.service = service;
this.soapAction = soapAction;
this.arguments = arguments;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
SOAPRequest that = (SOAPRequest) o;
if (!service.equals(that.service))
return false;
if (!soapAction.equals(that.soapAction))
return false;
return arguments.equals(that.arguments);
}
@Override
public int hashCode() {
int result = service.hashCode();
result = 31 * result + soapAction.hashCode();
result = 31 * result + arguments.hashCode();
return result;
}
@Override
public String toString() {
return "SOAPRequest{" + "service=" + service + ", soapAction='" + soapAction + '\'' + ", arguments=" + arguments
+ '}';
}
}

View File

@ -17,6 +17,7 @@ import static org.openhab.binding.tr064.internal.Tr064BindingConstants.*;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -49,6 +50,7 @@ import org.openhab.binding.tr064.internal.dto.config.ChannelTypeDescriptions;
import org.openhab.binding.tr064.internal.dto.config.ParameterType;
import org.openhab.binding.tr064.internal.dto.scpd.root.SCPDServiceType;
import org.openhab.binding.tr064.internal.dto.scpd.service.*;
import org.openhab.core.cache.ExpiringCacheMap;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.binding.builder.ChannelBuilder;
@ -67,6 +69,10 @@ import org.w3c.dom.NodeList;
@NonNullByDefault
public class Util {
private static final Logger LOGGER = LoggerFactory.getLogger(Util.class);
private static final int HTTP_REQUEST_TIMEOUT = 5; // in s
// cache XML content for 5s
private static final ExpiringCacheMap<String, Object> XML_OBJECT_CACHE = new ExpiringCacheMap<>(
Duration.ofMillis(3000));
/**
* read the channel config from the resource file (static initialization)
@ -317,23 +323,37 @@ public class Util {
* @param clazz the class describing the XML file
* @return unmarshalling result
*/
@SuppressWarnings("unchecked")
public static <T> @Nullable T getAndUnmarshalXML(HttpClient httpClient, String uri, Class<T> clazz) {
try {
ContentResponse contentResponse = httpClient.newRequest(uri).timeout(2, TimeUnit.SECONDS)
.method(HttpMethod.GET).send();
byte[] response = contentResponse.getContent();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("XML = {}", new String(response));
}
InputStream xml = new ByteArrayInputStream(response);
T returnValue = (T) XML_OBJECT_CACHE.putIfAbsentAndGet(uri, () -> {
try {
LOGGER.trace("Refreshing cache for '{}'", uri);
ContentResponse contentResponse = httpClient.newRequest(uri)
.timeout(HTTP_REQUEST_TIMEOUT, TimeUnit.SECONDS).method(HttpMethod.GET).send();
byte[] response = contentResponse.getContent();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("XML = {}", new String(response));
}
InputStream xml = new ByteArrayInputStream(response);
JAXBContext context = JAXBContext.newInstance(clazz);
Unmarshaller um = context.createUnmarshaller();
return um.unmarshal(new StreamSource(xml), clazz).getValue();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
LOGGER.debug("HTTP Failed to GET uri '{}': {}", uri, e.getMessage());
} catch (JAXBException e) {
LOGGER.debug("Unmarshalling failed: {}", e.getMessage());
JAXBContext context = JAXBContext.newInstance(clazz);
Unmarshaller um = context.createUnmarshaller();
T newValue = um.unmarshal(new StreamSource(xml), clazz).getValue();
LOGGER.trace("Storing in cache {}", newValue);
return newValue;
} catch (ExecutionException | InterruptedException | TimeoutException e) {
LOGGER.debug("HTTP Failed to GET uri '{}': {}", uri, e.getMessage());
throw new IllegalArgumentException();
} catch (JAXBException e) {
LOGGER.debug("Unmarshalling failed: {}", e.getMessage());
throw new IllegalArgumentException();
}
});
LOGGER.trace("Returning from cache: {}", returnValue);
return returnValue;
} catch (IllegalArgumentException e) {
// already logged
}
return null;
}