Skip to content

Commit

Permalink
Add Support for Activating and Deactivating File Inbound Endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
sajinieKavindya committed Dec 13, 2024
1 parent 6d45b9d commit b747327
Show file tree
Hide file tree
Showing 35 changed files with 876 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQTask;
import org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager;

import java.util.Objects;

import static org.wso2.carbon.inbound.endpoint.common.Constants.SUPER_TENANT_DOMAIN_NAME;

/**
Expand All @@ -44,6 +46,7 @@ public abstract class InboundOneTimeTriggerRequestProcessor implements InboundRe
protected SynapseEnvironment synapseEnvironment;
protected String name;
protected boolean coordination;
protected boolean startInPausedMode;

private OneTimeTriggerInboundRunner inboundRunner;
private Thread runningThread;
Expand Down Expand Up @@ -134,4 +137,27 @@ public void destroy(boolean removeTask) {
}
}
}

@Override
public boolean activate() {

return false;
}

@Override
public boolean deactivate() {

return false;
}

@Override
public boolean isDeactivated() {

if (Objects.nonNull(startUpController)) {
return !startUpController.isTaskActive();
} else if (Objects.nonNull(runningThread)) {
return !runningThread.isAlive();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.wso2.carbon.inbound.endpoint.persistence.InboundEndpointsDataStore;
import org.wso2.carbon.inbound.endpoint.protocol.jms.JMSTask;
import org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager;
import org.wso2.micro.integrator.ntask.core.TaskUtils;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -43,6 +44,7 @@ public abstract class InboundRequestProcessorImpl implements InboundRequestProce
protected long interval;
protected String name;
protected boolean coordination;
protected boolean startInPausedMode;

private List<StartUpController> startUpControllersList = new ArrayList<>();
private HashMap<Thread, InboundRunner> inboundRunnersThreadsMap = new HashMap<>();
Expand All @@ -63,8 +65,8 @@ public InboundRequestProcessorImpl() {
* @param endpointPostfix
*/
protected void start(InboundTask task, String endpointPostfix) {
log.info("Starting the inbound endpoint " + name + ", with coordination " + coordination + ". Interval : "
+ interval + ". Type : " + endpointPostfix);
log.info("Starting the inbound endpoint [" + name + "] " + (startInPausedMode ? "in suspended mode" : "")
+ ", with coordination " + coordination + ". Interval : " + interval + ". Type : " + endpointPostfix);
if (coordination) {
try {
TaskDescription taskDescription = new TaskDescription();
Expand All @@ -78,6 +80,10 @@ protected void start(InboundTask task, String endpointPostfix) {
taskDescription.setIntervalInMs(true);
taskDescription.addResource(TaskDescription.INSTANCE, task);
taskDescription.addResource(TaskDescription.CLASSNAME, task.getClass().getName());
taskDescription.setTaskImplClassName(task.getClass().getName());
taskDescription.addProperty(TaskUtils.TASK_OWNER_PROPERTY, TaskUtils.TASK_BELONGS_TO_INBOUND_ENDPOINT);
taskDescription.addProperty(TaskUtils.TASK_OWNER_NAME, name);
taskDescription.addProperty(TaskUtils.START_IN_PAUSED_MODE, String.valueOf(startInPausedMode));
StartUpController startUpController = new StartUpController();
startUpController.setTaskDescription(taskDescription);
startUpController.init(synapseEnvironment);
Expand All @@ -96,12 +102,22 @@ protected void start(InboundTask task, String endpointPostfix) {
}
} else {

startInboundRunnerThread(task, Constants.SUPER_TENANT_DOMAIN_NAME, false);
startInboundRunnerThread(task, Constants.SUPER_TENANT_DOMAIN_NAME, false, startInPausedMode);
}
}

private void startInboundRunnerThread(InboundTask task, String tenantDomain, boolean mgrOverride) {
InboundRunner inboundRunner = new InboundRunner(task, interval, tenantDomain, mgrOverride);
/**
* Starts a new thread to execute the given inbound task by creating a new {@link InboundRunner} instance
* and running it in a separate thread.
*
* @param task The inbound task to be executed by the thread.
* @param tenantDomain The tenant domain under which the task should be run.
* @param mgrOverride A flag indicating whether the manager override is enabled.
* @param startInPausedMode A flag indicating whether the task should start in paused mode.
*/
private void startInboundRunnerThread(InboundTask task, String tenantDomain, boolean mgrOverride,
boolean startInPausedMode) {
InboundRunner inboundRunner = new InboundRunner(task, interval, tenantDomain, mgrOverride, startInPausedMode);
Thread runningThread = new Thread(inboundRunner);
inboundRunnersThreadsMap.put(runningThread, inboundRunner);
runningThread.start();
Expand Down Expand Up @@ -140,4 +156,105 @@ public void destroy() {
}
}

/**
* Activates the Inbound Endpoint by activating any associated startup controllers
* or resuming inbound runner threads if no startup controllers are present.
*
* <p>This method first checks if there are any startup controllers. If there are, it attempts to activate
* each controller and sets the success flag accordingly. If no startup controllers are present, it resumes
* any inbound runner threads that may be running. The method returns a boolean indicating whether
* the activation was successful.</p>
*
* @return {@code true} if at least one associated startup controller was successfully activated or inbound runner
* threads were resumed; {@code false} if activation task failed for all the startup controllers or
* if no startup controllers or inbound runner threads present.
*/
@Override
public boolean activate() {
log.info("Activating the Inbound Endpoint [" + name + "].");

boolean isSuccessfullyActivated = false;
if (!startUpControllersList.isEmpty()) {
for (StartUpController sc : startUpControllersList) {
if (sc.activateTask()) {
isSuccessfullyActivated = true;
} else {
if (log.isDebugEnabled()) {
log.debug("Failed to activate the consumer: " + sc.getTaskDescription().getName());
}
}
}
} else if (!inboundRunnersThreadsMap.isEmpty()) {
for (Map.Entry<Thread, InboundRunner> threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) {
InboundRunner inboundRunner = (InboundRunner) ((Map.Entry) threadInboundRunnerEntry).getValue();
inboundRunner.resume();
}
isSuccessfullyActivated = true;
}
return isSuccessfullyActivated;
}

/**
* Deactivates the Inbound Endpoint by deactivating any associated startup controllers
* or pausing inbound runner threads if no startup controllers are present.
*
* <p>This method first checks if there are any startup controllers. If there are, it attempts to deactivate
* each controller and sets the success flag accordingly. If no startup controllers are present, it pauses
* any inbound runner threads that may be running. The method returns a boolean indicating whether
* the deactivation was successful.</p>
*
* @return {@code true} if all associated startup controllers were successfully deactivated or inbound runner threads
* were paused; {@code false} if any deactivation task failed.
*/
@Override
public boolean deactivate() {
log.info("Deactivating the Inbound Endpoint [" + name + "].");

boolean isSuccessfullyDeactivated = true;
if (!startUpControllersList.isEmpty()) {
for (StartUpController sc : startUpControllersList) {
if (!sc.deactivateTask()) {
if (log.isDebugEnabled()) {
log.debug("Failed to deactivate the consumer: " + sc.getTaskDescription().getName());
}
isSuccessfullyDeactivated = false;
}
}
} else if (!inboundRunnersThreadsMap.isEmpty()) {
for (Map.Entry<Thread, InboundRunner> threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) {
InboundRunner inboundRunner = (InboundRunner) ((Map.Entry<?, ?>) threadInboundRunnerEntry).getValue();
inboundRunner.pause();
}
}
return isSuccessfullyDeactivated;
}

/**
* Checks if the Inbound Endpoint is deactivated. This method checks the status of any associated
* startup controllers or inbound runner threads. The endpoint is considered deactivated if all
* startup controllers are inactive and all inbound runner threads are paused.
*
* @return {@code true} if all startup controllers are inactive and all inbound runner threads are paused;
* {@code false} if any startup controller is active or any inbound runner thread is not paused.
*/
@Override
public boolean isDeactivated() {
if (!startUpControllersList.isEmpty()) {
for (StartUpController sc : startUpControllersList) {
if (sc.isTaskActive()) {
// Inbound Endpoint is considered active if at least one consumer is alive.
return false;
}
}
} else if (!inboundRunnersThreadsMap.isEmpty()) {
for (Map.Entry<Thread, InboundRunner> threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) {
InboundRunner inboundRunner = (InboundRunner) ((Map.Entry<?, ?>) threadInboundRunnerEntry).getValue();
if (!inboundRunner.isPaused()) {
// Inbound Endpoint is considered active if at least one consumer is alive.
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class InboundRunner implements Runnable {
private long interval;

private volatile boolean execute = true;
private volatile boolean isPaused;
private volatile boolean init = false;
// Following will be used to calculate the sleeping interval
private long lastRuntime;
Expand All @@ -43,19 +44,57 @@ public class InboundRunner implements Runnable {
private static final String CLUSTERING_PATTERN = "clusteringPattern";
private static final String CLUSTERING_PATTERN_WORKER_MANAGER = "WorkerManager";
private static final Log log = LogFactory.getLog(InboundRunner.class);
private final Object lock = new Object();

public InboundRunner(InboundTask task, long interval, String tenantDomain, boolean mgrOverride) {
public InboundRunner(InboundTask task, long interval, String tenantDomain, boolean mgrOverride, boolean startInPausedMode) {
this.task = task;
this.interval = interval;
this.tenantDomain = tenantDomain;
this.runOnManagerOverride = mgrOverride;
this.isPaused = startInPausedMode;
}

/**
* Pauses the execution of the thread.
* <p>
* This method sets the {@code isPaused} flag to {@code true}, indicating that
* the thread should pause its execution. Threads can check this flag and
* enter a wait state if necessary.
* </p>
*/
public void pause() {
synchronized (lock) {
isPaused = true;
}
}

/**
* Resumes the execution of a paused thread.
* <p>
* This method sets the {@code isPaused} flag to {@code false} and notifies
* all threads waiting on the {@code lock} object, allowing the thread to continue execution.
* </p>
*/
public void resume() {
synchronized (lock) {
isPaused = false;
lock.notifyAll(); // Wake up the thread
}
}

public boolean isPaused() {
return isPaused;
}

/**
* Exit the running while loop and terminate the thread
*/
protected void terminate() {
execute = false;
public void terminate() {
synchronized (lock) {
execute = false;
isPaused = false; // Ensure the thread is not stuck in pause
lock.notifyAll(); // Wake up the thread to exit
}
}

@Override
Expand All @@ -65,7 +104,22 @@ public void run() {
log.debug("Configuration context loaded. Running the Inbound Endpoint.");
// Run the poll cycles
while (execute) {
log.debug("Executing the Inbound Endpoint.");
synchronized (lock) {
while (isPaused && execute) {
try {
lock.wait(); // Pause the thread
} catch (InterruptedException e) {
if (log.isDebugEnabled()) {
log.debug("Inbound thread got interrupted while paused, but continuing...");
}
}
}
}
if (!execute) break; // Exit right away if the thread is terminated

if (log.isDebugEnabled()) {
log.debug("Executing the Inbound Endpoint.");
}
lastRuntime = getTime();
try {
task.taskExecute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,9 +1012,17 @@ protected Properties getInboundProperties() {

void destroy() {
fsManager.close();
this.close();
}

void close() {
isClosed = true;
}

void start() {
isClosed = false;
}

private String sanitizeFileUriWithSub(String originalFileUri) {
String[] splitUri = originalFileUri.split("\\?");
splitUri[0] = splitUri[0].substring(0, splitUri[0].length() - INCLUDE_SUB_DIR_SYMBOL_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ public VFSProcessor(InboundProcessorParams params) {
this.injectingSeq = params.getInjectingSeq();
this.onErrorSeq = params.getOnErrorSeq();
this.synapseEnvironment = params.getSynapseEnvironment();
this.startInPausedMode = params.startInPausedMode();
}

/**
* This will be called at the time of synapse artifact deployment.
*/
public void init() {
log.info("Inbound file listener " + name + " starting ...");
log.info("Inbound file listener [" + name + "] is initializing"
+ (this.startInPausedMode ? " but will remain in suspended mode..." : "..."));
fileScanner = new FilePollingConsumer(vfsProperties, name, synapseEnvironment, interval);
fileScanner.registerHandler(
new FileInjectHandler(injectingSeq, onErrorSeq, sequential, synapseEnvironment, vfsProperties));
Expand All @@ -95,6 +97,18 @@ public void update() {
// This will not be called for inbound endpoints
}

@Override
public boolean deactivate() {
fileScanner.close();
return super.deactivate();
}

@Override
public boolean activate() {
fileScanner.start();
return super.activate();
}

/**
* Remove inbound endpoints.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class GenericEventBasedListener extends InboundOneTimeTriggerEventBasedPr
private String onErrorSeq;
private String classImpl;
private boolean sequential;
private boolean startInPausedMode;
private static final Log log = LogFactory.getLog(GenericEventBasedListener.class);

private static final String ENDPOINT_POSTFIX = "CLASS" + COMMON_ENDPOINT_POSTFIX;
Expand Down Expand Up @@ -71,9 +72,24 @@ public GenericEventBasedListener(InboundProcessorParams params) {
this.onErrorSeq = params.getOnErrorSeq();
this.synapseEnvironment = params.getSynapseEnvironment();
this.classImpl = params.getClassImpl();
this.startInPausedMode = params.startInPausedMode();
}

public void init() {
/*
* The activate/deactivate functionality is not currently implemented
* for this Inbound Endpoint type.
*
* Therefore, the following check has been added to immediately return if the "suspend"
* attribute is set to true in the inbound endpoint configuration.
*
* Note: This implementation is temporary and should be revisited and improved once
* the activate/deactivate capability is implemented.
*/
if (startInPausedMode) {
log.info("Inbound endpoint [" + name + "] is currently suspended.");
return;
}
log.info("Inbound event based listener " + name + " for class " + classImpl + " starting ...");
try {
Class c = Class.forName(classImpl);
Expand Down
Loading

0 comments on commit b747327

Please sign in to comment.