diff --git a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java index 84dd422b..71f918f6 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java +++ b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java @@ -39,9 +39,14 @@ import io.siddhi.extension.io.file.util.FileSourceConfiguration; import io.siddhi.extension.io.file.util.FileSourceServiceProvider; import io.siddhi.extension.io.file.util.VFSClientConnectorCallback; +import io.siddhi.extension.util.Utils; import io.siddhi.query.api.annotation.Annotation; import io.siddhi.query.api.annotation.Element; +import org.apache.commons.vfs2.FileObject; import org.apache.log4j.Logger; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; import org.wso2.carbon.messaging.ServerConnector; import org.wso2.carbon.messaging.exceptions.ClientConnectorException; import org.wso2.carbon.messaging.exceptions.ServerConnectorException; @@ -65,6 +70,10 @@ import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import static io.siddhi.extension.io.file.listeners.FileCronExecutor.scheduleJob; +import static org.quartz.CronExpression.isValidExpression; + + /** * Implementation of siddhi-io-file source. */ @@ -248,6 +257,15 @@ type = {DataType.BOOL}, defaultValue = "false" ), + @Parameter( + name = "cron.expression", + description = "This is used to specify a timestamp in cron expression. " + + "The file or files in the given dir.uri or file.uri will be processed when the " + + "given expression satisfied by the system time.", + optional = true, + type = {DataType.STRING}, + defaultValue = "None" + ) }, examples = { @Example( @@ -333,6 +351,7 @@ public class FileSource extends Source { private String beginRegex; private String endRegex; private List tailedFileURIMap; + private String uri; private String dirUri; private String fileUri; private String dirPollingInterval; @@ -344,6 +363,7 @@ public class FileSource extends Source { private ConnectionCallback connectionCallback; private String headerPresent; private String readOnlyHeader; + private String cronExpression; @Override protected ServiceDeploymentInfo exposeServiceDeploymentInfo() { @@ -365,10 +385,14 @@ public StateFactory init(SourceEventListener sourceEventListene if (optionHolder.isOptionExists(Constants.DIR_URI)) { dirUri = optionHolder.validateAndGetStaticValue(Constants.DIR_URI); validateURL(dirUri, "dir.uri"); + FileObject listeningFileObject = Utils.getFileObject(dirUri); + uri = listeningFileObject.getName().getPath(); } if (optionHolder.isOptionExists(Constants.FILE_URI)) { fileUri = optionHolder.validateAndGetStaticValue(Constants.FILE_URI); validateURL(fileUri, "file.uri"); + FileObject listeningFileObject = Utils.getFileObject(fileUri); + uri = listeningFileObject.getName().getPath(); } if (dirUri != null && fileUri != null) { @@ -456,6 +480,15 @@ public StateFactory init(SourceEventListener sourceEventListene fileReadWaitTimeout = optionHolder.validateAndGetStaticValue(Constants.FILE_READ_WAIT_TIMEOUT, "1000"); headerPresent = optionHolder.validateAndGetStaticValue(Constants.HEADER_PRESENT, "false"); readOnlyHeader = optionHolder.validateAndGetStaticValue(Constants.READ_ONLY_HEADER, "false"); + + if (optionHolder.isOptionExists(Constants.CRON_EXPRESSION)) { + cronExpression = optionHolder.validateAndGetStaticValue(Constants.CRON_EXPRESSION, null); + if (!isValidExpression(cronExpression)) { + throw new SiddhiAppCreationException("Cron Expression " + cronExpression + " is not valid."); + } + } else { + cronExpression = null; + } validateParameters(); createInitialSourceConf(); updateSourceConf(); @@ -489,9 +522,16 @@ public void disconnect() { if (executorService != null && !executorService.isShutdown()) { executorService.shutdown(); } + Scheduler scheduler = fileSourceConfiguration.getScheduler(); + if (scheduler != null) { + scheduler.deleteJob(new JobKey(Constants.JOB_NAME, Constants.JOB_GROUP)); + } } catch (ServerConnectorException e) { throw new SiddhiAppRuntimeException("Failed to stop the file server when shutting down the siddhi app '" + siddhiAppContext.getName() + "' due to " + e.getMessage(), e); + } catch (SchedulerException e) { + throw new SiddhiAppRuntimeException("Failed to delete the cron job of the siddhi app '" + + siddhiAppContext.getName() + "' due to " + e.getMessage(), e); } } @@ -523,6 +563,7 @@ public void resume() { } private void createInitialSourceConf() { + fileSourceConfiguration.setUri(uri); fileSourceConfiguration.setBeginRegex(beginRegex); fileSourceConfiguration.setEndRegex(endRegex); fileSourceConfiguration.setMode(mode); @@ -536,6 +577,7 @@ private void createInitialSourceConf() { fileSourceConfiguration.setFileReadWaitTimeout(fileReadWaitTimeout); fileSourceConfiguration.setHeaderPresent(headerPresent); fileSourceConfiguration.setReadOnlyHeader(readOnlyHeader); + fileSourceConfiguration.setCronExpression(cronExpression); } private void updateSourceConf() { @@ -556,6 +598,7 @@ private Map getFileSystemServerProperties() { map.put(Constants.CREATE_MOVE_DIR, Constants.TRUE.toUpperCase(Locale.ENGLISH)); map.put(Constants.ACK_TIME_OUT, "5000"); map.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY, fileReadWaitTimeout); + map.put(Constants.CRON_EXPRESSION, cronExpression); if (Constants.BINARY_FULL.equalsIgnoreCase(mode) || Constants.TEXT_FULL.equalsIgnoreCase(mode)) { @@ -589,6 +632,12 @@ private void validateParameters() { } } + if (isTailingEnabled && cronExpression != null) { + throw new SiddhiAppCreationException("Tailing has been enabled by user or by default. " + + "'cron.expression' cannot be used when tailing is enabled. " + + "Hence stopping the siddhi app '" + siddhiAppContext.getName() + "'."); + } + if (isTailingEnabled && moveAfterProcess != null) { throw new SiddhiAppCreationException("Tailing has been enabled by user or by default." + "'moveAfterProcess' cannot be used when tailing is enabled. " + @@ -601,6 +650,17 @@ private void validateParameters() { "Hence stopping the siddhi app '" + siddhiAppContext.getName() + "'."); } + if (!(Constants.MOVE.equalsIgnoreCase(actionAfterProcess)) && (cronExpression != null)) { + throw new SiddhiAppCreationException("'cronExpression' can only be used when 'action.after.process' " + + "is 'move'. Hence stopping the siddhi app '" + siddhiAppContext.getName() + "'."); + } + + if (cronExpression != null && moveAfterProcess == null) { + throw new SiddhiAppCreationException("'move.after.process' has not been provided where it is mandatory " + + "when 'cron.expression' is given. Hence stopping the siddhi app " + + siddhiAppContext.getName() + "."); + } + if (Constants.MOVE.equalsIgnoreCase(actionAfterProcess) && (moveAfterProcess == null)) { throw new SiddhiAppCreationException("'moveAfterProcess' has not been provided where it is mandatory when" + " 'actionAfterProcess' is 'move'. Hence stopping the siddhi app '" + @@ -618,103 +678,107 @@ private void deployServers() throws ConnectionUnavailableException { ExecutorService executorService = siddhiAppContext.getExecutorService(); createInitialSourceConf(); fileSourceConfiguration.setExecutorService(executorService); - if (dirUri != null) { - Map properties = getFileSystemServerProperties(); - FileSystemListener fileSystemListener = new FileSystemListener(sourceEventListener, - fileSourceConfiguration); - try { - fileSystemServerConnector = fileSystemConnectorFactory.createServerConnector( - siddhiAppContext.getName(), properties, fileSystemListener); - fileSourceConfiguration.setFileSystemServerConnector(fileSystemServerConnector); - FileSourcePoller.CompletionCallback fileSourceCompletionCallback = (Throwable error) -> - { - if (error.getClass().equals(RemoteFileSystemConnectorException.class)) { - connectionCallback.onError(new ConnectionUnavailableException( - "Connection to the file directory is lost.", error)); - } else { - throw new SiddhiAppRuntimeException("File Polling mode run failed.", error); - } - }; - FileSourcePoller fileSourcePoller = - new FileSourcePoller(fileSystemServerConnector, siddhiAppContext.getName()); - fileSourcePoller.setCompletionCallback(fileSourceCompletionCallback); - this.scheduledFuture = siddhiAppContext.getScheduledExecutorService(). - scheduleAtFixedRate(fileSourcePoller, 0, 1, TimeUnit.SECONDS); - } catch (RemoteFileSystemConnectorException e) { - throw new ConnectionUnavailableException("Connection to the file directory is lost.", e); - } - } else if (fileUri != null && !fileServerConnectorStarted) { - Map properties = new HashMap<>(); - properties.put(Constants.ACTION, Constants.READ); - properties.put(Constants.MAX_LINES_PER_POLL, "10"); - properties.put(Constants.POLLING_INTERVAL, filePollingInterval); - properties.put(Constants.HEADER_PRESENT, headerPresent); - properties.put(Constants.READ_ONLY_HEADER, readOnlyHeader); - if (actionAfterFailure != null) { - properties.put(Constants.ACTION_AFTER_FAILURE_KEY, actionAfterFailure); - } - if (moveAfterFailure != null) { - properties.put(Constants.MOVE_AFTER_FAILURE_KEY, moveAfterFailure); - } - if (fileSourceConfiguration.isTailingEnabled()) { - if (fileSourceConfiguration.getTailedFileURIMap() == null) { - fileSourceConfiguration.setTailedFileURI(fileUri); - } - if (fileSourceConfiguration.getTailedFileURIMap().get(0).toString().equalsIgnoreCase(fileUri)) { - properties.put(Constants.START_POSITION, fileSourceConfiguration.getFilePointer()); - properties.put(Constants.PATH, fileUri); - FileServerConnectorProvider fileServerConnectorProvider = - fileSourceServiceProvider.getFileServerConnectorProvider(); - FileProcessor fileProcessor = new FileProcessor(sourceEventListener, - fileSourceConfiguration); - final ServerConnector fileServerConnector = fileServerConnectorProvider - .createConnector("file-server-connector", properties); - fileServerConnector.setMessageProcessor(fileProcessor); - fileSourceConfiguration.setFileServerConnector((FileServerConnector) fileServerConnector); - Runnable runnableServer = () -> { - try { - fileServerConnector.start(); - } catch (ServerConnectorException e) { - log.error(String.format("For the siddhi app '" + siddhiAppContext.getName() + - ",' failed to start the server for file '%s'." + - "Hence starting to process next file.", fileUri)); + if (fileSourceConfiguration.getCronExpression() != null) { + scheduleJob(fileSourceConfiguration, sourceEventListener, siddhiAppContext); + } else { + if (dirUri != null) { + Map properties = getFileSystemServerProperties(); + FileSystemListener fileSystemListener = new FileSystemListener(sourceEventListener, + fileSourceConfiguration); + try { + fileSystemServerConnector = fileSystemConnectorFactory.createServerConnector( + siddhiAppContext.getName(), properties, fileSystemListener); + fileSourceConfiguration.setFileSystemServerConnector(fileSystemServerConnector); + FileSourcePoller.CompletionCallback fileSourceCompletionCallback = (Throwable error) -> + { + if (error.getClass().equals(RemoteFileSystemConnectorException.class)) { + connectionCallback.onError(new ConnectionUnavailableException( + "Connection to the file directory is lost.", error)); + } else { + throw new SiddhiAppRuntimeException("File Polling mode run failed.", error); } }; - fileSourceConfiguration.getExecutorService().execute(runnableServer); - this.fileServerConnectorStarted = true; + FileSourcePoller fileSourcePoller = + new FileSourcePoller(fileSystemServerConnector, siddhiAppContext.getName()); + fileSourcePoller.setCompletionCallback(fileSourceCompletionCallback); + this.scheduledFuture = siddhiAppContext.getScheduledExecutorService(). + scheduleAtFixedRate(fileSourcePoller, 0, 1, TimeUnit.SECONDS); + } catch (RemoteFileSystemConnectorException e) { + throw new ConnectionUnavailableException("Connection to the file directory is lost.", e); } - } else { - properties.put(Constants.URI, fileUri); - properties.put(Constants.ACK_TIME_OUT, "1000"); - properties.put(Constants.MODE, mode); + } else if (fileUri != null && !fileServerConnectorStarted) { + Map properties = new HashMap<>(); + properties.put(Constants.ACTION, Constants.READ); + properties.put(Constants.MAX_LINES_PER_POLL, "10"); + properties.put(Constants.POLLING_INTERVAL, filePollingInterval); properties.put(Constants.HEADER_PRESENT, headerPresent); properties.put(Constants.READ_ONLY_HEADER, readOnlyHeader); - VFSClientConnector vfsClientConnector = new VFSClientConnector(); - FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration); - vfsClientConnector.setMessageProcessor(fileProcessor); - VFSClientConnectorCallback vfsClientConnectorCallback = new VFSClientConnectorCallback(); - Runnable runnableClient = () -> { - try { - vfsClientConnector.send(null, vfsClientConnectorCallback, properties); - vfsClientConnectorCallback.waitTillDone(timeout, fileUri); - if (actionAfterProcess != null) { - properties.put(Constants.URI, fileUri); - properties.put(Constants.ACTION, actionAfterProcess); - if (moveAfterProcess != null) { - properties.put(Constants.DESTINATION, moveAfterProcess); + if (actionAfterFailure != null) { + properties.put(Constants.ACTION_AFTER_FAILURE_KEY, actionAfterFailure); + } + if (moveAfterFailure != null) { + properties.put(Constants.MOVE_AFTER_FAILURE_KEY, moveAfterFailure); + } + if (fileSourceConfiguration.isTailingEnabled()) { + if (fileSourceConfiguration.getTailedFileURIMap() == null) { + fileSourceConfiguration.setTailedFileURI(fileUri); + } + if (fileSourceConfiguration.getTailedFileURIMap().get(0).toString().equalsIgnoreCase(fileUri)) { + properties.put(Constants.START_POSITION, fileSourceConfiguration.getFilePointer()); + properties.put(Constants.PATH, fileUri); + FileServerConnectorProvider fileServerConnectorProvider = + fileSourceServiceProvider.getFileServerConnectorProvider(); + FileProcessor fileProcessor = new FileProcessor(sourceEventListener, + fileSourceConfiguration); + final ServerConnector fileServerConnector = fileServerConnectorProvider + .createConnector("file-server-connector", properties); + fileServerConnector.setMessageProcessor(fileProcessor); + fileSourceConfiguration.setFileServerConnector((FileServerConnector) fileServerConnector); + Runnable runnableServer = () -> { + try { + fileServerConnector.start(); + } catch (ServerConnectorException e) { + log.error(String.format("For the siddhi app '" + siddhiAppContext.getName() + + ",' failed to start the server for file '%s'." + + "Hence starting to process next file.", fileUri)); } + }; + fileSourceConfiguration.getExecutorService().execute(runnableServer); + this.fileServerConnectorStarted = true; + } + } else { + properties.put(Constants.URI, fileUri); + properties.put(Constants.ACK_TIME_OUT, "1000"); + properties.put(Constants.MODE, mode); + properties.put(Constants.HEADER_PRESENT, headerPresent); + properties.put(Constants.READ_ONLY_HEADER, readOnlyHeader); + VFSClientConnector vfsClientConnector = new VFSClientConnector(); + FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration); + vfsClientConnector.setMessageProcessor(fileProcessor); + VFSClientConnectorCallback vfsClientConnectorCallback = new VFSClientConnectorCallback(); + Runnable runnableClient = () -> { + try { vfsClientConnector.send(null, vfsClientConnectorCallback, properties); vfsClientConnectorCallback.waitTillDone(timeout, fileUri); + if (actionAfterProcess != null) { + properties.put(Constants.URI, fileUri); + properties.put(Constants.ACTION, actionAfterProcess); + if (moveAfterProcess != null) { + properties.put(Constants.DESTINATION, moveAfterProcess); + } + vfsClientConnector.send(null, vfsClientConnectorCallback, properties); + vfsClientConnectorCallback.waitTillDone(timeout, fileUri); + } + } catch (ClientConnectorException e) { + log.error(String.format("Failure occurred in vfs-client while reading the file '%s' " + + "through siddhi app '%s'.", fileUri, siddhiAppContext.getName()), e); + } catch (InterruptedException e) { + log.error(String.format("Failed to get callback from vfs-client for file '%s' through " + + "siddhi app '%s'.", fileUri, siddhiAppContext.getName()), e); } - } catch (ClientConnectorException e) { - log.error(String.format("Failure occurred in vfs-client while reading the file '%s' through " + - "siddhi app '%s'.", fileUri, siddhiAppContext.getName()), e); - } catch (InterruptedException e) { - log.error(String.format("Failed to get callback from vfs-client for file '%s' through " + - "siddhi app '%s'.", fileUri, siddhiAppContext.getName()), e); - } - }; - fileSourceConfiguration.getExecutorService().execute(runnableClient); + }; + fileSourceConfiguration.getExecutorService().execute(runnableClient); + } } } } diff --git a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileCronExecutor.java b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileCronExecutor.java new file mode 100644 index 00000000..ccb13ab6 --- /dev/null +++ b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileCronExecutor.java @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.extension.io.file.listeners; + +import io.siddhi.core.config.SiddhiAppContext; +import io.siddhi.core.stream.input.source.SourceEventListener; +import io.siddhi.extension.io.file.processors.FileProcessor; +import io.siddhi.extension.io.file.util.Constants; +import io.siddhi.extension.io.file.util.FileSourceConfiguration; +import io.siddhi.extension.io.file.util.VFSClientConnectorCallback; +import org.apache.log4j.Logger; +import org.quartz.CronScheduleBuilder; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; +import org.wso2.carbon.messaging.BinaryCarbonMessage; +import org.wso2.carbon.messaging.exceptions.ClientConnectorException; +import org.wso2.transport.file.connector.sender.VFSClientConnector; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static io.siddhi.extension.io.file.util.Util.constructPath; +import static io.siddhi.extension.io.file.util.Util.generateProperties; +import static io.siddhi.extension.io.file.util.Util.getFileName; +import static io.siddhi.extension.io.file.util.Util.reProcessFileGenerateProperties; + +/** + * FileCronExecutor is executed when the cron expression is given. If the current time satisfied by the cron + * expression then the file processing will be executed. + */ +public class FileCronExecutor implements Job { + private static final Logger log = Logger.getLogger(FileCronExecutor.class); + + public FileCronExecutor() { + } + + /** + * To initialize the cron job to execute at given cron expression + */ + public static void scheduleJob(FileSourceConfiguration fileSourceConfiguration, + SourceEventListener sourceEventListener, SiddhiAppContext siddhiAppContext) { + try { + JobKey jobKey = new JobKey(Constants.JOB_NAME, Constants.JOB_GROUP); + + Scheduler scheduler = new StdSchedulerFactory().getScheduler(); + fileSourceConfiguration.setScheduler(scheduler); + if (scheduler.checkExists(jobKey)) { + scheduler.deleteJob(jobKey); + } + scheduler.start(); + // JobDataMap used to access the object in the job class + JobDataMap dataMap = new JobDataMap(); + dataMap.put(Constants.FILE_SOURCE_CONFIGURATION, fileSourceConfiguration); + dataMap.put(Constants.SOURCE_EVENT_LISTENER, sourceEventListener); + + // Define instances of Jobs + JobDetail cron = JobBuilder.newJob(FileCronExecutor.class) + .usingJobData(dataMap) + .withIdentity(jobKey) + .build(); + //Trigger the job to at given cron expression + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity(Constants.TRIGGER_NAME, Constants.TRIGGER_GROUP) + .withSchedule( + CronScheduleBuilder.cronSchedule(fileSourceConfiguration.getCronExpression())).build(); + // Tell quartz to schedule the job using our trigger + scheduler.scheduleJob(cron, trigger); + } catch (SchedulerException e) { + log.error("The error occurs at scheduler start in SiddhiApp " + siddhiAppContext.getName() + " : " + e); + } + } + + /** + * Method gets called when the cron Expression satisfies the system time + */ + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + FileSourceConfiguration fileSourceConfiguration = (FileSourceConfiguration) dataMap.get( + Constants.FILE_SOURCE_CONFIGURATION); + SourceEventListener sourceEventListener = (SourceEventListener) dataMap.get( + Constants.SOURCE_EVENT_LISTENER); + File listeningFileObject = new File(fileSourceConfiguration.getUri()); + if (listeningFileObject.isDirectory()) { + File[] listOfFiles = listeningFileObject.listFiles(); + if (listOfFiles != null) { + for (File file : listOfFiles) { + if (file.isFile()) { + processFile(file.toURI().toString(), jobExecutionContext, sourceEventListener); + } + } + } + } else { + processFile(listeningFileObject.toURI().toString(), jobExecutionContext, sourceEventListener); + } + } + + /** + * Action taken while processing a file + */ + public void processFile(String fileURI, JobExecutionContext jobExecutionContext, + SourceEventListener sourceEventListener) { + JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + FileSourceConfiguration fileSourceConfiguration = (FileSourceConfiguration) dataMap.get( + Constants.FILE_SOURCE_CONFIGURATION); + FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration); + VFSClientConnector vfsClientConnector = new VFSClientConnector(); + vfsClientConnector.setMessageProcessor(fileProcessor); + Map properties = generateProperties(fileSourceConfiguration, fileURI); + VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback(); + initialProcessFile(vfsClientConnector, carbonCallback, properties, fileURI, + fileSourceConfiguration, fileProcessor); + } + + public void initialProcessFile(VFSClientConnector vfsClientConnector, VFSClientConnectorCallback carbonCallback, + Map properties, String fileURI, + FileSourceConfiguration fileSourceConfiguration, FileProcessor fileProcessor) { + vfsClientConnector.setMessageProcessor(fileProcessor); + BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap( + fileURI.getBytes(StandardCharsets.UTF_8)), true); + try { + vfsClientConnector.send(carbonMessage, carbonCallback, properties); + try { + carbonCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileURI); + } catch (InterruptedException e) { + log.error(String.format("Failed to get callback from vfs-client for file '%s'.", + fileURI), e); + } + reProcessFile(vfsClientConnector, carbonCallback, properties, fileURI, fileSourceConfiguration); + } catch (ClientConnectorException e) { + log.error(String.format("Failed to provide file '%s' for consuming.", fileURI), e); + } + } + + /** + * Method use to move file from one path to another if action.after.process is 'move' + */ + public void reProcessFile(VFSClientConnector vfsClientConnector, + VFSClientConnectorCallback vfsClientConnectorCallback, + Map properties, String fileUri, + FileSourceConfiguration fileSourceConfiguration) { + BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap( + fileUri.getBytes(StandardCharsets.UTF_8)), true); + String moveAfterProcess = fileSourceConfiguration.getMoveAfterProcess(); + Map reGeneratedProperties = reProcessFileGenerateProperties(fileSourceConfiguration, fileUri, + properties); + try { + File file = new File(fileSourceConfiguration.getUri()); + if (file.isFile()) { + reGeneratedProperties.put(Constants.DESTINATION, moveAfterProcess); + } else { + String destination = constructPath(moveAfterProcess, getFileName(fileUri, + fileSourceConfiguration.getProtocolForMoveAfterProcess())); + if (destination != null) { + reGeneratedProperties.put(Constants.DESTINATION, destination); + } + } + vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, reGeneratedProperties); + vfsClientConnectorCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileUri); + } catch (ClientConnectorException e) { + log.error(String.format("Failure occurred in vfs-client while reading the file '%s '.", fileUri), e); + } catch (InterruptedException e) { + log.error(String.format("Failed to get callback from vfs-client for file '%s '.", fileUri), e); + } + } +} diff --git a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java index 611885fb..a1824158 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java +++ b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java @@ -24,7 +24,6 @@ import io.siddhi.extension.io.file.util.FileSourceConfiguration; import io.siddhi.extension.io.file.util.FileSourceServiceProvider; import io.siddhi.extension.io.file.util.VFSClientConnectorCallback; -import org.apache.commons.io.FilenameUtils; import org.apache.log4j.Logger; import org.wso2.carbon.messaging.BinaryCarbonMessage; import org.wso2.carbon.messaging.CarbonCallback; @@ -40,13 +39,15 @@ import org.wso2.transport.remotefilesystem.message.RemoteFileSystemEvent; import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.HashMap; import java.util.Map; +import static io.siddhi.extension.io.file.util.Util.constructPath; +import static io.siddhi.extension.io.file.util.Util.generateProperties; +import static io.siddhi.extension.io.file.util.Util.getFileName; +import static io.siddhi.extension.io.file.util.Util.reProcessFileGenerateProperties; + /** * Test {@link RemoteFileSystemListener} implementation for testing purpose. */ @@ -78,14 +79,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent) vfsClientConnector = new VFSClientConnector(); fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration); vfsClientConnector.setMessageProcessor(fileProcessor); - Map properties = new HashMap<>(); - properties.put(Constants.URI, fileURI); - properties.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE); - properties.put(Constants.ACTION, Constants.READ); - properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval()); - properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY, - fileSourceConfiguration.getFileReadWaitTimeout()); - properties.put(Constants.MODE, mode); + Map properties = generateProperties(fileSourceConfiguration, fileURI); VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback(); BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage( ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true); @@ -108,15 +102,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent) vfsClientConnector = new VFSClientConnector(); fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration); vfsClientConnector.setMessageProcessor(fileProcessor); - - Map properties = new HashMap<>(); - properties.put(Constants.URI, fileURI); - properties.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE); - properties.put(Constants.ACTION, Constants.READ); - properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval()); - properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY, - fileSourceConfiguration.getFileReadWaitTimeout()); - properties.put(Constants.MODE, mode); + Map properties = generateProperties(fileSourceConfiguration, fileURI); VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback(); BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage( ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true); @@ -136,15 +122,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent) log.error(String.format("Failed to provide file '%s' for consuming.", fileURI), e); } } else if (Constants.LINE.equalsIgnoreCase(mode) || Constants.REGEX.equalsIgnoreCase(mode)) { - Map properties = new HashMap<>(); - properties.put(Constants.ACTION, Constants.READ); - properties.put(Constants.MAX_LINES_PER_POLL, "10"); - properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval()); - properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY, - fileSourceConfiguration.getFileReadWaitTimeout()); - properties.put(Constants.MODE, mode); - properties.put(Constants.HEADER_PRESENT, fileSourceConfiguration.getHeaderPresent()); - properties.put(Constants.READ_ONLY_HEADER, fileSourceConfiguration.getReadOnlyHeader()); + Map properties = generateProperties(fileSourceConfiguration, fileURI); if (fileSourceConfiguration.isTailingEnabled()) { fileSourceConfiguration.setTailedFileURI(fileURI); if (fileSourceConfiguration.getTailedFileURIMap().contains(fileURI)) { @@ -169,7 +147,6 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent) fileSourceConfiguration.getExecutorService().execute(fileServerExecutor); } } else { - properties.put(Constants.URI, fileURI); vfsClientConnector = new VFSClientConnector(); fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration); vfsClientConnector.setMessageProcessor(fileProcessor); @@ -237,52 +214,27 @@ public void run() { private void reProcessFile(VFSClientConnector vfsClientConnector, VFSClientConnectorCallback vfsClientConnectorCallback, Map properties, String fileUri) { - String actionAfterProcess = fileSourceConfiguration.getActionAfterProcess(); - properties.put(Constants.URI, fileUri); - properties.put(Constants.ACK_TIME_OUT, "1000"); + Map reGeneratedProperties = reProcessFileGenerateProperties(fileSourceConfiguration, fileUri, + properties); BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap( fileUri.getBytes(StandardCharsets.UTF_8)), true); String moveAfterProcess = fileSourceConfiguration.getMoveAfterProcess(); try { if (fileSourceConfiguration.getActionAfterProcess() != null) { - properties.put(Constants.URI, fileUri); - properties.put(Constants.ACTION, actionAfterProcess); if (fileSourceConfiguration.getMoveAfterProcess() != null) { String destination = constructPath(moveAfterProcess, getFileName(fileUri, fileSourceConfiguration.getProtocolForMoveAfterProcess())); if (destination != null) { - properties.put(Constants.DESTINATION, destination); + reGeneratedProperties.put(Constants.DESTINATION, destination); } } - vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, properties); + vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, reGeneratedProperties); vfsClientConnectorCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileUri); } } catch (ClientConnectorException e) { log.error(String.format("Failure occurred in vfs-client while reading the file '%s'.", fileUri), e); } catch (InterruptedException e) { - log.error(String.format("Failed to get callback from vfs-client for file '%s'.", fileUri), e); - } - } - - private String getFileName(String uri, String protocol) { - try { - URL url = new URL(String.format("%s%s%s", protocol, File.separator, uri)); - return FilenameUtils.getName(url.getPath()); - } catch (MalformedURLException e) { - log.error(String.format("Failed to extract file name from the uri '%s'.", uri), e); - return null; - } - } - - private String constructPath(String baseUri, String fileName) { - if (baseUri != null && fileName != null) { - if (baseUri.endsWith(File.separator)) { - return String.format("%s%s", baseUri, fileName); - } else { - return String.format("%s%s%s", baseUri, File.separator, fileName); - } - } else { - return null; + log.error(String.format("Failed to get callback from vfs-client for file '%s'.", fileUri), e); } } } diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java index e3921e17..c11acb7d 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java +++ b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java @@ -22,8 +22,6 @@ * Constants used in siddhi-io-file extension. */ public class Constants { - private Constants() { - } /* configuration parameters*/ public static final String URI = "uri"; @@ -51,6 +49,7 @@ private Constants() { public static final int WAIT_TILL_DONE = 5000; public static final String HEADER_PRESENT = "header.present"; public static final String READ_ONLY_HEADER = "read.only.header"; + public static final String CRON_EXPRESSION = "cron.expression"; /* configuration param values*/ public static final String MOVE = "move"; @@ -92,6 +91,13 @@ private Constants() { public static final String ANNOTATION_TYPE_ELEMENT_NAME = "type"; public static final String MAP_ANNOTATION_BINARY_TYPE = "binary"; public static final String SOURCE_ANNOTATION_FILE_TYPE_NAME = "file"; + public static final String SOURCE_EVENT_LISTENER = "SourceEventListener"; + public static final String FILE_SOURCE_CONFIGURATION = "FileSourceConfiguration"; + public static final String JOB_GROUP = "JobGroup"; + public static final String JOB_NAME = "JobName"; + public static final String TRIGGER_NAME = "TriggerName"; + public static final String TRIGGER_GROUP = "TriggerGroup"; + /*source property keys*/ public static final String TAILED_FILE = "tailedFile"; diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java index 720a34fb..24426a80 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java +++ b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java @@ -18,6 +18,7 @@ package io.siddhi.extension.io.file.util; +import org.quartz.Scheduler; import org.wso2.transport.file.connector.server.FileServerConnector; import org.wso2.transport.remotefilesystem.server.connector.contract.RemoteFileSystemServerConnector; @@ -56,6 +57,9 @@ public class FileSourceConfiguration { private String actionAfterFailure = null; private String moveAfterProcess = null; private String fileReadWaitTimeout; + private String cronExpression = null; + private String uri = null; + private Scheduler scheduler = null; public FileSourceConfiguration() { tailingRegexStringBuilder = new StringBuilder(); @@ -259,4 +263,28 @@ public String getReadOnlyHeader() { public void setReadOnlyHeader(String readOnlyHeader) { this.readOnlyHeader = readOnlyHeader; } + + public String getCronExpression() { + return cronExpression; + } + + public void setCronExpression(String cronExpression) { + this.cronExpression = cronExpression; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public void setScheduler(Scheduler scheduler) { + this.scheduler = scheduler; + } + + public Scheduler getScheduler() { + return scheduler; + } } diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/Util.java b/component/src/main/java/io/siddhi/extension/io/file/util/Util.java index 6828f544..ed23fa2a 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/util/Util.java +++ b/component/src/main/java/io/siddhi/extension/io/file/util/Util.java @@ -19,15 +19,22 @@ package io.siddhi.extension.io.file.util; import io.siddhi.core.event.Event; +import org.apache.commons.io.FilenameUtils; +import org.apache.log4j.Logger; import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Util Class. * This method used to get the fileHandlerEvent */ public class Util { + private static final Logger log = Logger.getLogger(Util.class); public static Event getFileHandlerEvent(final File file, List fileObjectList, Status enumStatus) { boolean listenerEventsURLValidated = false; String status; @@ -58,4 +65,75 @@ public static Event getFileHandlerEvent(final File file, List fileObject } return null; } + + public static String getFileName(String uri, String protocol) { + try { + URL url = new URL(String.format("%s%s%s", protocol, File.separator, uri)); + return FilenameUtils.getName(url.getPath()); + } catch (MalformedURLException e) { + log.error(String.format("Failed to extract file name from the uri '%s '.", uri), e); + return null; + } + } + + public static String constructPath(String baseUri, String fileName) { + if (baseUri != null && fileName != null) { + if (baseUri.endsWith(File.separator)) { + return String.format("%s%s", baseUri, fileName); + } else { + return String.format("%s%s%s", baseUri, File.separator, fileName); + } + } else { + return null; + } + } + + public static Map generateProperties(FileSourceConfiguration fileSourceConfiguration, + String fileURI) { + Map properties; + String mode = fileSourceConfiguration.getMode(); + if (Constants.TEXT_FULL.equalsIgnoreCase(mode)) { + properties = new HashMap<>(); + properties.put(Constants.URI, fileURI); + properties.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE); + properties.put(Constants.ACTION, Constants.READ); + properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval()); + properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY, + fileSourceConfiguration.getFileReadWaitTimeout()); + properties.put(Constants.MODE, mode); + properties.put(Constants.CRON_EXPRESSION, fileSourceConfiguration.getCronExpression()); + } else if (Constants.BINARY_FULL.equalsIgnoreCase(mode)) { + properties = new HashMap<>(); + properties.put(Constants.URI, fileURI); + properties.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE); + properties.put(Constants.ACTION, Constants.READ); + properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval()); + properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY, + fileSourceConfiguration.getFileReadWaitTimeout()); + properties.put(Constants.MODE, mode); + properties.put(Constants.CRON_EXPRESSION, fileSourceConfiguration.getCronExpression()); + } else { + properties = new HashMap<>(); + properties.put(Constants.ACTION, Constants.READ); + properties.put(Constants.MAX_LINES_PER_POLL, "10"); + properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval()); + properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY, + fileSourceConfiguration.getFileReadWaitTimeout()); + properties.put(Constants.MODE, mode); + properties.put(Constants.HEADER_PRESENT, fileSourceConfiguration.getHeaderPresent()); + properties.put(Constants.READ_ONLY_HEADER, fileSourceConfiguration.getReadOnlyHeader()); + properties.put(Constants.CRON_EXPRESSION, fileSourceConfiguration.getCronExpression()); + properties.put(Constants.URI, fileURI); + } + return properties; + } + + public static Map reProcessFileGenerateProperties(FileSourceConfiguration fileSourceConfiguration, + String fileURI, Map properties) { + String actionAfterProcess = fileSourceConfiguration.getActionAfterProcess(); + properties.put(Constants.URI, fileURI); + properties.put(Constants.ACK_TIME_OUT, "1000"); + properties.put(Constants.ACTION, actionAfterProcess); + return properties; + } } diff --git a/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java b/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java index 66ad296e..16801720 100644 --- a/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java @@ -1321,4 +1321,35 @@ public void receive(Event[] events) { SiddhiTestHelper.waitForEvents(100, 1, count.get(), 3000); siddhiAppRuntime.shutdown(); } + + @Test + public void siddhiIOFileTestCronSupportForFile() throws InterruptedException { + log.info("Siddhi IO File test for Cron support via file.uri"); + String streams = "" + + "@App:name('TestSiddhiApp')" + + "@source(type='file', mode='line'," + + "file.uri='file:" + newRoot + "/line/header/test.txt', cron.expression='*/5 * * * * ?', " + + "action.after.process='move', tailing='false', " + + "move.after.process='file:/" + moveAfterProcessDir + "/line/header/test.txt', " + + "@map( type='csv', delimiter='|'))" + + "define stream FileReaderStream (code string, serialNo string, amount double); " + + "define stream FileResultStream (code string, serialNo string, amount double); "; + String query = "" + + "from FileReaderStream " + + "select * " + + "insert into FileResultStream; "; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + siddhiAppRuntime.addCallback("FileResultStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count.incrementAndGet(); + } + }); + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(100, 7, count.get(), 6000); + AssertJUnit.assertEquals("Number of events", 7, count.get()); + siddhiAppRuntime.shutdown(); + } } diff --git a/component/src/test/java/io/siddhi/extension/io/file/FileSourceTextFullModeTestCase.java b/component/src/test/java/io/siddhi/extension/io/file/FileSourceTextFullModeTestCase.java index 48ca980b..52891c92 100644 --- a/component/src/test/java/io/siddhi/extension/io/file/FileSourceTextFullModeTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/file/FileSourceTextFullModeTestCase.java @@ -37,14 +37,16 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * Test cases for siddhi-io-file source. - * */ + */ public class FileSourceTextFullModeTestCase { private static final Logger log = Logger.getLogger(FileSourceTextFullModeTestCase.class); private AtomicInteger count = new AtomicInteger(); @@ -103,8 +105,8 @@ public void doAfterMethod() { } /** - * Test cases for 'mode = text.full'. - * */ + * Test cases for 'mode = text.full'. + */ @Test public void siddhiIoFileTest1() throws InterruptedException { log.info("test SiddhiIoFile [mode = text.full] 1"); @@ -474,8 +476,8 @@ public void siddhiIoFileTestForEOFAndFileNameForTextFull() throws InterruptedExc "action.after.process='delete', " + "tailing='false', " + "@map(type='json', enclosing.element=\"$.event\", " + - "@attributes(symbol = \"symbol\", price = \"price\", volume = \"volume\", " + - "eof = 'trp:eof', fp = 'trp:file.path')))\n" + + "@attributes(symbol = \"symbol\", price = \"price\", volume = \"volume\", " + + "eof = 'trp:eof', fp = 'trp:file.path')))\n" + "define stream FooStream (symbol string, price float, volume long, eof String, fp String); " + "define stream BarStream (symbol string, price float, volume long, eof String, fp String); "; String query = "" + @@ -552,4 +554,201 @@ public void siddhiIoFileTest9() throws InterruptedException { Thread.sleep(1000); siddhiAppRuntime.shutdown(); } + + @Test + public void siddhiIoFileTestCronSupportForDirectory() throws InterruptedException, IOException { + log.info("Siddhi IO File Test for cron Support for directory"); + String streams = "" + + "@App:name('TestSiddhiApp')" + + "@source(type='file',mode='text.full'," + + "dir.uri='file:/" + dirUri + "/text_full_single', " + + "action.after.process='move', tailing='false', cron.expression='*/5 * * * * ?', " + + "move.after.process='file:/" + moveAfterProcessDir + "/text_full', " + + "@map(type='json'))" + + "define stream FooStream (symbol string, price float, volume long); " + + "define stream BarStream (symbol string, price float, volume long); "; + + String query = "" + + "from FooStream " + + "select * " + + "insert into BarStream; "; + + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("BarStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.incrementAndGet(); + log.info("Count is : " + n + " Time is : " + new Date(System.currentTimeMillis())); + for (Event event : events) { + AssertJUnit.assertTrue(companies.contains(event.getData(0).toString())); + } + } + }); + + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(waitTime, 1, count, timeout); + File file2 = new File(dirUri + "/text_full/cloudbees.json"); + File file3 = new File(dirUri + "/text_full_single/cloudbees.json"); + FileUtils.copyFile(file2, file3); + SiddhiTestHelper.waitForEvents(waitTime, 2, count, timeout); + + File file = new File(moveAfterProcessDir + "/text_full"); + AssertJUnit.assertEquals(2, (Objects.requireNonNull(file.list()).length)); + + //assert event count + AssertJUnit.assertEquals("Number of events", 2, count.get()); + siddhiAppRuntime.shutdown(); + } + + @Test + public void siddhiIOFileTestCronExpressionInDirectory() throws InterruptedException, IOException { + log.info("Siddhi IO File for Cron Support to check whether Cron Expression satisfies the time"); + long[] time = new long[3]; + String streams = "" + + "@App:name('TestSiddhiApp')" + + "@source(type='file', dir.uri='file:/" + dirUri + "/text_full_single', " + + "action.after.process='move', tailing='false', cron.expression='*/5 * * * * ?', " + + "move.after.process='file:/" + moveAfterProcessDir + "/text_full', " + + "@map(type='json'))" + + "define stream FooStream (symbol string, price float, volume long); " + + "define stream BarStream (symbol string, price float, volume long); "; + + String query = "" + + "from FooStream " + + "select * " + + "insert into BarStream; "; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + siddhiAppRuntime.addCallback("BarStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.incrementAndGet(); + time[n] = System.currentTimeMillis(); + log.info("Count is : " + n + " Time is : " + new Date(System.currentTimeMillis())); + for (Event event : events) { + AssertJUnit.assertTrue(companies.contains(event.getData(0).toString())); + } + } + }); + + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(waitTime, 1, count, timeout); + File file2 = new File(dirUri + "/text_full/cloudbees.json"); + File file3 = new File(dirUri + "/text_full_single/cloudbees.json"); + FileUtils.copyFile(file2, file3); + SiddhiTestHelper.waitForEvents(waitTime, 2, count, timeout); + long val = time[2] - time[1]; + log.info("Difference between " + time[2] + " and " + time[1] + " : " + val); + if (val < 4000) { + AssertJUnit.fail("Cron Time is not satisfied"); + } else { + //assert event count + AssertJUnit.assertEquals("Number of events", 2, count.get()); + } + siddhiAppRuntime.shutdown(); + } + + @Test(expectedExceptions = SiddhiAppCreationException.class) + public void siddhiIoFileTest10() throws InterruptedException { + log.info("Cron is not null and action.after.process is in default value"); + String streams = "" + + "@App:name('SiddhiApp')" + + "@source(type='file', file.uri='file:/" + dirUri + "/text_full_single/apache.json', " + + "move.after.process='file:/" + moveAfterProcessDir + "/apache.json', " + + "cron.expression='*/5 * * * * ?', tailing='false', @map(type='csv'))\n" + + "define stream InputStream (symbol string, price float, volume long);" + + "@sink(type='log')" + + "define stream OutputStream (symbol string, price float, volume long);"; + + String query = "" + + "from InputStream " + + "select * " + + "insert into OutputStream; "; + + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(100, 0, count.get(), 1000); + siddhiAppRuntime.shutdown(); + } + + @Test(expectedExceptions = SiddhiAppCreationException.class) + public void siddhiIoFileTest11() throws InterruptedException { + log.info("Cron is not null tailing is true"); + String streams = "" + + "@App:name('SiddhiApp')" + + "@source(type='file', file.uri='file:/" + dirUri + "/text_full_single/apache.json', " + + "action.after.process='MOVE', cron.expression='*/5 * * * * ?', " + + "move.after.process='file:/" + moveAfterProcessDir + "/apache.json', " + + "tailing='true', @map(type='csv'))\n" + + "define stream InputStream (symbol string, price float, volume long);" + + "@sink(type='log')" + + "define stream OutputStream (symbol string, price float, volume long);"; + + String query = "" + + "from InputStream " + + "select * " + + "insert into OutputStream; "; + + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(100, 0, count.get(), 1000); + siddhiAppRuntime.shutdown(); + } + + @Test(expectedExceptions = SiddhiAppCreationException.class) + public void siddhiIoFileTest12() throws InterruptedException { + log.info("Move after process is null but cron is not null"); + String streams = "" + + "@App:name('SiddhiApp')" + + "@source(type='file', file.uri='file:/" + dirUri + "/text_full_single/apache.json', " + + "action.after.process='MOVE', " + + "cron.expression='*/5 * * * * ?', tailing='false', @map(type='csv'))\n" + + "define stream InputStream (symbol string, price float, volume long);" + + "@sink(type='log')" + + "define stream OutputStream (symbol string, price float, volume long);"; + + String query = "" + + "from InputStream " + + "select * " + + "insert into OutputStream; "; + + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(100, 0, count.get(), 1000); + siddhiAppRuntime.shutdown(); + } + + @Test(expectedExceptions = SiddhiAppCreationException.class) + public void siddhiIoFileTest13() throws InterruptedException { + log.info("Cron expression is not valid"); + String streams = "" + + "@App:name('SiddhiApp')" + + "@source(type='file', file.uri='file:/" + dirUri + "/text_full_single/apache.json', " + + "action.after.process='MOVE', move.after.process='file:/" + moveAfterProcessDir + "/apache.json', " + + "cron.expression='* 5 * * * * ?', tailing='false', @map(type='csv'))\n" + + "define stream InputStream (symbol string, price float, volume long);" + + "@sink(type='log')" + + "define stream OutputStream (symbol string, price float, volume long);"; + + String query = "" + + "from InputStream " + + "select * " + + "insert into OutputStream; "; + + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(100, 0, count.get(), 1000); + siddhiAppRuntime.shutdown(); + } } diff --git a/component/src/test/resources/testng.xml b/component/src/test/resources/testng.xml index 6ee7949b..42c327db 100644 --- a/component/src/test/resources/testng.xml +++ b/component/src/test/resources/testng.xml @@ -22,8 +22,8 @@ - +