Skip to content

Commit

Permalink
Refactor scatter-gather synapse syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
SanojPunchihewa committed Nov 19, 2024
1 parent ba26d36 commit b61a0c8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@
*
* <pre>
* &lt;scatter-gather parallel-execution=(true | false)&gt;
* &lt;aggregation value-to-aggregate="expression" condition="expression" timeout="long"
* &lt;aggregation value="expression" condition="expression" timeout="long"
* min-messages="expression" max-messages="expression"/&gt;
* &lt;target&gt;
* &lt;sequence&gt;
* (mediator)+
* &lt;/sequence&gt;
* &lt;/target&gt;+
* &lt;sequence&gt;
* (mediator)+
* &lt;/sequence&gt;+
* &lt;/scatter-gather&gt;
* </pre>
*/
Expand All @@ -54,14 +52,16 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather");
private static final QName ELEMENT_AGGREGATE_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation");
private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value-to-aggregate");
private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value");
private static final QName ATT_CONDITION = new QName("condition");
private static final QName ATT_TIMEOUT = new QName("timeout");
private static final QName ATT_MIN_MESSAGES = new QName("min-messages");
private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
private static final QName TARGET_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "target");
private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");

private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();

public Mediator createSpecificMediator(OMElement elem, Properties properties) {

boolean asynchronousExe = true;
Expand All @@ -76,11 +76,15 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {

mediator.setParallelExecution(asynchronousExe);

Iterator targetElements = elem.getChildrenWithName(TARGET_Q);
while (targetElements.hasNext()) {
Target target = TargetFactory.createTarget((OMElement) targetElements.next(), properties);
target.setAsynchronous(asynchronousExe);
mediator.addTarget(target);
Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q);
while (sequenceListElements.hasNext()) {
OMElement sequence = (OMElement) sequenceListElements.next();
if (sequence != null) {
Target target = new Target();
target.setSequence(fac.createAnonymousSequence(sequence, properties));
target.setAsynchronous(asynchronousExe);
mediator.addTarget(target);
}
}

OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public OMElement serializeSpecificMediator(Mediator m) {
OMElement aggregationElement = fac.createOMElement("aggregation", synNS);

SynapsePathSerializer.serializePath(
scatterGatherMediator.getAggregationExpression(), aggregationElement, "value-to-aggregate");
scatterGatherMediator.getAggregationExpression(), aggregationElement, "value");

if (scatterGatherMediator.getCorrelateExpression() != null) {
SynapsePathSerializer.serializePath(
Expand All @@ -68,8 +68,9 @@ public OMElement serializeSpecificMediator(Mediator m) {
scatterGatherElement.addChild(aggregationElement);

for (Target target : scatterGatherMediator.getTargets()) {
if (target != null) {
scatterGatherElement.addChild(TargetSerializer.serializeTarget(target));
if (target != null && target.getSequence() != null) {
SequenceMediatorSerializer serializer = new SequenceMediatorSerializer();
serializer.serializeAnonymousSequence(scatterGatherElement, target.getSequence());
}
}
serializeComments(scatterGatherElement, scatterGatherMediator.getCommentsList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public ScatterGatherMediatorSerializationTest() {
public void testScatterGatherSerialization() {

String inputXML = "<scatter-gather xmlns=\"http://ws.apache.org/ns/synapse\" parallel-execution=\"true\">" +
"<aggregation value-to-aggregate=\"json-eval($)\" /><target><sequence>" +
"<aggregation value=\"json-eval($)\" /><sequence>" +
"<payloadFactory media-type=\"json\"><format>{ \"pet\": { " +
"\"name\": \"pet1\", \"type\": \"dog\" }, " +
"\"status\": \"success\" }</format><args/></payloadFactory><log level=\"custom\">" +
"<property name=\"Message\" value=\"==== DONE scatter target 1 ====\"/></log></sequence></target>" +
"<target><sequence><call><endpoint><http method=\"GET\" uri-template=\"http://localhost:5454/api/pet2\"/>" +
"<property name=\"Message\" value=\"==== DONE scatter target 1 ====\"/></log></sequence>" +
"<sequence><call><endpoint><http method=\"GET\" uri-template=\"http://localhost:5454/api/pet2\"/>" +
"</endpoint></call><log level=\"custom\"><property name=\"Message\" value=\"==== DONE scatter target 2 ====\"/>" +
"</log></sequence></target></scatter-gather>";
"</log></sequence></scatter-gather>";

assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer));
}
Expand Down

0 comments on commit b61a0c8

Please sign in to comment.