From 1c0e2efdec3f1583222446b5a497c52afa64f4f2 Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Tue, 19 Nov 2024 09:38:11 +0530 Subject: [PATCH] Add synapse expression support and option to set output to a variable --- .../xml/ScatterGatherMediatorFactory.java | 40 ++- .../xml/ScatterGatherMediatorSerializer.java | 5 + .../synapse/mediators/MediatorWorker.java | 2 +- .../mediators/base/SequenceMediator.java | 12 - .../synapse/mediators/eip/EIPUtils.java | 4 +- .../synapse/mediators/v2/ScatterGather.java | 322 ++++++++++++++---- ...catterGatherMediatorSerializationTest.java | 3 +- 7 files changed, 309 insertions(+), 79 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java index 3af458912e..81689a7edb 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -20,7 +20,9 @@ import org.apache.axiom.om.OMAttribute; import org.apache.axiom.om.OMElement; +import org.apache.commons.lang3.StringUtils; import org.apache.synapse.Mediator; +import org.apache.synapse.SynapseException; import org.apache.synapse.mediators.eip.Target; import org.apache.synapse.mediators.v2.ScatterGather; import org.jaxen.JaxenException; @@ -34,7 +36,7 @@ * different message contexts and aggregate the responses back. * *
- * <scatter-gather parallel-execution=(true | false)>
+ * <scatter-gather parallel-execution=(true | false) result-target=(body | variable) content-type=(JSON | XML)>
  *   <aggregation value="expression" condition="expression" timeout="long"
  *     min-messages="expression" max-messages="expression"/>
  *   <sequence>
@@ -59,6 +61,8 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
     private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
     private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
     private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
+    private static final QName RESULT_TARGET_Q = new QName("result-target");
+    private static final QName CONTENT_TYPE_Q = new QName("content-type");
 
     private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();
 
@@ -73,10 +77,36 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
         if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) {
             asynchronousExe = false;
         }
-
         mediator.setParallelExecution(asynchronousExe);
 
+        OMAttribute contentTypeAttr = elem.getAttribute(CONTENT_TYPE_Q);
+        if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) {
+            String msg = "The 'content-type' attribute is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
+        } else {
+            if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
+                mediator.setContentType(ScatterGather.JSON_TYPE);
+            } else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
+                mediator.setContentType(ScatterGather.XML_TYPE);
+            } else {
+                String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'";
+                throw new SynapseException(msg);
+            }
+        }
+
+        OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q);
+        if (resultTargetAttr == null || StringUtils.isBlank(resultTargetAttr.getAttributeValue())) {
+            String msg = "The 'result-target' attribute is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
+        } else {
+            mediator.setResultTarget(resultTargetAttr.getAttributeValue());
+        }
+
         Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q);
+        if (!sequenceListElements.hasNext()) {
+            String msg = "A 'sequence' element is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
+        }
         while (sequenceListElements.hasNext()) {
             OMElement sequence = (OMElement) sequenceListElements.next();
             if (sequence != null) {
@@ -97,6 +127,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
                 } catch (JaxenException e) {
                     handleException("Unable to load the aggregating expression", e);
                 }
+            } else {
+                String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator";
+                throw new SynapseException(msg);
             }
 
             OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION);
@@ -123,6 +156,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
             if (maxMessages != null) {
                 mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement));
             }
