-
Notifications
You must be signed in to change notification settings - Fork 509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Issue: Ensure proper locking in WorkflowSweeper to prevent race conditions #214
Fix Issue: Ensure proper locking in WorkflowSweeper to prevent race conditions #214
Conversation
core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java
Show resolved
Hide resolved
@@ -74,24 +79,25 @@ public CompletableFuture<Void> sweepAsync(String workflowId) { | |||
|
|||
public void sweep(String workflowId) { | |||
WorkflowModel workflow = null; | |||
StopWatch watch = new StopWatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rq-dbrady , Instead of using stopwatch, Can we try to acquire a lock here and if successful, then we do repair or come out of sweep logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So currently it follows the logic:
- Create stopwatch.
- Set the workflow context.
- AcquireLock (if fail return from sweep)
- Gets workflow from store.
- Run repair
- Start Stop Watch.
- Decide
- Remove From DeciderQueue
- Release lock and Stop StopWatch.
Are you proposing we Acquire the lock at the method start and then start the stop watch ? so something like
public void sweep(String workflowId) {
WorkflowModel workflow = null;
StopWatch watch = new StopWatch();
if (!executionLockService.acquireLock(workflowId)) {
watch.start();
//Do repair logic
// Decide etc ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rq-dbrady , Yes, the first thing we should try is to acquire a lock. then start a stop watch, run decide, stop stopwatch. release a lock. Optionally we dont need stop watch to measure execution time. We can use System.currentTimeMillis()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@manan164 , Applied changes in the logic order suggested !
- Acquire lock
- Run repair
- Start timer
- Decide
- End timer
- Release lock
0c37dca
to
76df8d7
Compare
76df8d7
to
42a708f
Compare
Pull Request type
NOTE: Please remember to run
./gradlew spotlessApply
to fix any format violations.Changes in this PR
Issue:#213
Correct locking mechanism in WorkflowSweeper to prevent race conditions
Previously, the WorkflowSweeper.class had a potential race condition due to the order of operations in the sweep method. The workflow was being fetched from the executionDaoFacade before acquiring the lock, followed by a verifyAndRepair operation that could mutate the state. This sequence allowed for a small window (~50µ to 100µ seconds) where a workflow could be in two different states across different threads, causing inconsistencies and failures in workflow listeners or completion checks.
Changes made:
decideWithLock
method.sweep
method to ensure the workflow is locked before any operations are performed on it.Observed issues:
New implementation in
sweep
method:This fix ensures atomicity in operations on workflows, preventing the race conditions previously observed.