Skip to content

Commit

Permalink
Merge with upstream/master
Browse files Browse the repository at this point in the history
  • Loading branch information
sahandilshan committed Jun 3, 2020
2 parents 85ee5fb + f9f8b79 commit 927d758
Show file tree
Hide file tree
Showing 9 changed files with 739 additions and 159 deletions.
292 changes: 178 additions & 114 deletions component/src/main/java/io/siddhi/extension/io/file/FileSource.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.extension.io.file.listeners;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.file.processors.FileProcessor;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.io.file.util.FileSourceConfiguration;
import io.siddhi.extension.io.file.util.VFSClientConnectorCallback;
import org.apache.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.wso2.carbon.messaging.BinaryCarbonMessage;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
import org.wso2.transport.file.connector.sender.VFSClientConnector;

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static io.siddhi.extension.io.file.util.Util.constructPath;
import static io.siddhi.extension.io.file.util.Util.generateProperties;
import static io.siddhi.extension.io.file.util.Util.getFileName;
import static io.siddhi.extension.io.file.util.Util.reProcessFileGenerateProperties;

/**
* FileCronExecutor is executed when the cron expression is given. If the current time satisfied by the cron
* expression then the file processing will be executed.
*/
public class FileCronExecutor implements Job {
private static final Logger log = Logger.getLogger(FileCronExecutor.class);

public FileCronExecutor() {
}

/**
* To initialize the cron job to execute at given cron expression
*/
public static void scheduleJob(FileSourceConfiguration fileSourceConfiguration,
SourceEventListener sourceEventListener, SiddhiAppContext siddhiAppContext) {
try {
JobKey jobKey = new JobKey(Constants.JOB_NAME, Constants.JOB_GROUP);

Scheduler scheduler = new StdSchedulerFactory().getScheduler();
fileSourceConfiguration.setScheduler(scheduler);
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
}
scheduler.start();
// JobDataMap used to access the object in the job class
JobDataMap dataMap = new JobDataMap();
dataMap.put(Constants.FILE_SOURCE_CONFIGURATION, fileSourceConfiguration);
dataMap.put(Constants.SOURCE_EVENT_LISTENER, sourceEventListener);

// Define instances of Jobs
JobDetail cron = JobBuilder.newJob(FileCronExecutor.class)
.usingJobData(dataMap)
.withIdentity(jobKey)
.build();
//Trigger the job to at given cron expression
Trigger trigger = TriggerBuilder
.newTrigger()
.withIdentity(Constants.TRIGGER_NAME, Constants.TRIGGER_GROUP)
.withSchedule(
CronScheduleBuilder.cronSchedule(fileSourceConfiguration.getCronExpression())).build();
// Tell quartz to schedule the job using our trigger
scheduler.scheduleJob(cron, trigger);
} catch (SchedulerException e) {
log.error("The error occurs at scheduler start in SiddhiApp " + siddhiAppContext.getName() + " : " + e);
}
}

/**
* Method gets called when the cron Expression satisfies the system time
*/
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
FileSourceConfiguration fileSourceConfiguration = (FileSourceConfiguration) dataMap.get(
Constants.FILE_SOURCE_CONFIGURATION);
SourceEventListener sourceEventListener = (SourceEventListener) dataMap.get(
Constants.SOURCE_EVENT_LISTENER);
File listeningFileObject = new File(fileSourceConfiguration.getUri());
if (listeningFileObject.isDirectory()) {
File[] listOfFiles = listeningFileObject.listFiles();
if (listOfFiles != null) {
for (File file : listOfFiles) {
if (file.isFile()) {
processFile(file.toURI().toString(), jobExecutionContext, sourceEventListener);
}
}
}
} else {
processFile(listeningFileObject.toURI().toString(), jobExecutionContext, sourceEventListener);
}
}

/**
* Action taken while processing a file
*/
public void processFile(String fileURI, JobExecutionContext jobExecutionContext,
SourceEventListener sourceEventListener) {
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
FileSourceConfiguration fileSourceConfiguration = (FileSourceConfiguration) dataMap.get(
Constants.FILE_SOURCE_CONFIGURATION);
FileProcessor fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, null);
VFSClientConnector vfsClientConnector = new VFSClientConnector();
vfsClientConnector.setMessageProcessor(fileProcessor);
Map<String, String> properties = generateProperties(fileSourceConfiguration, fileURI);
VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback();
initialProcessFile(vfsClientConnector, carbonCallback, properties, fileURI,
fileSourceConfiguration, fileProcessor);
}

