Skip to content

Commit

Permalink
Add scatter gather mediator
Browse files Browse the repository at this point in the history
  • Loading branch information
SanojPunchihewa committed Dec 6, 2024
1 parent 4ca35d8 commit 0ccc7e5
Show file tree
Hide file tree
Showing 13 changed files with 1,108 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -626,4 +626,6 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE

public static final String ANALYTICS_METADATA = "ANALYTICS_METADATA";

public static final String SCATTER_MESSAGES = "SCATTER_MESSAGES";
public static final String CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER = "CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER";
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,17 @@ public static void tryEndFlow(MessageContext messageContext, String componentNam
// closeFlowForcefully(messageContext);
}
}

/**
* This method will close the event collector and finish the flow when a Scatter Gather mediator is used.
*
* @param messageContext synapse message context.
*/
public static void closeEventsAfterScatterGather(MessageContext messageContext) {

if (isOpenTelemetryEnabled()) {
OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler()
.handleScatterGatherFinishEvent(messageContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ public interface CloseEventHandler {
* @param synCtx Message context.
*/
void handleTryEndFlow(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx);

/**
* Handles a close flow event.
*
* @param synCtx Message context.
*/
void handleScatterGatherFinishEvent(MessageContext synCtx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,16 @@ private void handleCallbackFinishEvent(MessageContext messageContext) {
}
}

public void handleScatterGatherFinishEvent(MessageContext messageContext) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext);
synchronized (tracingScope.getSpanStore()) {
cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext);
SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
}

@Override
public void handleStateStackInsertion(MessageContext synCtx, String seqName, SequenceType seqType) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public class MediatorFactoryFinder implements XMLToObjectMapper {
ForEachMediatorFactory.class,
JSONTransformMediatorFactory.class,
NTLMMediatorFactory.class,
VariableMediatorFactory.class
VariableMediatorFactory.class,
ScatterGatherMediatorFactory.class
};

private final static MediatorFactoryFinder instance = new MediatorFactoryFinder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public class MediatorSerializerFinder {
ForEachMediatorSerializer.class,
JSONTransformMediatorSerializer.class,
NTLMMediatorSerializer.class,
VariableMediatorSerializer.class
VariableMediatorSerializer.class,
ScatterGatherMediatorSerializer.class
};

private final static MediatorSerializerFinder instance = new MediatorSerializerFinder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.config.xml;

import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.synapse.Mediator;
import org.apache.synapse.mediators.eip.Target;
import org.apache.synapse.mediators.v2.ScatterGather;
import org.jaxen.JaxenException;

import java.util.Iterator;
import java.util.Properties;
import javax.xml.namespace.QName;

/**
* The <scatter-gather> mediator is used to copy messages in Synapse to similar messages but with
* different message contexts and aggregate the responses back.
*
* <pre>
* &lt;scatter-gather parallel-execution=(true | false)&gt;
* &lt;aggregation value-to-aggregate="expression" condition="expression" timeout="long"
* min-messages="expression" max-messages="expression"/&gt;
* &lt;target&gt;
* &lt;sequence&gt;
* (mediator)+
* &lt;/sequence&gt;
* &lt;/target&gt;+
* &lt;/scatter-gather&gt;
* </pre>
*/
public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {

/**
* This will hold the QName of the clone mediator element in the xml configuration
*/
private static final QName SCATTER_GATHER_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather");
private static final QName ELEMENT_AGGREGATE_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation");
private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value-to-aggregate");
private static final QName ATT_CONDITION = new QName("condition");
private static final QName ATT_TIMEOUT = new QName("timeout");
private static final QName ATT_MIN_MESSAGES = new QName("min-messages");
private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
private static final QName TARGET_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "target");
private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");

public Mediator createSpecificMediator(OMElement elem, Properties properties) {

boolean asynchronousExe = true;

ScatterGather mediator = new ScatterGather();
processAuditStatus(mediator, elem);

OMAttribute parallelExecAttr = elem.getAttribute(PARALLEL_EXEC_Q);
if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) {
asynchronousExe = false;
}

mediator.setParallelExecution(asynchronousExe);

Iterator targetElements = elem.getChildrenWithName(TARGET_Q);
while (targetElements.hasNext()) {
Target target = TargetFactory.createTarget((OMElement) targetElements.next(), properties);
target.setAsynchronous(asynchronousExe);
mediator.addTarget(target);
}

OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q);
if (aggregateElement != null) {
OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_VALUE_TO_AGGREGATE);
if (aggregateExpr != null) {
try {
mediator.setAggregationExpression(
SynapsePathFactory.getSynapsePath(aggregateElement, ATT_VALUE_TO_AGGREGATE));
} catch (JaxenException e) {
handleException("Unable to load the aggregating expression", e);
}
}

OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION);
if (conditionExpr != null) {
try {
mediator.setCorrelateExpression(
SynapsePathFactory.getSynapsePath(aggregateElement, ATT_CONDITION));
} catch (JaxenException e) {
handleException("Unable to load the condition expression", e);
}
}

OMAttribute completeTimeout = aggregateElement.getAttribute(ATT_TIMEOUT);
if (completeTimeout != null) {
mediator.setCompletionTimeoutMillis(Long.parseLong(completeTimeout.getAttributeValue()));
}