+        } else {
+            String msg = "The 'aggregation' element is required for the configuration of a Scatter Gather mediator";
+            throw new SynapseException(msg);
         }
         addAllCommentChildrenToList(elem, mediator.getCommentsList());
         return mediator;
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
index 78e3e8e44d..e63bad4c85 100755
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java
@@ -43,6 +43,11 @@ public OMElement serializeSpecificMediator(Mediator m) {
 
         scatterGatherElement.addAttribute(fac.createOMAttribute(
                 "parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution())));
+        scatterGatherElement.addAttribute(fac.createOMAttribute(
+                "result-target", nullNS, scatterGatherMediator.getResultTarget()));
+        scatterGatherElement.addAttribute(fac.createOMAttribute(
+                "content-type", nullNS, scatterGatherMediator.getContentType()));
+
         OMElement aggregationElement = fac.createOMElement("aggregation", synNS);
 
         SynapsePathSerializer.serializePath(
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
index ae7fffa435..2d4258f342 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java
@@ -154,7 +154,7 @@ public void run() {
                 debugManager.advertiseMediationFlowTerminatePoint(synCtx);
                 debugManager.releaseMediationFlowLock();
             }
-            if (RuntimeStatisticCollector.isStatisticsEnabled()) {
+            if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) {
                 this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx);
             }
         }
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java
index 2c28222924..ccf33c72ec 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java
@@ -521,16 +521,4 @@ public void setComponentStatisticsId(ArtifactHolder holder) {
             StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder);
         }
     }
-
-    /**
-     * Check whether the message is a scatter message or not
-     *
-     * @param synCtx MessageContext
-     * @return true if the message is a scatter message
-     */
-    private static boolean isScatterMessage(MessageContext synCtx) {
-
-        Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
-        return isSkipContinuationState != null && isSkipContinuationState;
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
index 09b76b80d5..afe114b67c 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java
@@ -189,7 +189,7 @@ public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher,
         }
     }
 
