From 1b4379f549ccf39a59aad89fd9d1374a827c2739 Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Wed, 4 Dec 2024 07:10:58 +0530 Subject: [PATCH] Fix review comments --- .../xml/ScatterGatherMediatorFactory.java | 19 +- .../xml/ScatterGatherMediatorSerializer.java | 7 +- .../synapse/mediators/MediatorWorker.java | 32 +- .../mediators/eip/aggregator/Aggregate.java | 16 +- .../synapse/mediators/v2/ScatterGather.java | 287 +++++------------- ...catterGatherMediatorSerializationTest.java | 22 +- 6 files changed, 145 insertions(+), 238 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java index 81689a7edb..f6bbb54dc5 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -54,7 +54,7 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory { = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather"); private static final QName ELEMENT_AGGREGATE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation"); - private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value"); + private static final QName ATT_AGGREGATE_EXPRESSION = new QName("expression"); private static final QName ATT_CONDITION = new QName("condition"); private static final QName ATT_TIMEOUT = new QName("timeout"); private static final QName ATT_MIN_MESSAGES = new QName("min-messages"); @@ -62,6 +62,7 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory { private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence"); private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution"); private static final QName RESULT_TARGET_Q = new QName("result-target"); + private static final QName ROOT_ELEMENT_Q = new QName("root-element"); private static final QName CONTENT_TYPE_Q = new QName("content-type"); private static final SequenceMediatorFactory fac = new SequenceMediatorFactory(); @@ -87,7 +88,15 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) { if ("JSON".equals(contentTypeAttr.getAttributeValue())) { mediator.setContentType(ScatterGather.JSON_TYPE); } else if ("XML".equals(contentTypeAttr.getAttributeValue())) { - mediator.setContentType(ScatterGather.XML_TYPE); + OMAttribute rootElementAttr = elem.getAttribute(ROOT_ELEMENT_Q); + if (rootElementAttr != null && StringUtils.isNotBlank(rootElementAttr.getAttributeValue())) { + mediator.setRootElementName(rootElementAttr.getAttributeValue()); + mediator.setContentType(ScatterGather.XML_TYPE); + } else { + String msg = "The 'root-element' attribute is required for the configuration of a " + + "Scatter Gather mediator when the 'content-type' is 'XML'"; + throw new SynapseException(msg); + } } else { String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'"; throw new SynapseException(msg); @@ -119,16 +128,16 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) { OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q); if (aggregateElement != null) { - OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_VALUE_TO_AGGREGATE); + OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_AGGREGATE_EXPRESSION); if (aggregateExpr != null) { try { mediator.setAggregationExpression( - SynapsePathFactory.getSynapsePath(aggregateElement, ATT_VALUE_TO_AGGREGATE)); + SynapsePathFactory.getSynapsePath(aggregateElement, ATT_AGGREGATE_EXPRESSION)); } catch (JaxenException e) { handleException("Unable to load the aggregating expression", e); } } else { - String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator"; + String msg = "The 'expression' attribute is required for the configuration of a Scatter Gather mediator"; throw new SynapseException(msg); } diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java index e63bad4c85..9585b59db6 100755 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java @@ -19,6 +19,7 @@ package org.apache.synapse.config.xml; import org.apache.axiom.om.OMElement; +import org.apache.commons.lang3.StringUtils; import org.apache.synapse.Mediator; import org.apache.synapse.mediators.eip.Target; import org.apache.synapse.mediators.v2.ScatterGather; @@ -47,11 +48,15 @@ public OMElement serializeSpecificMediator(Mediator m) { "result-target", nullNS, scatterGatherMediator.getResultTarget())); scatterGatherElement.addAttribute(fac.createOMAttribute( "content-type", nullNS, scatterGatherMediator.getContentType())); + if (StringUtils.isNotBlank(scatterGatherMediator.getRootElementName())) { + scatterGatherElement.addAttribute(fac.createOMAttribute( + "root-element", nullNS, scatterGatherMediator.getRootElementName())); + } OMElement aggregationElement = fac.createOMElement("aggregation", synNS); SynapsePathSerializer.serializePath( - scatterGatherMediator.getAggregationExpression(), aggregationElement, "value"); + scatterGatherMediator.getAggregationExpression(), aggregationElement, "expression"); if (scatterGatherMediator.getCorrelateExpression() != null) { SynapsePathSerializer.serializePath( diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java index 2d4258f342..706d792b91 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java @@ -97,24 +97,20 @@ public void run() { debugManager.advertiseMediationFlowStartPoint(synCtx); } + boolean result = seq.mediate(synCtx); // If this is a scatter message, then we need to use the continuation state and continue the mediation - if (isScatterMessage(synCtx)) { - boolean result = seq.mediate(synCtx); - if (result) { - SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx); - if (seqContinuationState == null) { - return; - } - SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState); - - FlowContinuableMediator mediator = - (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition()); - - synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true); - mediator.mediate(synCtx, seqContinuationState); + if (isScatterMessage(synCtx) && result) { + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx); + if (seqContinuationState == null) { + return; } - } else { - seq.mediate(synCtx); + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState); + + FlowContinuableMediator mediator = + (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition()); + + synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true); + mediator.mediate(synCtx, seqContinuationState); } //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard(); @@ -188,7 +184,7 @@ public void setStatisticsCloseEventListener(StatisticsCloseEventListener statist */ private static boolean isScatterMessage(MessageContext synCtx) { - Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); - return isSkipContinuationState != null && isSkipContinuationState; + Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); + return isScatterMessage != null && isScatterMessage; } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java index 337a693d57..d650b454e2 100755 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.TimerTask; +import java.util.concurrent.locks.ReentrantLock; /** * An instance of this class is created to manage each aggregation group, and it holds @@ -53,7 +54,7 @@ public class Aggregate extends TimerTask { private AggregateMediator aggregateMediator = null; private ScatterGather scatterGatherMediator = null; private List messages = new ArrayList(); - private boolean locked = false; + private ReentrantLock lock = new ReentrantLock(); private boolean completed = false; private SynapseEnvironment synEnv = null; @@ -313,15 +314,15 @@ public void run() { } public synchronized boolean getLock() { - if (!locked) { - locked = true; - return true; - } - return false; + + return lock.tryLock(); } public synchronized void releaseLock() { - locked = false; + + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } } public boolean isCompleted() { @@ -331,5 +332,4 @@ public boolean isCompleted() { public void setCompleted(boolean completed) { this.completed = completed; } - } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java index c0e5e25f8f..7650c65422 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -20,18 +20,18 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; import com.google.gson.JsonSyntaxException; import org.apache.axiom.om.OMAbstractFactory; import org.apache.axiom.om.OMElement; import org.apache.axiom.om.OMNode; +import org.apache.axiom.soap.SOAP11Constants; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.SOAPFactory; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; import org.apache.axis2.context.OperationContext; import org.apache.http.protocol.HTTP; import org.apache.synapse.ContinuationState; -import org.apache.synapse.JSONObjectExtensionException; import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.Mediator; import org.apache.synapse.MessageContext; @@ -59,12 +59,10 @@ import org.apache.synapse.mediators.Value; import org.apache.synapse.mediators.base.SequenceMediator; import org.apache.synapse.mediators.eip.EIPConstants; -import org.apache.synapse.mediators.eip.EIPUtils; import org.apache.synapse.mediators.eip.SharedDataHolder; import org.apache.synapse.mediators.eip.Target; import org.apache.synapse.mediators.eip.aggregator.Aggregate; import org.apache.synapse.transport.passthru.util.RelayUtils; -import org.apache.synapse.util.JSONMergeUtils; import org.apache.synapse.util.MessageHelper; import java.io.ByteArrayInputStream; @@ -100,6 +98,7 @@ public class ScatterGather extends AbstractMediator implements ManagedLifecycle, private boolean parallelExecution = true; private Integer statisticReportingIndex; private String contentType; + private String rootElementName; private String resultTarget; private SynapseEnvironment synapseEnv; @@ -265,6 +264,7 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch // otherwise start aggregation if (isContinuationTriggeredFromMediatorWorker(synCtx)) { + synLog.traceOrDebug("Continuation is triggered from a mediator worker"); if (continuationState.hasChild()) { int subBranch = ((ReliantContinuationState) continuationState.getChildContState()).getSubBranch(); SequenceMediator branchSequence = targets.get(subBranch).getSequence(); @@ -280,6 +280,7 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat result = true; } } else { + synLog.traceOrDebug("Continuation is triggered from a callback"); // If the continuation is triggered from a callback, continue the mediation from the continuation state int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); @@ -312,9 +313,10 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { String correlationIdName = EIPConstants.AGGREGATE_CORRELATION + "." + id; Object correlationID = synCtx.getProperty(correlationIdName); - String correlation; + String correlation = (String) correlationID; + synLog.traceOrDebug("Aggregating messages started for correlation : " + correlation); - Object result = null; + boolean isAggregationConditionMet = false; // When the target sequences are not content aware, the message builder wont get triggered. // Therefore, we need to build the message to do the aggregation. try { @@ -324,71 +326,12 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { } if (correlateExpression != null) { - result = correlateExpression.objectValueOf(synCtx); - if (result instanceof List) { - if (((List) result).isEmpty()) { - handleException("Failed to evaluate correlate expression: " + correlateExpression.toString(), synCtx); - } - } - if (result instanceof JsonPrimitive) { - if (!((JsonPrimitive) result).getAsBoolean()) { - return false; - } - } - if (result instanceof Boolean) { - if (!(Boolean) result) { - return false; - } + String expressionResult = correlateExpression.stringValueOf(synCtx); + if ("true".equalsIgnoreCase(expressionResult)) { + isAggregationConditionMet = true; } } - if (result != null) { - while (aggregate == null) { - synchronized (lock) { - if (activeAggregates.containsKey(correlateExpression.toString())) { - aggregate = activeAggregates.get(correlateExpression.toString()); - if (aggregate != null) { - if (!aggregate.getLock()) { - aggregate = null; - } - } - } else { - if (synLog.isTraceOrDebugEnabled()) { - synLog.traceOrDebug("Creating new Aggregator - " + - (completionTimeoutMillis > 0 ? "expires in : " - + (completionTimeoutMillis / 1000) + "secs" : - "without expiry time")); - } - if (isAggregationCompleted(synCtx)) { - return false; - } - - Double minMsg = -1.0; - if (minMessagesToComplete != null) { - minMsg = Double.parseDouble(minMessagesToComplete.evaluateValue(synCtx)); - } - Double maxMsg = -1.0; - if (maxMessagesToComplete != null) { - maxMsg = Double.parseDouble(maxMessagesToComplete.evaluateValue(synCtx)); - } - - aggregate = new Aggregate( - synCtx.getEnvironment(), - correlateExpression.toString(), - completionTimeoutMillis, - minMsg.intValue(), - maxMsg.intValue(), this, synCtx.getFaultStack().peek()); - - if (completionTimeoutMillis > 0) { - synCtx.getConfiguration().getSynapseTimer(). - schedule(aggregate, completionTimeoutMillis); - } - aggregate.getLock(); - activeAggregates.put(correlateExpression.toString(), aggregate); - } - } - } - } else if (correlationID instanceof String) { - correlation = (String) correlationID; + if (correlateExpression == null || isAggregationConditionMet) { while (aggregate == null) { synchronized (lock) { if (activeAggregates.containsKey(correlation)) { @@ -397,8 +340,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { if (!aggregate.getLock()) { aggregate = null; } - } else { - break; } } else { if (synLog.isTraceOrDebugEnabled()) { @@ -446,9 +387,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { } } } - } else { - synLog.traceOrDebug("Unable to find aggregation correlation property"); - return false; } // if there is an aggregate continue on aggregation if (aggregate != null) { @@ -463,9 +401,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { } if (aggregate.isComplete(synLog)) { synLog.traceOrDebug("Aggregation completed"); - boolean onCompleteSeqResult = completeAggregate(aggregate); - synLog.traceOrDebug("End : Scatter Gather mediator"); - return onCompleteSeqResult; + return completeAggregate(aggregate); } else { aggregate.releaseLock(); } @@ -482,9 +418,7 @@ private boolean isAggregationCompleted(MessageContext synCtx) { if (aggregateTimeoutHolderObj != null) { SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; if (sharedDataHolder.isAggregationCompleted()) { - if (log.isDebugEnabled()) { - log.debug("Received a response for already completed Aggregate"); - } + log.debug("Received a response for already completed Aggregate"); return true; } } @@ -498,10 +432,7 @@ public boolean completeAggregate(Aggregate aggregate) { if (wasComplete) { return false; } - - if (log.isDebugEnabled()) { - log.debug("Aggregation completed or timed out"); - } + log.debug("Aggregation completed or timed out"); // cancel the timer synchronized (this) { @@ -567,6 +498,7 @@ private boolean processAggregation(MessageContext messageContext, Aggregate aggr statisticReportingIndex, isContentAltering()); } + getLog(messageContext).traceOrDebug("End : Scatter Gather mediator"); boolean result = false; if (seqContinuationState != null) { SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState); @@ -646,145 +578,58 @@ private static List getMatchingElements(MessageContext messageContext, SynapsePa } } - private static void enrichEnvelope(MessageContext messageContext, SynapsePath expression) { - - OMElement enrichingElement; - List elementList = getMatchingElements(messageContext, expression); - if (EIPUtils.checkNotEmpty(elementList)) { - // attach at parent of the first result from the XPath, or to the SOAPBody - Object o = elementList.get(0); - if (o instanceof OMElement && - ((OMElement) o).getParent() != null && - ((OMElement) o).getParent() instanceof OMElement) { - enrichingElement = (OMElement) ((OMElement) o).getParent(); - OMElement body = messageContext.getEnvelope().getBody(); - if (!EIPUtils.isBody(body, enrichingElement)) { - OMElement nonBodyElem = enrichingElement; - enrichingElement = messageContext.getEnvelope().getBody(); - addChildren(elementList, enrichingElement); - while (!EIPUtils.isBody(body, (OMElement) nonBodyElem.getParent())) { - nonBodyElem = (OMElement) nonBodyElem.getParent(); - } - nonBodyElem.detach(); - } - } - } - } - private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) { - Object variable = originalMessageContext.getVariable(resultTarget); - if (variable == null) { - variable = createNewVariable(originalMessageContext, aggregate); - originalMessageContext.setVariable(resultTarget, variable); - } + Object variable = null; if (Objects.equals(contentType, JSON_TYPE)) { - setJSONResultToVariable((JsonElement) variable, aggregate); - } else if (Objects.equals(contentType, XML_TYPE) && variable instanceof OMElement) { - setXMLResultToVariable((OMElement) variable, aggregate); + log.debug("Merging aggregated JSON responses to variable"); + variable = new JsonArray(); + setJSONResultToVariable((JsonArray) variable, aggregate); + } else if (Objects.equals(contentType, XML_TYPE)) { + log.debug("Merging aggregated XML responses to variable"); + variable = OMAbstractFactory.getOMFactory().createOMElement(new QName(rootElementName)); + setXMLResultToRootOMElement((OMElement) variable, aggregate); } else { - handleInvalidVariableType(variable, aggregate, originalMessageContext); + handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget + + " unknown content type : " + contentType, null, originalMessageContext); } + originalMessageContext.setVariable(resultTarget, variable); StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), originalMessageContext); } - private void handleInvalidVariableType(Object variable, Aggregate aggregate, MessageContext synCtx) { - - String expectedType = Objects.equals(contentType, JSON_TYPE) ? "JSON" : "OMElement"; - String actualType = variable != null ? variable.getClass().getName() : "null"; - handleException(aggregate, "Error merging aggregation results to variable: " + resultTarget + - " expected a " + expectedType + " but found " + actualType, null, synCtx); - } - - private void setJSONResultToVariable(JsonElement variable, Aggregate aggregate) { + private void setJSONResultToVariable(JsonArray variable, Aggregate aggregate) { - try { - for (MessageContext synCtx : aggregate.getMessages()) { - Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); - if (variable instanceof JsonArray) { - variable.getAsJsonArray().add((JsonElement) evaluatedResult); - } else if (variable instanceof JsonObject) { - JSONMergeUtils.extendJSONObject((JsonObject) variable, - JSONMergeUtils.ConflictStrategy.MERGE_INTO_ARRAY, - ((JsonObject) evaluatedResult).getAsJsonObject()); - } else { - handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget + - " expected a JSON type variable but found " + variable.getClass().getName(), null, synCtx); - } - } - } catch (JSONObjectExtensionException e) { - handleException(aggregate, "Error merging aggregation results to JSON Object : " + - aggregationExpression.toString(), e, aggregate.getLastMessage()); + for (MessageContext synCtx : aggregate.getMessages()) { + Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); + variable.add((JsonElement) evaluatedResult); } } - private void setXMLResultToVariable(OMElement variable, Aggregate aggregate) { + private void setXMLResultToRootOMElement(OMElement element, Aggregate aggregate) { for (MessageContext synCtx : aggregate.getMessages()) { List list = getMatchingElements(synCtx, aggregationExpression); - addChildren(list, variable); - } - } - - private Object createNewVariable(MessageContext synCtx, Aggregate aggregate) { - - if (Objects.equals(contentType, JSON_TYPE)) { - return new JsonArray(); - } else if (Objects.equals(contentType, XML_TYPE)) { - return OMAbstractFactory.getOMFactory().createOMElement(new QName(resultTarget)); - } else { - handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget + - " unknown content type : " + contentType, null, synCtx); - return null; + addChildren(list, element); } } private MessageContext getAggregatedMessage(Aggregate aggregate) { MessageContext newCtx = null; - JsonArray jsonArray = new JsonArray(); - - for (MessageContext synCtx : aggregate.getMessages()) { - if (newCtx == null) { - try { - newCtx = MessageHelper.cloneMessageContext(synCtx, true, false, true); - } catch (AxisFault axisFault) { - handleException(aggregate, "Error creating a copy of the message", axisFault, synCtx); - } - - if (log.isDebugEnabled()) { - log.debug("Generating Aggregated message from : " + newCtx.getEnvelope()); - } - if (Objects.equals(contentType, JSON_TYPE)) { - Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); - if (evaluatedResult instanceof JsonElement) { - jsonArray.add((JsonElement) evaluatedResult); - } else { - handleException(aggregate, "Error merging aggregation results as expression : " + - aggregationExpression.toString() + " did not resolve to a JSON value", null, synCtx); - } - } else { - enrichEnvelope(synCtx, aggregationExpression); - } - } else { + if (Objects.equals(contentType, JSON_TYPE)) { + JsonArray jsonArray = new JsonArray(); + log.debug("Merging aggregated JSON responses to body"); + for (MessageContext synCtx : aggregate.getMessages()) { try { if (log.isDebugEnabled()) { log.debug("Merging message : " + synCtx.getEnvelope() + " using expression : " + aggregationExpression); } - if (Objects.equals(contentType, JSON_TYPE)) { - Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); - if (evaluatedResult instanceof JsonElement) { - jsonArray.add((JsonElement) evaluatedResult); - } else { - jsonArray.add(evaluatedResult.toString()); - } + Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); + if (evaluatedResult instanceof JsonElement) { + jsonArray.add((JsonElement) evaluatedResult); } else { - enrichEnvelope(synCtx, aggregationExpression); - } - - if (log.isDebugEnabled()) { - log.debug("Merged result : " + newCtx.getEnvelope()); + jsonArray.add(evaluatedResult.toString()); } } catch (SynapseException e) { handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx); @@ -792,26 +637,52 @@ private MessageContext getAggregatedMessage(Aggregate aggregate) { handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx); } } - } - - StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx); - if (Objects.equals(contentType, JSON_TYPE)) { // setting the new JSON payload to the messageContext try { + newCtx = MessageHelper.cloneMessageContext(aggregate.getLastMessage(), false, false, true); + SOAPEnvelope newEnvelope = createNewSoapEnvelope(aggregate.getLastMessage().getEnvelope()); + newCtx.setEnvelope(newEnvelope); JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new ByteArrayInputStream(jsonArray.toString().getBytes()), true, true); } catch (AxisFault axisFault) { - log.error("Error occurred while setting the new JSON payload to the msg context", axisFault); + handleException(aggregate, "Error occurred while setting the new JSON payload to the message context", + axisFault, newCtx); + } + } else if (Objects.equals(contentType, XML_TYPE)) { + log.debug("Merging aggregated XML responses to body"); + OMElement rootElement = OMAbstractFactory.getOMFactory().createOMElement(new QName(rootElementName)); + setXMLResultToRootOMElement(rootElement, aggregate); + try { + newCtx = MessageHelper.cloneMessageContext(aggregate.getLastMessage(), false, false, true); + SOAPEnvelope newEnvelope = createNewSoapEnvelope(aggregate.getLastMessage().getEnvelope()); + newEnvelope.getBody().addChild(rootElement); + newCtx.setEnvelope(newEnvelope); + } catch (AxisFault axisFault) { + handleException(aggregate, "Error creating a copy of the message", axisFault, aggregate.getLastMessage()); } - } else { // Removing the JSON stream after aggregated using XML path. // This will fix inconsistent behaviour in logging the payload. ((Axis2MessageContext) newCtx).getAxis2MessageContext() .removeProperty(org.apache.synapse.commons.json.Constants.ORG_APACHE_SYNAPSE_COMMONS_JSON_JSON_INPUT_STREAM); + } else { + handleException(aggregate, "Error aggregating results. Unknown content type : " + contentType, null, + aggregate.getLastMessage()); } + StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx); return newCtx; } + private SOAPEnvelope createNewSoapEnvelope(SOAPEnvelope envelope) { + + SOAPFactory fac; + if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getBody().getNamespace().getNamespaceURI())) { + fac = OMAbstractFactory.getSOAP11Factory(); + } else { + fac = OMAbstractFactory.getSOAP12Factory(); + } + return fac.getDefaultEnvelope(); + } + public SynapsePath getCorrelateExpression() { return correlateExpression; @@ -932,6 +803,16 @@ public void setResultTarget(String resultTarget) { this.resultTarget = resultTarget; } + public String getRootElementName() { + + return rootElementName; + } + + public void setRootElementName(String rootElementName) { + + this.rootElementName = rootElementName; + } + private boolean isTargetBody() { return "body".equalsIgnoreCase(resultTarget); diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java index 67da918d99..9ecbd3f6f8 100644 --- a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java @@ -34,11 +34,27 @@ public ScatterGatherMediatorSerializationTest() { scatterGatherMediatorSerializer = new ScatterGatherMediatorSerializer(); } - public void testScatterGatherSerialization() { + public void testScatterGatherXMLTypeSerialization() { String inputXML = "" + - "" + + "content-type=\"XML\" root-element=\"result\" parallel-execution=\"true\">" + + "" + + "{ \"pet\": { " + + "\"name\": \"pet1\", \"type\": \"dog\" }, " + + "\"status\": \"success\" }" + + "" + + "" + + "" + + ""; + + assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer)); + } + + public void testScatterGatherJSONTypeSerialization() { + + String inputXML = "" + + "" + "{ \"pet\": { " + "\"name\": \"pet1\", \"type\": \"dog\" }, " + "\"status\": \"success\" }" +