Skip to content

Commit

Permalink
Add Support for Activating and Deactivating File Inbound Endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
sajinieKavindya committed Dec 13, 2024
1 parent 4e16060 commit e5073be
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.synapse.commons.handlers.MessagingHandler;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.mediators.Value;
import org.apache.synapse.registry.Registry;
import org.apache.synapse.util.xpath.SynapseXPath;
import org.jaxen.JaxenException;
import org.wso2.securevault.SecretResolver;
Expand All @@ -41,6 +42,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.regex.Matcher;
Expand All @@ -59,27 +61,35 @@ public class InboundEndpoint implements AspectConfigurable, ManagedLifecycle {
private boolean isSuspend;
private String injectingSeq;
private String onErrorSeq;
private boolean startInPausedMode;
private Map<String, String> parametersMap = new LinkedHashMap<String, String>();
private Map<String, String> parameterKeyMap = new LinkedHashMap<String, String>();
private List<MessagingHandler> handlers = new ArrayList();
private String fileName;
private SynapseEnvironment synapseEnvironment;
private InboundRequestProcessor inboundRequestProcessor;
private Registry registry;
/**
* This property determines whether the inbound endpoint should preserve its state
* across server restarts or configuration updates.
* */
private boolean preserveState = false;
/** car file name which this endpoint deployed from */
private String artifactContainerName;
/** Whether the deployed inbound endpoint is edited via the management console */
private boolean isEdited;
private AspectConfiguration aspectConfiguration;
/** regex for any vault expression */
private static final String secureVaultRegex = "\\{(.*?):vault-lookup\\('(.*?)'\\)\\}";
private static final String REG_INBOUND_ENDPOINT_BASE_PATH = "/repository/components/org.apache.synapse.inbound/";
private static final String INBOUND_ENDPOINT_STATE = "INBOUND_ENDPOINT_STATE";

public void init(SynapseEnvironment se) {
log.info("Initializing Inbound Endpoint: " + getName());
synapseEnvironment = se;
if(isSuspend){
log.info("Inbound endpoint " + name + " is currently suspended.");
return;
}
registry = se.getSynapseConfiguration().getRegistry();
setPreserveState();
startInPausedMode = startInPausedMode();
inboundRequestProcessor = getInboundRequestProcessor();
if (inboundRequestProcessor != null) {
try {
Expand All @@ -91,10 +101,11 @@ public void init(SynapseEnvironment se) {
}
} else {
String msg = "Inbound Request processor not found for Inbound EP : " + name +
" Protocol: " + protocol + " Class" + classImpl;
" Protocol: " + protocol + " Class" + classImpl;
log.error(msg);
throw new SynapseException(msg);
}

}

/**
Expand Down Expand Up @@ -140,6 +151,7 @@ private InboundProcessorParams populateParams() {
inboundProcessorParams.setInjectingSeq(injectingSeq);
inboundProcessorParams.setOnErrorSeq(onErrorSeq);
inboundProcessorParams.setSynapseEnvironment(synapseEnvironment);
inboundProcessorParams.setStartInPausedMode(startInPausedMode);

Properties props = Utils.paramsToProperties(parametersMap);
//replacing values by secure vault
Expand All @@ -163,6 +175,24 @@ private void resolveSystemSecureVaultProperties(Properties props) {
}
}

/**
* Configures the `preserveState` property for the inbound endpoint.
* <p>
* This method checks if the parameter {@code INBOUND_ENDPOINT_PRESERVE_STATE} is defined.
* If the parameter exists, its value is parsed as a boolean and assigned to the `preserveState` variable.
* </p>
*
* @see InboundEndpointConstants#INBOUND_ENDPOINT_PRESERVE_STATE
*/
private void setPreserveState() {
if (getParameter(InboundEndpointConstants.INBOUND_ENDPOINT_PRESERVE_STATE) != null) {
preserveState = Boolean.parseBoolean(getParameter(InboundEndpointConstants.INBOUND_ENDPOINT_PRESERVE_STATE));
}
if (!preserveState) {
deleteInboundEndpointStateInRegistry();
}
}

/**
* Remove inbound endpoints.
* <p>
Expand All @@ -179,6 +209,9 @@ public void destroy(boolean removeTask) {
} else {
inboundRequestProcessor.destroy();
}
if (!preserveState) {
deleteInboundEndpointStateInRegistry();
}
} catch (Exception e) {
log.error("Unable to destroy Inbound endpoint", e);
}
Expand Down Expand Up @@ -232,6 +265,87 @@ public void setOnErrorSeq(String onErrorSeq) {
this.onErrorSeq = onErrorSeq;
}

/**
* Activates the inbound endpoint.
* <p>
* This method synchronizes access to ensure thread safety while activating the inbound endpoint.
* It calls the underlying {@link InboundRequestProcessor} to perform the activation logic.
* If the activation is successful, updates the inbound endpoint's state in the registry
* to {@link InboundEndpointState#ACTIVE}
* </p>
*/
public synchronized boolean activate() {

if (Objects.isNull(this.inboundRequestProcessor)) {
log.error("Unable to activate the Inbound Endpoint [" + getName() + "] because "
+ "no associated inbound request processor was found!");
}

log.info("Activating the Inbound Endpoint: " + getName());
String errorMessage = "Failed to activate the Inbound Endpoint: " + getName();
try {
if (this.inboundRequestProcessor.activate()) {
log.info("Inbound Endpoint [" + getName() + "] is successfully activated.");
setInboundEndpointStateInRegistry(InboundEndpointState.ACTIVE);
return true;
} else {
log.error(errorMessage);
}
} catch (Exception e) {
log.error(errorMessage, e);
}
return false;
}

/**
* Deactivates the inbound endpoint.
* <p>
* This method synchronizes access to ensure thread safety while deactivating the inbound endpoint.
* It calls the underlying {@link InboundRequestProcessor} to perform the deactivation logic.
* If the deactivation is successful, the method updates the inbound endpoint's state in the
* registry to {@link InboundEndpointState#INACTIVE}.
* </p>
*/
public synchronized boolean deactivate() {

if (Objects.isNull(this.inboundRequestProcessor)) {
log.error("Unable to deactivate the Inbound Endpoint [" + getName() + "] because "
+ "no associated inbound request processor was found!");
}

log.info("Deactivating the Inbound Endpoint: " + getName());
String errorMessage = "Failed to deactivate the Inbound Endpoint: " + getName();
try {
if (this.inboundRequestProcessor.deactivate()) {
log.info("Inbound Endpoint [" + getName() + "] is successfully deactivated.");
setInboundEndpointStateInRegistry(InboundEndpointState.INACTIVE);
return true;
} else {
log.error(errorMessage);
}
} catch (Exception e) {
log.error(errorMessage, e);
}
return false;
}

/**
* Checks whether the inbound endpoint is deactivated.
* <p>
* This method delegates the check to the underlying {@link InboundRequestProcessor},
* which determines the deactivation state of the inbound endpoint.
* </p>
*
* @return {@code true} if the inbound endpoint is deactivated; {@code false} otherwise.
*/
public boolean isDeactivated() {

if (Objects.isNull(this.inboundRequestProcessor)) {
return true;
}
return inboundRequestProcessor.isDeactivated();
}

public String getFileName() {
return fileName;
}
Expand Down Expand Up @@ -353,4 +467,123 @@ public void addHandler(MessagingHandler handler) {

this.handlers.add(handler);
}

/**
* Updates the state of the Inbound Endpoint in the registry.
*
* <p>This method ensures that the state of the Inbound Endpoint is persisted in
* the registry for future reference. If the registry is unavailable and state
* preservation is enabled, a warning is logged, and the state will not be updated.
*
* @param state the {@link InboundEndpointState} to be saved in the registry
*/
private void setInboundEndpointStateInRegistry(InboundEndpointState state) {
if (Objects.isNull(registry) && preserveState) {
log.warn("Registry not available! The state of the Inbound Endpoint will not be saved.");
return;
}
registry.newNonEmptyResource(REG_INBOUND_ENDPOINT_BASE_PATH + getName(), false, "text/plain",
state.toString(), INBOUND_ENDPOINT_STATE);
}

/**
* Deletes the state of the Inbound Endpoint from the registry.
*
* <p>This method removes the registry entry corresponding to the current
* Inbound Endpoint's state, if it exists. If the registry is unavailable,
* the operation is skipped.
*/
private void deleteInboundEndpointStateInRegistry() {
if (Objects.isNull(registry)) {
return;
}
if (registry.getResourceProperties(REG_INBOUND_ENDPOINT_BASE_PATH + getName()) != null) {
registry.delete(REG_INBOUND_ENDPOINT_BASE_PATH + getName());
}
}

/**
* Determines whether the inbound endpoint should start in paused mode.
*
* This method evaluates the `preserveState` flag and the current state of the inbound endpoint
* to decide if the endpoint should start in a paused state.
*
* - If `preserveState` is false or if the current state in the registry is {@link InboundEndpointState#INITIAL},
* it returns the value of `suspend` attribute in the inbound endpoint configuration`.
* - Otherwise, it checks if the current state is {@link InboundEndpointState#INACTIVE}
* and returns `true` if it is, indicating the endpoint should start in paused mode.
*
* @return {@code true} if the inbound endpoint should start in paused mode, {@code false} otherwise.
*/
private boolean startInPausedMode() {

if (!preserveState) {
return isSuspend();
}
if (getInboundEndpointStateFromRegistry() == InboundEndpointState.INITIAL) {
return isSuspend();
}
return (getInboundEndpointStateFromRegistry() == InboundEndpointState.INACTIVE);
}

/**
* Retrieves the current state of the inbound endpoint from the registry.
*
* This method checks the registry for the state of the inbound endpoint associated with
* the provided name. It first fetches the resource properties of the inbound endpoint from
* the registry. If no properties are found, the method assumes the state is {@link InboundEndpointState#INITIAL}.
*
* If the state is present, it determines whether the state is {@link InboundEndpointState#ACTIVE}.
* or {@link InboundEndpointState#INACTIVE}.
*
* @return The current state of the inbound endpoint, as either {@link InboundEndpointState#ACTIVE},
* {@link InboundEndpointState#INACTIVE}, or {@link InboundEndpointState#INITIAL} if not explicitly set.
*/
private InboundEndpointState getInboundEndpointStateFromRegistry() {
Properties resourceProperties = null;
if (Objects.nonNull(registry)) {
resourceProperties = registry.getResourceProperties(REG_INBOUND_ENDPOINT_BASE_PATH + getName());
}

if (resourceProperties == null) {
return InboundEndpointState.INITIAL;
}

String state = resourceProperties.getProperty(INBOUND_ENDPOINT_STATE);
if (InboundEndpointState.ACTIVE.toString().equalsIgnoreCase(state)) {
return InboundEndpointState.ACTIVE;
}
return InboundEndpointState.INACTIVE;
}

private enum InboundEndpointState {
INITIAL, ACTIVE, INACTIVE
}

/**
* Updates the state of the inbound endpoint to either paused or active based on the given parameter.
* <p>
* This method attempts to change the state of the inbound endpoint and ensures that the state in
* the registry matches the expected behavior. If the operation fails to align the actual state
* with the requested state, a warning is logged to indicate the potential for inconsistent behavior.
*
* @param pause {@code true} to pause the inbound endpoint, setting its state to {@code INACTIVE};
* {@code false} to resume the inbound endpoint, setting its state to {@code ACTIVE}.
*/
public void updateInboundEndpointState(boolean pause) {

if (Objects.isNull(inboundRequestProcessor)) {
log.error("Unable to update the state of the Inbound Endpoint [" + getName() + "] as it does not exist!");
}
if (pause && inboundRequestProcessor.isDeactivated()) {
setInboundEndpointStateInRegistry(InboundEndpointState.INACTIVE);
} else if (!pause && !inboundRequestProcessor.isDeactivated()){
setInboundEndpointStateInRegistry(InboundEndpointState.ACTIVE);
} else {
log.warn("The inbound endpoint [" + name + "] was requested to change its state to "
+ (pause ? "pause" : "resume") + ", but the operation did not complete successfully "
+ "as the actual state does not match the expected state.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class InboundEndpointConstants {
public static final String INBOUND_ENDPOINT_PROTOCOL = "protocol";
public static final String INBOUND_ENDPOINT_CLASS = "class";
public static final String INBOUND_ENDPOINT_SUSPEND = "suspend";
public static final String INBOUND_ENDPOINT_PRESERVE_STATE = "preserve.state";
public static final String INBOUND_ENDPOINT_SEQUENCE = "sequence";
public static final String INBOUND_ENDPOINT_ERROR_SEQUENCE = "onError";
public static final String INBOUND_ENDPOINT_PARAMETERS = "parameters";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.synapse.core.SynapseEnvironment;

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
Expand All @@ -36,6 +35,17 @@ public class InboundProcessorParams {
private String onErrorSeq;
private SynapseEnvironment synapseEnvironment;
private List<MessagingHandler> handlers;
private boolean startInPausedMode;

public boolean startInPausedMode() {

return startInPausedMode;
}

public void setStartInPausedMode(boolean startInPausedMode) {

this.startInPausedMode = startInPausedMode;
}

/**
* Get the name of the inbound endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public interface InboundRequestProcessor {
public void init();

public void destroy();

public boolean activate();

public boolean deactivate();

public boolean isDeactivated();
}
Loading

0 comments on commit e5073be

Please sign in to comment.