Prevent duplicate events in WatchService (#3404)
On Samba shares a single MODIFY event may result in different sequences of CREATE/DELETE/MODIFY events. Unfortunately this is not properly handled by the underlying library and we have to re-introduce our ugly workaround. The solution itself is a bit improved over the old one as it not only keeps the last event but determines the correct event by examining the sequence of received events. Signed-off-by: Jan N. Klug <github@klug.nrw>pull/3427/head
parent
ac88faa5dc
commit
07e8ff56c7
|
@ -53,7 +53,7 @@ public class ConfigDispatcherFileWatcher implements WatchService.WatchEventListe
|
|||
|
||||
this.watchService = watchService;
|
||||
|
||||
watchService.registerListener(this, Path.of(servicesFolder));
|
||||
watchService.registerListener(this, Path.of(servicesFolder), false);
|
||||
configDispatcher.processConfigFile(Path.of(OpenHAB.getConfigFolder(), servicesFolder).toFile());
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
*/
|
||||
package org.openhab.core.model.core.internal.folder;
|
||||
|
||||
import static java.nio.file.StandardWatchEventKinds.*;
|
||||
import static org.openhab.core.service.WatchService.Kind.CREATE;
|
||||
import static org.openhab.core.service.WatchService.Kind.MODIFY;
|
||||
|
||||
|
@ -27,8 +26,8 @@ import java.util.Dictionary;
|
|||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -64,7 +63,7 @@ import org.slf4j.LoggerFactory;
|
|||
@Component(name = "org.openhab.core.folder", immediate = true, configurationPid = "org.openhab.folder", configurationPolicy = ConfigurationPolicy.REQUIRE)
|
||||
public class FolderObserver implements WatchService.WatchEventListener {
|
||||
private final WatchService watchService;
|
||||
private Logger logger = LoggerFactory.getLogger(FolderObserver.class);
|
||||
private final Logger logger = LoggerFactory.getLogger(FolderObserver.class);
|
||||
|
||||
/* the model repository is provided as a service */
|
||||
private final ModelRepository modelRepository;
|
||||
|
@ -75,7 +74,7 @@ public class FolderObserver implements WatchService.WatchEventListener {
|
|||
private boolean activated;
|
||||
|
||||
/* map that stores a list of valid file extensions for each folder */
|
||||
private final Map<String, String[]> folderFileExtMap = new ConcurrentHashMap<>();
|
||||
private final Map<String, List<String>> folderFileExtMap = new ConcurrentHashMap<>();
|
||||
|
||||
/* set of file extensions for which we have parsers already registered */
|
||||
private final Set<String> parsers = new HashSet<>();
|
||||
|
@ -124,16 +123,16 @@ public class FolderObserver implements WatchService.WatchEventListener {
|
|||
|
||||
String[] fileExts = ((String) config.get(foldername)).split(",");
|
||||
|
||||
File folder = getFile(foldername);
|
||||
File folder = watchService.getWatchPath().resolve(foldername).toFile();
|
||||
if (folder.exists() && folder.isDirectory()) {
|
||||
folderFileExtMap.put(foldername, fileExts);
|
||||
folderFileExtMap.put(foldername, Arrays.asList(fileExts));
|
||||
} else {
|
||||
logger.warn("Directory '{}' does not exist in '{}'. Please check your configuration settings!",
|
||||
foldername, OpenHAB.getConfigFolder());
|
||||
}
|
||||
}
|
||||
|
||||
watchService.registerListener(this, folderFileExtMap.keySet().stream().map(Path::of).toList());
|
||||
watchService.registerListener(this, Path.of(""));
|
||||
addModelsToRepo();
|
||||
this.activated = true;
|
||||
}
|
||||
|
@ -162,9 +161,9 @@ public class FolderObserver implements WatchService.WatchEventListener {
|
|||
private void addModelsToRepo() {
|
||||
if (!folderFileExtMap.isEmpty()) {
|
||||
for (String folderName : folderFileExtMap.keySet()) {
|
||||
final String[] validExtension = folderFileExtMap.get(folderName);
|
||||
if (validExtension != null && validExtension.length > 0) {
|
||||
File folder = getFile(folderName);
|
||||
final List<String> validExtension = folderFileExtMap.get(folderName);
|
||||
if (validExtension != null && validExtension.size() > 0) {
|
||||
File folder = watchService.getWatchPath().resolve(folderName).toFile();
|
||||
|
||||
File[] files = folder.listFiles(new FileExtensionsFilter(validExtension));
|
||||
if (files != null) {
|
||||
|
@ -196,9 +195,9 @@ public class FolderObserver implements WatchService.WatchEventListener {
|
|||
|
||||
protected static class FileExtensionsFilter implements FilenameFilter {
|
||||
|
||||
private final String[] validExtensions;
|
||||
private final List<String> validExtensions;
|
||||
|
||||
public FileExtensionsFilter(String[] validExtensions) {
|
||||
public FileExtensionsFilter(List<String> validExtensions) {
|
||||
this.validExtensions = validExtensions;
|
||||
}
|
||||
|
||||
|
@ -213,7 +212,6 @@ public class FolderObserver implements WatchService.WatchEventListener {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private void checkFile(final ModelRepository modelRepository, final File file, final WatchService.Kind kind) {
|
||||
try {
|
||||
synchronized (FolderObserver.class) {
|
||||
|
@ -238,33 +236,6 @@ public class FolderObserver implements WatchService.WatchEventListener {
|
|||
}
|
||||
}
|
||||
|
||||
private @Nullable File getFileByFileExtMap(Map<String, String[]> folderFileExtMap, String filename) {
|
||||
if (!filename.trim().isEmpty() && !folderFileExtMap.isEmpty()) {
|
||||
String extension = getExtension(filename);
|
||||
if (extension != null && !extension.trim().isEmpty()) {
|
||||
Set<Entry<String, String[]>> entries = folderFileExtMap.entrySet();
|
||||
for (Entry<String, String[]> entry : entries) {
|
||||
if (Arrays.asList(entry.getValue()).contains(extension)) {
|
||||
return new File(getFile(entry.getKey()) + File.separator + filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link File} object for the given filename. <br />
|
||||
* It must be contained in the configuration folder
|
||||
*
|
||||
* @param filename
|
||||
* the file name to get the {@link File} for
|
||||
* @return the corresponding {@link File}
|
||||
*/
|
||||
protected File getFile(String filename) {
|
||||
return new File(OpenHAB.getConfigFolder() + File.separator + filename);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the extension of the given file
|
||||
*
|
||||
|
@ -282,9 +253,18 @@ public class FolderObserver implements WatchService.WatchEventListener {
|
|||
|
||||
@Override
|
||||
public void processWatchEvent(WatchService.Kind kind, Path path) {
|
||||
File toCheck = getFileByFileExtMap(folderFileExtMap, path.getFileName().toString());
|
||||
if (toCheck != null && !toCheck.isHidden()) {
|
||||
checkFile(modelRepository, toCheck, kind);
|
||||
if (path.getNameCount() != 2) {
|
||||
logger.trace("{} event for {} ignored, only depth 1 is allowed.", kind, path);
|
||||
return;
|
||||
}
|
||||
|
||||
String fileExtension = getExtension(path.getFileName().toString());
|
||||
List<String> validExtensions = folderFileExtMap.get(path.getName(0).toString());
|
||||
if (fileExtension != null && validExtensions != null && validExtensions.contains(fileExtension)) {
|
||||
File toCheck = watchService.getWatchPath().resolve(path).toFile();
|
||||
if (!toCheck.isHidden()) {
|
||||
checkFile(modelRepository, toCheck, kind);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,13 +91,9 @@ public class FolderObserverTest extends JavaTest {
|
|||
|
||||
when(modelParserMock.getExtension()).thenReturn("java");
|
||||
when(contextMock.getProperties()).thenReturn(configProps);
|
||||
when(watchServiceMock.getWatchPath()).thenReturn(WATCHED_DIRECTORY.toPath());
|
||||
|
||||
folderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock) {
|
||||
@Override
|
||||
protected File getFile(String filename) {
|
||||
return new File(WATCHED_DIRECTORY + File.separator + filename);
|
||||
}
|
||||
};
|
||||
folderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock);
|
||||
folderObserver.addModelParser(modelParserMock);
|
||||
}
|
||||
|
||||
|
@ -134,7 +130,7 @@ public class FolderObserverTest extends JavaTest {
|
|||
Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE);
|
||||
|
||||
waitForAssert(() -> assertThat(file.exists(), is(true)));
|
||||
folderObserver.processWatchEvent(CREATE, file.toPath());
|
||||
folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath()));
|
||||
|
||||
verify(modelRepoMock).addOrRefreshModel(eq(file.getName()), any());
|
||||
verifyNoMoreInteractions(modelRepoMock);
|
||||
|
@ -159,12 +155,12 @@ public class FolderObserverTest extends JavaTest {
|
|||
Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE);
|
||||
|
||||
waitForAssert(() -> assertThat(file.exists(), is(true)));
|
||||
folderObserver.processWatchEvent(CREATE, file.toPath());
|
||||
folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath()));
|
||||
|
||||
String text = "Additional content";
|
||||
Files.writeString(file.toPath(), text, StandardCharsets.UTF_8, StandardOpenOption.APPEND);
|
||||
|
||||
folderObserver.processWatchEvent(MODIFY, file.toPath());
|
||||
folderObserver.processWatchEvent(MODIFY, WATCHED_DIRECTORY.toPath().relativize(file.toPath()));
|
||||
|
||||
verify(modelRepoMock, times(2)).addOrRefreshModel(eq(file.getName()), any());
|
||||
verifyNoMoreInteractions(modelRepoMock);
|
||||
|
@ -188,7 +184,7 @@ public class FolderObserverTest extends JavaTest {
|
|||
Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE);
|
||||
waitForAssert(() -> assertThat(file.exists(), is(true)));
|
||||
|
||||
folderObserver.processWatchEvent(CREATE, file.toPath());
|
||||
folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath()));
|
||||
|
||||
verifyNoInteractions(modelRepoMock);
|
||||
}
|
||||
|
@ -208,7 +204,7 @@ public class FolderObserverTest extends JavaTest {
|
|||
File file = new File(EXISTING_SUBDIR_PATH, "NewlyCreatedMockFile.java");
|
||||
Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE);
|
||||
waitForAssert(() -> assertThat(file.exists(), is(true)));
|
||||
folderObserver.processWatchEvent(CREATE, file.toPath());
|
||||
folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath()));
|
||||
|
||||
verifyNoInteractions(modelRepoMock);
|
||||
}
|
||||
|
@ -261,7 +257,7 @@ public class FolderObserverTest extends JavaTest {
|
|||
Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE);
|
||||
waitForAssert(() -> assertThat(file.exists(), is(true)));
|
||||
|
||||
folderObserver.processWatchEvent(CREATE, file.toPath());
|
||||
folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath()));
|
||||
|
||||
verifyNoInteractions(modelRepoMock);
|
||||
}
|
||||
|
@ -270,13 +266,7 @@ public class FolderObserverTest extends JavaTest {
|
|||
public void testException() throws Exception {
|
||||
when(modelRepoMock.addOrRefreshModel(any(), any())).thenThrow(new IllegalStateException("intentional failure"));
|
||||
|
||||
FolderObserver localFolderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock) {
|
||||
@Override
|
||||
protected File getFile(String filename) {
|
||||
return new File(WATCHED_DIRECTORY + File.separator + filename);
|
||||
}
|
||||
};
|
||||
|
||||
FolderObserver localFolderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock);
|
||||
localFolderObserver.addModelParser(modelParserMock);
|
||||
|
||||
String validExtension = "java";
|
||||
|
@ -286,11 +276,13 @@ public class FolderObserverTest extends JavaTest {
|
|||
File mockFileWithValidExt = new File(EXISTING_SUBDIR_PATH, "MockFileForModification." + validExtension);
|
||||
Files.writeString(mockFileWithValidExt.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8,
|
||||
StandardOpenOption.CREATE);
|
||||
localFolderObserver.processWatchEvent(CREATE, mockFileWithValidExt.toPath());
|
||||
localFolderObserver.processWatchEvent(CREATE,
|
||||
WATCHED_DIRECTORY.toPath().relativize(mockFileWithValidExt.toPath()));
|
||||
|
||||
Files.writeString(mockFileWithValidExt.toPath(), "Additional content", StandardCharsets.UTF_8,
|
||||
StandardOpenOption.APPEND);
|
||||
localFolderObserver.processWatchEvent(MODIFY, mockFileWithValidExt.toPath());
|
||||
localFolderObserver.processWatchEvent(MODIFY,
|
||||
WATCHED_DIRECTORY.toPath().relativize(mockFileWithValidExt.toPath()));
|
||||
|
||||
verify(modelRepoMock, times(2)).addOrRefreshModel(eq(mockFileWithValidExt.getName()), any());
|
||||
}
|
||||
|
|
|
@ -17,12 +17,18 @@ import java.nio.file.Files;
|
|||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Dictionary;
|
||||
import java.util.HashMap;
|
||||
import java.util.Hashtable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
|
@ -52,6 +58,8 @@ import io.methvin.watcher.DirectoryWatcher;
|
|||
@Component(immediate = true, service = WatchService.class, configurationPid = WatchService.SERVICE_PID, configurationPolicy = ConfigurationPolicy.REQUIRE)
|
||||
public class WatchServiceImpl implements WatchService, DirectoryChangeListener {
|
||||
|
||||
public static final int PROCESSING_TIME = 1000;
|
||||
|
||||
public @interface WatchServiceConfiguration {
|
||||
String name() default "";
|
||||
|
||||
|
@ -63,6 +71,8 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener {
|
|||
private final List<Listener> dirPathListeners = new CopyOnWriteArrayList<>();
|
||||
private final List<Listener> subDirPathListeners = new CopyOnWriteArrayList<>();
|
||||
private final ExecutorService executor;
|
||||
private final ScheduledExecutorService scheduler;
|
||||
|
||||
private final String name;
|
||||
private final BundleContext bundleContext;
|
||||
|
||||
|
@ -70,6 +80,9 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener {
|
|||
private @Nullable DirectoryWatcher dirWatcher;
|
||||
private @Nullable ServiceRegistration<WatchService> reg;
|
||||
|
||||
private final Map<Path, ScheduledFuture<?>> scheduledEvents = new HashMap<>();
|
||||
private final Map<Path, List<Kind>> scheduledEventKinds = new ConcurrentHashMap<>();
|
||||
|
||||
@Activate
|
||||
public WatchServiceImpl(WatchServiceConfiguration config, BundleContext bundleContext) throws IOException {
|
||||
this.bundleContext = bundleContext;
|
||||
|
@ -79,7 +92,7 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener {
|
|||
|
||||
this.name = config.name();
|
||||
executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
|
||||
|
||||
scheduler = ThreadPoolManager.getScheduledPool("watchservice");
|
||||
modified(config);
|
||||
}
|
||||
|
||||
|
@ -207,6 +220,50 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener {
|
|||
case OVERFLOW -> Kind.OVERFLOW;
|
||||
};
|
||||
|
||||
synchronized (scheduledEvents) {
|
||||
ScheduledFuture<?> future = scheduledEvents.remove(path);
|
||||
if (future != null && !future.isDone()) {
|
||||
future.cancel(true);
|
||||
}
|
||||
future = scheduler.schedule(() -> notifyListeners(path), PROCESSING_TIME, TimeUnit.MILLISECONDS);
|
||||
scheduledEventKinds.computeIfAbsent(path, k -> new CopyOnWriteArrayList<>()).add(kind);
|
||||
scheduledEvents.put(path, future);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyListeners(Path path) {
|
||||
List<Kind> kinds = scheduledEventKinds.remove(path);
|
||||
if (kinds == null || kinds.isEmpty()) {
|
||||
logger.debug("Tried to notify listeners of change events for '{}', but the event list is empty.", path);
|
||||
return;
|
||||
}
|
||||
|
||||
if (kinds.size() == 1) {
|
||||
// we have only one event
|
||||
doNotify(path, kinds.get(0));
|
||||
return;
|
||||
}
|
||||
|
||||
Kind firstElement = kinds.get(0);
|
||||
Kind lastElement = kinds.get(kinds.size() - 1);
|
||||
|
||||
// determine final event
|
||||
if (lastElement == Kind.DELETE) {
|
||||
if (firstElement == Kind.CREATE) {
|
||||
logger.debug("Discarding events for '{}' because file was immediately deleted bafter creation", path);
|
||||
return;
|
||||
}
|
||||
doNotify(path, Kind.DELETE);
|
||||
} else if (firstElement == Kind.CREATE) {
|
||||
doNotify(path, Kind.CREATE);
|
||||
} else {
|
||||
doNotify(path, Kind.MODIFY);
|
||||
}
|
||||
}
|
||||
|
||||
private void doNotify(Path path, Kind kind) {
|
||||
logger.trace("Notifying listeners of '{}' event for '{}'.", kind, path);
|
||||
subDirPathListeners.stream().filter(isChildOf(path)).forEach(l -> l.notify(path, kind));
|
||||
dirPathListeners.stream().filter(isDirectChildOf(path)).forEach(l -> l.notify(path, kind));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue