Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the storage service reading. #3011

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions linera-storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow.workspace = true
async-lock.workspace = true
bcs.workspace = true
clap.workspace = true
futures.workspace = true
linera-base.workspace = true
linera-version.workspace = true
linera-views.workspace = true
Expand Down
46 changes: 32 additions & 14 deletions linera-storage-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{mem, sync::Arc};

use async_lock::{Semaphore, SemaphoreGuard};
use futures::future::join_all;
use linera_base::ensure;
#[cfg(with_metrics)]
use linera_views::metering::MeteredStore;
Expand Down Expand Up @@ -96,7 +97,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
if num_chunks == 0 {
Ok(value)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}

Expand Down Expand Up @@ -161,7 +162,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
Ok(values)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}

Expand Down Expand Up @@ -192,7 +193,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
if num_chunks == 0 {
Ok(keys)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}

Expand Down Expand Up @@ -227,7 +228,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
.collect::<Vec<_>>();
Ok(key_values)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}
}
Expand Down Expand Up @@ -351,21 +352,38 @@ impl ServiceStoreClientInternal {
}
}

async fn read_single_entry(
&self,
message_index: i64,
index: i32,
) -> Result<Vec<u8>, ServiceStoreError> {
let channel = self.channel.clone();
let query = RequestSpecificChunk {
message_index,
index,
};
let request = tonic::Request::new(query);
let mut client = StoreProcessorClient::new(channel);
deuszx marked this conversation as resolved.
Show resolved Hide resolved
let response = client.process_specific_chunk(request).await?;
let response = response.into_inner();
let ReplySpecificChunk { chunk } = response;
Ok(chunk)
}

async fn read_entries<S: DeserializeOwned>(
client: &mut StoreProcessorClient<Channel>,
&self,
message_index: i64,
num_chunks: i32,
) -> Result<S, ServiceStoreError> {
let mut value = Vec::new();
let mut handles = Vec::new();
println!("read_entries: message_index={message_index} num_chunks={num_chunks}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is a debug print?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, forgot to push.

for index in 0..num_chunks {
let query = RequestSpecificChunk {
message_index,
index,
};
let request = tonic::Request::new(query);
let response = client.process_specific_chunk(request).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what this was for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calls to read_value, read_multi_values, find_keys_by_prefix and find_key_values_by_prefix can return something of size higher than the one of the maximal size of GRPC. So, we need to split the read into several chunks which are processed one by one.

let response = response.into_inner();
let ReplySpecificChunk { chunk } = response;
let handle = self.read_single_entry(message_index, index);
handles.push(handle);
}
let mut value = Vec::new();
for chunk in join_all(handles).await {
let chunk = chunk?;
value.extend(chunk);
}
Ok(bcs::from_bytes(&value)?)
Expand Down
25 changes: 17 additions & 8 deletions linera-storage-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ enum ServiceStoreServerInternal {
RocksDb(RocksDbStore),
}

#[derive(Default)]
struct BigRead {
num_processed_chunks: usize,
chunks: Vec<Vec<u8>>,
}

#[derive(Default)]
struct PendingBigReads {
index: i64,
chunks_by_index: BTreeMap<i64, Vec<Vec<u8>>>,
big_reads: BTreeMap<i64, BigRead>,
}

struct ServiceStoreServer {
Expand Down Expand Up @@ -219,9 +225,11 @@ impl ServiceStoreServer {
let mut pending_big_reads = self.pending_big_reads.write().await;
let message_index = pending_big_reads.index;
pending_big_reads.index += 1;
pending_big_reads
.chunks_by_index
.insert(message_index, chunks);
let big_read = BigRead {
num_processed_chunks: 0,
chunks,
};
pending_big_reads.big_reads.insert(message_index, big_read);
(message_index, num_chunks)
}
}
Expand Down Expand Up @@ -457,13 +465,14 @@ impl StoreProcessor for ServiceStoreServer {
index,
} = request;
let mut pending_big_reads = self.pending_big_reads.write().await;
let Some(entry) = pending_big_reads.chunks_by_index.get(&message_index) else {
let Some(entry) = pending_big_reads.big_reads.get_mut(&message_index) else {
return Err(Status::not_found("process_specific_chunk"));
};
let index = index as usize;
let chunk = entry[index].clone();
if entry.len() == index + 1 {
pending_big_reads.chunks_by_index.remove(&message_index);
let chunk = entry.chunks[index].clone();
entry.num_processed_chunks += 1;
if entry.chunks.len() == entry.num_processed_chunks {
pending_big_reads.big_reads.remove(&message_index);
}
let response = ReplySpecificChunk { chunk };
Ok(Response::new(response))
Expand Down
Loading