Skip to content

Commit

Permalink
feat: clean staled locks manually
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 1, 2024
1 parent 8410e7c commit 05b3764
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
6 changes: 5 additions & 1 deletion src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ impl Runner {

// Release locks and notify parent procedure.
guard.finish();
// Clean the staled locks.
self.manager_ctx.key_lock.clean();

// If this is the root procedure, clean up message cache.
if self.meta.parent_id.is_none() {
Expand Down Expand Up @@ -787,6 +789,7 @@ mod tests {
runner.manager_ctx = manager_ctx.clone();

runner.run().await;
assert!(manager_ctx.key_lock.is_empty());

// Check child procedures.
for child_id in children_ids {
Expand Down Expand Up @@ -1045,10 +1048,11 @@ mod tests {
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
runner.manager_ctx = manager_ctx.clone();

// Run the runner and execute the procedure.
runner.run().await;
assert!(manager_ctx.key_lock.is_empty());
let err = meta.state().error().unwrap().output_msg();
assert!(err.contains("subprocedure failed"), "{err}");
}
Expand Down
13 changes: 9 additions & 4 deletions src/common/procedure/src/local/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ where
pub async fn read(&self, key: K) -> OwnedRwLockReadGuard<()> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

Expand All @@ -91,7 +90,6 @@ where
pub async fn write(&self, key: K) -> OwnedRwLockWriteGuard<()> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

Expand All @@ -102,7 +100,6 @@ where
pub fn try_read(&self, key: K) -> Result<OwnedRwLockReadGuard<()>, TryLockError> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

Expand All @@ -113,7 +110,6 @@ where
pub fn try_write(&self, key: K) -> Result<OwnedRwLockWriteGuard<()>, TryLockError> {
let lock = {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
locks.entry(key.clone()).or_default().clone()
};

Expand All @@ -129,6 +125,12 @@ where
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Clean up stale locks.
pub fn clean(&self) {
let mut locks = self.inner.lock().unwrap();
Self::clean_up(&mut locks);
}
}

#[cfg(test)]
Expand All @@ -155,5 +157,8 @@ mod tests {
}

assert_eq!(lock_key.len(), 2);

lock_key.clean();
assert!(lock_key.is_empty());
}
}

0 comments on commit 05b3764

Please sign in to comment.