diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java index eab812e9..36771cc3 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java @@ -226,15 +226,17 @@ protected Object[] process(Object[] data) { } } } catch (FileSystemException e) { - throw new SiddhiAppRuntimeException("Exception occurred when getting the file type " + - uri, e); + log.error("Exception occurred when getting the file type " + uri, e); + return new Object[]{false}; } finally { if (rootFileObject != null) { try { rootFileObject.close(); } catch (FileSystemException e) { - throw new SiddhiAppRuntimeException("Exception occurred when closing file object for " + + log.error("Exception occurred when closing file object for " + rootFileObject.getName().getPath(), e); + return new Object[]{false}; + } } } diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java index 3324734f..e384762d 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java @@ -24,7 +24,6 @@ import io.siddhi.annotation.ReturnAttribute; import io.siddhi.annotation.util.DataType; import io.siddhi.core.config.SiddhiQueryContext; -import io.siddhi.core.exception.SiddhiAppRuntimeException; import io.siddhi.core.executor.ConstantExpressionExecutor; import io.siddhi.core.executor.ExpressionExecutor; import io.siddhi.core.query.processor.ProcessingMode; @@ -55,13 +54,13 @@ description = "This function performs copying file from one directory to another.\n", parameters = { @Parameter( - name = "uri", + name = "path", description = "Absolute file or directory path.", type = DataType.STRING, dynamic = true ), @Parameter( - name = "destination.dir.uri", + name = "destination.dir.path", description = "Absolute file path to the destination directory.\n" + "Note: Parent folder structure will be created if it does not exist.", type = DataType.STRING, @@ -73,14 +72,16 @@ "Note: Add an empty string to match all files", type = DataType.STRING, optional = true, - defaultValue = "" + defaultValue = "", + dynamic = true ), @Parameter( name = "exclude.root.dir", description = "Exclude parent folder when moving the content.", type = DataType.BOOL, optional = true, - defaultValue = "false" + defaultValue = "false", + dynamic = true ), @Parameter( name = "file.system.options", @@ -95,16 +96,16 @@ }, parameterOverloads = { @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri"} + parameterNames = {"path", "destination.dir.path"} ), @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri", "include.by.regexp"} + parameterNames = {"path", "destination.dir.path", "include.by.regexp"} ), @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri", "include.by.regexp", "exclude.root.dir"} + parameterNames = {"path", "destination.dir.path", "include.by.regexp", "exclude.root.dir"} ), @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri", "include.by.regexp", "exclude.root.dir", + parameterNames = {"path", "destination.dir.path", "include.by.regexp", "exclude.root.dir", "file.system.options"} ) }, @@ -193,19 +194,22 @@ protected Object[] process(Object[] data) { boolean excludeParentFolder = false; if (inputExecutorLength == 3) { regex = (String) data[2]; + } else if (inputExecutorLength == 4) { + regex = (String) data[2]; + excludeParentFolder = (Boolean) data[3]; } if (pattern == null) { pattern = Pattern.compile(regex); } try { FileObject rootFileObject = Utils.getFileObject(uri, fileSystemOptions); + if (!rootFileObject.exists()) { + return new Object[]{false}; + } if (rootFileObject.getType().hasContent() && pattern.matcher(rootFileObject.getName().getBaseName()).lookingAt()) { moveFileToDestination(rootFileObject, destinationDirUri, pattern); } else if (rootFileObject.getType().hasChildren()) { - if (inputExecutorLength == 4) { - excludeParentFolder = (Boolean) data[3]; - } if (!excludeParentFolder) { destinationDirUri = destinationDirUri.concat(File.separator + rootFileObject.getName().getBaseName()); @@ -216,21 +220,16 @@ protected Object[] process(Object[] data) { if (sourceFileObject.getType().hasContent() && pattern.matcher(sourceFileObject.getName().getBaseName()).lookingAt()) { String sourcePartialUri = sourceFileObject.getName().getPath(); - if (excludeParentFolder) { - sourcePartialUri = sourcePartialUri.replace(uri + - rootFileObject.getName().getBaseName(), ""); - } else { - sourcePartialUri = sourcePartialUri.replace(uri, ""). - replace(sourceFileObject.getName().getBaseName(), ""); - } + sourcePartialUri = sourcePartialUri.replace(rootFileObject.getName().getPath(), ""). + replace(sourceFileObject.getName().getBaseName(), ""); moveFileToDestination(sourceFileObject, destinationDirUri + sourcePartialUri, pattern); } } } } catch (FileSystemException e) { - throw new SiddhiAppRuntimeException("Exception occurred when getting the file type " + - uri, e); + log.error("Exception occurred when getting the file type " + uri, e); + return new Object[]{false}; } return new Object[]{true}; } @@ -250,7 +249,8 @@ public void stop() { } - private void moveFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern) { + private void moveFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern) + throws FileSystemException { try { String fileName = sourceFileObject.getName().getBaseName(); String destinationPath; @@ -280,8 +280,7 @@ private void moveFileToDestination(FileObject sourceFileObject, String destinati if (fileMoveMetrics != null) { fileMoveMetrics.getMoveMetric(0); } - throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when moving for file: " + - sourceFileObject.getName().getPath(), e); + throw e; } } } diff --git a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java index d21615f4..ef33b0c8 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java +++ b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java @@ -378,7 +378,6 @@ public class FileSource extends Source { private FileSourceConfiguration fileSourceConfiguration; private RemoteFileSystemConnectorFactory fileSystemConnectorFactory; private FileSourceServiceProvider fileSourceServiceProvider; - private RemoteFileSystemServerConnector fileSystemServerConnector; private String filePointer = "0"; private String[] requiredProperties; private boolean isTailingEnabled = true; @@ -401,6 +400,7 @@ public class FileSource extends Source { private long timeout = 5000; private boolean fileServerConnectorStarted = false; private ScheduledFuture scheduledFuture; + private FileSourcePoller fileSourcePoller; private ConnectionCallback connectionCallback; private String headerPresent; private String readOnlyHeader; @@ -577,7 +577,6 @@ public void connect(ConnectionCallback connectionCallback, FileSourceState fileS @Override public void disconnect() { try { - fileSystemServerConnector = null; if (isTailingEnabled && fileSourceConfiguration.getFileServerConnector() != null) { fileSourceConfiguration.getFileServerConnector().stop(); fileSourceConfiguration.setFileServerConnector(null); @@ -618,11 +617,23 @@ public void pause() { } public void resume() { - try { - updateSourceConf(); - deployServers(); - } catch (ConnectionUnavailableException e) { - throw new SiddhiAppRuntimeException("Failed to resume siddhi app runtime.", e); + if (dirUri != null && scheduledFuture != null) { + this.scheduledFuture = siddhiAppContext.getScheduledExecutorService(). + scheduleAtFixedRate(fileSourcePoller, 0, 1, TimeUnit.SECONDS); + } + if (isTailingEnabled && fileSourceConfiguration.getFileServerConnector() != null) { + FileServerConnector fileServerConnector = fileSourceConfiguration.getFileServerConnector(); + Runnable runnableServer = () -> { + try { + fileServerConnector.start(); + } catch (ServerConnectorException e) { + log.error(String.format("For the siddhi app '" + siddhiAppContext.getName() + + ",' failed to resume the server for file '%s'." + + "Hence starting to process next file.", fileUri)); + } + }; + fileSourceConfiguration.getExecutorService().execute(runnableServer); + this.fileServerConnectorStarted = true; } } @@ -756,9 +767,11 @@ private void deployServers() throws ConnectionUnavailableException { FileSystemListener fileSystemListener = new FileSystemListener(sourceEventListener, fileSourceConfiguration, metrics, schemeFileOptions); try { - fileSystemServerConnector = fileSystemConnectorFactory.createServerConnector( - siddhiAppContext.getName(), properties, fileSystemListener); + RemoteFileSystemServerConnector fileSystemServerConnector = + fileSystemConnectorFactory.createServerConnector( + siddhiAppContext.getName(), properties, fileSystemListener); fileSourceConfiguration.setFileSystemServerConnector(fileSystemServerConnector); + FileSourcePoller.CompletionCallback fileSourceCompletionCallback = (Throwable error) -> { if (error.getClass().equals(RemoteFileSystemConnectorException.class)) { @@ -768,7 +781,7 @@ private void deployServers() throws ConnectionUnavailableException { throw new SiddhiAppRuntimeException("File Polling mode run failed.", error); } }; - FileSourcePoller fileSourcePoller = + this.fileSourcePoller = new FileSourcePoller(fileSystemServerConnector, siddhiAppContext.getName()); fileSourcePoller.setCompletionCallback(fileSourceCompletionCallback); this.scheduledFuture = siddhiAppContext.getScheduledExecutorService(). @@ -950,8 +963,8 @@ public Map snapshot() { filePointer = FileSource.this.fileSourceConfiguration.getFilePointer(); state.put(Constants.FILE_POINTER, fileSourceConfiguration.getFilePointer()); state.put(Constants.TAILED_FILE, fileSourceConfiguration.getTailedFileURIMap()); - state.put(Constants.TAILING_REGEX_STRING_BUILDER, - fileSourceConfiguration.getTailingRegexStringBuilder()); + state.put(Constants.TAILING_REGEX_STRING_BUILDER, fileSourceConfiguration.getTailingRegexStringBuilder()); + state.put(Constants.PROCESSED_FILE_LIST, fileSourceConfiguration.getProcessedFileList()); return state; } @@ -963,6 +976,8 @@ public void restore(Map map) { fileSourceConfiguration.setTailedFileURIMap(tailedFileURIMap); fileSourceConfiguration.updateTailingRegexStringBuilder( (StringBuilder) map.get(Constants.TAILING_REGEX_STRING_BUILDER)); + fileSourceConfiguration.setProcessedFileList( + (List) map.get(Constants.PROCESSED_FILE_LIST)); } } } diff --git a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java index 49128a4e..19ec1a49 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java +++ b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java @@ -81,6 +81,9 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent) RemoteFileSystemEvent remoteFileSystemEvent = (RemoteFileSystemEvent) remoteFileSystemBaseEvent; for (int i = 0; i < remoteFileSystemEvent.getAddedFiles().size(); i++) { String fileURI = remoteFileSystemEvent.getAddedFiles().get(i).getPath(); + if (!fileSourceConfiguration.addFileToListIfAbsent(fileURI)) { + continue; + } VFSClientConnector vfsClientConnector; FileProcessor fileProcessor; fileSourceConfiguration.setCurrentlyReadingFileURI(fileURI); diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java index 7f013493..a9bab3b8 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java +++ b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java @@ -117,6 +117,8 @@ public class Constants { public static final String UTF_8 = "UTF-8"; + public static final String PROCESSED_FILE_LIST = "processedFileList"; + /*prometheus reporte values*/ public static final String PROMETHEUS_REPORTER_NAME = "prometheus"; } diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java index 2fd83cc3..aae8c817 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java +++ b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java @@ -48,6 +48,7 @@ public class FileSourceConfiguration { private FileServerConnector fileServerConnector; private RemoteFileSystemServerConnector fileSystemServerConnector; + private List processedFileList = new ArrayList<>(); private List tailedFileURIMap; private ExecutorService executorService = null; private String[] requiredProperties = null; @@ -307,4 +308,25 @@ public void setScheduler(Scheduler scheduler) { public Scheduler getScheduler() { return scheduler; } + + public List getProcessedFileList() { + return processedFileList; + } + + public void setProcessedFileList(List processedFileList) { + this.processedFileList = processedFileList; + } + + /** + * Maintains the processedFileList. Adds a file URL to the list if it has not being added already. + * @param fileURI the file URI which needs to be added to the list + * @return true if the fileURI is absent in the current list and adds to it; false if the URI is already present. + */ + public boolean addFileToListIfAbsent(String fileURI) { + if (processedFileList.contains(fileURI)) { + return false; + } + processedFileList.add(fileURI); + return true; + } } diff --git a/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java b/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java index fdc6c08d..850245bc 100644 --- a/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java @@ -707,6 +707,45 @@ public void receive(Event[] events) { AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/test.txt", false)); } + @Test + public void folderMoveNegativeTestcase() throws InterruptedException, IOException { + FileUtils.copyDirectory(sourceRoot, tempSource); + log.info("file:move() function should return false when the source folder is not found. " + + "This test case validates that"); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination", false)); + count.set(0); + String app = "" + + "@App:name('TestSiddhiApp')" + + "define stream MoveFileStream(sample string);\n" + + "from MoveFileStream" + + "#file:move('" + tempSource + "/archivee/', '" + sourceRoot + "/destination', '')\n" + + "select sample, isSuccess \n" + + "insert into ResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("MoveFileStream"); + siddhiAppRuntime.addCallback("ResultStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.getAndIncrement(); + for (Event event : events) { + if (n == 0) { + AssertJUnit.assertEquals("WSO2", event.getData(0)); + AssertJUnit.assertEquals(false, event.getData(1)); + } else { + AssertJUnit.fail("More events received than expected."); + } + } + } + }); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{"WSO2"}); + Thread.sleep(100); + siddhiAppRuntime.shutdown(); + } + @Test public void folderMoveWithRegexFunction() throws InterruptedException, IOException { FileUtils.copyDirectory(sourceRoot, tempSource); @@ -747,6 +786,86 @@ public void receive(Event[] events) { AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/subFolder/test3.txt", false)); } + @Test + public void folderMoveWithDynamicParams() throws InterruptedException, IOException { + FileUtils.copyDirectory(sourceRoot, tempSource); + log.info("test Siddhi Io File move() allows dynamic params"); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination", false)); + String app = "" + + "@App:name('TestSiddhiApp')" + + "define stream CopyFileStream(regex string);\n" + + "from CopyFileStream#file:move" + + "('" + tempSource + "/archive', '" + sourceRoot + "/destination', regex)\n" + + "select *\n" + + "insert into ResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("CopyFileStream"); + siddhiAppRuntime.addCallback("ResultStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.getAndIncrement(); + for (Event event : events) { + if (n == 0) { + AssertJUnit.assertEquals(".*test3.txt$", event.getData(0)); + AssertJUnit.assertEquals(true, event.getData(1)); + } else { + AssertJUnit.fail("More events received than expected."); + } + } + } + }); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{".*test3.txt$"}); + Thread.sleep(100); + siddhiAppRuntime.shutdown(); + AssertJUnit.assertTrue(isFileExist(sourceRoot + "/destination/archive/subFolder/test3.txt", false)); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination/archive/test.txt", false)); + AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/subFolder/test3.txt", false)); + } + + @Test + public void folderMoveExcludingParentFolder() throws InterruptedException, IOException { + FileUtils.copyDirectory(sourceRoot, tempSource); + log.info("test Siddhi Io File move() with a dynamic value for exclude.root.dir parameter"); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination", false)); + String app = "" + + "@App:name('TestSiddhiApp')" + + "define stream CopyFileStream(regex string, excludeParent bool);\n" + + "from CopyFileStream#file:move" + + "('" + tempSource + "/archive', '" + sourceRoot + "/destination', regex, excludeParent)\n" + + "select *\n" + + "insert into ResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("CopyFileStream"); + siddhiAppRuntime.addCallback("ResultStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.getAndIncrement(); + for (Event event : events) { + if (n == 0) { + AssertJUnit.assertEquals(".*test3.txt$", event.getData(0)); + AssertJUnit.assertEquals(true, event.getData(1)); + AssertJUnit.assertEquals(true, event.getData(2)); + } else { + AssertJUnit.fail("More events received than expected."); + } + } + } + }); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{".*test3.txt$", true}); + Thread.sleep(1000); + siddhiAppRuntime.shutdown(); + AssertJUnit.assertTrue(isFileExist(sourceRoot + "/destination/subFolder/test3.txt", false)); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination/test.txt", false)); + AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/subFolder/test3.txt", false)); + } @Test public void fileIsFileFunction() throws InterruptedException { diff --git a/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java b/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java index 34f205f4..a5a9815d 100644 --- a/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java @@ -1228,6 +1228,58 @@ public void receive(Event[] events) { siddhiAppRuntime.shutdown(); } + @Test + public void siddhiIOFileKeepWithStatePersistence() throws InterruptedException { + log.info("test SiddhiIOFile: Keep file with state persistence enabled"); + String streams = "" + + "@App:name('TestSiddhiApp')\n" + + "@source(type='file', mode='line', dir.uri='file:" + newRoot + "/line/header', " + + "read.only.header='true', action.after.process='keep', tailing='false', \n" + + "@map(type='csv', delimiter='|'))\n" + + "define stream FileReaderStream (code string, serialNo string, amount string);\n" + + "@sink(type='log')\n" + + "define stream FileResultStream (code string, serialNo string, amount string);\n"; + + String query = "" + + "from FileReaderStream\n" + + "select *\n" + + "insert into FileResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + SiddhiAppRuntime siddhiAppRuntime2 = siddhiManager.createSiddhiAppRuntime(streams + query); + siddhiAppRuntime.addCallback("FileResultStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count.incrementAndGet(); + } + }); + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(100, 1, count.get(), 3000); + byte[] snapshot = siddhiAppRuntime.snapshot(); + siddhiAppRuntime.shutdown(); + + Thread.sleep(1000); + + try { + siddhiAppRuntime2.restore(snapshot); + } catch (CannotRestoreSiddhiAppStateException e) { + log.error("Failed to restore siddhi app state. Reason: " + e.getMessage(), e); + AssertJUnit.fail("Failed to restore siddhi app state"); + } + siddhiAppRuntime2.start(); + Thread.sleep(1000); + siddhiAppRuntime2.addCallback("FileResultStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + AssertJUnit.fail("When state persistence is enabled, the file should not be processed again."); + } + }); + Thread.sleep(1000); + siddhiAppRuntime2.shutdown(); + } + @Test public void siddhiIOFileTestCronSupportForFile() throws InterruptedException { log.info("Siddhi IO File test for Cron support via file.uri");