diff --git a/Cargo.lock b/Cargo.lock index 0994a82242e..cf2a454eb93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4692,6 +4692,7 @@ dependencies = [ "cfg_aliases", "clap", "criterion", + "futures", "linera-base", "linera-storage-service", "linera-version", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index baca9449146..ec1e2f34f13 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -3436,6 +3436,7 @@ dependencies = [ "bcs", "cfg_aliases", "clap", + "futures", "linera-base", "linera-version", "linera-views", diff --git a/linera-storage-service/Cargo.toml b/linera-storage-service/Cargo.toml index de066aa7a48..4a63535dd8c 100644 --- a/linera-storage-service/Cargo.toml +++ b/linera-storage-service/Cargo.toml @@ -26,6 +26,7 @@ anyhow.workspace = true async-lock.workspace = true bcs.workspace = true clap.workspace = true +futures.workspace = true linera-base.workspace = true linera-version.workspace = true linera-views.workspace = true diff --git a/linera-storage-service/src/client.rs b/linera-storage-service/src/client.rs index e3332dfad39..3df20326fe9 100644 --- a/linera-storage-service/src/client.rs +++ b/linera-storage-service/src/client.rs @@ -4,6 +4,7 @@ use std::{mem, sync::Arc}; use async_lock::{Semaphore, SemaphoreGuard}; +use futures::future::join_all; use linera_base::ensure; #[cfg(with_metrics)] use linera_views::metering::MeteredStore; @@ -96,7 +97,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { if num_chunks == 0 { Ok(value) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } @@ -161,7 +162,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { let values = values.into_iter().map(|x| x.value).collect::>(); Ok(values) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } @@ -192,7 +193,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { if num_chunks == 0 { Ok(keys) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } @@ -227,7 +228,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { .collect::>(); Ok(key_values) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } } @@ -351,21 +352,38 @@ impl ServiceStoreClientInternal { } } + async fn read_single_entry( + &self, + message_index: i64, + index: i32, + ) -> Result, ServiceStoreError> { + let channel = self.channel.clone(); + let query = RequestSpecificChunk { + message_index, + index, + }; + let request = tonic::Request::new(query); + let mut client = StoreProcessorClient::new(channel); + let response = client.process_specific_chunk(request).await?; + let response = response.into_inner(); + let ReplySpecificChunk { chunk } = response; + Ok(chunk) + } + async fn read_entries( - client: &mut StoreProcessorClient, + &self, message_index: i64, num_chunks: i32, ) -> Result { - let mut value = Vec::new(); + let mut handles = Vec::new(); + println!("read_entries: message_index={message_index} num_chunks={num_chunks}"); for index in 0..num_chunks { - let query = RequestSpecificChunk { - message_index, - index, - }; - let request = tonic::Request::new(query); - let response = client.process_specific_chunk(request).await?; - let response = response.into_inner(); - let ReplySpecificChunk { chunk } = response; + let handle = self.read_single_entry(message_index, index); + handles.push(handle); + } + let mut value = Vec::new(); + for chunk in join_all(handles).await { + let chunk = chunk?; value.extend(chunk); } Ok(bcs::from_bytes(&value)?) diff --git a/linera-storage-service/src/server.rs b/linera-storage-service/src/server.rs index b7b2465cd6d..b08afd64022 100644 --- a/linera-storage-service/src/server.rs +++ b/linera-storage-service/src/server.rs @@ -45,10 +45,16 @@ enum ServiceStoreServerInternal { RocksDb(RocksDbStore), } +#[derive(Default)] +struct BigRead { + num_processed_chunks: usize, + chunks: Vec>, +} + #[derive(Default)] struct PendingBigReads { index: i64, - chunks_by_index: BTreeMap>>, + big_reads: BTreeMap, } struct ServiceStoreServer { @@ -219,9 +225,11 @@ impl ServiceStoreServer { let mut pending_big_reads = self.pending_big_reads.write().await; let message_index = pending_big_reads.index; pending_big_reads.index += 1; - pending_big_reads - .chunks_by_index - .insert(message_index, chunks); + let big_read = BigRead { + num_processed_chunks: 0, + chunks, + }; + pending_big_reads.big_reads.insert(message_index, big_read); (message_index, num_chunks) } } @@ -457,13 +465,14 @@ impl StoreProcessor for ServiceStoreServer { index, } = request; let mut pending_big_reads = self.pending_big_reads.write().await; - let Some(entry) = pending_big_reads.chunks_by_index.get(&message_index) else { + let Some(entry) = pending_big_reads.big_reads.get_mut(&message_index) else { return Err(Status::not_found("process_specific_chunk")); }; let index = index as usize; - let chunk = entry[index].clone(); - if entry.len() == index + 1 { - pending_big_reads.chunks_by_index.remove(&message_index); + let chunk = entry.chunks[index].clone(); + entry.num_processed_chunks += 1; + if entry.chunks.len() == entry.num_processed_chunks { + pending_big_reads.big_reads.remove(&message_index); } let response = ReplySpecificChunk { chunk }; Ok(Response::new(response))