OMAttribute minMessages = aggregateElement.getAttribute(ATT_MIN_MESSAGES);
if (minMessages != null) {
mediator.setMinMessagesToComplete(new ValueFactory().createValue("min-messages", aggregateElement));
}

OMAttribute maxMessages = aggregateElement.getAttribute(ATT_MAX_MESSAGES);
if (maxMessages != null) {
mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement));
}
}
addAllCommentChildrenToList(elem, mediator.getCommentsList());
return mediator;
}

/**
* This method will implement the getTagQName method of the MediatorFactory interface
*
* @return QName of the clone element in xml configuration
*/
public QName getTagQName() {

return SCATTER_GATHER_Q;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.config.xml;

import org.apache.axiom.om.OMElement;
import org.apache.synapse.Mediator;
import org.apache.synapse.mediators.eip.Target;
import org.apache.synapse.mediators.v2.ScatterGather;

/**
* Serializer for {@link ScatterGather} instances.
*/
public class ScatterGatherMediatorSerializer extends AbstractMediatorSerializer {

public OMElement serializeSpecificMediator(Mediator m) {

ScatterGather scatterGatherMediator = null;
if (!(m instanceof ScatterGather)) {
handleException("Unsupported mediator passed in for serialization : " + m.getType());
} else {
scatterGatherMediator = (ScatterGather) m;
}

assert scatterGatherMediator != null;
OMElement scatterGatherElement = fac.createOMElement("scatter-gather", synNS);
saveTracingState(scatterGatherElement, scatterGatherMediator);

scatterGatherElement.addAttribute(fac.createOMAttribute(
"parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution())));
OMElement aggregationElement = fac.createOMElement("aggregation", synNS);

SynapsePathSerializer.serializePath(
scatterGatherMediator.getAggregationExpression(), aggregationElement, "value-to-aggregate");

if (scatterGatherMediator.getCorrelateExpression() != null) {
SynapsePathSerializer.serializePath(
scatterGatherMediator.getAggregationExpression(), aggregationElement, "condition");
}

if (scatterGatherMediator.getCompletionTimeoutMillis() != 0) {
aggregationElement.addAttribute(fac.createOMAttribute(
"timeout", nullNS, Long.toString(scatterGatherMediator.getCompletionTimeoutMillis())));
}
if (scatterGatherMediator.getMinMessagesToComplete() != null) {
new ValueSerializer().serializeValue(
scatterGatherMediator.getMinMessagesToComplete(), "min-messages", aggregationElement);
}
if (scatterGatherMediator.getMaxMessagesToComplete() != null) {
new ValueSerializer().serializeValue(
scatterGatherMediator.getMaxMessagesToComplete(), "max-messages", aggregationElement);
}
scatterGatherElement.addChild(aggregationElement);

for (Target target : scatterGatherMediator.getTargets()) {
if (target != null) {
scatterGatherElement.addChild(TargetSerializer.serializeTarget(target));
}
}
serializeComments(scatterGatherElement, scatterGatherMediator.getCommentsList());

return scatterGatherElement;
}

public String getMediatorClassName() {

return ScatterGather.class.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@

package org.apache.synapse.mediators;

import org.apache.synapse.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.FaultHandler;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.aspects.ComponentType;
import org.apache.synapse.aspects.flow.statistics.StatisticsCloseEventListener;
import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector;
import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants;
import org.apache.synapse.carbonext.TenantInfoConfigurator;
import org.apache.synapse.continuation.ContinuationStackManager;
import org.apache.synapse.continuation.SeqContinuationState;
import org.apache.synapse.debug.SynapseDebugManager;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.util.logging.LoggingUtils;

/**
Expand Down Expand Up @@ -86,7 +97,25 @@ public void run() {
debugManager.advertiseMediationFlowStartPoint(synCtx);
}

seq.mediate(synCtx);
// If this is a scatter message, then we need to use the continuation state and continue the mediation
if (isScatterMessage(synCtx)) {
boolean result = seq.mediate(synCtx);
if (result) {
SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
if (seqContinuationState == null) {
return;
}
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);

FlowContinuableMediator mediator =
(FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());

synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
mediator.mediate(synCtx, seqContinuationState);
}
} else {
seq.mediate(synCtx);
}
//((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();

} catch (SynapseException syne) {
Expand Down Expand Up @@ -150,4 +179,16 @@ private void warn(boolean traceOn, String msg, MessageContext msgContext) {
public void setStatisticsCloseEventListener(StatisticsCloseEventListener statisticsCloseEventListener) {
this.statisticsCloseEventListener = statisticsCloseEventListener;
}

/**
* Check whether the message is a scatter message or not
*
* @param synCtx MessageContext
* @return true if the message is a scatter message
*/
private static boolean isScatterMessage(MessageContext synCtx) {

Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
return isSkipContinuationState != null && isSkipContinuationState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -521,4 +521,16 @@ public void setComponentStatisticsId(ArtifactHolder holder) {
StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder);
}
}

/**
* Check whether the message is a scatter message or not
*
* @param synCtx MessageContext
* @return true if the message is a scatter message
*/
private static boolean isScatterMessage(MessageContext synCtx) {

Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
return isSkipContinuationState != null && isSkipContinuationState;
}
}
Loading

0 comments on commit 0ccc7e5

Please sign in to comment.