public void initialProcessFile(VFSClientConnector vfsClientConnector, VFSClientConnectorCallback carbonCallback,
Map<String, String> properties, String fileURI,
FileSourceConfiguration fileSourceConfiguration, FileProcessor fileProcessor) {
vfsClientConnector.setMessageProcessor(fileProcessor);
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
fileURI.getBytes(StandardCharsets.UTF_8)), true);
try {
vfsClientConnector.send(carbonMessage, carbonCallback, properties);
try {
carbonCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileURI);
} catch (InterruptedException e) {
log.error(String.format("Failed to get callback from vfs-client for file '%s'.",
fileURI), e);
}
reProcessFile(vfsClientConnector, carbonCallback, properties, fileURI, fileSourceConfiguration);
} catch (ClientConnectorException e) {
log.error(String.format("Failed to provide file '%s' for consuming.", fileURI), e);
}
}

/**
* Method use to move file from one path to another if action.after.process is 'move'
*/
public void reProcessFile(VFSClientConnector vfsClientConnector,
VFSClientConnectorCallback vfsClientConnectorCallback,
Map<String, String> properties, String fileUri,
FileSourceConfiguration fileSourceConfiguration) {
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
fileUri.getBytes(StandardCharsets.UTF_8)), true);
String moveAfterProcess = fileSourceConfiguration.getMoveAfterProcess();
Map<String, String> reGeneratedProperties = reProcessFileGenerateProperties(fileSourceConfiguration, fileUri,
properties);
try {
File file = new File(fileSourceConfiguration.getUri());
if (file.isFile()) {
reGeneratedProperties.put(Constants.DESTINATION, moveAfterProcess);
} else {
String destination = constructPath(moveAfterProcess, getFileName(fileUri,
fileSourceConfiguration.getProtocolForMoveAfterProcess()));
if (destination != null) {
reGeneratedProperties.put(Constants.DESTINATION, destination);
}
}
vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, reGeneratedProperties);
vfsClientConnectorCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileUri);
} catch (ClientConnectorException e) {
log.error(String.format("Failure occurred in vfs-client while reading the file '%s '.", fileUri), e);
} catch (InterruptedException e) {
log.error(String.format("Failed to get callback from vfs-client for file '%s '.", fileUri), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

import static io.siddhi.extension.io.file.util.Util.constructPath;
import static io.siddhi.extension.io.file.util.Util.generateProperties;
import static io.siddhi.extension.io.file.util.Util.getFileName;
import static io.siddhi.extension.io.file.util.Util.reProcessFileGenerateProperties;

/**
* Test {@link RemoteFileSystemListener} implementation for testing purpose.
*/
Expand Down Expand Up @@ -89,14 +93,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
vfsClientConnector = new VFSClientConnector();
fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, metrics);
vfsClientConnector.setMessageProcessor(fileProcessor);
Map<String, String> properties = new HashMap<>();
properties.put(Constants.URI, fileURI);
properties.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE);
properties.put(Constants.ACTION, Constants.READ);
properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval());
properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY,
fileSourceConfiguration.getFileReadWaitTimeout());
properties.put(Constants.MODE, mode);
Map<String, String> properties = generateProperties(fileSourceConfiguration, fileURI);
VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback();
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(
ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true);
Expand All @@ -122,15 +119,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
vfsClientConnector = new VFSClientConnector();
fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, metrics);
vfsClientConnector.setMessageProcessor(fileProcessor);

Map<String, String> properties = new HashMap<>();
properties.put(Constants.URI, fileURI);
properties.put(Constants.READ_FILE_FROM_BEGINNING, Constants.TRUE);
properties.put(Constants.ACTION, Constants.READ);
properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval());
properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY,
fileSourceConfiguration.getFileReadWaitTimeout());
properties.put(Constants.MODE, mode);
Map<String, String> properties = generateProperties(fileSourceConfiguration, fileURI);
VFSClientConnectorCallback carbonCallback = new VFSClientConnectorCallback();
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(
ByteBuffer.wrap(fileURI.getBytes(StandardCharsets.UTF_8)), true);
Expand All @@ -153,15 +142,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
}
}
} else if (Constants.LINE.equalsIgnoreCase(mode) || Constants.REGEX.equalsIgnoreCase(mode)) {
Map<String, String> properties = new HashMap<>();
properties.put(Constants.ACTION, Constants.READ);
properties.put(Constants.MAX_LINES_PER_POLL, "10");
properties.put(Constants.POLLING_INTERVAL, fileSourceConfiguration.getFilePollingInterval());
properties.put(Constants.FILE_READ_WAIT_TIMEOUT_KEY,
fileSourceConfiguration.getFileReadWaitTimeout());
properties.put(Constants.MODE, mode);
properties.put(Constants.HEADER_PRESENT, fileSourceConfiguration.getHeaderPresent());
properties.put(Constants.READ_ONLY_HEADER, fileSourceConfiguration.getReadOnlyHeader());
Map<String, String> properties = generateProperties(fileSourceConfiguration, fileURI);
if (fileSourceConfiguration.isTailingEnabled()) {
fileSourceConfiguration.setTailedFileURI(fileURI);
if (metrics != null) {
Expand Down Expand Up @@ -189,7 +170,6 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent)
fileSourceConfiguration.getExecutorService().execute(fileServerExecutor);
}
} else {
properties.put(Constants.URI, fileURI);
vfsClientConnector = new VFSClientConnector();
fileProcessor = new FileProcessor(sourceEventListener, fileSourceConfiguration, metrics);
vfsClientConnector.setMessageProcessor(fileProcessor);
Expand Down Expand Up @@ -255,17 +235,18 @@ public void run() {
log.error(String.format("Failed to start the server for file '%s'. " +
"Hence starting to process next file.", fileURI));
carbonCallback.done(carbonMessage);
metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileURI), StreamStatus.ERROR);
if (metrics != null) {
metrics.getSourceFileStatusMap().replace(Utils.getShortFilePath(fileURI), StreamStatus.ERROR);
}
}
}
}