-    private static boolean isBody(OMElement body, OMElement enrichingElement) {
+    public static boolean isBody(OMElement body, OMElement enrichingElement) {
         try {
             return (body.getLocalName().equals(enrichingElement.getLocalName()) &&
                     body.getNamespace().getNamespaceURI().equals(enrichingElement.getNamespace().getNamespaceURI()));
@@ -198,7 +198,7 @@ private static boolean isBody(OMElement body, OMElement enrichingElement) {
         }
     }
 
-    private static boolean checkNotEmpty(List list) {
+    public static boolean checkNotEmpty(List list) {
         return list != null && !list.isEmpty();
     }
 
diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
index 589ab2b7d5..5d7ec22462 100644
--- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
+++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java
@@ -20,11 +20,17 @@
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 import com.google.gson.JsonSyntaxException;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMNode;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.context.OperationContext;
+import org.apache.http.protocol.HTTP;
 import org.apache.synapse.ContinuationState;
+import org.apache.synapse.JSONObjectExtensionException;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
@@ -39,6 +45,7 @@
 import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
 import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder;
 import org.apache.synapse.aspects.flow.statistics.util.StatisticDataCollectionHelper;
+import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants;
 import org.apache.synapse.commons.json.JsonUtil;
 import org.apache.synapse.config.xml.SynapsePath;
 import org.apache.synapse.continuation.ContinuationStackManager;
@@ -56,10 +63,8 @@
 import org.apache.synapse.mediators.eip.Target;
 import org.apache.synapse.mediators.eip.aggregator.Aggregate;
 import org.apache.synapse.transport.passthru.util.RelayUtils;
+import org.apache.synapse.util.JSONMergeUtils;
 import org.apache.synapse.util.MessageHelper;
-import org.apache.synapse.util.xpath.SynapseJsonPath;
-import org.apache.synapse.util.xpath.SynapseXPath;
-import org.jaxen.JaxenException;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -69,12 +74,18 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Timer;
 import javax.xml.stream.XMLStreamException;
 
+import static org.apache.synapse.SynapseConstants.XML_CONTENT_TYPE;
+import static org.apache.synapse.transport.passthru.PassThroughConstants.JSON_CONTENT_TYPE;
+
 public class ScatterGather extends AbstractMediator implements ManagedLifecycle, FlowContinuableMediator {
 
+    public static final String JSON_TYPE = "JSON";
+    public static final String XML_TYPE = "XML";
     private final Object lock = new Object();
     private final Map activeAggregates = Collections.synchronizedMap(new HashMap<>());
     private String id;
@@ -86,6 +97,10 @@ public class ScatterGather extends AbstractMediator implements ManagedLifecycle,
     private SynapsePath aggregationExpression = null;
     private boolean parallelExecution = true;
     private Integer statisticReportingIndex;
+    private String contentType;
+    private String resultTarget;
+    private MessageContext originalMessageContext;
+    private SynapseEnvironment synapseEnv;
 
     public ScatterGather() {
 
@@ -122,6 +137,14 @@ public boolean mediate(MessageContext synCtx) {
             }
         }
 
+        if (!isTargetBody()) {
+            try {
+                originalMessageContext = MessageHelper.cloneMessageContext(synCtx);
+            } catch (AxisFault e) {
+                handleException("Error cloning the message context", e, synCtx);
+            }
+        }
+
         synCtx.setProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id :
                 EIPConstants.EIP_SHARED_DATA_HOLDER, new SharedDataHolder());
         Iterator iter = targets.iterator();
@@ -144,17 +167,21 @@ public boolean mediate(MessageContext synCtx) {
         if (opCtx != null) {
             opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
         }
+        synCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
         return aggregationResult;
     }
 
-    public void init(SynapseEnvironment se) {
+    public void init(SynapseEnvironment synapseEnv) {
 
+        this.synapseEnv = synapseEnv;
         for (Target target : targets) {
             ManagedLifecycle seq = target.getSequence();
             if (seq != null) {
-                seq.init(se);
+                seq.init(synapseEnv);
             }
         }
+        // Registering the mediator for enabling continuation
+        synapseEnv.updateCallMediatorCount(true);
     }
 
     public void destroy() {
@@ -165,6 +192,8 @@ public void destroy() {
                 seq.destroy();
             }
         }
+        // Unregistering the mediator for continuation
+        synapseEnv.updateCallMediatorCount(false);
     }
 
     /**
@@ -289,21 +318,29 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
         String correlation;
 
         Object result = null;
+        // When the target sequences are not content aware, the message builder wont get triggered.
+        // Therefore, we need to build the message to do the aggregation.
+        try {
+            RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext());
+        } catch (IOException | XMLStreamException e) {
+            handleException("Error building the message", e, synCtx);
+        }
+
         if (correlateExpression != null) {
-            try {
-                result = correlateExpression instanceof SynapseXPath ? correlateExpression.evaluate(synCtx) :
-                        ((SynapseJsonPath) correlateExpression).evaluate(synCtx);
-            } catch (JaxenException e) {
-                handleException("Unable to execute the XPATH over the message", e, synCtx);
-            }
+            result = correlateExpression.objectValueOf(synCtx);
             if (result instanceof List) {
                 if (((List) result).isEmpty()) {
                     handleException("Failed to evaluate correlate expression: " + correlateExpression.toString(), synCtx);
                 }
             }
+            if (result instanceof JsonPrimitive) {
+                if (!((JsonPrimitive) result).getAsBoolean()) {
+                    return false;
+                }
+            }
             if (result instanceof Boolean) {
                 if (!(Boolean) result) {
-                    return true;
+                    return false;
                 }
             }
         }
@@ -497,40 +534,185 @@ public boolean completeAggregate(Aggregate aggregate) {
             return false;
         }
 
-        MessageContext newSynCtx = getAggregatedMessage(aggregate);
+        if (isTargetBody()) {
+            MessageContext newSynCtx = getAggregatedMessage(aggregate);
 
-        if (newSynCtx == null) {
-            log.warn("An aggregation of messages timed out with no aggregated messages", null);
-            return false;
+            if (newSynCtx == null) {
+                log.warn("An aggregation of messages timed out with no aggregated messages", null);
+                return false;
+            }
+            aggregate.clear();
+            activeAggregates.remove(aggregate.getCorrelation());
+
+            // Set content type to the aggregated message
+            setContentType(newSynCtx);
+
+            newSynCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false);
+            SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(newSynCtx);
+            boolean result = false;
+
+            // Set CONTINUE_STATISTICS_FLOW to avoid mark event collection as finished before the aggregation is completed
+            newSynCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
+            if (RuntimeStatisticCollector.isStatisticsEnabled()) {
+                CloseEventCollector.closeEntryEvent(newSynCtx, getMediatorName(), ComponentType.MEDIATOR,
+                        statisticReportingIndex, isContentAltering());
+            }
+
+            if (seqContinuationState != null) {
+                SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(newSynCtx, seqContinuationState);
+                result = sequenceMediator.mediate(newSynCtx, seqContinuationState);
+                if (RuntimeStatisticCollector.isStatisticsEnabled()) {
+                    sequenceMediator.reportCloseStatistics(newSynCtx, null);
+                }
+            }
+            CloseEventCollector.closeEventsAfterScatterGather(newSynCtx);
+            return result;
+        } else {
+            setAggregatedMessageAsVariable(originalMessageContext, aggregate);
+
+            aggregate.clear();
+            activeAggregates.remove(aggregate.getCorrelation());
+            // Update the continuation state to current mediator position as we are using the original message context
+            ContinuationStackManager.updateSeqContinuationState(originalMessageContext, getMediatorPosition());
+            SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(originalMessageContext);
+            boolean result = false;
+
+            // Set CONTINUE_STATISTICS_FLOW to avoid mark event collection as finished before the aggregation is completed
+            originalMessageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true);
+            if (RuntimeStatisticCollector.isStatisticsEnabled()) {
+                CloseEventCollector.closeEntryEvent(originalMessageContext, getMediatorName(), ComponentType.MEDIATOR,
+                        statisticReportingIndex, isContentAltering());
+            }
+
+            if (seqContinuationState != null) {
+                SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(originalMessageContext, seqContinuationState);
+                result = sequenceMediator.mediate(originalMessageContext, seqContinuationState);
+                if (RuntimeStatisticCollector.isStatisticsEnabled()) {
+                    sequenceMediator.reportCloseStatistics(originalMessageContext, null);
+                }
+            }
+            CloseEventCollector.closeEventsAfterScatterGather(originalMessageContext);
+            // Clear the original message context to avoid memory leak
+            originalMessageContext = null;
+            return result;
         }
-        aggregate.clear();
-        activeAggregates.remove(aggregate.getCorrelation());
-        newSynCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false);
-        SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(newSynCtx);
-        boolean result = false;
-
-        if (RuntimeStatisticCollector.isStatisticsEnabled()) {
-            CloseEventCollector.closeEntryEvent(newSynCtx, getMediatorName(), ComponentType.MEDIATOR,
-                    statisticReportingIndex, isContentAltering());
+    }
+
+    private void setContentType(MessageContext synCtx) {
+
+        org.apache.axis2.context.MessageContext a2mc = ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+        if (Objects.equals(contentType, JSON_TYPE)) {
+            a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, JSON_CONTENT_TYPE);
+            a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, JSON_CONTENT_TYPE);
+            setContentTypeHeader(JSON_CONTENT_TYPE, a2mc);
+        } else {
+            a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, XML_CONTENT_TYPE);
+            a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, XML_CONTENT_TYPE);
+            setContentTypeHeader(XML_CONTENT_TYPE, a2mc);
         }
+        a2mc.removeProperty("NO_ENTITY_BODY");
+    }
 
-        if (seqContinuationState != null) {
-            SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(newSynCtx, seqContinuationState);
-            result = sequenceMediator.mediate(newSynCtx, seqContinuationState);
-            if (RuntimeStatisticCollector.isStatisticsEnabled()) {
-                sequenceMediator.reportCloseStatistics(newSynCtx, null);
+    private void setContentTypeHeader(Object resultValue, org.apache.axis2.context.MessageContext axis2MessageCtx) {
+
+        axis2MessageCtx.setProperty(org.apache.axis2.Constants.Configuration.CONTENT_TYPE, resultValue);
+        Object o = axis2MessageCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
+        Map headers = (Map) o;
+        if (headers != null) {
+            headers.put(HTTP.CONTENT_TYPE, resultValue);
+        }
+    }
+
+    private static void addChildren(List list, OMElement element) {
+
+        for (Object item : list) {
+            if (item instanceof OMElement) {
+                element.addChild((OMElement) item);
+            }
+        }
+    }
+
+    private static List getMatchingElements(MessageContext messageContext, SynapsePath expression) {
+
+        Object o = expression.objectValueOf(messageContext);
+        if (o instanceof OMNode) {
+            List list = new ArrayList();
+            list.add(o);
+            return list;
+        } else if (o instanceof List) {
+            return (List) o;
+        } else {
+            return new ArrayList();
+        }
+    }
+
+    private static void enrichEnvelope(MessageContext messageContext, SynapsePath expression) {
+
+        OMElement enrichingElement;
+        List elementList = getMatchingElements(messageContext, expression);
+        if (EIPUtils.checkNotEmpty(elementList)) {
+            // attach at parent of the first result from the XPath, or to the SOAPBody
+            Object o = elementList.get(0);
+            if (o instanceof OMElement &&
+                    ((OMElement) o).getParent() != null &&
+                    ((OMElement) o).getParent() instanceof OMElement) {
+                enrichingElement = (OMElement) ((OMElement) o).getParent();
+                OMElement body = messageContext.getEnvelope().getBody();
+                if (!EIPUtils.isBody(body, enrichingElement)) {
+                    OMElement nonBodyElem = enrichingElement;
+                    enrichingElement = messageContext.getEnvelope().getBody();
+                    addChildren(elementList, enrichingElement);
+                    while (!EIPUtils.isBody(body, (OMElement) nonBodyElem.getParent())) {
+                        nonBodyElem = (OMElement) nonBodyElem.getParent();
+                    }
+                    nonBodyElem.detach();
+                }
+            }
+        }
+    }
+
+    private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) {
+
+        Object variable = originalMessageContext.getVariable(resultTarget);
+        for (MessageContext synCtx : aggregate.getMessages()) {
+            try {
+                if (Objects.equals(contentType, JSON_TYPE)) {
+                    Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+                    if (variable instanceof JsonArray) {
+                        ((JsonElement) variable).getAsJsonArray().add((JsonElement) evaluatedResult);
+                    } else if (variable instanceof JsonObject) {
+                            JSONMergeUtils.extendJSONObject((JsonObject) variable,
+                                    JSONMergeUtils.ConflictStrategy.MERGE_INTO_ARRAY,
+                                    ((JsonObject) evaluatedResult).getAsJsonObject());
+                    } else {
+                        handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget +
+                                " expected a JSON type variable but found " + variable.getClass().getName(), null, synCtx);
+                    }
+                } else {
+                    if (variable instanceof OMElement) {
+                        List list = getMatchingElements(synCtx, aggregationExpression);
+                        addChildren(list, (OMElement) variable);
+                    } else {
+                        handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget +
+                                " expected an OMElement type variable but found " + variable.getClass().getName(), null, synCtx);
+                    }
+                }
+            } catch (SynapseException e) {
+                handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx);
+            } catch (JsonSyntaxException e) {
+                handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx);
+            } catch (JSONObjectExtensionException e) {
+                handleException(aggregate, "Error merging aggregation results to JSON Object : " +
+                        aggregationExpression.toString(), e, synCtx);
             }
         }
-        CloseEventCollector.closeEventsAfterScatterGather(newSynCtx);
-        return result;
+        StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), originalMessageContext);
     }
 
     private MessageContext getAggregatedMessage(Aggregate aggregate) {
 
         MessageContext newCtx = null;
         JsonArray jsonArray = new JsonArray();
-        JsonElement result;
-        boolean isJSONAggregation = aggregationExpression instanceof SynapseJsonPath;
 
         for (MessageContext synCtx : aggregate.getMessages()) {
             if (newCtx == null) {
@@ -543,58 +725,51 @@ private MessageContext getAggregatedMessage(Aggregate aggregate) {
                 if (log.isDebugEnabled()) {
                     log.debug("Generating Aggregated message from : " + newCtx.getEnvelope());
                 }
-                if (isJSONAggregation) {
-                    jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression));
-                } else {
-                    try {
-                        EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx, (SynapseXPath) aggregationExpression);
-                    } catch (JaxenException e) {
-                        handleException(aggregate, "Error merging aggregation results using XPath : " +
-                                aggregationExpression.toString(), e, synCtx);
+                if (Objects.equals(contentType, JSON_TYPE)) {
+                    Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+                    if (evaluatedResult instanceof JsonElement) {
+                        jsonArray.add((JsonElement) evaluatedResult);
+                    } else {
+                        handleException(aggregate, "Error merging aggregation results as expression : " +
+                                aggregationExpression.toString() + " did not resolve to a JSON value", null, synCtx);
                     }
+                } else {
+                    enrichEnvelope(synCtx, aggregationExpression);
                 }
             } else {
                 try {
                     if (log.isDebugEnabled()) {
-                        log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " +
+                        log.debug("Merging message : " + synCtx.getEnvelope() + " using expression : " +
                                 aggregationExpression);
                     }
-                    // When the target sequences are not content aware, the message builder wont get triggered.
-                    // Therefore, we need to build the message to do the aggregation.
-                    RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext());
-                    if (isJSONAggregation) {
-                        jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression));
+                    if (Objects.equals(contentType, JSON_TYPE)) {
+                        Object evaluatedResult = aggregationExpression.objectValueOf(synCtx);
+                        if (evaluatedResult instanceof JsonElement) {
+                            jsonArray.add((JsonElement) evaluatedResult);
+                        } else {
+                            jsonArray.add(evaluatedResult.toString());
+                        }
                     } else {
-                        EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx.getEnvelope(), synCtx, (SynapseXPath)
-                                aggregationExpression);
+                        enrichEnvelope(synCtx, aggregationExpression);
                     }
 
                     if (log.isDebugEnabled()) {
                         log.debug("Merged result : " + newCtx.getEnvelope());
                     }
-                } catch (JaxenException e) {
-                    handleException(aggregate, "Error merging aggregation results using XPath : " +
-                            aggregationExpression.toString(), e, synCtx);
                 } catch (SynapseException e) {
                     handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx);
                 } catch (JsonSyntaxException e) {
                     handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx);
-                } catch (IOException e) {
-                    handleException(aggregate, "IO Error occurred while building the message", e, synCtx);
-                } catch (XMLStreamException e) {
-                    handleException(aggregate, "XML Error occurred while building the message", e, synCtx);
                 }
             }
         }
 
-        result = jsonArray;
-
         StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx);
-        if (isJSONAggregation) {
+        if (Objects.equals(contentType, JSON_TYPE)) {
             // setting the new JSON payload to the messageContext
             try {
                 JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new
-                        ByteArrayInputStream(result.toString().getBytes()), true, true);
+                        ByteArrayInputStream(jsonArray.toString().getBytes()), true, true);
             } catch (AxisFault axisFault) {
                 log.error("Error occurred while setting the new JSON payload to the msg context", axisFault);
             }
@@ -707,4 +882,29 @@ private void handleException(Aggregate aggregate, String msg, Exception exceptio
             super.handleException(msg, msgContext);
         }
     }
+
+    public String getContentType() {
+
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+
+        this.contentType = contentType;
+    }
+
+    public String getResultTarget() {
+
+        return resultTarget;
+    }
+
+    public void setResultTarget(String resultTarget) {
+
+        this.resultTarget = resultTarget;
+    }
+
+    private boolean isTargetBody() {
+
+        return "body".equalsIgnoreCase(resultTarget);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
index a8572a5256..67da918d99 100644
--- a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
+++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java
@@ -36,7 +36,8 @@ public ScatterGatherMediatorSerializationTest() {
 
     public void testScatterGatherSerialization() {
 
-        String inputXML = "" +
+        String inputXML = "" +
                 "" +
                 "{                    \"pet\": {                        " +
                 "\"name\": \"pet1\",                        \"type\": \"dog\"                    },                    " +