Skip to content

Commit

Permalink
Add synapse expression support and option to set output to a variable
Browse files Browse the repository at this point in the history
  • Loading branch information
SanojPunchihewa committed Dec 6, 2024
1 parent dad9df5 commit b45b4ca
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.commons.lang3.StringUtils;
import org.apache.synapse.Mediator;
import org.apache.synapse.SynapseException;
import org.apache.synapse.mediators.eip.Target;
import org.apache.synapse.mediators.v2.ScatterGather;
import org.jaxen.JaxenException;
Expand All @@ -34,7 +36,7 @@
* different message contexts and aggregate the responses back.
*
* <pre>
* &lt;scatter-gather parallel-execution=(true | false)&gt;
* &lt;scatter-gather parallel-execution=(true | false) result-target=(body | variable) content-type=(JSON | XML)&gt;
* &lt;aggregation value="expression" condition="expression" timeout="long"
* min-messages="expression" max-messages="expression"/&gt;
* &lt;sequence&gt;
Expand All @@ -59,6 +61,8 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
private static final QName RESULT_TARGET_Q = new QName("result-target");
private static final QName CONTENT_TYPE_Q = new QName("content-type");

private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();

Expand All @@ -73,10 +77,36 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) {
asynchronousExe = false;
}

mediator.setParallelExecution(asynchronousExe);

OMAttribute contentTypeAttr = elem.getAttribute(CONTENT_TYPE_Q);
if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) {
String msg = "The 'content-type' attribute is required for the configuration of a Scatter Gather mediator";
throw new SynapseException(msg);
} else {
if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ScatterGather.JSON_TYPE);
} else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ScatterGather.XML_TYPE);
} else {
String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'";
throw new SynapseException(msg);
}
}

OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q);
if (resultTargetAttr == null || StringUtils.isBlank(resultTargetAttr.getAttributeValue())) {
String msg = "The 'result-target' attribute is required for the configuration of a Scatter Gather mediator";
throw new SynapseException(msg);
} else {
mediator.setResultTarget(resultTargetAttr.getAttributeValue());
}

Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q);
if (!sequenceListElements.hasNext()) {
String msg = "A 'sequence' element is required for the configuration of a Scatter Gather mediator";
throw new SynapseException(msg);
}
while (sequenceListElements.hasNext()) {
OMElement sequence = (OMElement) sequenceListElements.next();
if (sequence != null) {
Expand All @@ -97,6 +127,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
} catch (JaxenException e) {
handleException("Unable to load the aggregating expression", e);
}
} else {
String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator";
throw new SynapseException(msg);
}

OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION);
Expand All @@ -123,6 +156,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
if (maxMessages != null) {
mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement));
}
} else {
String msg = "The 'aggregation' element is required for the configuration of a Scatter Gather mediator";
throw new SynapseException(msg);
}
addAllCommentChildrenToList(elem, mediator.getCommentsList());
return mediator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public OMElement serializeSpecificMediator(Mediator m) {

scatterGatherElement.addAttribute(fac.createOMAttribute(
"parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution())));
scatterGatherElement.addAttribute(fac.createOMAttribute(
"result-target", nullNS, scatterGatherMediator.getResultTarget()));
scatterGatherElement.addAttribute(fac.createOMAttribute(
"content-type", nullNS, scatterGatherMediator.getContentType()));

OMElement aggregationElement = fac.createOMElement("aggregation", synNS);

SynapsePathSerializer.serializePath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void run() {
debugManager.advertiseMediationFlowTerminatePoint(synCtx);
debugManager.releaseMediationFlowLock();
}
if (RuntimeStatisticCollector.isStatisticsEnabled()) {
if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) {
this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,16 +521,4 @@ public void setComponentStatisticsId(ArtifactHolder holder) {
StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder);
}
}

/**
* Check whether the message is a scatter message or not
*
* @param synCtx MessageContext
* @return true if the message is a scatter message
*/
private static boolean isScatterMessage(MessageContext synCtx) {

Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
return isSkipContinuationState != null && isSkipContinuationState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher,
}
}

private static boolean isBody(OMElement body, OMElement enrichingElement) {
public static boolean isBody(OMElement body, OMElement enrichingElement) {
try {
return (body.getLocalName().equals(enrichingElement.getLocalName()) &&
body.getNamespace().getNamespaceURI().equals(enrichingElement.getNamespace().getNamespaceURI()));
Expand All @@ -198,7 +198,7 @@ private static boolean isBody(OMElement body, OMElement enrichingElement) {
}
}

private static boolean checkNotEmpty(List list) {
public static boolean checkNotEmpty(List list) {
return list != null && !list.isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.synapse.mediators.eip;

import org.apache.synapse.MessageContext;

/**
* This class is used to hold the shared data for a particular message set
* For an example we can use this to share some data across all the spawned spilt messages in iterate mediator
Expand All @@ -29,6 +31,17 @@ public class SharedDataHolder {
*/
private boolean isAggregationCompleted = false;

private MessageContext synCtx;

public SharedDataHolder() {

}

public SharedDataHolder(MessageContext synCtx) {

this.synCtx = synCtx;
}

/**
* Check whether aggregation has been completed.
*
Expand All @@ -45,4 +58,8 @@ public void markAggregationCompletion() {
isAggregationCompleted = true;
}

public MessageContext getSynCtx() {

return synCtx;
}
}
Loading

0 comments on commit b45b4ca

Please sign in to comment.