private void reProcessFile(VFSClientConnector vfsClientConnector,
VFSClientConnectorCallback vfsClientConnectorCallback,
Map<String, String> properties, String fileUri) {
String actionAfterProcess = fileSourceConfiguration.getActionAfterProcess();
properties.put(Constants.URI, fileUri);
properties.put(Constants.ACK_TIME_OUT, "1000");
Map<String, String> reGeneratedProperties = reProcessFileGenerateProperties(fileSourceConfiguration, fileUri,
properties);
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
fileUri.getBytes(StandardCharsets.UTF_8)), true);
String moveAfterProcess = fileSourceConfiguration.getMoveAfterProcess();
Expand All @@ -274,16 +255,14 @@ private void reProcessFile(VFSClientConnector vfsClientConnector,
}
try {
if (fileSourceConfiguration.getActionAfterProcess() != null) {
properties.put(Constants.URI, fileUri);
properties.put(Constants.ACTION, actionAfterProcess);
if (fileSourceConfiguration.getMoveAfterProcess() != null) {
String destination = constructPath(moveAfterProcess, getFileName(fileUri,
fileSourceConfiguration.getProtocolForMoveAfterProcess()));
if (destination != null) {
properties.put(Constants.DESTINATION, destination);
reGeneratedProperties.put(Constants.DESTINATION, destination);
}
}
vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, properties);
vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, reGeneratedProperties);
vfsClientConnectorCallback.waitTillDone(fileSourceConfiguration.getTimeout(), fileUri);
if (metrics != null) {
fileSourceConfiguration.getExecutorService().execute(() -> {
Expand All @@ -300,7 +279,7 @@ private void reProcessFile(VFSClientConnector vfsClientConnector,
}
log.error(String.format("Failure occurred in vfs-client while reading the file '%s'.", fileUri), e);
} catch (InterruptedException e) {
log.error(String.format("Failed to get callback from vfs-client for file '%s'.", fileUri), e);
log.error(String.format("Failed to get callback from vfs-client for file '%s'.", fileUri), e);
} finally {
if (metrics != null) {
metrics.setFilePath(fileUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
* Constants used in siddhi-io-file extension.
*/
public class Constants {
private Constants() {
}

/* configuration parameters*/
public static final String URI = "uri";
Expand Down Expand Up @@ -51,6 +49,7 @@ private Constants() {
public static final int WAIT_TILL_DONE = 5000;
public static final String HEADER_PRESENT = "header.present";
public static final String READ_ONLY_HEADER = "read.only.header";
public static final String CRON_EXPRESSION = "cron.expression";

/* configuration param values*/
public static final String MOVE = "move";
Expand Down Expand Up @@ -92,6 +91,13 @@ private Constants() {
public static final String ANNOTATION_TYPE_ELEMENT_NAME = "type";
public static final String MAP_ANNOTATION_BINARY_TYPE = "binary";
public static final String SOURCE_ANNOTATION_FILE_TYPE_NAME = "file";
public static final String SOURCE_EVENT_LISTENER = "SourceEventListener";
public static final String FILE_SOURCE_CONFIGURATION = "FileSourceConfiguration";
public static final String JOB_GROUP = "JobGroup";
public static final String JOB_NAME = "JobName";
public static final String TRIGGER_NAME = "TriggerName";
public static final String TRIGGER_GROUP = "TriggerGroup";


/*source property keys*/
public static final String TAILED_FILE = "tailedFile";
Expand Down
Loading

0 comments on commit 927d758

Please sign in to comment.