diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java index 3af458912e..81689a7edb 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -20,7 +20,9 @@ import org.apache.axiom.om.OMAttribute; import org.apache.axiom.om.OMElement; +import org.apache.commons.lang3.StringUtils; import org.apache.synapse.Mediator; +import org.apache.synapse.SynapseException; import org.apache.synapse.mediators.eip.Target; import org.apache.synapse.mediators.v2.ScatterGather; import org.jaxen.JaxenException; @@ -34,7 +36,7 @@ * different message contexts and aggregate the responses back. * *
- * <scatter-gather parallel-execution=(true | false)> + * <scatter-gather parallel-execution=(true | false) result-target=(body | variable) content-type=(JSON | XML)> * <aggregation value="expression" condition="expression" timeout="long" * min-messages="expression" max-messages="expression"/> * <sequence> @@ -59,6 +61,8 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory { private static final QName ATT_MAX_MESSAGES = new QName("max-messages"); private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence"); private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution"); + private static final QName RESULT_TARGET_Q = new QName("result-target"); + private static final QName CONTENT_TYPE_Q = new QName("content-type"); private static final SequenceMediatorFactory fac = new SequenceMediatorFactory(); @@ -73,10 +77,36 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) { if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) { asynchronousExe = false; } - mediator.setParallelExecution(asynchronousExe); + OMAttribute contentTypeAttr = elem.getAttribute(CONTENT_TYPE_Q); + if (contentTypeAttr == null || StringUtils.isBlank(contentTypeAttr.getAttributeValue())) { + String msg = "The 'content-type' attribute is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } else { + if ("JSON".equals(contentTypeAttr.getAttributeValue())) { + mediator.setContentType(ScatterGather.JSON_TYPE); + } else if ("XML".equals(contentTypeAttr.getAttributeValue())) { + mediator.setContentType(ScatterGather.XML_TYPE); + } else { + String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'"; + throw new SynapseException(msg); + } + } + + OMAttribute resultTargetAttr = elem.getAttribute(RESULT_TARGET_Q); + if (resultTargetAttr == null || StringUtils.isBlank(resultTargetAttr.getAttributeValue())) { + String msg = "The 'result-target' attribute is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } else { + mediator.setResultTarget(resultTargetAttr.getAttributeValue()); + } + Iterator sequenceListElements = elem.getChildrenWithName(SEQUENCE_Q); + if (!sequenceListElements.hasNext()) { + String msg = "A 'sequence' element is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); + } while (sequenceListElements.hasNext()) { OMElement sequence = (OMElement) sequenceListElements.next(); if (sequence != null) { @@ -97,6 +127,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) { } catch (JaxenException e) { handleException("Unable to load the aggregating expression", e); } + } else { + String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); } OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION); @@ -123,6 +156,9 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) { if (maxMessages != null) { mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement)); } + } else { + String msg = "The 'aggregation' element is required for the configuration of a Scatter Gather mediator"; + throw new SynapseException(msg); } addAllCommentChildrenToList(elem, mediator.getCommentsList()); return mediator; diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java index 78e3e8e44d..e63bad4c85 100755 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java @@ -43,6 +43,11 @@ public OMElement serializeSpecificMediator(Mediator m) { scatterGatherElement.addAttribute(fac.createOMAttribute( "parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution()))); + scatterGatherElement.addAttribute(fac.createOMAttribute( + "result-target", nullNS, scatterGatherMediator.getResultTarget())); + scatterGatherElement.addAttribute(fac.createOMAttribute( + "content-type", nullNS, scatterGatherMediator.getContentType())); + OMElement aggregationElement = fac.createOMElement("aggregation", synNS); SynapsePathSerializer.serializePath( diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java index ae7fffa435..2d4258f342 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java @@ -154,7 +154,7 @@ public void run() { debugManager.advertiseMediationFlowTerminatePoint(synCtx); debugManager.releaseMediationFlowLock(); } - if (RuntimeStatisticCollector.isStatisticsEnabled()) { + if (RuntimeStatisticCollector.isStatisticsEnabled() && !isScatterMessage(synCtx)) { this.statisticsCloseEventListener.invokeCloseEventEntry(synCtx); } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java index 2c28222924..ccf33c72ec 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java @@ -521,16 +521,4 @@ public void setComponentStatisticsId(ArtifactHolder holder) { StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder); } } - - /** - * Check whether the message is a scatter message or not - * - * @param synCtx MessageContext - * @return true if the message is a scatter message - */ - private static boolean isScatterMessage(MessageContext synCtx) { - - Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); - return isSkipContinuationState != null && isSkipContinuationState; - } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java index 09b76b80d5..afe114b67c 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/EIPUtils.java @@ -189,7 +189,7 @@ public static void enrichEnvelope(SOAPEnvelope envelope, SOAPEnvelope enricher, } } - private static boolean isBody(OMElement body, OMElement enrichingElement) { + public static boolean isBody(OMElement body, OMElement enrichingElement) { try { return (body.getLocalName().equals(enrichingElement.getLocalName()) && body.getNamespace().getNamespaceURI().equals(enrichingElement.getNamespace().getNamespaceURI())); @@ -198,7 +198,7 @@ private static boolean isBody(OMElement body, OMElement enrichingElement) { } } - private static boolean checkNotEmpty(List list) { + public static boolean checkNotEmpty(List list) { return list != null && !list.isEmpty(); } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java index f87c0fffef..e0f782005f 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/SharedDataHolder.java @@ -18,6 +18,8 @@ package org.apache.synapse.mediators.eip; +import org.apache.synapse.MessageContext; + /** * This class is used to hold the shared data for a particular message set * For an example we can use this to share some data across all the spawned spilt messages in iterate mediator @@ -29,6 +31,17 @@ public class SharedDataHolder { */ private boolean isAggregationCompleted = false; + private MessageContext synCtx; + + public SharedDataHolder() { + + } + + public SharedDataHolder(MessageContext synCtx) { + + this.synCtx = synCtx; + } + /** * Check whether aggregation has been completed. * @@ -45,4 +58,8 @@ public void markAggregationCompletion() { isAggregationCompleted = true; } + public MessageContext getSynCtx() { + + return synCtx; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java index 589ab2b7d5..876183b7fe 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -20,11 +20,18 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.google.gson.JsonSyntaxException; +import org.apache.axiom.om.OMAbstractFactory; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMNode; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; import org.apache.axis2.context.OperationContext; +import org.apache.http.protocol.HTTP; import org.apache.synapse.ContinuationState; +import org.apache.synapse.JSONObjectExtensionException; import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.Mediator; import org.apache.synapse.MessageContext; @@ -39,6 +46,7 @@ import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder; import org.apache.synapse.aspects.flow.statistics.util.StatisticDataCollectionHelper; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; import org.apache.synapse.commons.json.JsonUtil; import org.apache.synapse.config.xml.SynapsePath; import org.apache.synapse.continuation.ContinuationStackManager; @@ -56,10 +64,8 @@ import org.apache.synapse.mediators.eip.Target; import org.apache.synapse.mediators.eip.aggregator.Aggregate; import org.apache.synapse.transport.passthru.util.RelayUtils; +import org.apache.synapse.util.JSONMergeUtils; import org.apache.synapse.util.MessageHelper; -import org.apache.synapse.util.xpath.SynapseJsonPath; -import org.apache.synapse.util.xpath.SynapseXPath; -import org.jaxen.JaxenException; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -69,12 +75,19 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Timer; +import javax.xml.namespace.QName; import javax.xml.stream.XMLStreamException; +import static org.apache.synapse.SynapseConstants.XML_CONTENT_TYPE; +import static org.apache.synapse.transport.passthru.PassThroughConstants.JSON_CONTENT_TYPE; + public class ScatterGather extends AbstractMediator implements ManagedLifecycle, FlowContinuableMediator { + public static final String JSON_TYPE = "JSON"; + public static final String XML_TYPE = "XML"; private final Object lock = new Object(); private final MapactiveAggregates = Collections.synchronizedMap(new HashMap<>()); private String id; @@ -86,6 +99,9 @@ public class ScatterGather extends AbstractMediator implements ManagedLifecycle, private SynapsePath aggregationExpression = null; private boolean parallelExecution = true; private Integer statisticReportingIndex; + private String contentType; + private String resultTarget; + private SynapseEnvironment synapseEnv; public ScatterGather() { @@ -122,8 +138,18 @@ public boolean mediate(MessageContext synCtx) { } } - synCtx.setProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : - EIPConstants.EIP_SHARED_DATA_HOLDER, new SharedDataHolder()); + MessageContext orginalMessageContext = null; + if (!isTargetBody()) { + try { + // Clone the original MessageContext and save it to continue the flow using it when the scatter gather + // output is set to a variable + orginalMessageContext = MessageHelper.cloneMessageContext(synCtx); + } catch (AxisFault e) { + handleException("Error cloning the message context", e, synCtx); + } + } + + synCtx.setProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id, new SharedDataHolder(orginalMessageContext)); Iterator iter = targets.iterator(); int i = 0; while (iter.hasNext()) { @@ -144,17 +170,21 @@ public boolean mediate(MessageContext synCtx) { if (opCtx != null) { opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP"); } + synCtx.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); return aggregationResult; } - public void init(SynapseEnvironment se) { + public void init(SynapseEnvironment synapseEnv) { + this.synapseEnv = synapseEnv; for (Target target : targets) { ManagedLifecycle seq = target.getSequence(); if (seq != null) { - seq.init(se); + seq.init(synapseEnv); } } + // Registering the mediator for enabling continuation + synapseEnv.updateCallMediatorCount(true); } public void destroy() { @@ -165,6 +195,8 @@ public void destroy() { seq.destroy(); } } + // Unregistering the mediator for continuation + synapseEnv.updateCallMediatorCount(false); } /** @@ -186,14 +218,9 @@ private MessageContext getClonedMessageContext(MessageContext synCtx, int messag // Set the SCATTER_MESSAGES property to the cloned message context which will be used by the MediatorWorker // to continue the mediation from the continuation state newCtx.setProperty(SynapseConstants.SCATTER_MESSAGES, true); - if (id != null) { - newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID()); - newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, messageSequence + - EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); - } else { - newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE, messageSequence + - EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); - } + newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID()); + newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, messageSequence + + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); } catch (AxisFault axisFault) { handleException("Error cloning the message context", axisFault, synCtx); } @@ -282,28 +309,35 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { Aggregate aggregate = null; - String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id : - EIPConstants.AGGREGATE_CORRELATION); + String correlationIdName = EIPConstants.AGGREGATE_CORRELATION + "." + id; Object correlationID = synCtx.getProperty(correlationIdName); String correlation; Object result = null; + // When the target sequences are not content aware, the message builder wont get triggered. + // Therefore, we need to build the message to do the aggregation. + try { + RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext()); + } catch (IOException | XMLStreamException e) { + handleException("Error building the message", e, synCtx); + } + if (correlateExpression != null) { - try { - result = correlateExpression instanceof SynapseXPath ? correlateExpression.evaluate(synCtx) : - ((SynapseJsonPath) correlateExpression).evaluate(synCtx); - } catch (JaxenException e) { - handleException("Unable to execute the XPATH over the message", e, synCtx); - } + result = correlateExpression.objectValueOf(synCtx); if (result instanceof List) { if (((List) result).isEmpty()) { handleException("Failed to evaluate correlate expression: " + correlateExpression.toString(), synCtx); } } + if (result instanceof JsonPrimitive) { + if (!((JsonPrimitive) result).getAsBoolean()) { + return false; + } + } if (result instanceof Boolean) { if (!(Boolean) result) { - return true; + return false; } } } @@ -444,9 +478,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { private boolean isAggregationCompleted(MessageContext synCtx) { - Object aggregateTimeoutHolderObj = - synCtx.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : - EIPConstants.EIP_SHARED_DATA_HOLDER); + Object aggregateTimeoutHolderObj = synCtx.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); if (aggregateTimeoutHolderObj != null) { SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; @@ -481,8 +513,7 @@ public boolean completeAggregate(Aggregate aggregate) { MessageContext lastMessage = aggregate.getLastMessage(); if (lastMessage != null) { Object aggregateTimeoutHolderObj = - lastMessage.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : - EIPConstants.EIP_SHARED_DATA_HOLDER); + lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); if (aggregateTimeoutHolderObj != null) { SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; @@ -497,40 +528,222 @@ public boolean completeAggregate(Aggregate aggregate) { return false; } - MessageContext newSynCtx = getAggregatedMessage(aggregate); + if (isTargetBody()) { + MessageContext newSynCtx = getAggregatedMessage(aggregate); + return processAggregation(newSynCtx, aggregate); + } else { + MessageContext originalMessageContext = getOriginalMessageContext(aggregate); + if (originalMessageContext != null) { + setAggregatedMessageAsVariable(originalMessageContext, aggregate); + return processAggregation(originalMessageContext, aggregate); + } else { + handleException(aggregate, "Error retrieving the original message context", null, aggregate.getLastMessage()); + return false; + } + } + } + + private boolean processAggregation(MessageContext messageContext, Aggregate aggregate) { - if (newSynCtx == null) { + if (messageContext == null) { log.warn("An aggregation of messages timed out with no aggregated messages", null); return false; } aggregate.clear(); activeAggregates.remove(aggregate.getCorrelation()); - newSynCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false); - SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(newSynCtx); - boolean result = false; + + if (isTargetBody()) { + // Set content type to the aggregated message + setContentType(messageContext); + } else { + // Update the continuation state to current mediator position as we are using the original message context + ContinuationStackManager.updateSeqContinuationState(messageContext, getMediatorPosition()); + } + + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(messageContext); + messageContext.setProperty(StatisticsConstants.CONTINUE_STATISTICS_FLOW, true); if (RuntimeStatisticCollector.isStatisticsEnabled()) { - CloseEventCollector.closeEntryEvent(newSynCtx, getMediatorName(), ComponentType.MEDIATOR, + CloseEventCollector.closeEntryEvent(messageContext, getMediatorName(), ComponentType.MEDIATOR, statisticReportingIndex, isContentAltering()); } + boolean result = false; if (seqContinuationState != null) { - SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(newSynCtx, seqContinuationState); - result = sequenceMediator.mediate(newSynCtx, seqContinuationState); + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(messageContext, seqContinuationState); + result = sequenceMediator.mediate(messageContext, seqContinuationState); if (RuntimeStatisticCollector.isStatisticsEnabled()) { - sequenceMediator.reportCloseStatistics(newSynCtx, null); + sequenceMediator.reportCloseStatistics(messageContext, null); } } - CloseEventCollector.closeEventsAfterScatterGather(newSynCtx); + CloseEventCollector.closeEventsAfterScatterGather(messageContext); return result; } + /** + * Return the original message context using the SharedDataHolder. + * + * @param aggregate Aggregate object + * @return original message context + */ + private MessageContext getOriginalMessageContext(Aggregate aggregate) { + + MessageContext lastMessage = aggregate.getLastMessage(); + if (lastMessage != null) { + Object aggregateHolderObj = lastMessage.getProperty(EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id); + if (aggregateHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateHolderObj; + return sharedDataHolder.getSynCtx(); + } + } + return null; + } + + private void setContentType(MessageContext synCtx) { + + org.apache.axis2.context.MessageContext a2mc = ((Axis2MessageContext) synCtx).getAxis2MessageContext(); + if (Objects.equals(contentType, JSON_TYPE)) { + a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, JSON_CONTENT_TYPE); + a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, JSON_CONTENT_TYPE); + setContentTypeHeader(JSON_CONTENT_TYPE, a2mc); + } else { + a2mc.setProperty(Constants.Configuration.MESSAGE_TYPE, XML_CONTENT_TYPE); + a2mc.setProperty(Constants.Configuration.CONTENT_TYPE, XML_CONTENT_TYPE); + setContentTypeHeader(XML_CONTENT_TYPE, a2mc); + } + a2mc.removeProperty("NO_ENTITY_BODY"); + } + + private void setContentTypeHeader(Object resultValue, org.apache.axis2.context.MessageContext axis2MessageCtx) { + + axis2MessageCtx.setProperty(org.apache.axis2.Constants.Configuration.CONTENT_TYPE, resultValue); + Object o = axis2MessageCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS); + Map headers = (Map) o; + if (headers != null) { + headers.put(HTTP.CONTENT_TYPE, resultValue); + } + } + + private static void addChildren(List list, OMElement element) { + + for (Object item : list) { + if (item instanceof OMElement) { + element.addChild((OMElement) item); + } + } + } + + private static List getMatchingElements(MessageContext messageContext, SynapsePath expression) { + + Object o = expression.objectValueOf(messageContext); + if (o instanceof OMNode) { + List list = new ArrayList(); + list.add(o); + return list; + } else if (o instanceof List) { + return (List) o; + } else { + return new ArrayList(); + } + } + + private static void enrichEnvelope(MessageContext messageContext, SynapsePath expression) { + + OMElement enrichingElement; + List elementList = getMatchingElements(messageContext, expression); + if (EIPUtils.checkNotEmpty(elementList)) { + // attach at parent of the first result from the XPath, or to the SOAPBody + Object o = elementList.get(0); + if (o instanceof OMElement && + ((OMElement) o).getParent() != null && + ((OMElement) o).getParent() instanceof OMElement) { + enrichingElement = (OMElement) ((OMElement) o).getParent(); + OMElement body = messageContext.getEnvelope().getBody(); + if (!EIPUtils.isBody(body, enrichingElement)) { + OMElement nonBodyElem = enrichingElement; + enrichingElement = messageContext.getEnvelope().getBody(); + addChildren(elementList, enrichingElement); + while (!EIPUtils.isBody(body, (OMElement) nonBodyElem.getParent())) { + nonBodyElem = (OMElement) nonBodyElem.getParent(); + } + nonBodyElem.detach(); + } + } + } + } + + private void setAggregatedMessageAsVariable(MessageContext originalMessageContext, Aggregate aggregate) { + + Object variable = originalMessageContext.getVariable(resultTarget); + if (variable == null) { + variable = createNewVariable(originalMessageContext, aggregate); + originalMessageContext.setVariable(resultTarget, variable); + } + if (Objects.equals(contentType, JSON_TYPE)) { + setJSONResultToVariable((JsonElement) variable, aggregate); + } else if (Objects.equals(contentType, XML_TYPE) && variable instanceof OMElement) { + setXMLResultToVariable((OMElement) variable, aggregate); + } else { + handleInvalidVariableType(variable, aggregate, originalMessageContext); + } + StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), originalMessageContext); + } + + private void handleInvalidVariableType(Object variable, Aggregate aggregate, MessageContext synCtx) { + + String expectedType = Objects.equals(contentType, JSON_TYPE) ? "JSON" : "OMElement"; + String actualType = variable != null ? variable.getClass().getName() : "null"; + handleException(aggregate, "Error merging aggregation results to variable: " + resultTarget + + " expected a " + expectedType + " but found " + actualType, null, synCtx); + } + + private void setJSONResultToVariable(JsonElement variable, Aggregate aggregate) { + + try { + for (MessageContext synCtx : aggregate.getMessages()) { + Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); + if (variable instanceof JsonArray) { + variable.getAsJsonArray().add((JsonElement) evaluatedResult); + } else if (variable instanceof JsonObject) { + JSONMergeUtils.extendJSONObject((JsonObject) variable, + JSONMergeUtils.ConflictStrategy.MERGE_INTO_ARRAY, + ((JsonObject) evaluatedResult).getAsJsonObject()); + } else { + handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget + + " expected a JSON type variable but found " + variable.getClass().getName(), null, synCtx); + } + } + } catch (JSONObjectExtensionException e) { + handleException(aggregate, "Error merging aggregation results to JSON Object : " + + aggregationExpression.toString(), e, aggregate.getLastMessage()); + } + } + + private void setXMLResultToVariable(OMElement variable, Aggregate aggregate) { + + for (MessageContext synCtx : aggregate.getMessages()) { + List list = getMatchingElements(synCtx, aggregationExpression); + addChildren(list, variable); + } + } + + private Object createNewVariable(MessageContext synCtx, Aggregate aggregate) { + + if (Objects.equals(contentType, JSON_TYPE)) { + return new JsonArray(); + } else if (Objects.equals(contentType, XML_TYPE)) { + return OMAbstractFactory.getOMFactory().createOMElement(new QName(resultTarget)); + } else { + handleException(aggregate, "Error merging aggregation results to variable : " + resultTarget + + " unknown content type : " + contentType, null, synCtx); + return null; + } + } + private MessageContext getAggregatedMessage(Aggregate aggregate) { MessageContext newCtx = null; JsonArray jsonArray = new JsonArray(); - JsonElement result; - boolean isJSONAggregation = aggregationExpression instanceof SynapseJsonPath; for (MessageContext synCtx : aggregate.getMessages()) { if (newCtx == null) { @@ -543,58 +756,51 @@ private MessageContext getAggregatedMessage(Aggregate aggregate) { if (log.isDebugEnabled()) { log.debug("Generating Aggregated message from : " + newCtx.getEnvelope()); } - if (isJSONAggregation) { - jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression)); - } else { - try { - EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx, (SynapseXPath) aggregationExpression); - } catch (JaxenException e) { - handleException(aggregate, "Error merging aggregation results using XPath : " + - aggregationExpression.toString(), e, synCtx); + if (Objects.equals(contentType, JSON_TYPE)) { + Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); + if (evaluatedResult instanceof JsonElement) { + jsonArray.add((JsonElement) evaluatedResult); + } else { + handleException(aggregate, "Error merging aggregation results as expression : " + + aggregationExpression.toString() + " did not resolve to a JSON value", null, synCtx); } + } else { + enrichEnvelope(synCtx, aggregationExpression); } } else { try { if (log.isDebugEnabled()) { - log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " + + log.debug("Merging message : " + synCtx.getEnvelope() + " using expression : " + aggregationExpression); } - // When the target sequences are not content aware, the message builder wont get triggered. - // Therefore, we need to build the message to do the aggregation. - RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext()); - if (isJSONAggregation) { - jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression)); + if (Objects.equals(contentType, JSON_TYPE)) { + Object evaluatedResult = aggregationExpression.objectValueOf(synCtx); + if (evaluatedResult instanceof JsonElement) { + jsonArray.add((JsonElement) evaluatedResult); + } else { + jsonArray.add(evaluatedResult.toString()); + } } else { - EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx.getEnvelope(), synCtx, (SynapseXPath) - aggregationExpression); + enrichEnvelope(synCtx, aggregationExpression); } if (log.isDebugEnabled()) { log.debug("Merged result : " + newCtx.getEnvelope()); } - } catch (JaxenException e) { - handleException(aggregate, "Error merging aggregation results using XPath : " + - aggregationExpression.toString(), e, synCtx); } catch (SynapseException e) { handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx); } catch (JsonSyntaxException e) { handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx); - } catch (IOException e) { - handleException(aggregate, "IO Error occurred while building the message", e, synCtx); - } catch (XMLStreamException e) { - handleException(aggregate, "XML Error occurred while building the message", e, synCtx); } } } - result = jsonArray; - StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx); - if (isJSONAggregation) { + if (Objects.equals(contentType, JSON_TYPE)) { // setting the new JSON payload to the messageContext try { JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new - ByteArrayInputStream(result.toString().getBytes()), true, true); + ByteArrayInputStream(jsonArray.toString().getBytes()), true, true); } catch (AxisFault axisFault) { log.error("Error occurred while setting the new JSON payload to the msg context", axisFault); } @@ -615,7 +821,6 @@ public SynapsePath getCorrelateExpression() { public void setCorrelateExpression(SynapsePath correlateExpression) { this.correlateExpression = correlateExpression; - this.id = null; } public long getCompletionTimeoutMillis() { @@ -700,11 +905,36 @@ public boolean isContentAltering() { private void handleException(Aggregate aggregate, String msg, Exception exception, MessageContext msgContext) { aggregate.clear(); - activeAggregates.clear(); + activeAggregates.remove(aggregate.getCorrelation()); if (exception != null) { super.handleException(msg, exception, msgContext); } else { super.handleException(msg, msgContext); } } + + public String getContentType() { + + return contentType; + } + + public void setContentType(String contentType) { + + this.contentType = contentType; + } + + public String getResultTarget() { + + return resultTarget; + } + + public void setResultTarget(String resultTarget) { + + this.resultTarget = resultTarget; + } + + private boolean isTargetBody() { + + return "body".equalsIgnoreCase(resultTarget); + } } diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java index a8572a5256..67da918d99 100644 --- a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java @@ -36,7 +36,8 @@ public ScatterGatherMediatorSerializationTest() { public void testScatterGatherSerialization() { - String inputXML = " " + + String inputXML = " " + " " + " { \"pet\": { " + "\"name\": \"pet1\", \"type\": \"dog\" }, " +