diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java index 94ded925d4..675cc3e360 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundOneTimeTriggerRequestProcessor.java @@ -30,6 +30,8 @@ import org.wso2.carbon.inbound.endpoint.protocol.rabbitmq.RabbitMQTask; import org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager; +import java.util.Objects; + import static org.wso2.carbon.inbound.endpoint.common.Constants.SUPER_TENANT_DOMAIN_NAME; /** @@ -44,6 +46,7 @@ public abstract class InboundOneTimeTriggerRequestProcessor implements InboundRe protected SynapseEnvironment synapseEnvironment; protected String name; protected boolean coordination; + protected boolean startInPausedMode; private OneTimeTriggerInboundRunner inboundRunner; private Thread runningThread; @@ -134,4 +137,27 @@ public void destroy(boolean removeTask) { } } } + + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + if (Objects.nonNull(startUpController)) { + return !startUpController.isTaskActive(); + } else if (Objects.nonNull(runningThread)) { + return !runningThread.isAlive(); + } + return true; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java index e9c1f35f09..a55c2753fb 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRequestProcessorImpl.java @@ -27,6 +27,7 @@ import org.wso2.carbon.inbound.endpoint.persistence.InboundEndpointsDataStore; import org.wso2.carbon.inbound.endpoint.protocol.jms.JMSTask; import org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager; +import org.wso2.micro.integrator.ntask.core.TaskUtils; import java.util.ArrayList; import java.util.HashMap; @@ -43,6 +44,7 @@ public abstract class InboundRequestProcessorImpl implements InboundRequestProce protected long interval; protected String name; protected boolean coordination; + protected boolean startInPausedMode; private List startUpControllersList = new ArrayList<>(); private HashMap inboundRunnersThreadsMap = new HashMap<>(); @@ -63,8 +65,8 @@ public InboundRequestProcessorImpl() { * @param endpointPostfix */ protected void start(InboundTask task, String endpointPostfix) { - log.info("Starting the inbound endpoint " + name + ", with coordination " + coordination + ". Interval : " - + interval + ". Type : " + endpointPostfix); + log.info("Starting the inbound endpoint [" + name + "] " + (startInPausedMode ? "in suspended mode" : "") + + ", with coordination " + coordination + ". Interval : " + interval + ". Type : " + endpointPostfix); if (coordination) { try { TaskDescription taskDescription = new TaskDescription(); @@ -78,6 +80,10 @@ protected void start(InboundTask task, String endpointPostfix) { taskDescription.setIntervalInMs(true); taskDescription.addResource(TaskDescription.INSTANCE, task); taskDescription.addResource(TaskDescription.CLASSNAME, task.getClass().getName()); + taskDescription.setTaskImplClassName(task.getClass().getName()); + taskDescription.addProperty(TaskUtils.TASK_OWNER_PROPERTY, TaskUtils.TASK_BELONGS_TO_INBOUND_ENDPOINT); + taskDescription.addProperty(TaskUtils.TASK_OWNER_NAME, name); + taskDescription.addProperty(TaskUtils.START_IN_PAUSED_MODE, String.valueOf(startInPausedMode)); StartUpController startUpController = new StartUpController(); startUpController.setTaskDescription(taskDescription); startUpController.init(synapseEnvironment); @@ -96,12 +102,22 @@ protected void start(InboundTask task, String endpointPostfix) { } } else { - startInboundRunnerThread(task, Constants.SUPER_TENANT_DOMAIN_NAME, false); + startInboundRunnerThread(task, Constants.SUPER_TENANT_DOMAIN_NAME, false, startInPausedMode); } } - private void startInboundRunnerThread(InboundTask task, String tenantDomain, boolean mgrOverride) { - InboundRunner inboundRunner = new InboundRunner(task, interval, tenantDomain, mgrOverride); + /** + * Starts a new thread to execute the given inbound task by creating a new {@link InboundRunner} instance + * and running it in a separate thread. + * + * @param task The inbound task to be executed by the thread. + * @param tenantDomain The tenant domain under which the task should be run. + * @param mgrOverride A flag indicating whether the manager override is enabled. + * @param startInPausedMode A flag indicating whether the task should start in paused mode. + */ + private void startInboundRunnerThread(InboundTask task, String tenantDomain, boolean mgrOverride, + boolean startInPausedMode) { + InboundRunner inboundRunner = new InboundRunner(task, interval, tenantDomain, mgrOverride, startInPausedMode); Thread runningThread = new Thread(inboundRunner); inboundRunnersThreadsMap.put(runningThread, inboundRunner); runningThread.start(); @@ -140,4 +156,105 @@ public void destroy() { } } + /** + * Activates the Inbound Endpoint by activating any associated startup controllers + * or resuming inbound runner threads if no startup controllers are present. + * + *

This method first checks if there are any startup controllers. If there are, it attempts to activate + * each controller and sets the success flag accordingly. If no startup controllers are present, it resumes + * any inbound runner threads that may be running. The method returns a boolean indicating whether + * the activation was successful.

+ * + * @return {@code true} if at least one associated startup controller was successfully activated or inbound runner + * threads were resumed; {@code false} if activation task failed for all the startup controllers or + * if no startup controllers or inbound runner threads present. + */ + @Override + public boolean activate() { + log.info("Activating the Inbound Endpoint [" + name + "]."); + + boolean isSuccessfullyActivated = false; + if (!startUpControllersList.isEmpty()) { + for (StartUpController sc : startUpControllersList) { + if (sc.activateTask()) { + isSuccessfullyActivated = true; + } else { + if (log.isDebugEnabled()) { + log.debug("Failed to activate the consumer: " + sc.getTaskDescription().getName()); + } + } + } + } else if (!inboundRunnersThreadsMap.isEmpty()) { + for (Map.Entry threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) { + InboundRunner inboundRunner = (InboundRunner) ((Map.Entry) threadInboundRunnerEntry).getValue(); + inboundRunner.resume(); + } + isSuccessfullyActivated = true; + } + return isSuccessfullyActivated; + } + + /** + * Deactivates the Inbound Endpoint by deactivating any associated startup controllers + * or pausing inbound runner threads if no startup controllers are present. + * + *

This method first checks if there are any startup controllers. If there are, it attempts to deactivate + * each controller and sets the success flag accordingly. If no startup controllers are present, it pauses + * any inbound runner threads that may be running. The method returns a boolean indicating whether + * the deactivation was successful.

+ * + * @return {@code true} if all associated startup controllers were successfully deactivated or inbound runner threads + * were paused; {@code false} if any deactivation task failed. + */ + @Override + public boolean deactivate() { + log.info("Deactivating the Inbound Endpoint [" + name + "]."); + + boolean isSuccessfullyDeactivated = true; + if (!startUpControllersList.isEmpty()) { + for (StartUpController sc : startUpControllersList) { + if (!sc.deactivateTask()) { + if (log.isDebugEnabled()) { + log.debug("Failed to deactivate the consumer: " + sc.getTaskDescription().getName()); + } + isSuccessfullyDeactivated = false; + } + } + } else if (!inboundRunnersThreadsMap.isEmpty()) { + for (Map.Entry threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) { + InboundRunner inboundRunner = (InboundRunner) ((Map.Entry) threadInboundRunnerEntry).getValue(); + inboundRunner.pause(); + } + } + return isSuccessfullyDeactivated; + } + + /** + * Checks if the Inbound Endpoint is deactivated. This method checks the status of any associated + * startup controllers or inbound runner threads. The endpoint is considered deactivated if all + * startup controllers are inactive and all inbound runner threads are paused. + * + * @return {@code true} if all startup controllers are inactive and all inbound runner threads are paused; + * {@code false} if any startup controller is active or any inbound runner thread is not paused. + */ + @Override + public boolean isDeactivated() { + if (!startUpControllersList.isEmpty()) { + for (StartUpController sc : startUpControllersList) { + if (sc.isTaskActive()) { + // Inbound Endpoint is considered active if at least one consumer is alive. + return false; + } + } + } else if (!inboundRunnersThreadsMap.isEmpty()) { + for (Map.Entry threadInboundRunnerEntry : inboundRunnersThreadsMap.entrySet()) { + InboundRunner inboundRunner = (InboundRunner) ((Map.Entry) threadInboundRunnerEntry).getValue(); + if (!inboundRunner.isPaused()) { + // Inbound Endpoint is considered active if at least one consumer is alive. + return false; + } + } + } + return true; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java index 341fb0588c..8ffda4366f 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/common/InboundRunner.java @@ -32,6 +32,7 @@ public class InboundRunner implements Runnable { private long interval; private volatile boolean execute = true; + private volatile boolean isPaused; private volatile boolean init = false; // Following will be used to calculate the sleeping interval private long lastRuntime; @@ -43,19 +44,57 @@ public class InboundRunner implements Runnable { private static final String CLUSTERING_PATTERN = "clusteringPattern"; private static final String CLUSTERING_PATTERN_WORKER_MANAGER = "WorkerManager"; private static final Log log = LogFactory.getLog(InboundRunner.class); + private final Object lock = new Object(); - public InboundRunner(InboundTask task, long interval, String tenantDomain, boolean mgrOverride) { + public InboundRunner(InboundTask task, long interval, String tenantDomain, boolean mgrOverride, boolean startInPausedMode) { this.task = task; this.interval = interval; this.tenantDomain = tenantDomain; this.runOnManagerOverride = mgrOverride; + this.isPaused = startInPausedMode; + } + + /** + * Pauses the execution of the thread. + *

+ * This method sets the {@code isPaused} flag to {@code true}, indicating that + * the thread should pause its execution. Threads can check this flag and + * enter a wait state if necessary. + *

+ */ + public void pause() { + synchronized (lock) { + isPaused = true; + } + } + + /** + * Resumes the execution of a paused thread. + *

+ * This method sets the {@code isPaused} flag to {@code false} and notifies + * all threads waiting on the {@code lock} object, allowing the thread to continue execution. + *

+ */ + public void resume() { + synchronized (lock) { + isPaused = false; + lock.notifyAll(); // Wake up the thread + } + } + + public boolean isPaused() { + return isPaused; } /** * Exit the running while loop and terminate the thread */ - protected void terminate() { - execute = false; + public void terminate() { + synchronized (lock) { + execute = false; + isPaused = false; // Ensure the thread is not stuck in pause + lock.notifyAll(); // Wake up the thread to exit + } } @Override @@ -65,7 +104,22 @@ public void run() { log.debug("Configuration context loaded. Running the Inbound Endpoint."); // Run the poll cycles while (execute) { - log.debug("Executing the Inbound Endpoint."); + synchronized (lock) { + while (isPaused && execute) { + try { + lock.wait(); // Pause the thread + } catch (InterruptedException e) { + if (log.isDebugEnabled()) { + log.debug("Inbound thread got interrupted while paused, but continuing..."); + } + } + } + } + if (!execute) break; // Exit right away if the thread is terminated + + if (log.isDebugEnabled()) { + log.debug("Executing the Inbound Endpoint."); + } lastRuntime = getTime(); try { task.taskExecute(); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java index cf1032d152..1a65416c53 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/FilePollingConsumer.java @@ -1012,9 +1012,17 @@ protected Properties getInboundProperties() { void destroy() { fsManager.close(); + this.close(); + } + + void close() { isClosed = true; } + void start() { + isClosed = false; + } + private String sanitizeFileUriWithSub(String originalFileUri) { String[] splitUri = originalFileUri.split("\\?"); splitUri[0] = splitUri[0].substring(0, splitUri[0].length() - INCLUDE_SUB_DIR_SYMBOL_LENGTH); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java index ab4efc2608..5be6b2c18d 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/file/VFSProcessor.java @@ -62,13 +62,15 @@ public VFSProcessor(InboundProcessorParams params) { this.injectingSeq = params.getInjectingSeq(); this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); + this.startInPausedMode = params.startInPausedMode(); } /** * This will be called at the time of synapse artifact deployment. */ public void init() { - log.info("Inbound file listener " + name + " starting ..."); + log.info("Inbound file listener [" + name + "] is initializing" + + (this.startInPausedMode ? " but will remain in suspended mode..." : "...")); fileScanner = new FilePollingConsumer(vfsProperties, name, synapseEnvironment, interval); fileScanner.registerHandler( new FileInjectHandler(injectingSeq, onErrorSeq, sequential, synapseEnvironment, vfsProperties)); @@ -95,6 +97,18 @@ public void update() { // This will not be called for inbound endpoints } + @Override + public boolean deactivate() { + fileScanner.close(); + return super.deactivate(); + } + + @Override + public boolean activate() { + fileScanner.start(); + return super.activate(); + } + /** * Remove inbound endpoints. * diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java index 5419693dac..610e4cd300 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericEventBasedListener.java @@ -38,6 +38,7 @@ public class GenericEventBasedListener extends InboundOneTimeTriggerEventBasedPr private String onErrorSeq; private String classImpl; private boolean sequential; + private boolean startInPausedMode; private static final Log log = LogFactory.getLog(GenericEventBasedListener.class); private static final String ENDPOINT_POSTFIX = "CLASS" + COMMON_ENDPOINT_POSTFIX; @@ -71,9 +72,24 @@ public GenericEventBasedListener(InboundProcessorParams params) { this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); this.classImpl = params.getClassImpl(); + this.startInPausedMode = params.startInPausedMode(); } public void init() { + /* + * The activate/deactivate functionality is not currently implemented + * for this Inbound Endpoint type. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("Inbound event based listener " + name + " for class " + classImpl + " starting ..."); try { Class c = Class.forName(classImpl); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java index b68f462692..be3e683d45 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericInboundListener.java @@ -95,4 +95,22 @@ protected static void handleException(String msg, Exception e) { log.error(msg, e); throw new SynapseException(msg, e); } + + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java index 42b111d601..30cea4c8f7 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/generic/GenericProcessor.java @@ -76,9 +76,24 @@ public GenericProcessor(InboundProcessorParams params) { this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); this.classImpl = params.getClassImpl(); + this.startInPausedMode = params.startInPausedMode(); } public void init() { + /* + * The activate/deactivate functionality is not currently implemented + * for this Inbound Endpoint type. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("Inbound listener " + name + " for class " + classImpl + " starting ..."); try { Class c = Class.forName(classImpl); @@ -138,4 +153,15 @@ public void update() { start(); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java index 6b4c901815..9ba943b43f 100755 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/grpc/InboundGRPCListener.java @@ -31,13 +31,16 @@ import org.wso2.carbon.inbound.endpoint.protocol.grpc.util.Event; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.TimeUnit; public class InboundGRPCListener implements InboundRequestProcessor { private int port; + private String name; private GRPCInjectHandler injectHandler; private static final Log log = LogFactory.getLog(InboundGRPCListener.class.getName()); private Server server; + private boolean startInPausedMode; public InboundGRPCListener(InboundProcessorParams params) { String injectingSeq = params.getInjectingSeq(); @@ -51,11 +54,27 @@ public InboundGRPCListener(InboundProcessorParams params) { " property. Setting the port as " + InboundGRPCConstants.DEFAULT_INBOUND_ENDPOINT_GRPC_PORT); port = InboundGRPCConstants.DEFAULT_INBOUND_ENDPOINT_GRPC_PORT; } + name = params.getName(); injectHandler = new GRPCInjectHandler(injectingSeq, onErrorSeq, false, synapseEnvironment); + startInPausedMode = params.startInPausedMode(); } public void init() { try { + /* + * The activate/deactivate functionality for the GRPC protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for GRPC listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } this.start(); } catch (IOException e) { throw new SynapseException("IOException when starting gRPC server: " + e.getMessage(), e); @@ -70,6 +89,26 @@ public void destroy() { } } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + if (Objects.isNull(server)) { + return true; + } + return server.isTerminated(); + } + public void start() throws IOException { if (server != null) { throw new IllegalStateException("gRPC Listener Server already started"); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java index ce6ef34f3b..f35ee81045 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/hl7/core/InboundHL7Listener.java @@ -32,13 +32,29 @@ public class InboundHL7Listener implements InboundRequestProcessor { private int port; private InboundProcessorParams params; + private boolean startInPausedMode; public InboundHL7Listener(InboundProcessorParams params) { this.params = params; + startInPausedMode = params.startInPausedMode(); } @Override public void init() { + /* + * The activate/deactivate functionality for the HL7 protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HL7 listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + params.getName() + "] is currently suspended."); + return; + } if (!InboundHL7IOReactor.isStarted()) { log.info("Starting MLLP Transport Reactor"); try { @@ -67,4 +83,21 @@ public void destroy() { HL7EndpointManager.getInstance().closeEndpoint(port); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return !InboundHL7IOReactor.isStarted(); + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java index 5bec1eb17e..230871843e 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/InboundHttpListener.java @@ -44,6 +44,7 @@ public class InboundHttpListener implements InboundRequestProcessor { private String name; private int port; private InboundProcessorParams processorParams; + protected boolean startInPausedMode; public InboundHttpListener(InboundProcessorParams params) { processorParams = params; @@ -58,10 +59,25 @@ public InboundHttpListener(InboundProcessorParams params) { handleException("Please provide port number as integer instead of port " + portParam, e); } name = params.getName(); + startInPausedMode = params.startInPausedMode(); } @Override public void init() { + /* + * The activate/deactivate functionality for the HTTP protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTP listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } if (isPortUsedByAnotherApplication(port)) { log.warn("Port " + port + " used by inbound endpoint " + name + " is already used by another application " + "hence undeploying inbound endpoint"); @@ -77,6 +93,24 @@ public void destroy() { HTTPEndpointManager.getInstance().closeEndpoint(port); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return !HTTPEndpointManager.getInstance().isEndpointRunning(name, port); + } + protected void handleException(String msg, Exception e) { log.error(msg, e); throw new SynapseException(msg, e); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java index 93f779d276..4a9eb9f967 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/http/management/HTTPEndpointManager.java @@ -481,4 +481,23 @@ public boolean isAnyInternalHttpApiEnabled() { public boolean isAnyInternalHttpsApiEnabled() { return internalHttpsApiEnabled; } + + /** + * Checks if a specified endpoint is running on a given port. + * This method retrieves the name of the endpoint listening on the given port and compares it + * with the provided name. If a match is found, it checks if the endpoint is actively running. + * + * @param name the name of the endpoint to check + * @param port the port number where the endpoint is expected to be running + * @return {@code true} if the endpoint with the specified name is running on the given port; + * {@code false} otherwise + */ + public boolean isEndpointRunning(String name, int port) { + + String epName = dataStore.getListeningEndpointName(port, SUPER_TENANT_DOMAIN_NAME); + if (epName != null && epName.equalsIgnoreCase(name)) { + return PassThroughInboundEndpointHandler.isEndpointRunning(port); + } + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java index 71c125722f..b28e0d6c25 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/https/InboundHttpsListener.java @@ -66,6 +66,20 @@ public InboundHttpsListener(InboundProcessorParams params) { @Override public void init() { + /* + * The activate/deactivate functionality for the HTTPS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTPS listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } if (isPortUsedByAnotherApplication(port)) { log.warn("Port " + port + "used by inbound endpoint " + name + " is already used by another application " + "hence undeploying inbound endpoint"); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java index ba59a1e124..f02cb9e44c 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpssecurewebsocket/InboundHttpsSecureWebsocketListener.java @@ -17,12 +17,16 @@ */ package org.wso2.carbon.inbound.endpoint.protocol.httpssecurewebsocket; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.synapse.inbound.InboundProcessorParams; import org.wso2.carbon.inbound.endpoint.protocol.httpwebsocket.InboundHttpWebsocketListener; import org.wso2.carbon.inbound.endpoint.protocol.httpwebsocket.management.HttpWebsocketEndpointManager; public class InboundHttpsSecureWebsocketListener extends InboundHttpWebsocketListener { + private static final Log LOGGER = LogFactory.getLog(InboundHttpsSecureWebsocketListener.class); + public InboundHttpsSecureWebsocketListener(InboundProcessorParams params) { super(params); @@ -31,6 +35,20 @@ public InboundHttpsSecureWebsocketListener(InboundProcessorParams params) { @Override public void init() { - HttpWebsocketEndpointManager.getInstance().startSSLEndpoint(port, name, processorParams); + /* + * The activate/deactivate functionality for the HTTPS-WSS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTPS-WSS listeners is implemented. + */ + if (startInPausedMode) { + LOGGER.info("Inbound endpoint [" + name + "] is currently suspended."); + } else { + HttpWebsocketEndpointManager.getInstance().startSSLEndpoint(port, name, processorParams); + } } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java index 95d0c04068..de700dd88f 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/InboundHttpWebsocketListener.java @@ -31,6 +31,7 @@ public class InboundHttpWebsocketListener implements InboundRequestProcessor { protected final String name; protected int port; protected InboundProcessorParams processorParams; + protected boolean startInPausedMode; public InboundHttpWebsocketListener(InboundProcessorParams params) { @@ -43,12 +44,27 @@ public InboundHttpWebsocketListener(InboundProcessorParams params) { handleException("Validation failed for the port parameter " + portParam, e); } name = params.getName(); + startInPausedMode = params.startInPausedMode(); } @Override public void init() { - HttpWebsocketEndpointManager.getInstance().startEndpoint(port, name, processorParams); + /* + * The activate/deactivate functionality for the HTTP-WS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for HTTP-WS listener is implemented. + */ + if (startInPausedMode) { + LOGGER.info("Inbound endpoint [" + name + "] is currently suspended."); + } else { + HttpWebsocketEndpointManager.getInstance().startEndpoint(port, name, processorParams); + } } @Override @@ -57,6 +73,24 @@ public void destroy() { HttpWebsocketEndpointManager.getInstance().closeEndpoint(port); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + + @Override + public boolean isDeactivated() { + + return !HttpWebsocketEndpointManager.getInstance().isEndpointRunning(name, port); + } + protected void handleException(String msg, Exception e) { LOGGER.error(msg, e); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java index ec571eedb8..b2f99c89c2 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/httpwebsocket/management/HttpWebsocketEndpointManager.java @@ -138,6 +138,11 @@ private boolean handleExistingEndpointOnSamePort(int port, String name) { } } + public boolean isEndpointRunning(String name, int port) { + String epName = dataStore.getListeningEndpointName(port, SUPER_TENANT_DOMAIN_NAME); + return epName.equalsIgnoreCase(name); + } + /** * Checks if the given port is available to use. * diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSPollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSPollingConsumer.java index ad32f505df..f28c2b84ba 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSPollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSPollingConsumer.java @@ -53,6 +53,7 @@ public class JMSPollingConsumer { private String name; private Properties jmsProperties; private boolean isConnected; + private boolean destroyedTriggered; private Long reconnectDuration; private long retryDuration; @@ -402,7 +403,12 @@ this flag will be checked occasionally and throw an InterruptedException (and re practice the Interrupted flag is set back to TRUE in this thread. */ } } - releaseResources(false); + if (destroyedTriggered) { + releaseResources(true); + destroyedTriggered = false; + } else { + releaseResources(false); + } } return null; } @@ -425,6 +431,7 @@ private void releaseResources(boolean forcefullyClose) { } public void destroy() { + destroyedTriggered = true; synchronized (jmsConnectionFactory) { if (messageConsumer != null) { jmsConnectionFactory.closeConsumer(messageConsumer, true); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java index 5048b2300b..d3372adeb0 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/jms/JMSProcessor.java @@ -46,6 +46,7 @@ public class JMSProcessor extends InboundRequestProcessorImpl implements TaskSta public JMSProcessor(InboundProcessorParams params) { this.name = params.getName(); + this.startInPausedMode = params.startInPausedMode(); this.jmsProperties = params.getProperties(); String inboundEndpointInterval = jmsProperties.getProperty(PollingConstants.INBOUND_ENDPOINT_INTERVAL); @@ -84,7 +85,24 @@ public JMSProcessor(InboundProcessorParams params) { * This will be called at the time of synapse artifact deployment. */ public void init() { + /* + * The activate/deactivate functionality for the JMS protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for JMS listener is implemented. + */ + if (startInPausedMode) { + return; + } log.info("Initializing inbound JMS listener for inbound endpoint " + name); + start(); + } + + private void start() { for (int consumers = 0; consumers < concurrentConsumers; consumers++) { JMSPollingConsumer jmsPollingConsumer = new JMSPollingConsumer(jmsProperties, interval, name); jmsPollingConsumer.registerHandler( @@ -121,10 +139,6 @@ public void setName(String name) { this.name = name; } - public void update() { - // This will not be called for inbound endpoints - } - /** * Remove inbound endpoints. * @@ -136,4 +150,20 @@ public void destroy(boolean removeTask) { destroy(); } } + + public void update() { + // Not used by JMSProcessor + } + + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/AbstractKafkaMessageListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/AbstractKafkaMessageListener.java index 334fc6447b..074ed0872a 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/AbstractKafkaMessageListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/AbstractKafkaMessageListener.java @@ -65,8 +65,7 @@ public String getName() { /** * Destroy consuming the messages */ - public void destroy() { - } + public abstract void destroy(); /** * Poll the messages from the zookeeper and injected to the sequence diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAMessageListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAMessageListener.java index 32ec2e14a9..dcfb3de6ff 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAMessageListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAMessageListener.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; public class KAFKAMessageListener extends AbstractKafkaMessageListener { @@ -142,6 +143,17 @@ public void injectMessageToESB(String name) { } } + @Override + public void destroy() { + if (Objects.nonNull(consumerConnector)) { + if (log.isDebugEnabled()) { + log.debug("Shutting down the Kafka consumer connect..."); + } + consumerConnector.shutdown(); + consumerConnector = null; + } + } + public void injectMessageToESB(String sequenceName, ConsumerIterator consumerIterator) { byte[] msg = consumerIterator.next().message(); injectHandler.invoke(msg, sequenceName); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java index c619fb6cc4..ef07a331d6 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.Properties; public class KAFKAPollingConsumer { @@ -157,4 +158,10 @@ public Object poll() { public Properties getInboundProperties() { return kafkaProperties; } + + public void destroy() { + if (Objects.nonNull(messageListener)) { + messageListener.destroy(); + } + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java index bb2af7b849..1864fc7f21 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAProcessor.java @@ -77,12 +77,26 @@ public KAFKAProcessor(InboundProcessorParams params) { this.injectingSeq = params.getInjectingSeq(); this.onErrorSeq = params.getOnErrorSeq(); this.synapseEnvironment = params.getSynapseEnvironment(); + this.startInPausedMode = params.startInPausedMode(); } /** * This will be called at the time of synapse artifact deployment. */ public void init() { + /* + * The activate/deactivate functionality for the Kafka Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for Kafka listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("Initializing inbound KAFKA listener for destination " + name); try { pollingConsumer = new KAFKAPollingConsumer(kafkaProperties, interval, name); @@ -111,7 +125,7 @@ public void start() { } public void update() { - // This will not be called for inbound endpoints + // This is not called for Kafka Inbound Endpoint } public String getName() { @@ -125,17 +139,26 @@ public void setName(String name) { @Override public void destroy() { try { - if (pollingConsumer != null && pollingConsumer.messageListener != null - && pollingConsumer.messageListener.consumerConnector != null) { - pollingConsumer.messageListener.consumerConnector.shutdown(); - log.info("Shutdown the kafka consumer connector"); - } + pollingConsumer.destroy(); + log.info("Shutdown the kafka consumer connector"); } catch (Exception e) { log.error("Error while shutdown the consumer connector" + e.getMessage(), e); } super.destroy(); } + @Override + public boolean activate() { + + return false; + } + + @Override + public boolean deactivate() { + + return false; + } + /** * Remove inbound endpoints. * diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/SimpleKafkaMessageListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/SimpleKafkaMessageListener.java index d0f3bd7707..8d6f0f00ad 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/SimpleKafkaMessageListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/kafka/SimpleKafkaMessageListener.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; public class SimpleKafkaMessageListener extends AbstractKafkaMessageListener { @@ -139,6 +140,16 @@ public boolean createKafkaConsumerConnector() throws Exception { public void start() throws Exception { } + public void destroy() { + if (Objects.nonNull(consumer)) { + if (log.isDebugEnabled()) { + log.debug("Shutting down the Kafka consumer connect..."); + } + consumer.close(); + consumer = null; + } + } + @Override public void injectMessageToESB(String name) { diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java index 31849d2ea5..b0008b8d44 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/mqtt/MqttListener.java @@ -77,6 +77,7 @@ public MqttListener(InboundProcessorParams params) { this.synapseEnvironment = params.getSynapseEnvironment(); this.mqttProperties = params.getProperties(); this.params = params; + this.startInPausedMode = params.startInPausedMode(); //assign default value if sequential mode parameter is not present this.sequential = true; @@ -153,6 +154,20 @@ public void destroy(boolean removeTask) { @Override public void init() { + /* + * The activate/deactivate functionality for the MQTT protocol is not currently implemented + * for Inbound Endpoints. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for MQTT listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("MQTT inbound endpoint " + name + " initializing ..."); initAsyncClient(); start(); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java index 68e7027780..36b279bcc7 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/rabbitmq/RabbitMQListener.java @@ -26,6 +26,7 @@ import org.wso2.carbon.inbound.endpoint.common.InboundOneTimeTriggerRequestProcessor; import org.wso2.carbon.inbound.endpoint.protocol.PollingConstants; +import java.util.Objects; import java.util.Properties; /** @@ -47,6 +48,8 @@ public RabbitMQListener(InboundProcessorParams params) { this.name = params.getName(); this.injectingSeq = params.getInjectingSeq(); this.onErrorSeq = params.getOnErrorSeq(); + this.startInPausedMode = params.startInPausedMode(); + this.synapseEnvironment = params.getSynapseEnvironment(); this.rabbitmqProperties = params.getProperties(); @@ -72,12 +75,27 @@ public void destroy() { @Override public void destroy(boolean removeTask) { - rabbitMQConsumer.close(); + if (Objects.nonNull(rabbitMQConsumer)) { + rabbitMQConsumer.close(); + } super.destroy(removeTask); } @Override public void init() { + /* + * The activate/deactivate functionality for the RabbitMQ Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for RabbitMQ listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } log.info("RABBITMQ inbound endpoint " + name + " initializing ..."); rabbitMQConsumer = new RabbitMQConsumer(rabbitMQConnectionFactory, rabbitmqProperties, injectHandler); rabbitMQConsumer.setInboundName(name); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java index 2ffa03c517..b85210a2d0 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/securewebsocket/InboundSecureWebsocketListener.java @@ -45,11 +45,26 @@ public InboundSecureWebsocketListener(InboundProcessorParams params) { handleException("Validation failed for the port parameter " + portParam, e); } name = params.getName(); + this.startInPausedMode = params.startInPausedMode(); + } @Override public void init() { + /* + * The activate/deactivate functionality for the WSS Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for WSS listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } int offsetPort = port + PersistenceUtils.getPortOffset(processorParams.getProperties()); WebsocketEndpointManager.getInstance().startSSLEndpoint(offsetPort, name, processorParams); } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java index a9429e74c7..1ae1170995 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketListener.java @@ -36,6 +36,7 @@ public class InboundWebsocketListener implements InboundRequestProcessor { private String name; private int port; private InboundProcessorParams processorParams; + protected boolean startInPausedMode; public InboundWebsocketListener(InboundProcessorParams params) { processorParams = params; @@ -47,10 +48,24 @@ public InboundWebsocketListener(InboundProcessorParams params) { handleException("Validation failed for the port parameter " + portParam, e); } name = params.getName(); + this.startInPausedMode = params.startInPausedMode(); } @Override public void init() { + /* + * The activate/deactivate functionality for the WS Inbound Endpoint is not currently implemented. + * + * Therefore, the following check has been added to immediately return if the "suspend" + * attribute is set to true in the inbound endpoint configuration. + * + * Note: This implementation is temporary and should be revisited and improved once + * the activate/deactivate capability for WS listener is implemented. + */ + if (startInPausedMode) { + log.info("Inbound endpoint [" + name + "] is currently suspended."); + return; + } int offsetPort = port + PersistenceUtils.getPortOffset(processorParams.getProperties()); WebsocketEndpointManager.getInstance().startEndpoint(offsetPort, name, processorParams); } @@ -62,6 +77,22 @@ public void destroy() { WebsocketEndpointManager.getInstance().closeEndpoint(offsetPort); } + @Override + public boolean activate() { + return false; + } + + @Override + public boolean deactivate() { + return false; + } + + @Override + public boolean isDeactivated() { + + return !WebsocketEndpointManager.getInstance().isEndpointRunning(name, port); + } + protected void handleException(String msg, Exception e) { log.error(msg, e); throw new SynapseException(msg, e); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java index 8d81c4c079..cc4ed3958b 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketEndpointManager.java @@ -26,6 +26,7 @@ import org.apache.synapse.SynapseException; import org.apache.synapse.inbound.InboundEndpoint; import org.apache.synapse.inbound.InboundProcessorParams; +import org.apache.synapse.transport.passthru.api.PassThroughInboundEndpointHandler; import org.wso2.carbon.inbound.endpoint.common.AbstractInboundEndpointManager; import org.wso2.carbon.inbound.endpoint.persistence.InboundEndpointInfoDTO; import org.wso2.carbon.inbound.endpoint.persistence.PersistenceUtils; @@ -309,4 +310,13 @@ public InboundWebsocketSourceHandler getSourceHandler() { public void setSourceHandler(InboundWebsocketSourceHandler sourceHandler) { this.sourceHandler = sourceHandler; } + + public boolean isEndpointRunning(String name, int port) { + + String epName = dataStore.getListeningEndpointName(port, SUPER_TENANT_DOMAIN_NAME); + if (epName != null && epName.equalsIgnoreCase(name)) { + return WebsocketEventExecutorManager.getInstance().isRegisteredExecutor(port); + } + return false; + } } diff --git a/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java index ec879d84ef..bbb05fc89d 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.mediation.ntask/src/main/java/org/wso2/micro/integrator/mediation/ntask/NTaskTaskManager.java @@ -26,6 +26,7 @@ import org.wso2.micro.core.ServerStartupHandler; import org.wso2.micro.integrator.mediation.ntask.internal.NtaskService; import org.wso2.micro.integrator.ntask.core.TaskInfo; +import org.wso2.micro.integrator.ntask.core.TaskUtils; import org.wso2.micro.integrator.ntask.core.impl.LocalTaskActionListener; import org.wso2.micro.integrator.ntask.core.service.TaskService; @@ -34,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -104,7 +106,10 @@ public boolean schedule(TaskDescription taskDescription) { if (logger.isDebugEnabled()) { logger.debug("Submitting task [ " + taskId(taskDescription) + " ] to the task manager."); } - taskManager.handleTask(taskInfo.getName()); + boolean scheduledInPausedMode = + Objects.nonNull(taskDescription.getProperty(TaskUtils.START_IN_PAUSED_MODE)) + && Boolean.parseBoolean((String) taskDescription.getProperty(TaskUtils.START_IN_PAUSED_MODE)); + taskManager.handleTask(taskInfo.getName(), scheduledInPausedMode); } removeTask(taskDescription); } @@ -624,8 +629,7 @@ private boolean checkTaskRunning(String taskName) { return false; } try { - return taskManager.getTaskState(taskName) - .equals(org.wso2.micro.integrator.ntask.core.TaskManager.TaskState.NORMAL); + return taskManager.isTaskRunning(taskName); } catch (Exception e) { logger.error("Cannot return task status [" + taskName + "]. Error: " + e.getLocalizedMessage(), e); diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java index 2feac69f5b..f1f3fe3679 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/scehduler/CoordinatedTaskScheduler.java @@ -22,7 +22,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.synapse.commons.util.MiscellaneousUtil; import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.inbound.InboundEndpoint; import org.apache.synapse.message.processor.MessageProcessor; +import org.apache.synapse.task.TaskDescription; +import org.apache.synapse.task.TaskDescriptionRepository; import org.wso2.micro.integrator.coordination.ClusterCoordinator; import org.wso2.micro.integrator.core.util.MicroIntegratorBaseUtils; import org.wso2.micro.integrator.ntask.common.TaskException; @@ -32,13 +35,13 @@ import org.wso2.micro.integrator.ntask.coordination.task.resolver.TaskLocationResolver; import org.wso2.micro.integrator.ntask.coordination.task.store.TaskStore; import org.wso2.micro.integrator.ntask.coordination.task.store.cleaner.TaskStoreCleaner; +import org.wso2.micro.integrator.ntask.core.TaskUtils; import org.wso2.micro.integrator.ntask.core.impl.standalone.ScheduledTaskManager; import org.wso2.micro.integrator.ntask.core.internal.DataHolder; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -162,7 +165,7 @@ private void pauseDeactivatedTasks() throws TaskCoordinationException { + "in this node or an invalid entry, hence ignoring it."); } }); - cleanUpMessageProcessors(pausedTasks); + notifyOnPause(pausedTasks); taskStore.updateTaskState(pausedTasks, CoordinatedTask.States.PAUSED); } @@ -173,13 +176,13 @@ private void pauseDeactivatedTasks() throws TaskCoordinationException { */ private void addFailedTasks() throws TaskCoordinationException { - List failedTasks = taskManager.getAdditionFailedTasks(); + List failedTasks = taskManager.getAdditionFailedTasks(); if (LOG.isDebugEnabled()) { LOG.debug("Following list of tasks were found in the failed list."); failedTasks.forEach(LOG::debug); } - for (String task : failedTasks) { - taskStore.addTaskIfNotExist(task); + for (ScheduledTaskManager.TaskEntry task : failedTasks) { + taskStore.addTaskIfNotExist(task.getName(), task.getState()); taskManager.removeTaskFromAdditionFailedTaskList(task); if (LOG.isDebugEnabled()) { LOG.debug("Successfully added the failed task [" + task + "]"); @@ -251,21 +254,36 @@ public synchronized void resolveUnassignedNotCompletedTasksAndUpdateStore() thro taskStore.updateAssignmentAndState(tasksToBeUpdated); } - private void cleanUpMessageProcessors(List pausedTasks) { - + private void notifyOnPause(List pausedTasks) { Set completedProcessors = new HashSet(); + Set completedInboundEndpoints = new HashSet(); + SynapseEnvironment synapseEnvironment = MicroIntegratorBaseUtils.getSynapseEnvironment(); + TaskDescriptionRepository taskRepo = synapseEnvironment.getTaskManager().getTaskDescriptionRepository(); pausedTasks.forEach(task -> { if (MiscellaneousUtil.isTaskOfMessageProcessor(task)) { String messageProcessorName = MiscellaneousUtil.getMessageProcessorName(task); if (!completedProcessors.contains(messageProcessorName)) { - SynapseEnvironment synapseEnvironment = MicroIntegratorBaseUtils.getSynapseEnvironment(); - MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration().getMessageProcessors(). - get(messageProcessorName); + MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration() + .getMessageProcessors().get(messageProcessorName); if (messageProcessor != null) { messageProcessor.cleanUpDeactivatedProcessors(); } completedProcessors.add(messageProcessorName); } + } else { + TaskDescription taskDescription = taskRepo.getTaskDescription(task); + if (taskDescription.getProperty(TaskUtils.TASK_OWNER_PROPERTY) == TaskUtils.TASK_BELONGS_TO_INBOUND_ENDPOINT) { + String inboundEndpointName = (String) taskDescription.getProperty(TaskUtils.TASK_OWNER_NAME); + + if (!completedInboundEndpoints.contains(inboundEndpointName)) { + InboundEndpoint inboundEndpoint = synapseEnvironment.getSynapseConfiguration() + .getInboundEndpoint(inboundEndpointName); + if (inboundEndpoint != null) { + inboundEndpoint.updateInboundEndpointState(true); + } + completedInboundEndpoints.add(inboundEndpointName); + } + } } }); } diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java index b432f01e26..575367dedf 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/TaskStore.java @@ -141,7 +141,18 @@ public List getAllAssignedIncompleteTasks() throws TaskCoordina */ public void addTaskIfNotExist(String task) throws TaskCoordinationException { - rdmbsConnector.addTaskIfNotExist(task); + rdmbsConnector.addTaskIfNotExist(task, CoordinatedTask.States.NONE); + } + + /** + * Add the task. + * + * @param task - The coordinated task which needs to be added. + * @param state - Initial state of the task. + */ + public void addTaskIfNotExist(String task, CoordinatedTask.States state) throws TaskCoordinationException { + + rdmbsConnector.addTaskIfNotExist(task, state); } /** diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java index bbeb8c2a4e..2a05dfc773 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/RDMBSConnector.java @@ -352,16 +352,21 @@ public List getAllAssignedIncompleteTasks() throws TaskCoordina } } + public void addTaskIfNotExist(String taskName) throws TaskCoordinationException { + addTaskIfNotExist(taskName, CoordinatedTask.States.NONE); + } + /** * Add the task if doesn't exist already. * * @param taskName - The task which needs to be added. */ - public void addTaskIfNotExist(String taskName) throws TaskCoordinationException { + public void addTaskIfNotExist(String taskName, CoordinatedTask.States state) throws TaskCoordinationException { try (Connection connection = getConnection(); PreparedStatement preparedStatement = connection.prepareStatement( ADD_TASK)) { preparedStatement.setString(1, taskName); + preparedStatement.setString(2, state.name()); preparedStatement.executeUpdate(); if (LOG.isDebugEnabled()) { LOG.debug("Successfully added the task [" + taskName + "]."); diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java index 518455e087..30ec1b8c79 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/store/connector/TaskQueryHelper.java @@ -38,9 +38,13 @@ public class TaskQueryHelper { + CoordinatedTask.States.NONE + "' WHEN '" + CoordinatedTask.States.DEACTIVATED + "'THEN '" + CoordinatedTask.States.PAUSED + "' ELSE " + TASK_STATE + " END )"; +// static final String ADD_TASK = +// "INSERT INTO " + TABLE_NAME + " ( " + TASK_NAME + ", " + DESTINED_NODE_ID + ", " + TASK_STATE + ") " +// + "VALUES (?,NULL,'" + CoordinatedTask.States.NONE + "')"; + static final String ADD_TASK = "INSERT INTO " + TABLE_NAME + " ( " + TASK_NAME + ", " + DESTINED_NODE_ID + ", " + TASK_STATE + ") " - + "VALUES (?,NULL,'" + CoordinatedTask.States.NONE + "')"; + + "VALUES (?,NULL,?)"; static final String UPDATE_ASSIGNMENT_AND_STATE = "UPDATE " + TABLE_NAME + " SET " + DESTINED_NODE_ID + " = ? , " + TASK_STATE + " = " + TASK_STATE_CONST diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java index 7bd4c37616..d3e9222156 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskManager.java @@ -17,6 +17,7 @@ */ package org.wso2.micro.integrator.ntask.core; +import org.apache.synapse.task.TaskManagerObserver; import org.wso2.micro.integrator.ntask.common.TaskException; import org.wso2.micro.integrator.ntask.core.impl.LocalTaskActionListener; @@ -60,6 +61,8 @@ public interface TaskManager { */ void handleTask(String taskName) throws TaskException; + void handleTask(String taskName, boolean scheduledInPausedMode) throws TaskException; + /** * Get all the coordinated tasks ( the tasks which need db interaction ) deployed in this node. * @@ -111,6 +114,8 @@ public interface TaskManager { boolean isDeactivated(String taskName) throws TaskException; + boolean isTaskRunning(String taskName) throws TaskException; + /** * Get task information. * diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java index 10be00225e..6ae2dcfb8c 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/TaskUtils.java @@ -45,6 +45,14 @@ public class TaskUtils { public static final String TASK_STATE_PROPERTY = "TASK_STATE_PROPERTY"; + public static final String TASK_OWNER_PROPERTY = "taskOwner"; + + public static final String TASK_OWNER_NAME = "taskOwnerName"; + + public static final String TASK_BELONGS_TO_INBOUND_ENDPOINT = "InboundEndpoint"; + + public static final String START_IN_PAUSED_MODE = "startInPausedMode"; + private static SecretResolver secretResolver; public static Document convertToDocument(File file) throws TaskException { diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java index da08974054..e4fc9f8a90 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/AbstractQuartzTaskManager.java @@ -208,7 +208,7 @@ protected synchronized void scheduleLocalTask(String taskName) throws TaskExcept this.scheduleLocalTask(taskName, paused); } - private synchronized void scheduleLocalTask(String taskName, boolean paused) throws TaskException { + protected synchronized void scheduleLocalTask(String taskName, boolean paused) throws TaskException { TaskInfo taskInfo = this.getTaskRepository().getTask(taskName); String taskGroup = this.getTenantTaskGroup(); diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java index 6df06888d6..cb0577156e 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/core/impl/standalone/ScheduledTaskManager.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.synapse.commons.util.MiscellaneousUtil; import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.inbound.InboundEndpoint; import org.apache.synapse.message.processor.MessageProcessor; import org.apache.synapse.task.TaskDescription; import org.wso2.micro.integrator.core.util.MicroIntegratorBaseUtils; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; /** * This class is responsible for handling / scheduling all tasks in Micro Integrator. @@ -53,7 +55,7 @@ public class ScheduledTaskManager extends AbstractQuartzTaskManager { /** * The list of tasks for which the addition failed. */ - private List additionFailedTasks = new ArrayList<>(); + private List additionFailedTasks = new ArrayList<>(); private List locallyRunningCoordinatedTasks = new ArrayList<>(); @@ -85,27 +87,41 @@ private boolean isMyTaskTypeRegistered() { */ public void handleTask(String taskName) throws TaskException { + handleTask(taskName, false); + } + + public void handleTask(String taskName, boolean scheduledInPausedMode) throws TaskException { if (isCoordinatedTask(taskName)) { if (log.isDebugEnabled()) { log.debug("Adding task [" + taskName + "] to the data base since this is a coordinated task."); } deployedCoordinatedTasks.add(taskName); + CoordinatedTask.States state; + if (scheduledInPausedMode) { + state = CoordinatedTask.States.PAUSED; + } else { + state = CoordinatedTask.States.NONE; + } try { - taskStore.addTaskIfNotExist(taskName); + taskStore.addTaskIfNotExist(taskName, state); } catch (TaskCoordinationException ex) { - additionFailedTasks.add(taskName); + additionFailedTasks.add(new TaskEntry(taskName, state)); throw new TaskException("Error adding task : " + taskName, TaskException.Code.DATABASE_ERROR, ex); } return; } - scheduleTask(taskName); + if (scheduledInPausedMode) { + scheduleTaskInPausedMode(taskName); + } else { + scheduleTask(taskName); + } } - public List getAdditionFailedTasks() { + public List getAdditionFailedTasks() { return new ArrayList<>(additionFailedTasks); } - public void removeTaskFromAdditionFailedTaskList(String taskName) { + public void removeTaskFromAdditionFailedTaskList(TaskEntry taskName) { additionFailedTasks.remove(taskName); } @@ -164,17 +180,11 @@ public void scheduleCoordinatedTask(String taskName) throws TaskException { try { if (taskStore.updateTaskState(taskName, CoordinatedTask.States.RUNNING, localNodeId)) { if (!isPreviouslyScheduled(taskName, getTenantTaskGroup())) { + // update the task repo to remove pause scheduleTask(taskName); } else { resumeLocalTask(taskName); - if (MiscellaneousUtil.isTaskOfMessageProcessor(taskName)) { - String messageProcessorName = MiscellaneousUtil.getMessageProcessorName(taskName); - MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration() - .getMessageProcessors().get(messageProcessorName); - if (messageProcessor != null) { - messageProcessor.resumeRemotely(); - } - } + notifyOnResume(taskName); } locallyRunningCoordinatedTasks.add(taskName); } else { @@ -188,6 +198,38 @@ public void scheduleCoordinatedTask(String taskName) throws TaskException { } } + /** + * Notifies the relevant components to resume operations associated with the specified task. + * + *

This method checks whether the given task is associated with a Message Processor or + * an Inbound Endpoint and triggers the necessary action to resume the respective component. + * If the task belongs to a Message Processor, the associated Message Processor is resumed + * remotely. If the task belongs to an Inbound Endpoint, its state is updated to active. + * + * @param taskName the name of the task to be resumed + */ + private void notifyOnResume(String taskName) { + if (MiscellaneousUtil.isTaskOfMessageProcessor(taskName)) { + String messageProcessorName = MiscellaneousUtil.getMessageProcessorName(taskName); + MessageProcessor messageProcessor = synapseEnvironment.getSynapseConfiguration() + .getMessageProcessors().get(messageProcessorName); + if (messageProcessor != null) { + messageProcessor.resumeRemotely(); + } + return; + } + TaskDescription taskDescription = + synapseEnvironment.getTaskManager().getTaskDescriptionRepository().getTaskDescription(taskName); + if (taskDescription.getProperty(TaskUtils.TASK_OWNER_PROPERTY) + == TaskUtils.TASK_BELONGS_TO_INBOUND_ENDPOINT) { + InboundEndpoint inboundEndpoint = synapseEnvironment.getSynapseConfiguration() + .getInboundEndpoint((String) taskDescription.getProperty(TaskUtils.TASK_OWNER_NAME)); + if (Objects.nonNull(inboundEndpoint)) { + inboundEndpoint.updateInboundEndpointState(false); + } + } + } + public List getLocallyRunningCoordinatedTasks() { return new ArrayList<>(locallyRunningCoordinatedTasks); } @@ -233,6 +275,17 @@ private void scheduleTask(String taskName) throws TaskException { } } + private void scheduleTaskInPausedMode(String taskName) throws TaskException { + if (this.isMyTaskTypeRegistered()) { + this.scheduleLocalTask(taskName, true); + } else { + throw new TaskException( + "Task type: '" + this.getTaskType() + "' is not registered in the current task node", + TaskException.Code.TASK_NODE_NOT_AVAILABLE); + } + } + + @Override public boolean deleteTask(String taskName) throws TaskException { @@ -290,13 +343,26 @@ public void registerTask(TaskInfo taskInfo) throws TaskException { public boolean isDeactivated(String taskName) throws TaskException { if (deployedCoordinatedTasks.contains(taskName)) { - boolean isDeactivated = !CoordinatedTask.States.RUNNING.equals(getCoordinatedTaskState(taskName)); + boolean isDeactivated = !CoordinatedTask.States.RUNNING.equals( getCoordinatedTaskState(taskName)); if (log.isDebugEnabled()) { log.debug("Task [" + taskName + "] is " + (isDeactivated ? "" : "not") + " in deactivated state."); } return isDeactivated; } - return getTaskState(taskName).equals(TaskState.PAUSED); + return !(getTaskState(taskName).equals(TaskState.NORMAL) || getTaskState(taskName).equals(TaskState.BLOCKED)); + } + + @Override + public boolean isTaskRunning(String taskName) throws TaskException { + + if (deployedCoordinatedTasks.contains(taskName)) { + boolean isRunning = CoordinatedTask.States.RUNNING.equals( getCoordinatedTaskState(taskName)); + if (log.isDebugEnabled()) { + log.debug("Task [" + taskName + "] is " + (isRunning ? "" : "not") + " in running state."); + } + return isRunning; + } + return getTaskState(taskName).equals(TaskState.NORMAL); } private CoordinatedTask.States getCoordinatedTaskState(String taskName) { @@ -361,4 +427,24 @@ private void resumeTask(String taskName) throws TaskException { TaskUtils.setTaskPaused(this.getTaskRepository(), taskName, false); } + public static class TaskEntry { + private String name; + private CoordinatedTask.States state; + + public TaskEntry(String name, CoordinatedTask.States state) { + this.name = name; + this.state = state; + } + + public String getName() { + + return name; + } + + public CoordinatedTask.States getState() { + + return state; + } + } + } diff --git a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java index de916b2400..867a29b129 100644 --- a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java +++ b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/Constants.java @@ -163,6 +163,7 @@ public class Constants { public static final String AUDIT_LOG_TYPE_LOG_LEVEL = "log_level"; public static final String AUDIT_LOG_TYPE_ROOT_LOG_LEVEL = "root_log_level"; public static final String AUDIT_LOG_TYPE_MESSAGE_PROCESSOR = "message_processor"; + public static final String AUDIT_LOG_TYPE_INBOUND_ENDPOINT = "inbound_endpoint"; public static final String AUDIT_LOG_TYPE_CARBON_APPLICATION = "carbon_application"; public static final String AUDIT_LOG_TYPE_CONNECTOR = "connector"; diff --git a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java index a7d7f9065c..8c42cf4701 100644 --- a/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java +++ b/components/org.wso2.micro.integrator.extensions/org.wso2.micro.integrator.management.apis/src/main/java/org/wso2/micro/integrator/management/apis/InboundEndpointResource.java @@ -32,6 +32,7 @@ import org.wso2.carbon.inbound.endpoint.internal.http.api.APIResource; import org.wso2.micro.integrator.management.apis.security.handler.SecurityUtils; import org.wso2.micro.integrator.security.user.api.UserStoreException; +import org.wso2.micro.core.util.AuditLogger; import java.io.IOException; @@ -43,7 +44,11 @@ import java.util.Objects; import java.util.stream.Collectors; +import static org.wso2.micro.integrator.management.apis.Constants.ACTIVE_STATUS; +import static org.wso2.micro.integrator.management.apis.Constants.INACTIVE_STATUS; +import static org.wso2.micro.integrator.management.apis.Constants.NAME; import static org.wso2.micro.integrator.management.apis.Constants.SEARCH_KEY; +import static org.wso2.micro.integrator.management.apis.Constants.STATUS; import static org.wso2.micro.integrator.management.apis.Constants.SYNAPSE_CONFIGURATION; import static org.wso2.micro.integrator.management.apis.Constants.USERNAME_PROPERTY; @@ -129,9 +134,14 @@ private void handlePost(MessageContext msgCtx, } JSONObject info = new JSONObject(); info.put(INBOUND_ENDPOINT_NAME, inboundName); - response = Utils.handleTracing(performedBy, Constants.AUDIT_LOG_TYPE_INBOUND_ENDPOINT_TRACE, - Constants.INBOUND_ENDPOINTS, info, - inboundEndpoint.getAspectConfiguration(), inboundName, axisMsgCtx); + if (payload.has(STATUS)) { + response = handleStatusUpdate(inboundEndpoint, performedBy, info, msgCtx, payload); + } else { + response = Utils.handleTracing(performedBy, Constants.AUDIT_LOG_TYPE_INBOUND_ENDPOINT_TRACE, + Constants.INBOUND_ENDPOINTS, info, + inboundEndpoint.getAspectConfiguration(), inboundName, axisMsgCtx); + } + } else { response = Utils.createJsonError("Specified inbound endpoint ('" + inboundName + "') not found", axisMsgCtx, Constants.BAD_REQUEST); @@ -165,6 +175,7 @@ private void setResponseBody(Collection inboundEndpointCollecti inboundObject.put(Constants.NAME, inboundEndpoint.getName()); inboundObject.put("protocol", inboundEndpoint.getProtocol()); + inboundObject.put(Constants.STATUS, getInboundEndpointState(inboundEndpoint)); jsonBody.getJSONArray(Constants.LIST).put(inboundObject); } @@ -204,6 +215,7 @@ private JSONObject convertInboundEndpointToJsonObject(InboundEndpoint inboundEnd inboundObject.put("protocol", inboundEndpoint.getProtocol()); inboundObject.put("sequence", inboundEndpoint.getInjectingSeq()); inboundObject.put("error", inboundEndpoint.getOnErrorSeq()); + inboundObject.put(Constants.STATUS, getInboundEndpointState(inboundEndpoint)); String statisticState = inboundEndpoint.getAspectConfiguration().isStatisticsEnable() ? Constants.ENABLED : Constants.DISABLED; inboundObject.put(Constants.STATS, statisticState); @@ -229,4 +241,71 @@ private JSONObject convertInboundEndpointToJsonObject(InboundEndpoint inboundEnd } return inboundObject; } + + /** + * Determines the current state of the specified inbound endpoint. + * + * @param inboundEndpoint The {@link InboundEndpoint} instance whose state is to be retrieved. + * @return A {@link String} representing the state of the inbound endpoint: + * - {@code INACTIVE_STATUS} if the inbound endpoint is deactivated. + * - {@code ACTIVE_STATUS} otherwise. + */ + private String getInboundEndpointState(InboundEndpoint inboundEndpoint) { + if (inboundEndpoint.isDeactivated()) { + return INACTIVE_STATUS; + } + return ACTIVE_STATUS; + } + + /** + * Handles the activation or deactivation of an inbound endpoint based on the provided status. + * + * @param performedBy The user performing the operation, used for audit logging. + * @param info A JSON object containing additional audit information. + * @param messageContext The current Synapse {@link MessageContext} for accessing the configuration. + * @param payload A {@link JsonObject} containing the inbound endpoint name and desired status. + * + * @return A {@link JSONObject} indicating the result of the operation. If successful, contains + * a confirmation message. If unsuccessful, contains an error message with appropriate + * HTTP error codes. + */ + private JSONObject handleStatusUpdate(InboundEndpoint inboundEndpoint, String performedBy, JSONObject info, + MessageContext messageContext, JsonObject payload) { + String name = payload.get(NAME).getAsString(); + String status = payload.get(STATUS).getAsString(); + + org.apache.axis2.context.MessageContext axis2MessageContext = + ((Axis2MessageContext) messageContext).getAxis2MessageContext(); + JSONObject jsonResponse = new JSONObject(); + if (inboundEndpoint == null) { + return Utils.createJsonError("Inbound Endpoint could not be found", + axis2MessageContext, Constants.NOT_FOUND); + } + + if (INACTIVE_STATUS.equalsIgnoreCase(status)) { + boolean success = inboundEndpoint.deactivate(); + if (success) { + jsonResponse.put(Constants.MESSAGE_JSON_ATTRIBUTE, name + " : is deactivated"); + AuditLogger.logAuditMessage(performedBy, Constants.AUDIT_LOG_TYPE_INBOUND_ENDPOINT, + Constants.AUDIT_LOG_ACTION_DISABLED, info); + } else { + jsonResponse = Utils.createJsonError("Failed to deactivate the inbound endpoint : " + name, + axis2MessageContext, Constants.INTERNAL_SERVER_ERROR); + } + } else if (ACTIVE_STATUS.equalsIgnoreCase(status)) { + boolean success = inboundEndpoint.activate(); + if (success) { + jsonResponse.put(Constants.MESSAGE_JSON_ATTRIBUTE, name + " : is activated"); + AuditLogger.logAuditMessage(performedBy, Constants.AUDIT_LOG_TYPE_MESSAGE_PROCESSOR, + Constants.AUDIT_LOG_ACTION_ENABLE, info); + } else { + jsonResponse = Utils.createJsonError("Failed to activate the inbound endpoint : " + name, + axis2MessageContext, Constants.INTERNAL_SERVER_ERROR); + } + } else { + jsonResponse = Utils.createJsonError("Provided state is not valid", axis2MessageContext, Constants.BAD_REQUEST); + } + + return jsonResponse; + } }