Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for https://github.com/wso2/streaming-integrator/issues/165 #118

Merged
merged 8 commits into from
Feb 17, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,17 @@ protected Object[] process(Object[] data) {
}
}
} catch (FileSystemException e) {
throw new SiddhiAppRuntimeException("Exception occurred when getting the file type " +
uri, e);
log.error("Exception occurred when getting the file type " + uri, e);
return new Object[]{false};
} finally {
if (rootFileObject != null) {
try {
rootFileObject.close();
} catch (FileSystemException e) {
throw new SiddhiAppRuntimeException("Exception occurred when closing file object for " +
log.error("Exception occurred when closing file object for " +
rootFileObject.getName().getPath(), e);
return new Object[]{false};

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
Expand Down Expand Up @@ -55,13 +54,13 @@
description = "This function performs copying file from one directory to another.\n",
parameters = {
@Parameter(
name = "uri",
name = "path",
description = "Absolute file or directory path.",
type = DataType.STRING,
dynamic = true
),
@Parameter(
name = "destination.dir.uri",
name = "destination.dir.path",
description = "Absolute file path to the destination directory.\n" +
"Note: Parent folder structure will be created if it does not exist.",
type = DataType.STRING,
Expand All @@ -73,14 +72,16 @@
"Note: Add an empty string to match all files",
type = DataType.STRING,
optional = true,
defaultValue = "<Empty_String>"
defaultValue = "<Empty_String>",
dynamic = true
),
@Parameter(
name = "exclude.root.dir",
description = "Exclude parent folder when moving the content.",
type = DataType.BOOL,
optional = true,
defaultValue = "false"
defaultValue = "false",
dynamic = true
),
@Parameter(
name = "file.system.options",
Expand All @@ -95,16 +96,16 @@
},
parameterOverloads = {
@ParameterOverload(
parameterNames = {"uri", "destination.dir.uri"}
parameterNames = {"path", "destination.dir.path"}
),
@ParameterOverload(
parameterNames = {"uri", "destination.dir.uri", "include.by.regexp"}
parameterNames = {"path", "destination.dir.path", "include.by.regexp"}
),
@ParameterOverload(
parameterNames = {"uri", "destination.dir.uri", "include.by.regexp", "exclude.root.dir"}
parameterNames = {"path", "destination.dir.path", "include.by.regexp", "exclude.root.dir"}
),
@ParameterOverload(
parameterNames = {"uri", "destination.dir.uri", "include.by.regexp", "exclude.root.dir",
parameterNames = {"path", "destination.dir.path", "include.by.regexp", "exclude.root.dir",
"file.system.options"}
)
},
Expand Down Expand Up @@ -193,19 +194,22 @@ protected Object[] process(Object[] data) {
boolean excludeParentFolder = false;
if (inputExecutorLength == 3) {
regex = (String) data[2];
} else if (inputExecutorLength == 4) {
regex = (String) data[2];
excludeParentFolder = (Boolean) data[3];
}
if (pattern == null) {
pattern = Pattern.compile(regex);
}
try {
FileObject rootFileObject = Utils.getFileObject(uri, fileSystemOptions);
if (!rootFileObject.exists()) {
return new Object[]{false};
}
if (rootFileObject.getType().hasContent() &&
pattern.matcher(rootFileObject.getName().getBaseName()).lookingAt()) {
moveFileToDestination(rootFileObject, destinationDirUri, pattern);
} else if (rootFileObject.getType().hasChildren()) {
if (inputExecutorLength == 4) {
excludeParentFolder = (Boolean) data[3];
}
if (!excludeParentFolder) {
destinationDirUri =
destinationDirUri.concat(File.separator + rootFileObject.getName().getBaseName());
Expand All @@ -216,21 +220,16 @@ protected Object[] process(Object[] data) {
if (sourceFileObject.getType().hasContent() &&
pattern.matcher(sourceFileObject.getName().getBaseName()).lookingAt()) {
String sourcePartialUri = sourceFileObject.getName().getPath();
if (excludeParentFolder) {
sourcePartialUri = sourcePartialUri.replace(uri +
rootFileObject.getName().getBaseName(), "");
} else {
sourcePartialUri = sourcePartialUri.replace(uri, "").
replace(sourceFileObject.getName().getBaseName(), "");
}
sourcePartialUri = sourcePartialUri.replace(rootFileObject.getName().getPath(), "").
replace(sourceFileObject.getName().getBaseName(), "");
moveFileToDestination(sourceFileObject, destinationDirUri + sourcePartialUri,
pattern);
}
}
}
} catch (FileSystemException e) {
throw new SiddhiAppRuntimeException("Exception occurred when getting the file type " +
uri, e);
log.error("Exception occurred when getting the file type " + uri, e);
return new Object[]{false};
}
return new Object[]{true};
}
Expand All @@ -250,7 +249,8 @@ public void stop() {

}

private void moveFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern) {
private void moveFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern)
throws FileSystemException {
try {
String fileName = sourceFileObject.getName().getBaseName();
String destinationPath;
Expand Down Expand Up @@ -280,8 +280,7 @@ private void moveFileToDestination(FileObject sourceFileObject, String destinati
if (fileMoveMetrics != null) {
fileMoveMetrics.getMoveMetric(0);
}
throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when moving for file: " +
sourceFileObject.getName().getPath(), e);
throw e;
}
}
}
39 changes: 27 additions & 12 deletions component/src/main/java/io/siddhi/extension/io/file/FileSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ public class FileSource extends Source<FileSource.FileSourceState> {
private FileSourceConfiguration fileSourceConfiguration;
private RemoteFileSystemConnectorFactory fileSystemConnectorFactory;
private FileSourceServiceProvider fileSourceServiceProvider;
private RemoteFileSystemServerConnector fileSystemServerConnector;
private String filePointer = "0";
private String[] requiredProperties;
private boolean isTailingEnabled = true;
Expand All @@ -401,6 +400,7 @@ public class FileSource extends Source<FileSource.FileSourceState> {
private long timeout = 5000;
private boolean fileServerConnectorStarted = false;
private ScheduledFuture scheduledFuture;
private FileSourcePoller fileSourcePoller;
private ConnectionCallback connectionCallback;
private String headerPresent;
private String readOnlyHeader;
Expand Down Expand Up @@ -577,7 +577,6 @@ public void connect(ConnectionCallback connectionCallback, FileSourceState fileS
@Override
public void disconnect() {
try {
fileSystemServerConnector = null;
if (isTailingEnabled && fileSourceConfiguration.getFileServerConnector() != null) {
fileSourceConfiguration.getFileServerConnector().stop();
fileSourceConfiguration.setFileServerConnector(null);
Expand Down Expand Up @@ -618,11 +617,23 @@ public void pause() {
}

public void resume() {
try {
updateSourceConf();
deployServers();
} catch (ConnectionUnavailableException e) {
throw new SiddhiAppRuntimeException("Failed to resume siddhi app runtime.", e);
if (dirUri != null && scheduledFuture != null) {
this.scheduledFuture = siddhiAppContext.getScheduledExecutorService().
scheduleAtFixedRate(fileSourcePoller, 0, 1, TimeUnit.SECONDS);
}
if (isTailingEnabled && fileSourceConfiguration.getFileServerConnector() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tailing disabled, and if its a large file which will take time more than the persisting interval, the file processing may stop. Isnt it?
Shall we use filePointer and resolve that issue?
This may affect for the files that will be processing in a given dir.uri as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ramindu,
In the existing pause() method, the file processing thread is not stopped which is the reason why I thought that there is nothing to resume in the resume() method, assuming that the file will be processed anyway. But let's check this further, to make sure that happens. I have created a separate issue to track this particular issue: #120.

Thanks

FileServerConnector fileServerConnector = fileSourceConfiguration.getFileServerConnector();
Runnable runnableServer = () -> {
try {
fileServerConnector.start();
} catch (ServerConnectorException e) {
log.error(String.format("For the siddhi app '" + siddhiAppContext.getName() +
",' failed to resume the server for file '%s'." +
"Hence starting to process next file.", fileUri));
}
};
fileSourceConfiguration.getExecutorService().execute(runnableServer);
this.fileServerConnectorStarted = true;
}
}

Expand Down Expand Up @@ -756,9 +767,11 @@ private void deployServers() throws ConnectionUnavailableException {
FileSystemListener fileSystemListener = new FileSystemListener(sourceEventListener,
fileSourceConfiguration, metrics, schemeFileOptions);
try {
fileSystemServerConnector = fileSystemConnectorFactory.createServerConnector(
siddhiAppContext.getName(), properties, fileSystemListener);
RemoteFileSystemServerConnector fileSystemServerConnector =
fileSystemConnectorFactory.createServerConnector(
siddhiAppContext.getName(), properties, fileSystemListener);
fileSourceConfiguration.setFileSystemServerConnector(fileSystemServerConnector);

FileSourcePoller.CompletionCallback fileSourceCompletionCallback = (Throwable error) ->
{
if (error.getClass().equals(RemoteFileSystemConnectorException.class)) {
Expand All @@ -768,7 +781,7 @@ private void deployServers() throws ConnectionUnavailableException {
throw new SiddhiAppRuntimeException("File Polling mode run failed.", error);
}
};
FileSourcePoller fileSourcePoller =
this.fileSourcePoller =
new FileSourcePoller(fileSystemServerConnector, siddhiAppContext.getName());
fileSourcePoller.setCompletionCallback(fileSourceCompletionCallback);
this.scheduledFuture = siddhiAppContext.getScheduledExecutorService().
Expand Down Expand Up @@ -950,8 +963,8 @@ public Map<String, Object> snapshot() {
filePointer = FileSource.this.fileSourceConfiguration.getFilePointer();
state.put(Constants.FILE_POINTER, fileSourceConfiguration.getFilePointer());
state.put(Constants.TAILED_FILE, fileSourceConfiguration.getTailedFileURIMap());
state.put(Constants.TAILING_REGEX_STRING_BUILDER,
fileSourceConfiguration.getTailingRegexStringBuilder());
state.put(Constants.TAILING_REGEX_STRING_BUILDER, fileSourceConfiguration.getTailingRegexStringBuilder());
state.put(Constants.PROCESSED_FILE_LIST, fileSourceConfiguration.getProcessedFileList());
return state;
}

Expand All @@ -963,6 +976,8 @@ public void restore(Map<String, Object> map) {
fileSourceConfiguration.setTailedFileURIMap(tailedFileURIMap);
fileSourceConfiguration.updateTailingRegexStringBuilder(
(StringBuilder) map.get(Constants.TAILING_REGEX_STRING_BUILDER));
fileSourceConfiguration.setProcessedFileList(
(List<String>) map.get(Constants.PROCESSED_FILE_LIST));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
RemoteFileSystemEvent remoteFileSystemEvent = (RemoteFileSystemEvent) remoteFileSystemBaseEvent;
for (int i = 0; i < remoteFileSystemEvent.getAddedFiles().size(); i++) {
String fileURI = remoteFileSystemEvent.getAddedFiles().get(i).getPath();
if (!fileSourceConfiguration.addFileToListIfAbsent(fileURI)) {
continue;
}
VFSClientConnector vfsClientConnector;
FileProcessor fileProcessor;
fileSourceConfiguration.setCurrentlyReadingFileURI(fileURI);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class Constants {

public static final String UTF_8 = "UTF-8";

public static final String PROCESSED_FILE_LIST = "processedFileList";

/*prometheus reporte values*/
public static final String PROMETHEUS_REPORTER_NAME = "prometheus";
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class FileSourceConfiguration {

private FileServerConnector fileServerConnector;
private RemoteFileSystemServerConnector fileSystemServerConnector;
private List<String> processedFileList = new ArrayList<>();
private List<String> tailedFileURIMap;
private ExecutorService executorService = null;
private String[] requiredProperties = null;
Expand Down Expand Up @@ -307,4 +308,25 @@ public void setScheduler(Scheduler scheduler) {
public Scheduler getScheduler() {
return scheduler;
}

public List<String> getProcessedFileList() {
return processedFileList;
}

public void setProcessedFileList(List<String> processedFileList) {
this.processedFileList = processedFileList;
}

/**
* Maintains the processedFileList. Adds a file URL to the list if it has not being added already.
* @param fileURI the file URI which needs to be added to the list
* @return true if the fileURI is absent in the current list and adds to it; false if the URI is already present.
*/
public boolean addFileToListIfAbsent(String fileURI) {
if (processedFileList.contains(fileURI)) {
return false;
}
processedFileList.add(fileURI);
return true;
}
}
Loading