Skip to content

Commit

Permalink
Allow multiple readers to acquire a read-only permit (DataChunkRefImpl)
Browse files Browse the repository at this point in the history
  • Loading branch information
Goshawk committed Aug 13, 2024
1 parent 59ca4c4 commit f83cb73
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use tokio::sync::{
};
use tokio::task::JoinHandle;

/// Maximum number of concurrent readers allowed for a chunk
const MAX_CONCURRENT_READERS: usize = 10;

type Cache = Arc<RwLock<HashMap<ChunkId, Arc<Semaphore>>>>;

pub struct DataChunkRefImpl<T: StorageEngine> {
Expand Down Expand Up @@ -127,21 +130,26 @@ impl<T: StorageEngine> DataManagerImpl<T> {
let db = self.db.clone();

let handle = tokio::spawn(async move {
// If someone has already acquired a DataChunkRefImpl of this chunk
// then we need to wait here until it's dropped
let permit = semaphore.acquire().await.unwrap();

if db
.read()
// Wait for all ongoing readers to drop DataChunkRefImpl (permit).
// Only if all permits are released, the chunk can be deleted
if let Ok(permit) = semaphore
.acquire_many(MAX_CONCURRENT_READERS as u32)
.await
.delete_chunk_id(&chunk_id)
.is_ok()
{
let mut cache = cache.write().await;
cache.remove(&chunk_id);
if db
.read()
.await
.delete_chunk_id(&chunk_id)
.is_ok()
{
let mut cache = cache.write().await;
cache.remove(&chunk_id);
}

drop(permit);
} else {
println!("Failed to acquire semaphore for chunk deletion");
}

drop(permit);
});

Some(handle)
Expand All @@ -159,7 +167,9 @@ impl<T: StorageEngine> DataManagerImpl<T> {
.for_each(|chunk_id| {
cache.insert(
*chunk_id,
Arc::new(Semaphore::new(1)),
Arc::new(Semaphore::new(
MAX_CONCURRENT_READERS,
)),
);
});

Expand Down Expand Up @@ -210,7 +220,9 @@ impl<T: StorageEngine> DataManagerImpl<T> {
if let hash_map::Entry::Vacant(e) =
cache.entry(chunk.id)
{
e.insert(Arc::new(Semaphore::new(1)));
e.insert(Arc::new(Semaphore::new(
MAX_CONCURRENT_READERS,
)));
false
} else {
true
Expand Down

0 comments on commit f83cb73

Please sign in to comment.