diff --git a/src/data_manager.rs b/src/data_manager.rs index 5559130..522c5fd 100644 --- a/src/data_manager.rs +++ b/src/data_manager.rs @@ -11,6 +11,9 @@ use tokio::sync::{ }; use tokio::task::JoinHandle; +/// Maximum number of concurrent readers allowed for a chunk +const MAX_CONCURRENT_READERS: usize = 10; + type Cache = Arc>>>; pub struct DataChunkRefImpl { @@ -127,21 +130,26 @@ impl DataManagerImpl { let db = self.db.clone(); let handle = tokio::spawn(async move { - // If someone has already acquired a DataChunkRefImpl of this chunk - // then we need to wait here until it's dropped - let permit = semaphore.acquire().await.unwrap(); - - if db - .read() + // Wait for all ongoing readers to drop DataChunkRefImpl (permit). + // Only if all permits are released, the chunk can be deleted + if let Ok(permit) = semaphore + .acquire_many(MAX_CONCURRENT_READERS as u32) .await - .delete_chunk_id(&chunk_id) - .is_ok() { - let mut cache = cache.write().await; - cache.remove(&chunk_id); + if db + .read() + .await + .delete_chunk_id(&chunk_id) + .is_ok() + { + let mut cache = cache.write().await; + cache.remove(&chunk_id); + } + + drop(permit); + } else { + println!("Failed to acquire semaphore for chunk deletion"); } - - drop(permit); }); Some(handle) @@ -159,7 +167,9 @@ impl DataManagerImpl { .for_each(|chunk_id| { cache.insert( *chunk_id, - Arc::new(Semaphore::new(1)), + Arc::new(Semaphore::new( + MAX_CONCURRENT_READERS, + )), ); }); @@ -210,7 +220,9 @@ impl DataManagerImpl { if let hash_map::Entry::Vacant(e) = cache.entry(chunk.id) { - e.insert(Arc::new(Semaphore::new(1))); + e.insert(Arc::new(Semaphore::new( + MAX_CONCURRENT_READERS, + ))); false } else { true