diff --git a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java index 4058874e53..edcdbeb1b0 100644 --- a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java @@ -626,4 +626,6 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE public static final String ANALYTICS_METADATA = "ANALYTICS_METADATA"; + public static final String SCATTER_MESSAGES = "SCATTER_MESSAGES"; + public static final String CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER = "CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER"; } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java index 4c7dfc361e..d6b197bd45 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java @@ -148,4 +148,17 @@ public static void tryEndFlow(MessageContext messageContext, String componentNam // closeFlowForcefully(messageContext); } } + + /** + * This method will close the event collector and finish the flow when a Scatter Gather mediator is used. + * + * @param messageContext synapse message context. + */ + public static void closeEventsAfterScatterGather(MessageContext messageContext) { + + if (isOpenTelemetryEnabled()) { + OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() + .handleScatterGatherFinishEvent(messageContext); + } + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java index 16522cbeb3..10b861417b 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java @@ -45,4 +45,11 @@ public interface CloseEventHandler { * @param synCtx Message context. */ void handleTryEndFlow(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx); + + /** + * Handles a close flow event. + * + * @param synCtx Message context. + */ + void handleScatterGatherFinishEvent(MessageContext synCtx); } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java index cbf5bf1218..5a78b2cb1a 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java @@ -430,6 +430,25 @@ private void handleCallbackFinishEvent(MessageContext messageContext) { } } + public void handleScatterGatherFinishEvent(MessageContext messageContext) { + TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext); + synchronized (tracingScope.getSpanStore()) { + cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext); + cleanUpActiveSpans(tracingScope.getSpanStore(), messageContext); + SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper(); + tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext); + tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId()); + } + } + + private void cleanUpActiveSpans(SpanStore spanStore, MessageContext messageContext) { + List activeSpanWrappers = spanStore.getActiveSpanWrappers(); + for (int i = activeSpanWrappers.size() - 1; i > 0; i--) { + SpanWrapper spanWrapper = activeSpanWrappers.get(i); + spanStore.finishSpan(spanWrapper, messageContext); + } + } + @Override public void handleStateStackInsertion(MessageContext synCtx, String seqName, SequenceType seqType) { TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java index 49b3a3ff27..cda214f563 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java @@ -103,7 +103,8 @@ public class MediatorFactoryFinder implements XMLToObjectMapper { ForEachMediatorFactory.class, JSONTransformMediatorFactory.class, NTLMMediatorFactory.class, - VariableMediatorFactory.class + VariableMediatorFactory.class, + ScatterGatherMediatorFactory.class }; private final static MediatorFactoryFinder instance = new MediatorFactoryFinder(); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java index 174d90511c..eb12240762 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java @@ -77,7 +77,8 @@ public class MediatorSerializerFinder { ForEachMediatorSerializer.class, JSONTransformMediatorSerializer.class, NTLMMediatorSerializer.class, - VariableMediatorSerializer.class + VariableMediatorSerializer.class, + ScatterGatherMediatorSerializer.class }; private final static MediatorSerializerFinder instance = new MediatorSerializerFinder(); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java new file mode 100644 index 0000000000..f6bbb54dc5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +import org.apache.axiom.om.OMAttribute; +import org.apache.axiom.om.OMElement; +import org.apache.commons.lang3.StringUtils; +import org.apache.synapse.Mediator; +import org.apache.synapse.SynapseException; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.v2.ScatterGather; +import org.jaxen.JaxenException; + +import java.util.Iterator; +import java.util.Properties; +import javax.xml.namespace.QName; + +/** + * The <scatter-gather> mediator is used to copy messages in Synapse to similar messages but with + * different message contexts and aggregate the responses back. + * + *
+ * <scatter-gather parallel-execution=(true | false) result-target=(body | variable) content-type=(JSON | XML)>
+ *   <aggregation value="expression" condition="expression" timeout="long"
+ *     min-messages="expression" max-messages="expression"/>
+ *   <sequence>
+ *     (mediator)+
+ *   </sequence>+
+ * </scatter-gather>
+ * 
+ */ +public class ScatterGatherMediatorFactory extends AbstractMediatorFactory { + + /** + * This will hold the QName of the clone mediator element in the xml configuration + */ + private static final QName SCATTER_GATHER_Q + = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather"); + private static final QName ELEMENT_AGGREGATE_Q + = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation"); + private static final QName ATT_AGGREGATE_EXPRESSION = new QName("expression"); + private static final QName ATT_CONDITION = new QName("condition"); + private static final QName ATT_TIMEOUT = new QName("timeout"); + private static final QName ATT_MIN_MESSAGES = new QName("min-messages"); + private static final QName ATT_MAX_MESSAGES = new QName("max-messages"); + private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence"); + private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution"); + private static final QName RESULT_TARGET_Q = new QName("result-target"); + private static final QName ROOT_ELEMENT_Q = new QName("root-element"); + private static final QName CONTENT_TYPE_Q = new QName("content-type"); + + private static final SequenceMediatorFactory fac = new SequenceMediatorFactory(); + + public Mediator createSpecificMediator(OMElement elem, Properties properties) { + + boolean asynchronousExe = true; + + ScatterGather mediator = new ScatterGather(); + processAuditStatus(mediator, elem); + + OMAttribute parallelExecAttr = elem.getAttribute(PARALLEL_EXEC_Q); + if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) { + asynchronousExe = false; + } + mediator.setParallelExecution(asynchronousExe); + + OMAttribute contentTypeAttr = elem.getAttribute(CONTENT_TYPE_Q); + if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) { + String msg = "The 'content-type' attribute is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } else { + if ("JSON".equals(contentTypeAttr.getAttributeValue())) { + mediator.setContentType(ScatterGather.JSON_TYPE); + } else if ("XML".equals(contentTypeAttr.getAttributeValue())) { + OMAttribute rootElementAttr = elem.getAttribute(ROOT_ELEMENT_Q); + if (rootElementAttr != null && StringUtils.isNotBlank(rootElementAttr.getAttributeValue())) { + mediator.setRootElementName(rootElementAttr.getAttributeValue()); + mediator.setContentType(ScatterGather.XML_TYPE); + } else { + String msg = "The 'root-element' attribute is required for the configuration of a " + + "Scatter Gather mediator when the 'content-type' is 'XML'"; + throw new SynapseException(msg); + } + } else { + String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'"; + throw new SynapseException(msg); + } + } + + OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q); + if (resultTargetAttr == null || StringUtils.isBlank(resultTargetAttr.getAttributeValue())) { + String msg = "The 'result-target' attribute is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } else { + mediator.setResultTarget(resultTargetAttr.getAttributeValue()); + } + + Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q); + if (!sequenceListElements.hasNext()) { + String msg = "A 'sequence' element is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } + while (sequenceListElements.hasNext()) { + OMElement sequence = (OMElement) sequenceListElements.next(); + if (sequence != null) { + Target target = new Target(); + target.setSequence(fac.createAnonymousSequence(sequence, properties)); + target.setAsynchronous(asynchronousExe); + mediator.addTarget(target); + } + } + + OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q); + if (aggregateElement != null) { + OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_AGGREGATE_EXPRESSION); + if (aggregateExpr != null) { + try { + mediator.setAggregationExpression( + SynapsePathFactory.getSynapsePath(aggregateElement, ATT_AGGREGATE_EXPRESSION)); + } catch (JaxenException e) { + handleException("Unable to load the aggregating expression", e); + } + } else { + String msg = "The 'expression' attribute is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } + + OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION); + if (conditionExpr != null) { + try { + mediator.setCorrelateExpression( + SynapsePathFactory.getSynapsePath(aggregateElement, ATT_CONDITION)); + } catch (JaxenException e) { + handleException("Unable to load the condition expression", e); + } + } + + OMAttribute completeTimeout = aggregateElement.getAttribute(ATT_TIMEOUT); + if (completeTimeout != null) { + mediator.setCompletionTimeoutMillis(Long.parseLong(completeTimeout.getAttributeValue())); + } + + OMAttribute minMessages = aggregateElement.getAttribute(ATT_MIN_MESSAGES); + if (minMessages != null) { + mediator.setMinMessagesToComplete(new ValueFactory().createValue("min-messages", aggregateElement)); + } + + OMAttribute maxMessages = aggregateElement.getAttribute(ATT_MAX_MESSAGES); + if (maxMessages != null) { + mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement)); + } + } else { + String msg = "The 'aggregation' element is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } + addAllCommentChildrenToList(elem, mediator.getCommentsList()); + return mediator; + } + + /** + * This method will implement the getTagQName method of the MediatorFactory interface + * + * @return QName of the clone element in xml configuration + */ + public QName getTagQName() { + + return SCATTER_GATHER_Q; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java new file mode 100755 index 0000000000..9585b59db6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +import org.apache.axiom.om.OMElement; +import org.apache.commons.lang3.StringUtils; +import org.apache.synapse.Mediator; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.v2.ScatterGather; + +/** + * Serializer for {@link ScatterGather} instances. + */ +public class ScatterGatherMediatorSerializer extends AbstractMediatorSerializer { + + public OMElement serializeSpecificMediator(Mediator m) { + + ScatterGather scatterGatherMediator = null; + if (!(m instanceof ScatterGather)) { + handleException("Unsupported mediator passed in for serialization : " + m.getType()); + } else { + scatterGatherMediator = (ScatterGather) m; + } + + assert scatterGatherMediator != null; + OMElement scatterGatherElement = fac.createOMElement("scatter-gather", synNS); + saveTracingState(scatterGatherElement, scatterGatherMediator); + + scatterGatherElement.addAttribute(fac.createOMAttribute( + "parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution()))); + scatterGatherElement.addAttribute(fac.createOMAttribute( + "result-target", nullNS, scatterGatherMediator.getResultTarget())); + scatterGatherElement.addAttribute(fac.createOMAttribute( + "content-type", nullNS, scatterGatherMediator.getContentType())); + if (StringUtils.isNotBlank(scatterGatherMediator.getRootElementName())) { + scatterGatherElement.addAttribute(fac.createOMAttribute( + "root-element", nullNS, scatterGatherMediator.getRootElementName())); + } + + OMElement aggregationElement = fac.createOMElement("aggregation", synNS); + + SynapsePathSerializer.serializePath( + scatterGatherMediator.getAggregationExpression(), aggregationElement, "expression"); + + if (scatterGatherMediator.getCorrelateExpression() != null) { + SynapsePathSerializer.serializePath( + scatterGatherMediator.getAggregationExpression(), aggregationElement, "condition"); + } + + if (scatterGatherMediator.getCompletionTimeoutMillis() != 0) { + aggregationElement.addAttribute(fac.createOMAttribute( + "timeout", nullNS, Long.toString(scatterGatherMediator.getCompletionTimeoutMillis()))); + } + if (scatterGatherMediator.getMinMessagesToComplete() != null) { + new ValueSerializer().serializeValue( + scatterGatherMediator.getMinMessagesToComplete(), "min-messages", aggregationElement); + } + if (scatterGatherMediator.getMaxMessagesToComplete() != null) { + new ValueSerializer().serializeValue( + scatterGatherMediator.getMaxMessagesToComplete(), "max-messages", aggregationElement); + } + scatterGatherElement.addChild(aggregationElement); + + for (Target target : scatterGatherMediator.getTargets()) { + if (target != null && target.getSequence() != null) { + SequenceMediatorSerializer serializer = new SequenceMediatorSerializer(); + serializer.serializeAnonymousSequence(scatterGatherElement, target.getSequence()); + } + } + serializeComments(scatterGatherElement, scatterGatherMediator.getCommentsList()); + + return scatterGatherElement; + } + + public String getMediatorClassName() { + + return ScatterGather.class.getName(); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java index 1afe6f142c..706d792b91 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java @@ -19,13 +19,24 @@ package org.apache.synapse.mediators; -import org.apache.synapse.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.synapse.FaultHandler; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.aspects.ComponentType; import org.apache.synapse.aspects.flow.statistics.StatisticsCloseEventListener; +import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; import org.apache.synapse.carbonext.TenantInfoConfigurator; +import org.apache.synapse.continuation.ContinuationStackManager; +import org.apache.synapse.continuation.SeqContinuationState; import org.apache.synapse.debug.SynapseDebugManager; +import org.apache.synapse.mediators.base.SequenceMediator; import org.apache.synapse.util.logging.LoggingUtils; /** @@ -86,7 +97,21 @@ public void run() { debugManager.advertiseMediationFlowStartPoint(synCtx); } - seq.mediate(synCtx); + boolean result = seq.mediate(synCtx); + // If this is a scatter message, then we need to use the continuation state and continue the mediation + if (isScatterMessage(synCtx) && result) { + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx); + if (seqContinuationState == null) { + return; + } + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState); + + FlowContinuableMediator mediator = + (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition()); + + synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true); + mediator.mediate(synCtx, seqContinuationState); + } //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard(); } catch (SynapseException syne) { @@ -125,7 +150,7 @@ public void run() { debugManager.advertiseMediationFlowTerminatePoint(synCtx); debugManager.releaseMediationFlowLock(); } - if (RuntimeStatisticCollector.isStatisticsEnabled()) { + if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) { this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx); } } @@ -150,4 +175,16 @@ private void warn(boolean traceOn, String msg, MessageContext msgContext) { public void setStatisticsCloseEventListener(StatisticsCloseEventListener statisticsCloseEventListener) { this.statisticsCloseEventListener = statisticsCloseEventListener; } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isScatterMessage(MessageContext synCtx) { + + Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); + return isScatterMessage != null && isScatterMessage; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java index f8b6b6f9f5..55870f16b2 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java @@ -189,7 +189,7 @@ public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher, } } - private static boolean isBody(OMElement body, OMElement enrichingElement) { + public static boolean isBody(OMElement body, OMElement enrichingElement) { try { return (body.getLocalName().equals(enrichingElement.getLocalName()) && body.getNamespace().getNamespaceURI().equals(enrichingElement.getNamespace().getNamespaceURI())); @@ -198,7 +198,7 @@ private static boolean isBody(OMElement body, OMElement enrichingElement) { } } - private static boolean checkNotEmpty(List list) { + public static boolean checkNotEmpty(List list) { return list != null && !list.isEmpty(); } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java index f87c0fffef..e0f782005f 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java @@ -18,6 +18,8 @@ package org.apache.synapse.mediators.eip; +import org.apache.synapse.MessageContext; + /** * This class is used to hold the shared data for a particular message set * For an example we can use this to share some data across all the spawned spilt messages in iterate mediator @@ -29,6 +31,17 @@ public class SharedDataHolder { */ private boolean isAggregationCompleted = false; + private MessageContext synCtx; + + public SharedDataHolder() { + + } + + public SharedDataHolder(MessageContext synCtx) { + + this.synCtx = synCtx; + } + /** * Check whether aggregation has been completed. * @@ -45,4 +58,8 @@ public void markAggregationCompletion() { isAggregationCompleted = true; } + public MessageContext getSynCtx() { + + return synCtx; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java index 3a2f621511..d650b454e2 100755 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java @@ -26,10 +26,12 @@ import org.apache.synapse.SynapseLog; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.mediators.eip.EIPConstants; +import org.apache.synapse.mediators.v2.ScatterGather; import java.util.ArrayList; import java.util.List; import java.util.TimerTask; +import java.util.concurrent.locks.ReentrantLock; /** * An instance of this class is created to manage each aggregation group, and it holds @@ -50,8 +52,9 @@ public class Aggregate extends TimerTask { private String correlation = null; /** The AggregateMediator that should be invoked on completion of the aggregation */ private AggregateMediator aggregateMediator = null; + private ScatterGather scatterGatherMediator = null; private List messages = new ArrayList(); - private boolean locked = false; + private ReentrantLock lock = new ReentrantLock(); private boolean completed = false; private SynapseEnvironment synEnv = null; @@ -87,6 +90,24 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli this.aggregateMediator = mediator; } + public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMillis, int min, + int max, ScatterGather scatterGatherMediator, FaultHandler faultHandler) { + + this.synEnv = synEnv; + this.correlation = corelation; + if (timeoutMillis > 0) { + expiryTimeMillis = System.currentTimeMillis() + timeoutMillis; + } + if (min > 0) { + minCount = min; + } + if (max > 0) { + maxCount = max; + } + this.faultHandler = faultHandler; + this.scatterGatherMediator = scatterGatherMediator; + } + /** * Add a message to the interlan message list * @@ -118,9 +139,15 @@ public synchronized boolean isComplete(SynapseLog synLog) { // get total messages for this group, from the first message we have collected MessageContext mc = messages.get(0); - Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + - (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : "")); - + Object prop; + if (aggregateMediator != null) { + prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + + (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : "")); + } else { + prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + + (scatterGatherMediator.getId() != null ? "." + scatterGatherMediator.getId() : "")); + } + if (prop != null && prop instanceof String) { String[] msgSequence = prop.toString().split( EIPConstants.MESSAGE_SEQUENCE_DELEMITER); @@ -232,12 +259,16 @@ public void run() { break; } if (getLock()) { - if (log.isDebugEnabled()) { - log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " + - "expired at : " + expiryTimeMillis); + try { + if (log.isDebugEnabled()) { + log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " + + "expired at : " + expiryTimeMillis); + } + synEnv.getExecutorService().execute(new AggregateTimeout(this)); + break; + } finally { + releaseLock(); } - synEnv.getExecutorService().execute(new AggregateTimeout(this)); - break; } } } @@ -264,8 +295,13 @@ private class AggregateTimeout implements Runnable { public void run() { MessageContext messageContext = aggregate.getLastMessage(); try { - log.warn("Aggregate mediator timeout occurred."); - aggregateMediator.completeAggregate(aggregate); + if (aggregateMediator != null) { + log.warn("Aggregate mediator timeout occurred."); + aggregateMediator.completeAggregate(aggregate); + } else { + log.warn("Scatter Gather mediator timeout occurred."); + scatterGatherMediator.completeAggregate(aggregate); + } } catch (Exception ex) { if (faultHandler != null && messageContext != null) { faultHandler.handleFault(messageContext, ex); @@ -278,11 +314,15 @@ public void run() { } public synchronized boolean getLock() { - return !locked; + + return lock.tryLock(); } - public void releaseLock() { - locked = false; + public synchronized void releaseLock() { + + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } } public boolean isCompleted() { @@ -292,5 +332,4 @@ public boolean isCompleted() { public void setCompleted(boolean completed) { this.completed = completed; } - } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java new file mode 100644 index 0000000000..5dbb70b9c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -0,0 +1,827 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.mediators.v2; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonSyntaxException; +import org.apache.axiom.om.OMAbstractFactory; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMNode; +import org.apache.axiom.om.util.AXIOMUtil; +import org.apache.axiom.soap.SOAP11Constants; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.SOAPFactory; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.OperationContext; +import org.apache.http.protocol.HTTP; +import org.apache.synapse.ContinuationState; +import org.apache.synapse.ManagedLifecycle; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.SynapseLog; +import org.apache.synapse.aspects.AspectConfiguration; +import org.apache.synapse.aspects.ComponentType; +import org.apache.synapse.aspects.flow.statistics.StatisticIdentityGenerator; +import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder; +import org.apache.synapse.aspects.flow.statistics.util.StatisticDataCollectionHelper; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; +import org.apache.synapse.commons.json.JsonUtil; +import org.apache.synapse.config.xml.SynapsePath; +import org.apache.synapse.continuation.ContinuationStackManager; +import org.apache.synapse.continuation.ReliantContinuationState; +import org.apache.synapse.continuation.SeqContinuationState; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.mediators.AbstractMediator; +import org.apache.synapse.mediators.FlowContinuableMediator; +import org.apache.synapse.mediators.Value; +import org.apache.synapse.mediators.base.SequenceMediator; +import org.apache.synapse.mediators.eip.EIPConstants; +import org.apache.synapse.mediators.eip.SharedDataHolder; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.eip.aggregator.Aggregate; +import org.apache.synapse.transport.passthru.util.RelayUtils; +import org.apache.synapse.util.MessageHelper; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Timer; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; + +import static org.apache.synapse.SynapseConstants.XML_CONTENT_TYPE; +import static org.apache.synapse.transport.passthru.PassThroughConstants.JSON_CONTENT_TYPE; + +public class ScatterGather extends AbstractMediator implements ManagedLifecycle, FlowContinuableMediator { + + public static final String JSON_TYPE = "JSON"; + public static final String XML_TYPE = "XML"; + private final Object lock = new Object(); + private final Map activeAggregates = Collections.synchronizedMap(new HashMap<>()); + private String id; + private List targets = new ArrayList<>(); + private long completionTimeoutMillis = 0; + private Value maxMessagesToComplete; + private Value minMessagesToComplete; + private SynapsePath correlateExpression = null; + private SynapsePath aggregationExpression = null; + private boolean parallelExecution = true; + private Integer statisticReportingIndex; + private String contentType; + private String rootElementName; + private String resultTarget; + private SynapseEnvironment synapseEnv; + + public ScatterGather() { + + id = String.valueOf(new Random().nextLong()); + } + + public void setParallelExecution(boolean parallelExecution) { + + this.parallelExecution = parallelExecution; + } + + public boolean getParallelExecution() { + + return this.parallelExecution; + } + + public String getId() { + + return id; + } + + @Override + public boolean mediate(MessageContext synCtx) { + + boolean aggregationResult = false; + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Start : Scatter Gather mediator"); + + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Message : " + synCtx.getEnvelope()); + } + } + + MessageContext orginalMessageContext = null; + if (!isTargetBody()) { + try { + // Clone the original MessageContext and save it to continue the flow using it when the scatter gather + // output is set to a variable + orginalMessageContext = MessageHelper.cloneMessageContext(synCtx); + } catch (AxisFault e) { + handleException("Error cloning the message context", e, synCtx); + } + } + + synCtx.setProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id, new SharedDataHolder(orginalMessageContext)); + Iterator iter = targets.iterator(); + int i = 0; + while (iter.hasNext()) { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Submitting " + (i + 1) + " of " + targets.size() + + " messages for " + (parallelExecution ? "parallel processing" : "sequential processing")); + } + + MessageContext clonedMsgCtx = getClonedMessageContext(synCtx, i++, targets.size()); + ContinuationStackManager.addReliantContinuationState(clonedMsgCtx, i - 1, getMediatorPosition()); + boolean result = iter.next().mediate(clonedMsgCtx); + if (!parallelExecution && result) { + aggregationResult = aggregateMessages(clonedMsgCtx, synLog); + } + } + OperationContext opCtx + = ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext(); + if (opCtx != null) { + opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP"); + } + synCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); + return aggregationResult; + } + + public void init(SynapseEnvironment synapseEnv) { + + this.synapseEnv = synapseEnv; + for (Target target : targets) { + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.init(synapseEnv); + } + } + // Registering the mediator for enabling continuation + synapseEnv.updateCallMediatorCount(true); + } + + public void destroy() { + + for (Target target : targets) { + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.destroy(); + } + } + // Unregistering the mediator for continuation + synapseEnv.updateCallMediatorCount(false); + } + + /** + * Clone the provided message context as a new message, and set the aggregation ID and the message sequence count + * + * @param synCtx - MessageContext which is subjected to the cloning + * @param messageSequence - the position of this message of the cloned set + * @param messageCount - total of cloned copies + * @return MessageContext the cloned message context + */ + private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence, int messageCount) { + + MessageContext newCtx = null; + try { + newCtx = MessageHelper.cloneMessageContext(synCtx); + // Set isServerSide property in the cloned message context + ((Axis2MessageContext) newCtx).getAxis2MessageContext().setServerSide( + ((Axis2MessageContext) synCtx).getAxis2MessageContext().isServerSide()); + // Set the SCATTER_MESSAGES property to the cloned message context which will be used by the MediatorWorker + // to continue the mediation from the continuation state + newCtx.setProperty(SynapseConstants.SCATTER_MESSAGES, true); + newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID()); + newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, messageSequence + + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); + } catch (AxisFault axisFault) { + handleException("Error cloning the message context", axisFault, synCtx); + } + return newCtx; + } + + public List getTargets() { + + return targets; + } + + public void setTargets(List targets) { + + this.targets = targets; + } + + public void addTarget(Target target) { + + this.targets.add(target); + } + + public SynapsePath getAggregationExpression() { + + return aggregationExpression; + } + + public void setAggregationExpression(SynapsePath aggregationExpression) { + + this.aggregationExpression = aggregationExpression; + } + + @Override + public boolean mediate(MessageContext synCtx, ContinuationState continuationState) { + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Scatter Gather mediator : Mediating from ContinuationState"); + } + + boolean result; + // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch + // otherwise start aggregation + if (isContinuationTriggeredFromMediatorWorker(synCtx)) { + synLog.traceOrDebug("Continuation is triggered from a mediator worker"); + if (continuationState.hasChild()) { + int subBranch = ((ReliantContinuationState) continuationState.getChildContState()).getSubBranch(); + SequenceMediator branchSequence = targets.get(subBranch).getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } else { + result = true; + } + } else { + synLog.traceOrDebug("Continuation is triggered from a callback"); + // If the continuation is triggered from a callback, continue the mediation from the continuation state + int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); + + SequenceMediator branchSequence = targets.get(subBranch).getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + if (!continuationState.hasChild()) { + result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); + } else { + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } + // If the mediation is completed, remove the child continuation state from the stack, so the aggregation + // will continue the mediation from the parent continuation state + ContinuationStackManager.removeReliantContinuationState(synCtx); + } + if (result) { + return aggregateMessages(synCtx, synLog); + } + return false; + } + + private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { + + Aggregate aggregate = null; + String correlationIdName = EIPConstants.AGGREGATE_CORRELATION + "." + id; + + Object correlationID = synCtx.getProperty(correlationIdName); + String correlation = (String) correlationID; + synLog.traceOrDebug("Aggregating messages started for correlation : " + correlation); + + boolean isAggregationConditionMet = false; + // When the target sequences are not content aware, the message builder wont get triggered. + // Therefore, we need to build the message to do the aggregation. + try { + RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext()); + } catch (IOException | XMLStreamException e) { + handleException("Error building the message", e, synCtx); + } + + if (correlateExpression != null) { + String expressionResult = correlateExpression.stringValueOf(synCtx); + if ("true".equalsIgnoreCase(expressionResult)) { + isAggregationConditionMet = true; + } + } + if (correlateExpression == null || isAggregationConditionMet) { + while (aggregate == null) { + synchronized (lock) { + if (activeAggregates.containsKey(correlation)) { + aggregate = activeAggregates.get(correlation); + if (aggregate != null) { + if (!aggregate.getLock()) { + aggregate = null; + } + } + } else { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Creating new Aggregator - " + + (completionTimeoutMillis > 0 ? "expires in : " + + (completionTimeoutMillis / 1000) + "secs" : + "without expiry time")); + } + if (isAggregationCompleted(synCtx)) { + return false; + } + + Double minMsg = -1.0; + if (minMessagesToComplete != null) { + minMsg = Double.parseDouble(minMessagesToComplete.evaluateValue(synCtx)); + } + Double maxMsg = -1.0; + if (maxMessagesToComplete != null) { + maxMsg = Double.parseDouble(maxMessagesToComplete.evaluateValue(synCtx)); + } + aggregate = new Aggregate( + synCtx.getEnvironment(), + correlation, + completionTimeoutMillis, + minMsg.intValue(), + maxMsg.intValue(), this, synCtx.getFaultStack().peek()); + + if (completionTimeoutMillis > 0) { + synchronized (aggregate) { + if (!aggregate.isCompleted()) { + try { + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } catch (IllegalStateException e) { + log.warn("Synapse timer already cancelled. Resetting Synapse timer"); + synCtx.getConfiguration().setSynapseTimer(new Timer(true)); + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } + } + } + } + aggregate.getLock(); + activeAggregates.put(correlation, aggregate); + } + } + } + } + // if there is an aggregate continue on aggregation + if (aggregate != null) { + boolean collected = aggregate.addMessage(synCtx); + if (synLog.isTraceOrDebugEnabled()) { + if (collected) { + synLog.traceOrDebug("Collected a message during aggregation"); + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Collected message : " + synCtx); + } + } + } + if (aggregate.isComplete(synLog)) { + synLog.traceOrDebug("Aggregation completed"); + return completeAggregate(aggregate); + } else { + aggregate.releaseLock(); + } + } else { + synLog.traceOrDebug("Unable to find an aggregate for this message - skip"); + } + return false; + } + + private boolean isAggregationCompleted(MessageContext synCtx) { + + Object aggregateTimeoutHolderObj = synCtx.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); + + if (aggregateTimeoutHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; + if (sharedDataHolder.isAggregationCompleted()) { + log.debug("Received a response for already completed Aggregate"); + return true; + } + } + return false; + } + + public boolean completeAggregate(Aggregate aggregate) { + + boolean markedCompletedNow = false; + boolean wasComplete = aggregate.isCompleted(); + if (wasComplete) { + return false; + } + log.debug("Aggregation completed or timed out"); + + // cancel the timer + synchronized (this) { + if (!aggregate.isCompleted()) { + aggregate.cancel(); + aggregate.setCompleted(true); + + MessageContext lastMessage = aggregate.getLastMessage(); + if (lastMessage != null) { + Object aggregateTimeoutHolderObj = + lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); + + if (aggregateTimeoutHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; + sharedDataHolder.markAggregationCompletion(); + } + } + markedCompletedNow = true; + } + } + + if (!markedCompletedNow) { + return false; + } + + if (isTargetBody()) { + MessageContext newSynCtx = getAggregatedMessage(aggregate); + return processAggregation(newSynCtx, aggregate); + } else { + MessageContext originalMessageContext = getOriginalMessageContext(aggregate); + if (originalMessageContext != null) { + setAggregatedMessageAsVariable(originalMessageContext, aggregate); + return processAggregation(originalMessageContext, aggregate); + } else { + handleException(aggregate, "Error retrieving the original message context", null, aggregate.getLastMessage()); + return false; + } + } + } + + private boolean processAggregation(MessageContext messageContext, Aggregate aggregate) { + + if (messageContext == null) { + log.warn("An aggregation of messages timed out with no aggregated messages", null); + return false; + } + aggregate.clear(); + activeAggregates.remove(aggregate.getCorrelation()); + + if (isTargetBody()) { + // Set content type to the aggregated message + setContentType(messageContext); + } else { + // Update the continuation state to current mediator position as we are using the original message context + ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition()); + } + + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext); + messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); + + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + CloseEventCollector.closeEntryEvent(messageContext, getMediatorName(), ComponentType.MEDIATOR, + statisticReportingIndex, isContentAltering()); + } + + getLog(messageContext).traceOrDebug("End : Scatter Gather mediator"); + boolean result = false; + if (seqContinuationState != null) { + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState); + result = sequenceMediator.mediate(messageContext, seqContinuationState); + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + sequenceMediator.reportCloseStatistics(messageContext, null); + } + } + CloseEventCollector.closeEventsAfterScatterGather(messageContext); + return result; + } + + /** + * Return the original message context using the SharedDataHolder. + * + * @param aggregate Aggregate object + * @return original message context + */ + private MessageContext getOriginalMessageContext(Aggregate aggregate) { + + MessageContext lastMessage = aggregate.getLastMessage(); + if (lastMessage != null) { + Object aggregateHolderObj = lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); + if (aggregateHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateHolderObj; + return sharedDataHolder.getSynCtx(); + } + } + return null; + } + + private void setContentType(MessageContext synCtx) { + + org.apache.axis2.context.MessageContext a2mc = ((Axis2MessageContext) synCtx).getAxis2MessageContext(); + if (Objects.equals(contentType, JSON_TYPE)) { + a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, JSON_CONTENT_TYPE); + a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, JSON_CONTENT_TYPE); + setContentTypeHeader(JSON_CONTENT_TYPE, a2mc); + } else { + a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, XML_CONTENT_TYPE); + a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, XML_CONTENT_TYPE); + setContentTypeHeader(XML_CONTENT_TYPE, a2mc); + } + a2mc.removeProperty("NO_ENTITY_BODY"); + } + + private void setContentTypeHeader(Object resultValue, org.apache.axis2.context.MessageContext axis2MessageCtx) { + + axis2MessageCtx.setProperty(org.apache.axis2.Constants.Configuration.CONTENT_TYPE, resultValue); + Object o = axis2MessageCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS); + Map headers = (Map) o; + if (headers != null) { + headers.put(HTTP.CONTENT_TYPE, resultValue); + } + } + + private static void addChildren(List list, OMElement element) { + + for (Object item : list) { + if (item instanceof OMElement) { + element.addChild((OMElement) item); + } + } + } + + private static List getMatchingElements(MessageContext messageContext, SynapsePath expression) { + + Object o = expression.objectValueOf(messageContext); + if (o instanceof OMNode) { + List list = new ArrayList(); + list.add(o); + return list; + } else if (o instanceof List) { + return (List) o; + } else { + return new ArrayList(); + } + } + + private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) { + + Object variable = null; + if (Objects.equals(contentType, JSON_TYPE)) { + log.debug("Merging aggregated JSON responses to variable"); + variable = new JsonArray(); + setJSONResultToVariable((JsonArray) variable, aggregate); + } else if (Objects.equals(contentType, XML_TYPE)) { + log.debug("Merging aggregated XML responses to variable"); + variable = OMAbstractFactory.getOMFactory().createOMElement(new QName(rootElementName)); + setXMLResultToRootOMElement((OMElement) variable, aggregate); + } else { + handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget + + " unknown content type : " + contentType, null, originalMessageContext); + } + originalMessageContext.setVariable(resultTarget, variable); + StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), originalMessageContext); + } + + private void setJSONResultToVariable(JsonArray variable, Aggregate aggregate) { + + for (MessageContext synCtx : aggregate.getMessages()) { + Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); + variable.add((JsonElement) evaluatedResult); + } + } + + private void setXMLResultToRootOMElement(OMElement element, Aggregate aggregate) { + + try { + for (MessageContext synCtx : aggregate.getMessages()) { + OMElement cloneResult = AXIOMUtil.stringToOM(aggregationExpression.stringValueOf(synCtx)); + cloneResult.buildWithAttachments(); + element.addChild(cloneResult); + } + } catch (XMLStreamException e) { + handleException(aggregate, "Error reading XML element: " + aggregationExpression.toString(), e, + aggregate.getLastMessage()); + } + } + + private MessageContext getAggregatedMessage(Aggregate aggregate) { + + MessageContext newCtx = null; + if (Objects.equals(contentType, JSON_TYPE)) { + JsonArray jsonArray = new JsonArray(); + log.debug("Merging aggregated JSON responses to body"); + for (MessageContext synCtx : aggregate.getMessages()) { + try { + if (log.isDebugEnabled()) { + log.debug("Merging message : " + synCtx.getEnvelope() + " using expression : " + + aggregationExpression); + } + Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); + if (evaluatedResult instanceof JsonElement) { + jsonArray.add((JsonElement) evaluatedResult); + } else { + jsonArray.add(evaluatedResult.toString()); + } + } catch (SynapseException e) { + handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx); + } catch (JsonSyntaxException e) { + handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx); + } + } + // setting the new JSON payload to the messageContext + try { + newCtx = MessageHelper.cloneMessageContext(aggregate.getLastMessage(), false, false, true); + SOAPEnvelope newEnvelope = createNewSoapEnvelope(aggregate.getLastMessage().getEnvelope()); + newCtx.setEnvelope(newEnvelope); + JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new + ByteArrayInputStream(jsonArray.toString().getBytes()), true, true); + } catch (AxisFault axisFault) { + handleException(aggregate, "Error occurred while setting the new JSON payload to the message context", + axisFault, newCtx); + } + } else if (Objects.equals(contentType, XML_TYPE)) { + log.debug("Merging aggregated XML responses to body"); + OMElement rootElement = OMAbstractFactory.getOMFactory().createOMElement(new QName(rootElementName)); + setXMLResultToRootOMElement(rootElement, aggregate); + try { + newCtx = MessageHelper.cloneMessageContext(aggregate.getLastMessage(), false, false, true); + SOAPEnvelope newEnvelope = createNewSoapEnvelope(aggregate.getLastMessage().getEnvelope()); + newEnvelope.getBody().addChild(rootElement); + newCtx.setEnvelope(newEnvelope); + } catch (AxisFault axisFault) { + handleException(aggregate, "Error creating a copy of the message", axisFault, aggregate.getLastMessage()); + } + // Removing the JSON stream after aggregated using XML path. + // This will fix inconsistent behaviour in logging the payload. + ((Axis2MessageContext) newCtx).getAxis2MessageContext() + .removeProperty(org.apache.synapse.commons.json.Constants.ORG_APACHE_SYNAPSE_COMMONS_JSON_JSON_INPUT_STREAM); + } else { + handleException(aggregate, "Error aggregating results. Unknown content type : " + contentType, null, + aggregate.getLastMessage()); + } + StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx); + return newCtx; + } + + private SOAPEnvelope createNewSoapEnvelope(SOAPEnvelope envelope) { + + SOAPFactory fac; + if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getBody().getNamespace().getNamespaceURI())) { + fac = OMAbstractFactory.getSOAP11Factory(); + } else { + fac = OMAbstractFactory.getSOAP12Factory(); + } + return fac.getDefaultEnvelope(); + } + + public SynapsePath getCorrelateExpression() { + + return correlateExpression; + } + + public void setCorrelateExpression(SynapsePath correlateExpression) { + + this.correlateExpression = correlateExpression; + } + + public long getCompletionTimeoutMillis() { + + return completionTimeoutMillis; + } + + public void setCompletionTimeoutMillis(long completionTimeoutMillis) { + + this.completionTimeoutMillis = completionTimeoutMillis; + } + + public Value getMinMessagesToComplete() { + + return minMessagesToComplete; + } + + public void setMinMessagesToComplete(Value minMessagesToComplete) { + + this.minMessagesToComplete = minMessagesToComplete; + } + + public Value getMaxMessagesToComplete() { + + return maxMessagesToComplete; + } + + public void setMaxMessagesToComplete(Value maxMessagesToComplete) { + + this.maxMessagesToComplete = maxMessagesToComplete; + } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) { + + Boolean isContinuationTriggeredMediatorWorker = + (Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER); + return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker; + } + + @Override + public Integer reportOpenStatistics(MessageContext messageContext, boolean isContentAltering) { + + statisticReportingIndex = OpenEventCollector.reportFlowContinuableEvent(messageContext, getMediatorName(), + ComponentType.MEDIATOR, getAspectConfiguration(), isContentAltering() || isContentAltering); + return statisticReportingIndex; + } + + @Override + public void reportCloseStatistics(MessageContext messageContext, Integer currentIndex) { + + // Do nothing here as the close event is reported in the completeAggregate method + } + + @Override + public void setComponentStatisticsId(ArtifactHolder holder) { + + if (getAspectConfiguration() == null) { + configure(new AspectConfiguration(getMediatorName())); + } + String sequenceId = + StatisticIdentityGenerator.getIdForFlowContinuableMediator(getMediatorName(), ComponentType.MEDIATOR, holder); + getAspectConfiguration().setUniqueId(sequenceId); + for (Target target : targets) { + target.setStatisticIdForMediators(holder); + } + + StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.MEDIATOR, holder); + } + + @Override + public boolean isContentAltering() { + + return true; + } + + private void handleException(Aggregate aggregate, String msg, Exception exception, MessageContext msgContext) { + + aggregate.clear(); + activeAggregates.remove(aggregate.getCorrelation()); + if (exception != null) { + super.handleException(msg, exception, msgContext); + } else { + super.handleException(msg, msgContext); + } + } + + public String getContentType() { + + return contentType; + } + + public void setContentType(String contentType) { + + this.contentType = contentType; + } + + public String getResultTarget() { + + return resultTarget; + } + + public void setResultTarget(String resultTarget) { + + this.resultTarget = resultTarget; + } + + public String getRootElementName() { + + return rootElementName; + } + + public void setRootElementName(String rootElementName) { + + this.rootElementName = rootElementName; + } + + private boolean isTargetBody() { + + return "body".equalsIgnoreCase(resultTarget); + } +} diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java new file mode 100644 index 0000000000..9ecbd3f6f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +/** + * Factory and Serializer tests for the ScatterGatherMediator + */ + +public class ScatterGatherMediatorSerializationTest extends AbstractTestCase { + + private ScatterGatherMediatorFactory scatterGatherMediatorFactory; + private ScatterGatherMediatorSerializer scatterGatherMediatorSerializer; + + public ScatterGatherMediatorSerializationTest() { + + super(ScatterGatherMediatorSerializer.class.getName()); + scatterGatherMediatorFactory = new ScatterGatherMediatorFactory(); + scatterGatherMediatorSerializer = new ScatterGatherMediatorSerializer(); + } + + public void testScatterGatherXMLTypeSerialization() { + + String inputXML = "" + + "" + + "{ \"pet\": { " + + "\"name\": \"pet1\", \"type\": \"dog\" }, " + + "\"status\": \"success\" }" + + "" + + "" + + "" + + ""; + + assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer)); + } + + public void testScatterGatherJSONTypeSerialization() { + + String inputXML = "" + + "" + + "{ \"pet\": { " + + "\"name\": \"pet1\", \"type\": \"dog\" }, " + + "\"status\": \"success\" }" + + "" + + "" + + "" + + ""; + + assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer)); + } +}