Skip to content

Commit

Permalink
Merge pull request #2252 from SanojPunchihewa/foreach-v2
Browse files Browse the repository at this point in the history
Add new Foreach mediator
  • Loading branch information
SanojPunchihewa authored Dec 16, 2024
2 parents 8c971b5 + 8022931 commit 65e169c
Show file tree
Hide file tree
Showing 5 changed files with 1,073 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.commons.lang3.StringUtils;
import org.apache.synapse.Mediator;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.mediators.builtin.CallMediator;
import org.apache.synapse.mediators.builtin.CalloutMediator;
import org.apache.synapse.mediators.builtin.ForEachMediator;
import org.apache.synapse.mediators.builtin.SendMediator;
import org.apache.synapse.mediators.eip.Target;
import org.apache.synapse.mediators.v2.ForEachMediatorV2;
import org.jaxen.JaxenException;

import javax.xml.namespace.QName;
Expand Down Expand Up @@ -57,13 +60,26 @@ public class ForEachMediatorFactory extends AbstractMediatorFactory {
private static final QName CONTINUE_IN_FAULT_Q
= new QName(XMLConfigConstants.NULL_NAMESPACE, "continueLoopOnFailure");

private static final QName ATT_COLLECTION = new QName("collection");
private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
private static final QName RESULT_TARGET_Q = new QName("result-target");
private static final QName RESULT_TYPE_Q = new QName("result-type");
private static final QName ATT_COUNTER_VARIABLE = new QName("counter-variable");

public QName getTagQName() {
return FOREACH_Q;
}

@Override
protected Mediator createSpecificMediator(OMElement elem,
Properties properties) {

OMAttribute collectionAttr = elem.getAttribute(ATT_COLLECTION);
if (collectionAttr != null && StringUtils.isNotBlank(collectionAttr.getAttributeValue())) {
return createForEachMediatorV2(elem, properties);
}

ForEachMediator mediator = new ForEachMediator();
processAuditStatus(mediator, elem);

Expand Down Expand Up @@ -126,4 +142,67 @@ private boolean validateSequence(SequenceMediator sequence) {
return true;
}

public Mediator createForEachMediatorV2(OMElement elem, Properties properties) {

boolean asynchronousExe = true;

ForEachMediatorV2 mediator = new ForEachMediatorV2();
processAuditStatus(mediator, elem);

OMAttribute parallelExecAttr = elem.getAttribute(PARALLEL_EXEC_Q);
if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) {
asynchronousExe = false;
}
mediator.setParallelExecution(asynchronousExe);

OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q);
if (resultTargetAttr != null && StringUtils.isNotBlank(resultTargetAttr.getAttributeValue())) {
OMAttribute contentTypeAttr = elem.getAttribute(RESULT_TYPE_Q);
if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) {
handleException("The 'result-type' attribute is required when the 'result-target' attribute is present");
} else {
if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ForEachMediatorV2.JSON_TYPE);
} else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ForEachMediatorV2.XML_TYPE);
} else {
handleException("The 'result-type' attribute should be either 'JSON' or 'XML'");
}
mediator.setResultTarget(resultTargetAttr.getAttributeValue());
}
}

OMAttribute counterVariableAttr = elem.getAttribute(ATT_COUNTER_VARIABLE);
if (counterVariableAttr != null && StringUtils.isNotBlank(counterVariableAttr.getAttributeValue())) {
if (asynchronousExe) {
handleException("The 'counter-variable' attribute is not allowed when parallel-execution is true");
}
mediator.setCounterVariable(counterVariableAttr.getAttributeValue());
}

OMAttribute collectionAttr = elem.getAttribute(ATT_COLLECTION);
if (collectionAttr == null || StringUtils.isBlank(collectionAttr.getAttributeValue())) {
handleException("The 'collection' attribute is required for the configuration of a Foreach mediator");
} else {
try {
mediator.setCollectionExpression(SynapsePathFactory.getSynapsePath(elem, ATT_COLLECTION));
} catch (JaxenException e) {
handleException("Unable to build the Foreach Mediator. Invalid expression "
+ collectionAttr.getAttributeValue(), e);
}
}

OMElement sequenceElement = elem.getFirstChildWithName(SEQUENCE_Q);
if (sequenceElement == null) {
handleException("A 'sequence' element is required for the configuration of a Foreach mediator");
} else {
Target target = new Target();
SequenceMediatorFactory fac = new SequenceMediatorFactory();
target.setSequence(fac.createAnonymousSequence(sequenceElement, properties));
target.setAsynchronous(asynchronousExe);
mediator.setTarget(target);
}
addAllCommentChildrenToList(elem, mediator.getCommentsList());
return mediator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.synapse.Mediator;
import org.apache.synapse.mediators.builtin.ForEachMediator;
import org.apache.synapse.mediators.v2.ForEachMediatorV2;

/**
* <p>Serialize for each mediator as below : </p>
Expand All @@ -42,39 +43,73 @@ public String getMediatorClassName() {

@Override
protected OMElement serializeSpecificMediator(Mediator m) {
if (!(m instanceof ForEachMediator)) {
handleException("Unsupported mediator passed in for serialization : " +
m.getType());
}
if (m instanceof ForEachMediator) {
OMElement forEachElem = fac.createOMElement("foreach", synNS);
saveTracingState(forEachElem, m);

OMElement forEachElem = fac.createOMElement("foreach", synNS);
saveTracingState(forEachElem, m);
ForEachMediator forEachMed = (ForEachMediator) m;

ForEachMediator forEachMed = (ForEachMediator) m;
if (forEachMed.getId() != null) {
forEachElem.addAttribute("id", forEachMed.getId(), nullNS);
}

if (forEachMed.getId() != null) {
forEachElem.addAttribute("id", forEachMed.getId(), nullNS);
}
if (forEachMed.getExpression() != null) {
SynapsePathSerializer.serializePath(forEachMed.getExpression(),
forEachElem, "expression");
} else {
handleException("Missing expression of the ForEach which is required.");
}

if (forEachMed.getExpression() != null) {
SynapsePathSerializer.serializePath(forEachMed.getExpression(),
forEachElem, "expression");
} else {
handleException("Missing expression of the ForEach which is required.");
}
if (forEachMed.getSequenceRef() != null) {
forEachElem.addAttribute("sequence", forEachMed.getSequenceRef(), null);
} else if (forEachMed.getSequence() != null) {
SequenceMediatorSerializer seqSerializer = new SequenceMediatorSerializer();
OMElement seqElement = seqSerializer.serializeAnonymousSequence(
null, forEachMed.getSequence());
seqElement.setLocalName("sequence");
forEachElem.addChild(seqElement);
}

if (forEachMed.getSequenceRef() != null) {
forEachElem.addAttribute("sequence", forEachMed.getSequenceRef(), null);
} else if (forEachMed.getSequence() != null) {
SequenceMediatorSerializer seqSerializer = new SequenceMediatorSerializer();
OMElement seqElement = seqSerializer.serializeAnonymousSequence(
null, forEachMed.getSequence());
seqElement.setLocalName("sequence");
forEachElem.addChild(seqElement);
}
serializeComments(forEachElem, forEachMed.getCommentsList());

return forEachElem;
} else if (m instanceof ForEachMediatorV2) {
OMElement forEachElem = fac.createOMElement("foreach", synNS);
saveTracingState(forEachElem, m);

serializeComments(forEachElem, forEachMed.getCommentsList());
ForEachMediatorV2 forEachMediatorV2 = (ForEachMediatorV2) m;

return forEachElem;
if (forEachMediatorV2.getCollectionExpression() != null) {
SynapsePathSerializer.serializePath(forEachMediatorV2.getCollectionExpression(),
forEachElem, "collection");
} else {
handleException("Missing collection of the ForEach which is required.");
}
forEachElem.addAttribute(fac.createOMAttribute(
"parallel-execution", nullNS, Boolean.toString(forEachMediatorV2.getParallelExecution())));
if (forEachMediatorV2.getResultTarget() != null) {
forEachElem.addAttribute(fac.createOMAttribute(
"result-target", nullNS, forEachMediatorV2.getResultTarget()));
forEachElem.addAttribute(fac.createOMAttribute(
"result-type", nullNS, forEachMediatorV2.getContentType()));
}
if (forEachMediatorV2.getCounterVariable() != null) {
forEachElem.addAttribute(fac.createOMAttribute(
"counter-variable", nullNS, forEachMediatorV2.getCounterVariable()));
}
if (forEachMediatorV2.getTarget() != null) {
if (forEachMediatorV2.getTarget() != null && forEachMediatorV2.getTarget().getSequence() != null) {
SequenceMediatorSerializer serializer = new SequenceMediatorSerializer();
serializer.serializeAnonymousSequence(forEachElem, forEachMediatorV2.getTarget().getSequence());
}
} else {
handleException("Missing sequence element of the ForEach which is required.");
}
serializeComments(forEachElem, forEachMediatorV2.getCommentsList());
return forEachElem;
} else {
handleException("Unsupported mediator passed in for serialization : " + m.getType());
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.mediators.eip.aggregator;

import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseLog;
import org.apache.synapse.mediators.eip.EIPConstants;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

/**
* An instance of this class is created to manage each aggregation group, and it holds
* the aggregation properties and the messages collected during aggregation. This class also
* times out itself after the timeout expires it
*/
public class ForEachAggregate {

private final String forLoopMediatorId;
private final ReentrantLock lock = new ReentrantLock();
private List<MessageContext> messages = new ArrayList<>();
private boolean completed = false;
private String correlation = null;

public ForEachAggregate(String correlation, String forLoopMediatorId) {

this.correlation = correlation;
this.forLoopMediatorId = forLoopMediatorId;
}

/**
* Add a message to the aggregate's message list
*
* @param synCtx message to be added into this aggregation group
* @return true if the message was added or false if not
*/
public synchronized boolean addMessage(MessageContext synCtx) {

if (messages == null) {
return false;
}
messages.add(synCtx);
return true;
}

/**
* Has this aggregation group completed?
*
* @param synLog the Synapse log to use
* @return boolean true if aggregation is complete
*/
public synchronized boolean isComplete(SynapseLog synLog) {

if (!completed) {
// if any messages have been collected, check if the completion criteria is met
if (!messages.isEmpty()) {
// get total messages for this group, from the first message we have collected
MessageContext mc = messages.get(0);
Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + "." + forLoopMediatorId);

if (prop instanceof String) {
String[] msgSequence = prop.toString().split(
EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
int total = Integer.parseInt(msgSequence[1]);

if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug(messages.size() +
" messages of " + total + " collected in current foreach aggregation");
}
if (messages.size() >= total) {
synLog.traceOrDebug("Foreach iterations complete");
return true;
}
}
} else {
synLog.traceOrDebug("No messages collected in current foreach aggregation");
}
} else {
synLog.traceOrDebug(
"Foreach iteration already completed - this message will not be processed in aggregation");
}
return false;
}

public MessageContext getLastMessage() {

return messages.get(messages.size() - 1);
}

public synchronized List<MessageContext> getMessages() {

return new ArrayList<>(messages);
}

public void setMessages(List<MessageContext> messages) {

this.messages = messages;
}

public String getCorrelation() {

return correlation;
}

public void clear() {

messages = null;
}

public synchronized boolean getLock() {

return lock.tryLock();
}

public void releaseLock() {

if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}

public boolean isCompleted() {

return completed;
}

public void setCompleted(boolean completed) {

this.completed = completed;
}
}
Loading

0 comments on commit 65e169c

Please sign in to comment.