Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
SanojPunchihewa committed Dec 6, 2024
1 parent a85c622 commit f28de29
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,21 @@ public void handleScatterGatherFinishEvent(MessageContext messageContext) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext);
synchronized (tracingScope.getSpanStore()) {
cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext);
cleanUpActiveSpans(tracingScope.getSpanStore(), messageContext);
SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
}

private void cleanUpActiveSpans(SpanStore spanStore, MessageContext messageContext) {
List<SpanWrapper> activeSpanWrappers = spanStore.getActiveSpanWrappers();
for (int i = activeSpanWrappers.size() - 1; i > 0; i--) {
SpanWrapper spanWrapper = activeSpanWrappers.get(i);
spanStore.finishSpan(spanWrapper, messageContext);
}
}

@Override
public void handleStateStackInsertion(MessageContext synCtx, String seqName, SequenceType seqType) {
TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ public class ScatterGatherMediatorFactory extends AbstractMediatorFactory {
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather");
private static final QName ELEMENT_AGGREGATE_Q
= new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation");
private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value");
private static final QName ATT_AGGREGATE_EXPRESSION = new QName("expression");
private static final QName ATT_CONDITION = new QName("condition");
private static final QName ATT_TIMEOUT = new QName("timeout");
private static final QName ATT_MIN_MESSAGES = new QName("min-messages");
private static final QName ATT_MAX_MESSAGES = new QName("max-messages");
private static final QName SEQUENCE_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sequence");
private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution");
private static final QName RESULT_TARGET_Q = new QName("result-target");
private static final QName ROOT_ELEMENT_Q = new QName("root-element");
private static final QName CONTENT_TYPE_Q = new QName("content-type");

private static final SequenceMediatorFactory fac = new SequenceMediatorFactory();
Expand All @@ -87,7 +88,15 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {
if ("JSON".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ScatterGather.JSON_TYPE);
} else if ("XML".equals(contentTypeAttr.getAttributeValue())) {
mediator.setContentType(ScatterGather.XML_TYPE);
OMAttribute rootElementAttr = elem.getAttribute(ROOT_ELEMENT_Q);
if (rootElementAttr != null && StringUtils.isNotBlank(rootElementAttr.getAttributeValue())) {
mediator.setRootElementName(rootElementAttr.getAttributeValue());
mediator.setContentType(ScatterGather.XML_TYPE);
} else {
String msg = "The 'root-element' attribute is required for the configuration of a " +
"Scatter Gather mediator when the 'content-type' is 'XML'";
throw new SynapseException(msg);
}
} else {
String msg = "The 'content-type' attribute should be either 'JSON' or 'XML'";
throw new SynapseException(msg);
Expand Down Expand Up @@ -119,16 +128,16 @@ public Mediator createSpecificMediator(OMElement elem, Properties properties) {

OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q);
if (aggregateElement != null) {
OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_VALUE_TO_AGGREGATE);
OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_AGGREGATE_EXPRESSION);
if (aggregateExpr != null) {
try {
mediator.setAggregationExpression(
SynapsePathFactory.getSynapsePath(aggregateElement, ATT_VALUE_TO_AGGREGATE));
SynapsePathFactory.getSynapsePath(aggregateElement, ATT_AGGREGATE_EXPRESSION));
} catch (JaxenException e) {
handleException("Unable to load the aggregating expression", e);
}
} else {
String msg = "The 'value' attribute is required for the configuration of a Scatter Gather mediator";
String msg = "The 'expression' attribute is required for the configuration of a Scatter Gather mediator";
throw new SynapseException(msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.synapse.config.xml;

import org.apache.axiom.om.OMElement;
import org.apache.commons.lang3.StringUtils;
import org.apache.synapse.Mediator;
import org.apache.synapse.mediators.eip.Target;
import org.apache.synapse.mediators.v2.ScatterGather;
Expand Down Expand Up @@ -47,11 +48,15 @@ public OMElement serializeSpecificMediator(Mediator m) {
"result-target", nullNS, scatterGatherMediator.getResultTarget()));
scatterGatherElement.addAttribute(fac.createOMAttribute(
"content-type", nullNS, scatterGatherMediator.getContentType()));
if (StringUtils.isNotBlank(scatterGatherMediator.getRootElementName())) {
scatterGatherElement.addAttribute(fac.createOMAttribute(
"root-element", nullNS, scatterGatherMediator.getRootElementName()));
}

OMElement aggregationElement = fac.createOMElement("aggregation", synNS);

SynapsePathSerializer.serializePath(
scatterGatherMediator.getAggregationExpression(), aggregationElement, "value");
scatterGatherMediator.getAggregationExpression(), aggregationElement, "expression");

if (scatterGatherMediator.getCorrelateExpression() != null) {
SynapsePathSerializer.serializePath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,20 @@ public void run() {
debugManager.advertiseMediationFlowStartPoint(synCtx);
}

boolean result = seq.mediate(synCtx);
// If this is a scatter message, then we need to use the continuation state and continue the mediation
if (isScatterMessage(synCtx)) {
boolean result = seq.mediate(synCtx);
if (result) {
SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
if (seqContinuationState == null) {
return;
}
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);

FlowContinuableMediator mediator =
(FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());

synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
mediator.mediate(synCtx, seqContinuationState);
if (isScatterMessage(synCtx) && result) {
SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx);
if (seqContinuationState == null) {
return;
}
} else {
seq.mediate(synCtx);
SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState);

FlowContinuableMediator mediator =
(FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition());

synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true);
mediator.mediate(synCtx, seqContinuationState);
}
//((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard();

Expand Down Expand Up @@ -188,7 +184,7 @@ public void setStatisticsCloseEventListener(StatisticsCloseEventListener statist
*/
private static boolean isScatterMessage(MessageContext synCtx) {

Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
return isSkipContinuationState != null && isSkipContinuationState;
Boolean isScatterMessage = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES);
return isScatterMessage != null && isScatterMessage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;

/**
* An instance of this class is created to manage each aggregation group, and it holds
Expand All @@ -53,7 +54,7 @@ public class Aggregate extends TimerTask {
private AggregateMediator aggregateMediator = null;
private ScatterGather scatterGatherMediator = null;
private List<MessageContext> messages = new ArrayList<MessageContext>();
private boolean locked = false;
private ReentrantLock lock = new ReentrantLock();
private boolean completed = false;
private SynapseEnvironment synEnv = null;

Expand Down Expand Up @@ -313,15 +314,15 @@ public void run() {
}

public synchronized boolean getLock() {
if (!locked) {
locked = true;
return true;
}
return false;

return lock.tryLock();
}

public synchronized void releaseLock() {
locked = false;

if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}

public boolean isCompleted() {
Expand All @@ -331,5 +332,4 @@ public boolean isCompleted() {
public void setCompleted(boolean completed) {
this.completed = completed;
}

}
Loading

0 comments on commit f28de29

Please sign in to comment.