From 07e8ff56c7365743bb62632eaa4bc81c5c9b5a20 Mon Sep 17 00:00:00 2001 From: J-N-K Date: Sun, 5 Mar 2023 09:44:23 +0100 Subject: [PATCH] Prevent duplicate events in WatchService (#3404) On Samba shares a single MODIFY event may result in different sequences of CREATE/DELETE/MODIFY events. Unfortunately this is not properly handled by the underlying library and we have to re-introduce our ugly workaround. The solution itself is a bit improved over the old one as it not only keeps the last event but determines the correct event by examining the sequence of received events. Signed-off-by: Jan N. Klug --- .../internal/ConfigDispatcherFileWatcher.java | 2 +- .../core/internal/folder/FolderObserver.java | 66 +++++++------------ .../internal/folder/FolderObserverTest.java | 34 ++++------ .../internal/service/WatchServiceImpl.java | 59 ++++++++++++++++- 4 files changed, 95 insertions(+), 66 deletions(-) diff --git a/bundles/org.openhab.core.config.dispatch/src/main/java/org/openhab/core/config/dispatch/internal/ConfigDispatcherFileWatcher.java b/bundles/org.openhab.core.config.dispatch/src/main/java/org/openhab/core/config/dispatch/internal/ConfigDispatcherFileWatcher.java index 05442c79f..6d64fb491 100644 --- a/bundles/org.openhab.core.config.dispatch/src/main/java/org/openhab/core/config/dispatch/internal/ConfigDispatcherFileWatcher.java +++ b/bundles/org.openhab.core.config.dispatch/src/main/java/org/openhab/core/config/dispatch/internal/ConfigDispatcherFileWatcher.java @@ -53,7 +53,7 @@ public class ConfigDispatcherFileWatcher implements WatchService.WatchEventListe this.watchService = watchService; - watchService.registerListener(this, Path.of(servicesFolder)); + watchService.registerListener(this, Path.of(servicesFolder), false); configDispatcher.processConfigFile(Path.of(OpenHAB.getConfigFolder(), servicesFolder).toFile()); } diff --git a/bundles/org.openhab.core.model.core/src/main/java/org/openhab/core/model/core/internal/folder/FolderObserver.java b/bundles/org.openhab.core.model.core/src/main/java/org/openhab/core/model/core/internal/folder/FolderObserver.java index 7d3cfacd7..ed4d2f77f 100644 --- a/bundles/org.openhab.core.model.core/src/main/java/org/openhab/core/model/core/internal/folder/FolderObserver.java +++ b/bundles/org.openhab.core.model.core/src/main/java/org/openhab/core/model/core/internal/folder/FolderObserver.java @@ -12,7 +12,6 @@ */ package org.openhab.core.model.core.internal.folder; -import static java.nio.file.StandardWatchEventKinds.*; import static org.openhab.core.service.WatchService.Kind.CREATE; import static org.openhab.core.service.WatchService.Kind.MODIFY; @@ -27,8 +26,8 @@ import java.util.Dictionary; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -64,7 +63,7 @@ import org.slf4j.LoggerFactory; @Component(name = "org.openhab.core.folder", immediate = true, configurationPid = "org.openhab.folder", configurationPolicy = ConfigurationPolicy.REQUIRE) public class FolderObserver implements WatchService.WatchEventListener { private final WatchService watchService; - private Logger logger = LoggerFactory.getLogger(FolderObserver.class); + private final Logger logger = LoggerFactory.getLogger(FolderObserver.class); /* the model repository is provided as a service */ private final ModelRepository modelRepository; @@ -75,7 +74,7 @@ public class FolderObserver implements WatchService.WatchEventListener { private boolean activated; /* map that stores a list of valid file extensions for each folder */ - private final Map folderFileExtMap = new ConcurrentHashMap<>(); + private final Map> folderFileExtMap = new ConcurrentHashMap<>(); /* set of file extensions for which we have parsers already registered */ private final Set parsers = new HashSet<>(); @@ -124,16 +123,16 @@ public class FolderObserver implements WatchService.WatchEventListener { String[] fileExts = ((String) config.get(foldername)).split(","); - File folder = getFile(foldername); + File folder = watchService.getWatchPath().resolve(foldername).toFile(); if (folder.exists() && folder.isDirectory()) { - folderFileExtMap.put(foldername, fileExts); + folderFileExtMap.put(foldername, Arrays.asList(fileExts)); } else { logger.warn("Directory '{}' does not exist in '{}'. Please check your configuration settings!", foldername, OpenHAB.getConfigFolder()); } } - watchService.registerListener(this, folderFileExtMap.keySet().stream().map(Path::of).toList()); + watchService.registerListener(this, Path.of("")); addModelsToRepo(); this.activated = true; } @@ -162,9 +161,9 @@ public class FolderObserver implements WatchService.WatchEventListener { private void addModelsToRepo() { if (!folderFileExtMap.isEmpty()) { for (String folderName : folderFileExtMap.keySet()) { - final String[] validExtension = folderFileExtMap.get(folderName); - if (validExtension != null && validExtension.length > 0) { - File folder = getFile(folderName); + final List validExtension = folderFileExtMap.get(folderName); + if (validExtension != null && validExtension.size() > 0) { + File folder = watchService.getWatchPath().resolve(folderName).toFile(); File[] files = folder.listFiles(new FileExtensionsFilter(validExtension)); if (files != null) { @@ -196,9 +195,9 @@ public class FolderObserver implements WatchService.WatchEventListener { protected static class FileExtensionsFilter implements FilenameFilter { - private final String[] validExtensions; + private final List validExtensions; - public FileExtensionsFilter(String[] validExtensions) { + public FileExtensionsFilter(List validExtensions) { this.validExtensions = validExtensions; } @@ -213,7 +212,6 @@ public class FolderObserver implements WatchService.WatchEventListener { } } - @SuppressWarnings("rawtypes") private void checkFile(final ModelRepository modelRepository, final File file, final WatchService.Kind kind) { try { synchronized (FolderObserver.class) { @@ -238,33 +236,6 @@ public class FolderObserver implements WatchService.WatchEventListener { } } - private @Nullable File getFileByFileExtMap(Map folderFileExtMap, String filename) { - if (!filename.trim().isEmpty() && !folderFileExtMap.isEmpty()) { - String extension = getExtension(filename); - if (extension != null && !extension.trim().isEmpty()) { - Set> entries = folderFileExtMap.entrySet(); - for (Entry entry : entries) { - if (Arrays.asList(entry.getValue()).contains(extension)) { - return new File(getFile(entry.getKey()) + File.separator + filename); - } - } - } - } - return null; - } - - /** - * Returns the {@link File} object for the given filename.
- * It must be contained in the configuration folder - * - * @param filename - * the file name to get the {@link File} for - * @return the corresponding {@link File} - */ - protected File getFile(String filename) { - return new File(OpenHAB.getConfigFolder() + File.separator + filename); - } - /** * Returns the extension of the given file * @@ -282,9 +253,18 @@ public class FolderObserver implements WatchService.WatchEventListener { @Override public void processWatchEvent(WatchService.Kind kind, Path path) { - File toCheck = getFileByFileExtMap(folderFileExtMap, path.getFileName().toString()); - if (toCheck != null && !toCheck.isHidden()) { - checkFile(modelRepository, toCheck, kind); + if (path.getNameCount() != 2) { + logger.trace("{} event for {} ignored, only depth 1 is allowed.", kind, path); + return; + } + + String fileExtension = getExtension(path.getFileName().toString()); + List validExtensions = folderFileExtMap.get(path.getName(0).toString()); + if (fileExtension != null && validExtensions != null && validExtensions.contains(fileExtension)) { + File toCheck = watchService.getWatchPath().resolve(path).toFile(); + if (!toCheck.isHidden()) { + checkFile(modelRepository, toCheck, kind); + } } } } diff --git a/bundles/org.openhab.core.model.core/src/test/java/org/openhab/core/model/core/internal/folder/FolderObserverTest.java b/bundles/org.openhab.core.model.core/src/test/java/org/openhab/core/model/core/internal/folder/FolderObserverTest.java index 0fbed211b..75f45a445 100644 --- a/bundles/org.openhab.core.model.core/src/test/java/org/openhab/core/model/core/internal/folder/FolderObserverTest.java +++ b/bundles/org.openhab.core.model.core/src/test/java/org/openhab/core/model/core/internal/folder/FolderObserverTest.java @@ -91,13 +91,9 @@ public class FolderObserverTest extends JavaTest { when(modelParserMock.getExtension()).thenReturn("java"); when(contextMock.getProperties()).thenReturn(configProps); + when(watchServiceMock.getWatchPath()).thenReturn(WATCHED_DIRECTORY.toPath()); - folderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock) { - @Override - protected File getFile(String filename) { - return new File(WATCHED_DIRECTORY + File.separator + filename); - } - }; + folderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock); folderObserver.addModelParser(modelParserMock); } @@ -134,7 +130,7 @@ public class FolderObserverTest extends JavaTest { Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE); waitForAssert(() -> assertThat(file.exists(), is(true))); - folderObserver.processWatchEvent(CREATE, file.toPath()); + folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath())); verify(modelRepoMock).addOrRefreshModel(eq(file.getName()), any()); verifyNoMoreInteractions(modelRepoMock); @@ -159,12 +155,12 @@ public class FolderObserverTest extends JavaTest { Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE); waitForAssert(() -> assertThat(file.exists(), is(true))); - folderObserver.processWatchEvent(CREATE, file.toPath()); + folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath())); String text = "Additional content"; Files.writeString(file.toPath(), text, StandardCharsets.UTF_8, StandardOpenOption.APPEND); - folderObserver.processWatchEvent(MODIFY, file.toPath()); + folderObserver.processWatchEvent(MODIFY, WATCHED_DIRECTORY.toPath().relativize(file.toPath())); verify(modelRepoMock, times(2)).addOrRefreshModel(eq(file.getName()), any()); verifyNoMoreInteractions(modelRepoMock); @@ -188,7 +184,7 @@ public class FolderObserverTest extends JavaTest { Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE); waitForAssert(() -> assertThat(file.exists(), is(true))); - folderObserver.processWatchEvent(CREATE, file.toPath()); + folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath())); verifyNoInteractions(modelRepoMock); } @@ -208,7 +204,7 @@ public class FolderObserverTest extends JavaTest { File file = new File(EXISTING_SUBDIR_PATH, "NewlyCreatedMockFile.java"); Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE); waitForAssert(() -> assertThat(file.exists(), is(true))); - folderObserver.processWatchEvent(CREATE, file.toPath()); + folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath())); verifyNoInteractions(modelRepoMock); } @@ -261,7 +257,7 @@ public class FolderObserverTest extends JavaTest { Files.writeString(file.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE); waitForAssert(() -> assertThat(file.exists(), is(true))); - folderObserver.processWatchEvent(CREATE, file.toPath()); + folderObserver.processWatchEvent(CREATE, WATCHED_DIRECTORY.toPath().relativize(file.toPath())); verifyNoInteractions(modelRepoMock); } @@ -270,13 +266,7 @@ public class FolderObserverTest extends JavaTest { public void testException() throws Exception { when(modelRepoMock.addOrRefreshModel(any(), any())).thenThrow(new IllegalStateException("intentional failure")); - FolderObserver localFolderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock) { - @Override - protected File getFile(String filename) { - return new File(WATCHED_DIRECTORY + File.separator + filename); - } - }; - + FolderObserver localFolderObserver = new FolderObserver(modelRepoMock, readyServiceMock, watchServiceMock); localFolderObserver.addModelParser(modelParserMock); String validExtension = "java"; @@ -286,11 +276,13 @@ public class FolderObserverTest extends JavaTest { File mockFileWithValidExt = new File(EXISTING_SUBDIR_PATH, "MockFileForModification." + validExtension); Files.writeString(mockFileWithValidExt.toPath(), INITIAL_FILE_CONTENT, StandardCharsets.UTF_8, StandardOpenOption.CREATE); - localFolderObserver.processWatchEvent(CREATE, mockFileWithValidExt.toPath()); + localFolderObserver.processWatchEvent(CREATE, + WATCHED_DIRECTORY.toPath().relativize(mockFileWithValidExt.toPath())); Files.writeString(mockFileWithValidExt.toPath(), "Additional content", StandardCharsets.UTF_8, StandardOpenOption.APPEND); - localFolderObserver.processWatchEvent(MODIFY, mockFileWithValidExt.toPath()); + localFolderObserver.processWatchEvent(MODIFY, + WATCHED_DIRECTORY.toPath().relativize(mockFileWithValidExt.toPath())); verify(modelRepoMock, times(2)).addOrRefreshModel(eq(mockFileWithValidExt.getName()), any()); } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/service/WatchServiceImpl.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/service/WatchServiceImpl.java index 456d13a94..fb0afbebe 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/service/WatchServiceImpl.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/service/WatchServiceImpl.java @@ -17,12 +17,18 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.Dictionary; +import java.util.HashMap; import java.util.Hashtable; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -52,6 +58,8 @@ import io.methvin.watcher.DirectoryWatcher; @Component(immediate = true, service = WatchService.class, configurationPid = WatchService.SERVICE_PID, configurationPolicy = ConfigurationPolicy.REQUIRE) public class WatchServiceImpl implements WatchService, DirectoryChangeListener { + public static final int PROCESSING_TIME = 1000; + public @interface WatchServiceConfiguration { String name() default ""; @@ -63,6 +71,8 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener { private final List dirPathListeners = new CopyOnWriteArrayList<>(); private final List subDirPathListeners = new CopyOnWriteArrayList<>(); private final ExecutorService executor; + private final ScheduledExecutorService scheduler; + private final String name; private final BundleContext bundleContext; @@ -70,6 +80,9 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener { private @Nullable DirectoryWatcher dirWatcher; private @Nullable ServiceRegistration reg; + private final Map> scheduledEvents = new HashMap<>(); + private final Map> scheduledEventKinds = new ConcurrentHashMap<>(); + @Activate public WatchServiceImpl(WatchServiceConfiguration config, BundleContext bundleContext) throws IOException { this.bundleContext = bundleContext; @@ -79,7 +92,7 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener { this.name = config.name(); executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name)); - + scheduler = ThreadPoolManager.getScheduledPool("watchservice"); modified(config); } @@ -207,6 +220,50 @@ public class WatchServiceImpl implements WatchService, DirectoryChangeListener { case OVERFLOW -> Kind.OVERFLOW; }; + synchronized (scheduledEvents) { + ScheduledFuture future = scheduledEvents.remove(path); + if (future != null && !future.isDone()) { + future.cancel(true); + } + future = scheduler.schedule(() -> notifyListeners(path), PROCESSING_TIME, TimeUnit.MILLISECONDS); + scheduledEventKinds.computeIfAbsent(path, k -> new CopyOnWriteArrayList<>()).add(kind); + scheduledEvents.put(path, future); + + } + } + + private void notifyListeners(Path path) { + List kinds = scheduledEventKinds.remove(path); + if (kinds == null || kinds.isEmpty()) { + logger.debug("Tried to notify listeners of change events for '{}', but the event list is empty.", path); + return; + } + + if (kinds.size() == 1) { + // we have only one event + doNotify(path, kinds.get(0)); + return; + } + + Kind firstElement = kinds.get(0); + Kind lastElement = kinds.get(kinds.size() - 1); + + // determine final event + if (lastElement == Kind.DELETE) { + if (firstElement == Kind.CREATE) { + logger.debug("Discarding events for '{}' because file was immediately deleted bafter creation", path); + return; + } + doNotify(path, Kind.DELETE); + } else if (firstElement == Kind.CREATE) { + doNotify(path, Kind.CREATE); + } else { + doNotify(path, Kind.MODIFY); + } + } + + private void doNotify(Path path, Kind kind) { + logger.trace("Notifying listeners of '{}' event for '{}'.", kind, path); subDirPathListeners.stream().filter(isChildOf(path)).forEach(l -> l.notify(path, kind)); dirPathListeners.stream().filter(isDirectChildOf(path)).forEach(l -> l.notify(path, kind)); }