Skip to content

Commit

Permalink
Fix concurrency issue in aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
SanojPunchihewa committed Dec 3, 2024
1 parent 905d1bb commit 1104cbb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli
*/
public synchronized boolean addMessage(MessageContext synCtx) {
if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) {
if (messages == null) {
return false;
}
messages.add(synCtx);
return true;
} else {
Expand Down Expand Up @@ -261,12 +258,16 @@ public void run() {
break;
}
if (getLock()) {
if (log.isDebugEnabled()) {
log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " +
"expired at : " + expiryTimeMillis);
try {
if (log.isDebugEnabled()) {
log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " +
"expired at : " + expiryTimeMillis);
}
synEnv.getExecutorService().execute(new AggregateTimeout(this));
break;
} finally {
releaseLock();
}
synEnv.getExecutorService().execute(new AggregateTimeout(this));
break;
}
}
}
Expand Down Expand Up @@ -312,10 +313,14 @@ public void run() {
}

public synchronized boolean getLock() {
return !locked;
if (!locked) {
locked = true;
return true;
}
return false;
}

public void releaseLock() {
public synchronized void releaseLock() {
locked = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
}
} else {
synLog.traceOrDebug("Unable to find aggregation correlation property");
return true;
return false;
}
// if there is an aggregate continue on aggregation
if (aggregate != null) {
Expand All @@ -471,7 +471,6 @@ private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) {
}
} else {
synLog.traceOrDebug("Unable to find an aggregate for this message - skip");
return true;
}
return false;
}
Expand Down

0 comments on commit 1104cbb

Please sign in to comment.