From 802293151cdc364ce609af49702362d5886f66d0 Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Thu, 5 Dec 2024 11:47:24 +0530 Subject: [PATCH] Add new Foreach mediator --- .../config/xml/ForEachMediatorFactory.java | 79 ++ .../config/xml/ForEachMediatorSerializer.java | 89 +- .../eip/aggregator/ForEachAggregate.java | 148 ++++ .../mediators/v2/ForEachMediatorV2.java | 757 ++++++++++++++++++ .../util/xpath/SynapseExpressionUtils.java | 27 + 5 files changed, 1073 insertions(+), 27 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/ForEachAggregate.java create mode 100644 modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorFactory.java index ee91de8795..23ff62722a 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorFactory.java @@ -20,6 +20,7 @@ import org.apache.axiom.om.OMAttribute; import org.apache.axiom.om.OMElement; +import org.apache.commons.lang3.StringUtils; import org.apache.synapse.Mediator; import org.apache.synapse.SynapseConstants; import org.apache.synapse.mediators.base.SequenceMediator; @@ -27,6 +28,8 @@ import org.apache.synapse.mediators.builtin.CalloutMediator; import org.apache.synapse.mediators.builtin.ForEachMediator; import org.apache.synapse.mediators.builtin.SendMediator; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.v2.ForEachMediatorV2; import org.jaxen.JaxenException; import javax.xml.namespace.QName; @@ -57,6 +60,13 @@ public class ForEachMediatorFactory extends AbstractMediatorFactory { private static final QName CONTINUE_IN_FAULT_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "continueLoopOnFailure"); + private static final QName ATT_COLLECTION = new QName("collection"); + private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence"); + private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution"); + private static final QName RESULT_TARGET_Q = new QName("result-target"); + private static final QName RESULT_TYPE_Q = new QName("result-type"); + private static final QName ATT_COUNTER_VARIABLE = new QName("counter-variable"); + public QName getTagQName() { return FOREACH_Q; } @@ -64,6 +74,12 @@ public QName getTagQName() { @Override protected Mediator createSpecificMediator(OMElement elem, Properties properties) { + + OMAttribute collectionAttr = elem.getAttribute(ATT_COLLECTION); + if (collectionAttr != null && StringUtils.isNotBlank(collectionAttr.getAttributeValue())) { + return createForEachMediatorV2(elem, properties); + } + ForEachMediator mediator = new ForEachMediator(); processAuditStatus(mediator, elem); @@ -126,4 +142,67 @@ private boolean validateSequence(SequenceMediator sequence) { return true; } + public Mediator createForEachMediatorV2(OMElement elem, Properties properties) { + + boolean asynchronousExe = true; + + ForEachMediatorV2 mediator = new ForEachMediatorV2(); + processAuditStatus(mediator, elem); + + OMAttribute parallelExecAttr = elem.getAttribute(PARALLEL_EXEC_Q); + if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) { + asynchronousExe = false; + } + mediator.setParallelExecution(asynchronousExe); + + OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q); + if (resultTargetAttr != null && StringUtils.isNotBlank(resultTargetAttr.getAttributeValue())) { + OMAttribute contentTypeAttr = elem.getAttribute(RESULT_TYPE_Q); + if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) { + handleException("The 'result-type' attribute is required when the 'result-target' attribute is present"); + } else { + if ("JSON".equals(contentTypeAttr.getAttributeValue())) { + mediator.setContentType(ForEachMediatorV2.JSON_TYPE); + } else if ("XML".equals(contentTypeAttr.getAttributeValue())) { + mediator.setContentType(ForEachMediatorV2.XML_TYPE); + } else { + handleException("The 'result-type' attribute should be either 'JSON' or 'XML'"); + } + mediator.setResultTarget(resultTargetAttr.getAttributeValue()); + } + } + + OMAttribute counterVariableAttr = elem.getAttribute(ATT_COUNTER_VARIABLE); + if (counterVariableAttr != null && StringUtils.isNotBlank(counterVariableAttr.getAttributeValue())) { + if (asynchronousExe) { + handleException("The 'counter-variable' attribute is not allowed when parallel-execution is true"); + } + mediator.setCounterVariable(counterVariableAttr.getAttributeValue()); + } + + OMAttribute collectionAttr = elem.getAttribute(ATT_COLLECTION); + if (collectionAttr == null || StringUtils.isBlank(collectionAttr.getAttributeValue())) { + handleException("The 'collection' attribute is required for the configuration of a Foreach mediator"); + } else { + try { + mediator.setCollectionExpression(SynapsePathFactory.getSynapsePath(elem, ATT_COLLECTION)); + } catch (JaxenException e) { + handleException("Unable to build the Foreach Mediator. Invalid expression " + + collectionAttr.getAttributeValue(), e); + } + } + + OMElement sequenceElement = elem.getFirstChildWithName(SEQUENCE_Q); + if (sequenceElement == null) { + handleException("A 'sequence' element is required for the configuration of a Foreach mediator"); + } else { + Target target = new Target(); + SequenceMediatorFactory fac = new SequenceMediatorFactory(); + target.setSequence(fac.createAnonymousSequence(sequenceElement, properties)); + target.setAsynchronous(asynchronousExe); + mediator.setTarget(target); + } + addAllCommentChildrenToList(elem, mediator.getCommentsList()); + return mediator; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorSerializer.java index a699a920bf..ba17edf9af 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorSerializer.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ForEachMediatorSerializer.java @@ -22,6 +22,7 @@ import org.apache.axiom.om.OMElement; import org.apache.synapse.Mediator; import org.apache.synapse.mediators.builtin.ForEachMediator; +import org.apache.synapse.mediators.v2.ForEachMediatorV2; /** *

Serialize for each mediator as below :

@@ -42,39 +43,73 @@ public String getMediatorClassName() { @Override protected OMElement serializeSpecificMediator(Mediator m) { - if (!(m instanceof ForEachMediator)) { - handleException("Unsupported mediator passed in for serialization : " + - m.getType()); - } + if (m instanceof ForEachMediator) { + OMElement forEachElem = fac.createOMElement("foreach", synNS); + saveTracingState(forEachElem, m); - OMElement forEachElem = fac.createOMElement("foreach", synNS); - saveTracingState(forEachElem, m); + ForEachMediator forEachMed = (ForEachMediator) m; - ForEachMediator forEachMed = (ForEachMediator) m; + if (forEachMed.getId() != null) { + forEachElem.addAttribute("id", forEachMed.getId(), nullNS); + } - if (forEachMed.getId() != null) { - forEachElem.addAttribute("id", forEachMed.getId(), nullNS); - } + if (forEachMed.getExpression() != null) { + SynapsePathSerializer.serializePath(forEachMed.getExpression(), + forEachElem, "expression"); + } else { + handleException("Missing expression of the ForEach which is required."); + } - if (forEachMed.getExpression() != null) { - SynapsePathSerializer.serializePath(forEachMed.getExpression(), - forEachElem, "expression"); - } else { - handleException("Missing expression of the ForEach which is required."); - } + if (forEachMed.getSequenceRef() != null) { + forEachElem.addAttribute("sequence", forEachMed.getSequenceRef(), null); + } else if (forEachMed.getSequence() != null) { + SequenceMediatorSerializer seqSerializer = new SequenceMediatorSerializer(); + OMElement seqElement = seqSerializer.serializeAnonymousSequence( + null, forEachMed.getSequence()); + seqElement.setLocalName("sequence"); + forEachElem.addChild(seqElement); + } - if (forEachMed.getSequenceRef() != null) { - forEachElem.addAttribute("sequence", forEachMed.getSequenceRef(), null); - } else if (forEachMed.getSequence() != null) { - SequenceMediatorSerializer seqSerializer = new SequenceMediatorSerializer(); - OMElement seqElement = seqSerializer.serializeAnonymousSequence( - null, forEachMed.getSequence()); - seqElement.setLocalName("sequence"); - forEachElem.addChild(seqElement); - } + serializeComments(forEachElem, forEachMed.getCommentsList()); + + return forEachElem; + } else if (m instanceof ForEachMediatorV2) { + OMElement forEachElem = fac.createOMElement("foreach", synNS); + saveTracingState(forEachElem, m); - serializeComments(forEachElem, forEachMed.getCommentsList()); + ForEachMediatorV2 forEachMediatorV2 = (ForEachMediatorV2) m; - return forEachElem; + if (forEachMediatorV2.getCollectionExpression() != null) { + SynapsePathSerializer.serializePath(forEachMediatorV2.getCollectionExpression(), + forEachElem, "collection"); + } else { + handleException("Missing collection of the ForEach which is required."); + } + forEachElem.addAttribute(fac.createOMAttribute( + "parallel-execution", nullNS, Boolean.toString(forEachMediatorV2.getParallelExecution()))); + if (forEachMediatorV2.getResultTarget() != null) { + forEachElem.addAttribute(fac.createOMAttribute( + "result-target", nullNS, forEachMediatorV2.getResultTarget())); + forEachElem.addAttribute(fac.createOMAttribute( + "result-type", nullNS, forEachMediatorV2.getContentType())); + } + if (forEachMediatorV2.getCounterVariable() != null) { + forEachElem.addAttribute(fac.createOMAttribute( + "counter-variable", nullNS, forEachMediatorV2.getCounterVariable())); + } + if (forEachMediatorV2.getTarget() != null) { + if (forEachMediatorV2.getTarget() != null && forEachMediatorV2.getTarget().getSequence() != null) { + SequenceMediatorSerializer serializer = new SequenceMediatorSerializer(); + serializer.serializeAnonymousSequence(forEachElem, forEachMediatorV2.getTarget().getSequence()); + } + } else { + handleException("Missing sequence element of the ForEach which is required."); + } + serializeComments(forEachElem, forEachMediatorV2.getCommentsList()); + return forEachElem; + } else { + handleException("Unsupported mediator passed in for serialization : " + m.getType()); + return null; + } } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/ForEachAggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/ForEachAggregate.java new file mode 100644 index 0000000000..8865cb02d8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/ForEachAggregate.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.mediators.eip.aggregator; + +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseLog; +import org.apache.synapse.mediators.eip.EIPConstants; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An instance of this class is created to manage each aggregation group, and it holds + * the aggregation properties and the messages collected during aggregation. This class also + * times out itself after the timeout expires it + */ +public class ForEachAggregate { + + private final String forLoopMediatorId; + private final ReentrantLock lock = new ReentrantLock(); + private List messages = new ArrayList<>(); + private boolean completed = false; + private String correlation = null; + + public ForEachAggregate(String correlation, String forLoopMediatorId) { + + this.correlation = correlation; + this.forLoopMediatorId = forLoopMediatorId; + } + + /** + * Add a message to the aggregate's message list + * + * @param synCtx message to be added into this aggregation group + * @return true if the message was added or false if not + */ + public synchronized boolean addMessage(MessageContext synCtx) { + + if (messages == null) { + return false; + } + messages.add(synCtx); + return true; + } + + /** + * Has this aggregation group completed? + * + * @param synLog the Synapse log to use + * @return boolean true if aggregation is complete + */ + public synchronized boolean isComplete(SynapseLog synLog) { + + if (!completed) { + // if any messages have been collected, check if the completion criteria is met + if (!messages.isEmpty()) { + // get total messages for this group, from the first message we have collected + MessageContext mc = messages.get(0); + Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + "." + forLoopMediatorId); + + if (prop instanceof String) { + String[] msgSequence = prop.toString().split( + EIPConstants.MESSAGE_SEQUENCE_DELEMITER); + int total = Integer.parseInt(msgSequence[1]); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug(messages.size() + + " messages of " + total + " collected in current foreach aggregation"); + } + if (messages.size() >= total) { + synLog.traceOrDebug("Foreach iterations complete"); + return true; + } + } + } else { + synLog.traceOrDebug("No messages collected in current foreach aggregation"); + } + } else { + synLog.traceOrDebug( + "Foreach iteration already completed - this message will not be processed in aggregation"); + } + return false; + } + + public MessageContext getLastMessage() { + + return messages.get(messages.size() - 1); + } + + public synchronized List getMessages() { + + return new ArrayList<>(messages); + } + + public void setMessages(List messages) { + + this.messages = messages; + } + + public String getCorrelation() { + + return correlation; + } + + public void clear() { + + messages = null; + } + + public synchronized boolean getLock() { + + return lock.tryLock(); + } + + public void releaseLock() { + + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + + public boolean isCompleted() { + + return completed; + } + + public void setCompleted(boolean completed) { + + this.completed = completed; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java new file mode 100644 index 0000000000..2ca6deede3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ForEachMediatorV2.java @@ -0,0 +1,757 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.mediators.v2; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import org.apache.axiom.om.OMAbstractFactory; +import org.apache.axiom.om.OMContainer; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMNode; +import org.apache.axiom.soap.SOAP11Constants; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.SOAPFactory; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.OperationContext; +import org.apache.synapse.ContinuationState; +import org.apache.synapse.ManagedLifecycle; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseLog; +import org.apache.synapse.aspects.AspectConfiguration; +import org.apache.synapse.aspects.ComponentType; +import org.apache.synapse.aspects.flow.statistics.StatisticIdentityGenerator; +import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder; +import org.apache.synapse.aspects.flow.statistics.util.StatisticDataCollectionHelper; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; +import org.apache.synapse.commons.json.JsonUtil; +import org.apache.synapse.config.xml.SynapsePath; +import org.apache.synapse.continuation.ContinuationStackManager; +import org.apache.synapse.continuation.SeqContinuationState; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.mediators.AbstractMediator; +import org.apache.synapse.mediators.FlowContinuableMediator; +import org.apache.synapse.mediators.base.SequenceMediator; +import org.apache.synapse.mediators.eip.EIPConstants; +import org.apache.synapse.mediators.eip.SharedDataHolder; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.eip.aggregator.ForEachAggregate; +import org.apache.synapse.transport.passthru.util.RelayUtils; +import org.apache.synapse.util.MessageHelper; +import org.apache.synapse.util.xpath.SynapseExpression; +import org.apache.synapse.util.xpath.SynapseExpressionUtils; +import org.apache.synapse.util.xpath.SynapseXPath; +import org.jaxen.JaxenException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; + +public class ForEachMediatorV2 extends AbstractMediator implements ManagedLifecycle, FlowContinuableMediator { + + public static final String JSON_TYPE = "JSON"; + public static final String XML_TYPE = "XML"; + private final Object lock = new Object(); + private final Map activeAggregates = Collections.synchronizedMap(new HashMap<>()); + private final String id; + private SynapsePath collectionExpression = null; + private Target target; + private boolean parallelExecution = true; + private Integer statisticReportingIndex; + private String contentType; + private String resultTarget = null; + private String counterVariableName = null; + private SynapseEnvironment synapseEnv; + + public ForEachMediatorV2() { + + id = String.valueOf(new Random().nextLong()); + } + + /** + * Check whether the message is a foreach message or not + * + * @param synCtx MessageContext + * @return true if the message is a foreach message + */ + private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) { + + Boolean isContinuationTriggeredMediatorWorker = + (Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER); + return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker; + } + + @Override + public boolean mediate(MessageContext synCtx) { + + boolean aggregationResult = false; + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Start : Foreach mediator"); + + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Message : " + synCtx.getEnvelope()); + } + } + + try { + // Clone the original MessageContext and save it to continue the flow + MessageContext clonedMessageContext = MessageHelper.cloneMessageContext(synCtx); + synCtx.setProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id, new SharedDataHolder(clonedMessageContext)); + + Object collection = collectionExpression.objectValueOf(synCtx); + + if (collection instanceof JsonArray) { + int msgNumber = 0; + JsonArray list = (JsonArray) collection; + if (list.isEmpty()) { + log.info("No elements found for the expression : " + collectionExpression); + return true; + } + int msgCount = list.size(); + for (Object item : list) { + MessageContext iteratedMsgCtx = getIteratedMessage(synCtx, msgNumber++, msgCount, item); + ContinuationStackManager.addReliantContinuationState(iteratedMsgCtx, 0, getMediatorPosition()); + boolean result = target.mediate(iteratedMsgCtx); + if (!parallelExecution && result) { + aggregationResult = aggregateMessages(iteratedMsgCtx, synLog); + } + } + } else if (collection instanceof List) { + int msgNumber = 0; + List list = (List) collection; + if (list.isEmpty()) { + log.info("No elements found for the expression : " + collectionExpression); + return true; + } + int msgCount = list.size(); + for (Object item : list) { + MessageContext iteratedMsgCtx = getIteratedMessage(synCtx, msgNumber++, msgCount, item); + ContinuationStackManager.addReliantContinuationState(iteratedMsgCtx, 0, getMediatorPosition()); + boolean result = target.mediate(iteratedMsgCtx); + if (!parallelExecution && result) { + aggregationResult = aggregateMessages(iteratedMsgCtx, synLog); + } + } + } else { + handleException("Expression " + collectionExpression + " did not resolve to a valid array", synCtx); + } + } catch (AxisFault e) { + handleException("Error executing Foreach mediator", e, synCtx); + } + + OperationContext opCtx + = ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext(); + if (opCtx != null) { + opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP"); + } + synCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); + return aggregationResult; + } + + private MessageContext getIteratedMessage(MessageContext synCtx, int msgNumber, int msgCount, Object node) throws AxisFault { + + MessageContext newCtx = MessageHelper.cloneMessageContext(synCtx, false, false); + // Adding an empty envelope since JsonUtil.getNewJsonPayload requires an envelope + SOAPEnvelope newEnvelope = createNewSoapEnvelope(synCtx.getEnvelope()); + newCtx.setEnvelope(newEnvelope); + if (node instanceof OMNode) { + if (newEnvelope.getBody() != null) { + newEnvelope.getBody().addChild((OMNode) node); + } + } else { + JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), node.toString(), true, + true); + } + newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID()); + newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, msgNumber + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + msgCount); + // Set the SCATTER_MESSAGES property to the cloned message context which will be used by the MediatorWorker + // to continue the mediation from the continuation state + newCtx.setProperty(SynapseConstants.SCATTER_MESSAGES, true); + if (!parallelExecution && counterVariableName != null) { + newCtx.setVariable(counterVariableName, msgNumber); + } + ((Axis2MessageContext) newCtx).getAxis2MessageContext().setServerSide( + ((Axis2MessageContext) synCtx).getAxis2MessageContext().isServerSide()); + return newCtx; + } + + private SOAPEnvelope createNewSoapEnvelope(SOAPEnvelope envelope) { + + SOAPFactory fac; + if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getBody().getNamespace().getNamespaceURI())) { + fac = OMAbstractFactory.getSOAP11Factory(); + } else { + fac = OMAbstractFactory.getSOAP12Factory(); + } + return fac.getDefaultEnvelope(); + } + + public void init(SynapseEnvironment synapseEnv) { + + this.synapseEnv = synapseEnv; + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.init(synapseEnv); + } + // Registering the mediator for enabling continuation + synapseEnv.updateCallMediatorCount(true); + } + + public void destroy() { + + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.destroy(); + } + // Unregistering the mediator for continuation + synapseEnv.updateCallMediatorCount(false); + } + + public Target getTarget() { + + return target; + } + + public void setTarget(Target target) { + + this.target = target; + } + + @Override + public boolean mediate(MessageContext synCtx, ContinuationState continuationState) { + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Foreach mediator : Mediating from ContinuationState"); + } + + boolean result; + // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch + // otherwise start aggregation + if (isContinuationTriggeredFromMediatorWorker(synCtx)) { + synLog.traceOrDebug("Continuation is triggered from a mediator worker"); + if (continuationState.hasChild()) { + SequenceMediator branchSequence = target.getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } else { + result = true; + } + } else { + synLog.traceOrDebug("Continuation is triggered from a callback"); + // If the continuation is triggered from a callback, continue the mediation from the continuation state + SequenceMediator branchSequence = target.getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + if (!continuationState.hasChild()) { + result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); + } else { + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } + // If the mediation is completed, remove the child continuation state from the stack, so the aggregation + // will continue the mediation from the parent continuation state + ContinuationStackManager.removeReliantContinuationState(synCtx); + } + if (result) { + return aggregateMessages(synCtx, synLog); + } + return false; + } + + private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { + + ForEachAggregate aggregate = null; + String correlationIdName = EIPConstants.AGGREGATE_CORRELATION + "." + id; + + Object correlationID = synCtx.getProperty(correlationIdName); + String correlation; + + // When the target sequences are not content aware, the message builder won't get triggered. + // Therefore, we need to build the message to do the aggregation. + try { + RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext()); + } catch (IOException | XMLStreamException e) { + handleException("Error building the message", e, synCtx); + } + if (correlationID instanceof String) { + correlation = (String) correlationID; + synLog.traceOrDebug("Aggregating messages started for correlation : " + correlation); + while (aggregate == null) { + synchronized (lock) { + if (activeAggregates.containsKey(correlation)) { + aggregate = activeAggregates.get(correlation); + if (aggregate != null) { + if (!aggregate.getLock()) { + aggregate = null; + } + } else { + break; + } + } else { + if (isAggregationCompleted(synCtx)) { + return false; + } + synLog.traceOrDebug("Creating new ForeachAggregator"); + aggregate = new ForEachAggregate(correlation, id); + aggregate.getLock(); + activeAggregates.put(correlation, aggregate); + } + } + } + } else { + synLog.traceOrDebug("Unable to find aggregation correlation property"); + return false; + } + // if there is an aggregate continue on aggregation + if (aggregate != null) { + boolean collected = aggregate.addMessage(synCtx); + if (synLog.isTraceOrDebugEnabled()) { + if (collected) { + synLog.traceOrDebug("Collected a message during aggregation"); + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Collected message : " + synCtx); + } + } + } + if (aggregate.isComplete(synLog)) { + synLog.traceOrDebug("Aggregation completed"); + return completeAggregate(aggregate); + } else { + aggregate.releaseLock(); + } + } else { + synLog.traceOrDebug("Unable to find an aggregate for this message - skip"); + } + return false; + } + + private boolean isAggregationCompleted(MessageContext synCtx) { + + Object aggregateHolderObj = synCtx.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); + + if (aggregateHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateHolderObj; + if (sharedDataHolder.isAggregationCompleted()) { + if (log.isDebugEnabled()) { + log.debug("Received a response for already completed Aggregate"); + } + return true; + } + } + return false; + } + + private boolean completeAggregate(ForEachAggregate aggregate) { + + boolean markedCompletedNow = false; + boolean wasComplete = aggregate.isCompleted(); + if (wasComplete) { + return false; + } + if (log.isDebugEnabled()) { + log.debug("Aggregation completed for the correlation : " + aggregate.getCorrelation() + + " in the ForEach mediator"); + } + + synchronized (this) { + if (!aggregate.isCompleted()) { + aggregate.setCompleted(true); + MessageContext lastMessage = aggregate.getLastMessage(); + if (lastMessage != null) { + Object aggregateHolderObj = lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); + if (aggregateHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateHolderObj; + sharedDataHolder.markAggregationCompletion(); + } + } + markedCompletedNow = true; + } + } + + if (!markedCompletedNow) { + return false; + } + + MessageContext originalMessageContext = getOriginalMessageContext(aggregate); + + if (originalMessageContext != null) { + if (updateOriginalContent()) { + updateOriginalPayload(originalMessageContext, aggregate); + } else { + setAggregatedMessageAsVariable(originalMessageContext, aggregate); + } + StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), originalMessageContext); + aggregate.clear(); + activeAggregates.remove(aggregate.getCorrelation()); + // Update the continuation state to current mediator position as we are using the original message context + ContinuationStackManager.updateSeqContinuationState(originalMessageContext, getMediatorPosition()); + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(originalMessageContext); + + getLog(originalMessageContext).traceOrDebug("End : Foreach mediator"); + boolean result = false; + + // Set CONTINUE_STATISTICS_FLOW to avoid mark event collection as finished before the aggregation is completed + originalMessageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + CloseEventCollector.closeEntryEvent(originalMessageContext, getMediatorName(), ComponentType.MEDIATOR, + statisticReportingIndex, isContentAltering()); + } + + if (seqContinuationState != null) { + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(originalMessageContext, seqContinuationState); + result = sequenceMediator.mediate(originalMessageContext, seqContinuationState); + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + sequenceMediator.reportCloseStatistics(originalMessageContext, null); + } + } + CloseEventCollector.closeEventsAfterScatterGather(originalMessageContext); + return result; + } else { + handleException(aggregate, "Error retrieving the original message context", null, aggregate.getLastMessage()); + return false; + } + } + + private MessageContext getOriginalMessageContext(ForEachAggregate aggregate) { + + MessageContext lastMessage = aggregate.getLastMessage(); + if (lastMessage != null) { + Object aggregateHolderObj = lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); + if (aggregateHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateHolderObj; + return sharedDataHolder.getSynCtx(); + } + } + return null; + } + + private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, ForEachAggregate aggregate) { + + Object variable = null; + if (Objects.equals(contentType, JSON_TYPE)) { + log.debug("Merging aggregated JSON responses to variable"); + // fill JSON array with null + variable = new JsonArray(); + Collections.nCopies(aggregate.getMessages().size(), JsonNull.INSTANCE).forEach(((JsonArray) variable)::add); + setJSONResultToVariable((JsonArray) variable, aggregate); + } else if (Objects.equals(contentType, XML_TYPE)) { + log.debug("Merging aggregated XML responses to variable"); + variable = OMAbstractFactory.getOMFactory().createOMElement(new QName(resultTarget)); + setXMLResultToVariable((OMElement) variable, aggregate); + } else { + handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget + + " unknown content type : " + contentType, null, originalMessageContext); + } + originalMessageContext.setVariable(resultTarget, variable); + } + + private void setJSONResultToVariable(JsonArray variable, ForEachAggregate aggregate) { + + for (MessageContext synCtx : aggregate.getMessages()) { + Object prop = synCtx.getProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id); + String[] msgSequence = prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER); + JsonElement jsonElement = null; + try { + Object result = new SynapseExpression("payload").objectValueOf(synCtx); + if (result instanceof JsonElement) { + jsonElement = (JsonElement) result; + } + } catch (JaxenException e) { + log.warn("Error extracting the JSON payload for iteration : " + msgSequence[0]); + } + variable.set(Integer.parseInt(msgSequence[0]), jsonElement); + } + } + + private void setXMLResultToVariable(OMElement variable, ForEachAggregate aggregate) { + + List list = getXMLPayloadsAsList(aggregate); + for (OMNode node : list) { + variable.addChild(node); + } + } + + private void updateOriginalPayload(MessageContext originalMessageContext, ForEachAggregate aggregate) { + + Object collection = this.collectionExpression.objectValueOf(originalMessageContext); + + if (collection instanceof JsonArray) { + try { + log.debug("Updating original JSON array with iteration results"); + //Read the complete JSON payload from the synCtx + String jsonPayload = JsonUtil.jsonPayloadToString(((Axis2MessageContext) originalMessageContext).getAxis2MessageContext()); + DocumentContext parsedJsonPayload = JsonPath.parse(jsonPayload); + JsonArray jsonArray = (JsonArray) collection; + for (MessageContext synCtx : aggregate.getMessages()) { + Object prop = synCtx.getProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id); + String[] msgSequence = prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER); + JsonElement jsonElement = null; + Object result = new SynapseExpression("payload").objectValueOf(synCtx); + if (result instanceof JsonElement) { + jsonElement = (JsonElement) result; + } + jsonArray.set(Integer.parseInt(msgSequence[0]), jsonElement); + } + JsonPath jsonPath = getJsonPathFromExpression(this.collectionExpression.getExpression()); + JsonElement jsonPayloadElement; + if (isWholeContent(jsonPath)) { + jsonPayloadElement = jsonArray; + } else { + jsonPayloadElement = parsedJsonPayload.set(jsonPath, jsonArray).json(); + } + if (isCollectionReferencedByVariable(this.collectionExpression)) { + String variableName = getVariableName(this.collectionExpression); + originalMessageContext.setVariable(variableName, jsonPayloadElement); + } else { + JsonUtil.getNewJsonPayload(((Axis2MessageContext) originalMessageContext).getAxis2MessageContext(), + jsonPayloadElement.toString(), true, true); + } + } catch (AxisFault axisFault) { + handleException("Error updating the json stream after foreach transformation", axisFault, originalMessageContext); + } catch (JaxenException e) { + handleException("Error extracting the JSON payload after iteration", e, originalMessageContext); + } + } else if (collection instanceof List) { + try { + log.debug("Updating original XML array with iteration results"); + List results = getXMLPayloadsAsList(aggregate); + if (isCollectionReferencedByVariable(this.collectionExpression)) { + String variableName = getVariableName(this.collectionExpression); + updateXMLCollection(originalMessageContext.getVariable(variableName), results); + } else if (SynapseExpressionUtils.isVariableXPathExpression(this.collectionExpression.getExpression())) { + // Collection is referenced by a variable and xpath as "${xpath('someXPathExpression', 'someVariable')}" + String variableName = SynapseExpressionUtils. + getVariableFromVariableXPathExpression(this.collectionExpression.getExpression()); + String xpath = SynapseExpressionUtils. + getXPathFromVariableXPathExpression(this.collectionExpression.getExpression()); + SynapseXPath synapseXPath = new SynapseXPath(xpath); + Object oldCollectionNodes = synapseXPath.evaluate(originalMessageContext.getVariable(variableName)); + updateXMLCollection(oldCollectionNodes, results); + } else { + // Extract the xpath value inside xpath() function from the expression + String xpath = this.collectionExpression.getExpression(). + substring(7, this.collectionExpression.getExpression().length() - 2); + SynapseXPath synapseXPath = new SynapseXPath(xpath); + Object oldCollectionNodes = synapseXPath.evaluate(originalMessageContext); + updateXMLCollection(oldCollectionNodes, results); + } + } catch (JaxenException e) { + handleException(aggregate, "Error updating the original XML array", e, originalMessageContext); + } + } + } + + private List getXMLPayloadsAsList(ForEachAggregate aggregate) { + + List results = new ArrayList<>(Collections.nCopies(aggregate.getMessages().size(), null)); + for (MessageContext synCtx : aggregate.getMessages()) { + Object prop = synCtx.getProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id); + String[] msgSequence = prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER); + results.set(Integer.parseInt(msgSequence[0]), synCtx.getEnvelope().getBody().getFirstElement()); + } + return results; + } + + private void updateXMLCollection(Object oldCollectionNodes, List results) { + + OMContainer parent = null; + // This is an XML collection. Hence detach the elements from the original collection and attach the new elements + if (oldCollectionNodes instanceof OMNode) { + parent = ((OMNode) oldCollectionNodes).getParent(); + ((OMNode) oldCollectionNodes).detach(); + } else if (oldCollectionNodes instanceof List) { + List oList = (List) oldCollectionNodes; + if (!oList.isEmpty()) { + parent = (((OMNode) oList.get(0)).getParent()); + } + for (Object elem : oList) { + if (elem instanceof OMNode) { + ((OMNode) elem).detach(); + } + } + } + if (parent != null) { + for (OMNode result : results) { + parent.addChild(result); + } + } + } + + @Override + public Integer reportOpenStatistics(MessageContext messageContext, boolean isContentAltering) { + + statisticReportingIndex = OpenEventCollector.reportFlowContinuableEvent(messageContext, getMediatorName(), + ComponentType.MEDIATOR, getAspectConfiguration(), isContentAltering() || isContentAltering); + return statisticReportingIndex; + } + + @Override + public void reportCloseStatistics(MessageContext messageContext, Integer currentIndex) { + + // Do nothing here as the close event is reported in the completeAggregate method + } + + @Override + public void setComponentStatisticsId(ArtifactHolder holder) { + + if (getAspectConfiguration() == null) { + configure(new AspectConfiguration(getMediatorName())); + } + String sequenceId = + StatisticIdentityGenerator.getIdForFlowContinuableMediator(getMediatorName(), ComponentType.MEDIATOR, holder); + getAspectConfiguration().setUniqueId(sequenceId); + target.setStatisticIdForMediators(holder); + StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.MEDIATOR, holder); + } + + @Override + public boolean isContentAltering() { + + return true; + } + + private void handleException(ForEachAggregate aggregate, String msg, Exception exception, MessageContext msgContext) { + + aggregate.clear(); + activeAggregates.remove(aggregate.getCorrelation()); + if (exception != null) { + super.handleException(msg, exception, msgContext); + } else { + super.handleException(msg, msgContext); + } + } + + public String getContentType() { + + return contentType; + } + + public void setContentType(String contentType) { + + this.contentType = contentType; + } + + public String getResultTarget() { + + return resultTarget; + } + + public void setResultTarget(String resultTarget) { + + this.resultTarget = resultTarget; + } + + public SynapsePath getCollectionExpression() { + + return collectionExpression; + } + + public void setCollectionExpression(SynapsePath collectionExpression) { + + this.collectionExpression = collectionExpression; + } + + public boolean getParallelExecution() { + + return this.parallelExecution; + } + + public void setParallelExecution(boolean parallelExecution) { + + this.parallelExecution = parallelExecution; + } + + private boolean updateOriginalContent() { + + return resultTarget == null; + } + + public String getId() { + + return id; + } + + private String getVariableName(SynapsePath expression) { + + return expression.getExpression().split("\\.")[1]; + } + + private boolean isCollectionReferencedByVariable(SynapsePath expression) { + + return expression.getExpression().startsWith("var."); + } + + private JsonPath getJsonPathFromExpression(String expression) { + + String jsonPath = expression; + if (jsonPath.startsWith("payload")) { + jsonPath = jsonPath.replace("payload", "$"); + } else if (jsonPath.startsWith("var.")) { + // Remove the "var." prefix and variable name and replace it with "$" for JSON path + jsonPath = expression.replaceAll("var\\.\\w+\\.(\\w+)", "\\$.$1").replaceAll("var\\.\\w+", "\\$"); + } + return JsonPath.compile(jsonPath); + } + + private boolean isWholeContent(JsonPath jsonPath) { + + return "$".equals(jsonPath.getPath().trim()) || "$.".equals(jsonPath.getPath().trim()); + } + + public String getCounterVariable() { + + return counterVariableName; + } + + public void setCounterVariable(String counterVariableName) { + + this.counterVariableName = counterVariableName; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/util/xpath/SynapseExpressionUtils.java b/modules/core/src/main/java/org/apache/synapse/util/xpath/SynapseExpressionUtils.java index b26b509a8a..0ac4c212bf 100644 --- a/modules/core/src/main/java/org/apache/synapse/util/xpath/SynapseExpressionUtils.java +++ b/modules/core/src/main/java/org/apache/synapse/util/xpath/SynapseExpressionUtils.java @@ -28,6 +28,9 @@ */ public class SynapseExpressionUtils { + private static final String regex = "xpath\\s*\\(\\s*'([^']+)'\\s*,\\s*'([^']+)'\\s*\\)"; + private static final Pattern pattern = Pattern.compile(regex); + /** * Checks whether the synapse expression is content aware * @@ -63,4 +66,28 @@ public static boolean isSynapseExpressionContentAware(String synapseExpression) } return false; } + + public static boolean isVariableXPathExpression(String synapseExpression) { + + Matcher matcher = pattern.matcher(synapseExpression); + return matcher.find(); + } + + public static String getVariableFromVariableXPathExpression(String synapseExpression) { + + Matcher matcher = pattern.matcher(synapseExpression); + if (matcher.find()) { + return matcher.group(2); + } + return null; + } + + public static String getXPathFromVariableXPathExpression(String synapseExpression) { + + Matcher matcher = pattern.matcher(synapseExpression); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } }