From 3abbb5a318fa88f0ae2934715b3ba5b0dfe59708 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Thu, 5 Dec 2024 13:43:15 +0100 Subject: [PATCH 1/4] Upgrade the scheme of database reading. --- Cargo.lock | 1 + linera-storage-service/Cargo.toml | 1 + linera-storage-service/src/client.rs | 48 +++++++++++++++++++--------- linera-storage-service/src/server.rs | 25 ++++++++++----- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0994a82242e..cf2a454eb93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4692,6 +4692,7 @@ dependencies = [ "cfg_aliases", "clap", "criterion", + "futures", "linera-base", "linera-storage-service", "linera-version", diff --git a/linera-storage-service/Cargo.toml b/linera-storage-service/Cargo.toml index de066aa7a48..4a63535dd8c 100644 --- a/linera-storage-service/Cargo.toml +++ b/linera-storage-service/Cargo.toml @@ -26,6 +26,7 @@ anyhow.workspace = true async-lock.workspace = true bcs.workspace = true clap.workspace = true +futures.workspace = true linera-base.workspace = true linera-version.workspace = true linera-views.workspace = true diff --git a/linera-storage-service/src/client.rs b/linera-storage-service/src/client.rs index e3332dfad39..28c88bf5255 100644 --- a/linera-storage-service/src/client.rs +++ b/linera-storage-service/src/client.rs @@ -4,6 +4,7 @@ use std::{mem, sync::Arc}; use async_lock::{Semaphore, SemaphoreGuard}; +use futures::future::join_all; use linera_base::ensure; #[cfg(with_metrics)] use linera_views::metering::MeteredStore; @@ -96,7 +97,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { if num_chunks == 0 { Ok(value) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } @@ -161,7 +162,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { let values = values.into_iter().map(|x| x.value).collect::>(); Ok(values) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } @@ -192,7 +193,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { if num_chunks == 0 { Ok(keys) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } @@ -227,7 +228,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal { .collect::>(); Ok(key_values) } else { - Self::read_entries(&mut client, message_index, num_chunks).await + self.read_entries(message_index, num_chunks).await } } } @@ -351,23 +352,40 @@ impl ServiceStoreClientInternal { } } + async fn read_single_entry( + &self, + message_index: i64, + index: i32, + ) -> Result, ServiceStoreError> { + let channel = self.channel.clone(); + let query = RequestSpecificChunk { + message_index, + index, + }; + let request = tonic::Request::new(query); + let mut client = StoreProcessorClient::new(channel); + let response = client.process_specific_chunk(request).await?; + let response = response.into_inner(); + let ReplySpecificChunk { chunk } = response; + Ok(chunk) + } + async fn read_entries( - client: &mut StoreProcessorClient, + &self, message_index: i64, num_chunks: i32, ) -> Result { - let mut value = Vec::new(); + let mut handles = Vec::new(); + println!("read_entries: message_index={message_index} num_chunks={num_chunks}"); for index in 0..num_chunks { - let query = RequestSpecificChunk { - message_index, - index, - }; - let request = tonic::Request::new(query); - let response = client.process_specific_chunk(request).await?; - let response = response.into_inner(); - let ReplySpecificChunk { chunk } = response; - value.extend(chunk); + let handle = self.read_single_entry(message_index, index); + handles.push(handle); } + let values: Vec> = join_all(handles) + .await + .into_iter() + .collect::>()?; + let value = values.into_iter().flatten().collect::>(); Ok(bcs::from_bytes(&value)?) } } diff --git a/linera-storage-service/src/server.rs b/linera-storage-service/src/server.rs index b7b2465cd6d..0590d67d246 100644 --- a/linera-storage-service/src/server.rs +++ b/linera-storage-service/src/server.rs @@ -45,10 +45,16 @@ enum ServiceStoreServerInternal { RocksDb(RocksDbStore), } +#[derive(Default)] +struct BigRead { + num_read: usize, + chunks: Vec>, +} + #[derive(Default)] struct PendingBigReads { index: i64, - chunks_by_index: BTreeMap>>, + big_reads: BTreeMap, } struct ServiceStoreServer { @@ -219,9 +225,11 @@ impl ServiceStoreServer { let mut pending_big_reads = self.pending_big_reads.write().await; let message_index = pending_big_reads.index; pending_big_reads.index += 1; - pending_big_reads - .chunks_by_index - .insert(message_index, chunks); + let big_read = BigRead { + num_read: 0, + chunks, + }; + pending_big_reads.big_reads.insert(message_index, big_read); (message_index, num_chunks) } } @@ -457,13 +465,14 @@ impl StoreProcessor for ServiceStoreServer { index, } = request; let mut pending_big_reads = self.pending_big_reads.write().await; - let Some(entry) = pending_big_reads.chunks_by_index.get(&message_index) else { + let Some(entry) = pending_big_reads.big_reads.get_mut(&message_index) else { return Err(Status::not_found("process_specific_chunk")); }; let index = index as usize; - let chunk = entry[index].clone(); - if entry.len() == index + 1 { - pending_big_reads.chunks_by_index.remove(&message_index); + let chunk = entry.chunks[index].clone(); + entry.num_read += 1; + if entry.chunks.len() == entry.num_read { + pending_big_reads.big_reads.remove(&message_index); } let response = ReplySpecificChunk { chunk }; Ok(Response::new(response)) From 7aa4ba0bb3e9a7773bf55bce611859a693ec3962 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Thu, 5 Dec 2024 14:42:19 +0100 Subject: [PATCH 2/4] Update to the Cargo.lock --- examples/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/Cargo.lock b/examples/Cargo.lock index baca9449146..ec1e2f34f13 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -3436,6 +3436,7 @@ dependencies = [ "bcs", "cfg_aliases", "clap", + "futures", "linera-base", "linera-version", "linera-views", From 9d6d656b1625fd96e00cf7e6512000acefbf7e10 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 6 Dec 2024 09:08:30 +0100 Subject: [PATCH 3/4] Some renaming. --- linera-storage-service/src/server.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/linera-storage-service/src/server.rs b/linera-storage-service/src/server.rs index 0590d67d246..b08afd64022 100644 --- a/linera-storage-service/src/server.rs +++ b/linera-storage-service/src/server.rs @@ -47,7 +47,7 @@ enum ServiceStoreServerInternal { #[derive(Default)] struct BigRead { - num_read: usize, + num_processed_chunks: usize, chunks: Vec>, } @@ -226,7 +226,7 @@ impl ServiceStoreServer { let message_index = pending_big_reads.index; pending_big_reads.index += 1; let big_read = BigRead { - num_read: 0, + num_processed_chunks: 0, chunks, }; pending_big_reads.big_reads.insert(message_index, big_read); @@ -470,8 +470,8 @@ impl StoreProcessor for ServiceStoreServer { }; let index = index as usize; let chunk = entry.chunks[index].clone(); - entry.num_read += 1; - if entry.chunks.len() == entry.num_read { + entry.num_processed_chunks += 1; + if entry.chunks.len() == entry.num_processed_chunks { pending_big_reads.big_reads.remove(&message_index); } let response = ReplySpecificChunk { chunk }; From d63985594ac506a106bd9714c3c3fe1709089cd4 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 6 Dec 2024 21:31:01 +0100 Subject: [PATCH 4/4] Avoid the two collect statements. --- linera-storage-service/src/client.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/linera-storage-service/src/client.rs b/linera-storage-service/src/client.rs index 28c88bf5255..3df20326fe9 100644 --- a/linera-storage-service/src/client.rs +++ b/linera-storage-service/src/client.rs @@ -381,11 +381,11 @@ impl ServiceStoreClientInternal { let handle = self.read_single_entry(message_index, index); handles.push(handle); } - let values: Vec> = join_all(handles) - .await - .into_iter() - .collect::>()?; - let value = values.into_iter().flatten().collect::>(); + let mut value = Vec::new(); + for chunk in join_all(handles).await { + let chunk = chunk?; + value.extend(chunk); + } Ok(bcs::from_bytes(&value)?) } }