Avoid parallel streams with common thread pool to avoid deadlocks (#3137)

* Avoid parallel streams with common thread pool to avoid deadlocks

To mitigate issue #3125

Signed-off-by: Sami Salonen <ssalonen@gmail.com>
pull/3135/head
Sami Salonen 2022-11-01 21:05:54 +02:00 committed by GitHub
parent 221c80b12f
commit 18d063e272
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 10 additions and 9 deletions

View File

@ -44,7 +44,7 @@ public class Subscription {
public void add(MqttMessageSubscriber subscriber) {
if (subscribers.add(subscriber)) {
// new subscriber. deliver all known retained messages
retainedMessages.entrySet().parallelStream().forEach(entry -> {
retainedMessages.entrySet().stream().forEach(entry -> {
if (entry.getValue().length > 0) {
processMessage(subscriber, entry.getKey(), entry.getValue());
}
@ -81,7 +81,7 @@ public class Subscription {
if (retain || retainedMessages.containsKey(topic)) {
retainedMessages.put(topic, payload);
}
subscribers.parallelStream().forEach(subscriber -> processMessage(subscriber, topic, payload));
subscribers.stream().forEach(subscriber -> processMessage(subscriber, topic, payload));
}
private void processMessage(MqttMessageSubscriber subscriber, String topic, byte[] payload) {

View File

@ -12,6 +12,7 @@
*/
package org.openhab.core.thing.link;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -134,7 +135,7 @@ public abstract class AbstractLinkRegistry<L extends AbstractLink, P extends Pro
if (forItemName == null || forLinkedUID == null) {
return false;
} else {
return forItemName.parallelStream().anyMatch(forLinkedUID::contains);
return !Collections.disjoint(forItemName, forLinkedUID);
}
} finally {
toLinkLock.readLock().unlock();
@ -184,7 +185,7 @@ public abstract class AbstractLinkRegistry<L extends AbstractLink, P extends Pro
if (forLinkedUID == null) {
return Set.of();
}
return forLinkedUID.parallelStream().map(link -> link.getItemName()).collect(Collectors.toSet());
return forLinkedUID.stream().map(link -> link.getItemName()).collect(Collectors.toSet());
} finally {
toLinkLock.readLock().unlock();
}

View File

@ -67,12 +67,12 @@ public class ItemChannelLinkRegistry extends AbstractLinkRegistry<ItemChannelLin
* @return an unmodifiable set of bound channels for the given item name
*/
public Set<ChannelUID> getBoundChannels(final String itemName) {
return getLinks(itemName).parallelStream().map(link -> link.getLinkedUID()).collect(Collectors.toSet());
return getLinks(itemName).stream().map(link -> link.getLinkedUID()).collect(Collectors.toSet());
}
@Override
public Set<String> getLinkedItemNames(final UID uid) {
return super.getLinkedItemNames(uid).parallelStream().filter(itemName -> itemRegistry.get(itemName) != null)
return super.getLinkedItemNames(uid).stream().filter(itemName -> itemRegistry.get(itemName) != null)
.collect(Collectors.toSet());
}
@ -83,8 +83,8 @@ public class ItemChannelLinkRegistry extends AbstractLinkRegistry<ItemChannelLin
* @return an unmodifiable set of bound items for the given channel UID
*/
public Set<Item> getLinkedItems(final UID uid) {
return ((Stream<Item>) super.getLinkedItemNames(uid).parallelStream()
.map(itemName -> itemRegistry.get(itemName)).filter(Objects::nonNull)).collect(Collectors.toSet());
return ((Stream<Item>) super.getLinkedItemNames(uid).stream().map(itemName -> itemRegistry.get(itemName))
.filter(Objects::nonNull)).collect(Collectors.toSet());
}
/**
@ -94,7 +94,7 @@ public class ItemChannelLinkRegistry extends AbstractLinkRegistry<ItemChannelLin
* @return an unmodifiable set of bound things for the given item name
*/
public Set<Thing> getBoundThings(final String itemName) {
return ((Stream<Thing>) getBoundChannels(itemName).parallelStream()
return ((Stream<Thing>) getBoundChannels(itemName).stream()
.map(channelUID -> thingRegistry.get(channelUID.getThingUID())).filter(Objects::nonNull))
.collect(Collectors.toSet());
}