From a85c6227c076d4d11eba21f30496e2cb7930175d Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Mon, 2 Dec 2024 19:00:00 +0530 Subject: [PATCH] Fix concurrency issue in aggregation --- .../mediators/eip/aggregator/Aggregate.java | 25 +++++++++++-------- .../synapse/mediators/v2/ScatterGather.java | 3 +-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java index cbd7998886..337a693d57 100755 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java @@ -115,9 +115,6 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli */ public synchronized boolean addMessage(MessageContext synCtx) { if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) { - if (messages == null) { - return false; - } messages.add(synCtx); return true; } else { @@ -261,12 +258,16 @@ public void run() { break; } if (getLock()) { - if (log.isDebugEnabled()) { - log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " + - "expired at : " + expiryTimeMillis); + try { + if (log.isDebugEnabled()) { + log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " + + "expired at : " + expiryTimeMillis); + } + synEnv.getExecutorService().execute(new AggregateTimeout(this)); + break; + } finally { + releaseLock(); } - synEnv.getExecutorService().execute(new AggregateTimeout(this)); - break; } } } @@ -312,10 +313,14 @@ public void run() { } public synchronized boolean getLock() { - return !locked; + if (!locked) { + locked = true; + return true; + } + return false; } - public void releaseLock() { + public synchronized void releaseLock() { locked = false; } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java index 876183b7fe..c0e5e25f8f 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -448,7 +448,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { } } else { synLog.traceOrDebug("Unable to find aggregation correlation property"); - return true; + return false; } // if there is an aggregate continue on aggregation if (aggregate != null) { @@ -471,7 +471,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { } } else { synLog.traceOrDebug("Unable to find an aggregate for this message - skip"); - return true; } return false; }