Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the storage service reading. #3011

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions linera-storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow.workspace = true
async-lock.workspace = true
bcs.workspace = true
clap.workspace = true
futures.workspace = true
linera-base.workspace = true
linera-version.workspace = true
linera-views.workspace = true
Expand Down
48 changes: 33 additions & 15 deletions linera-storage-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{mem, sync::Arc};

use async_lock::{Semaphore, SemaphoreGuard};
use futures::future::join_all;
use linera_base::ensure;
#[cfg(with_metrics)]
use linera_views::metering::MeteredStore;
Expand Down Expand Up @@ -96,7 +97,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
if num_chunks == 0 {
Ok(value)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}

Expand Down Expand Up @@ -161,7 +162,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
Ok(values)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}

Expand Down Expand Up @@ -192,7 +193,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
if num_chunks == 0 {
Ok(keys)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}

Expand Down Expand Up @@ -227,7 +228,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
.collect::<Vec<_>>();
Ok(key_values)
} else {
Self::read_entries(&mut client, message_index, num_chunks).await
self.read_entries(message_index, num_chunks).await
}
}
}
Expand Down Expand Up @@ -351,23 +352,40 @@ impl ServiceStoreClientInternal {
}
}

async fn read_single_entry(
&self,
message_index: i64,
index: i32,
) -> Result<Vec<u8>, ServiceStoreError> {
let channel = self.channel.clone();
let query = RequestSpecificChunk {
message_index,
index,
};
let request = tonic::Request::new(query);
let mut client = StoreProcessorClient::new(channel);
deuszx marked this conversation as resolved.
Show resolved Hide resolved
let response = client.process_specific_chunk(request).await?;
let response = response.into_inner();
let ReplySpecificChunk { chunk } = response;
Ok(chunk)
}

async fn read_entries<S: DeserializeOwned>(
client: &mut StoreProcessorClient<Channel>,
&self,
message_index: i64,
num_chunks: i32,
) -> Result<S, ServiceStoreError> {
let mut value = Vec::new();
let mut handles = Vec::new();
println!("read_entries: message_index={message_index} num_chunks={num_chunks}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is a debug print?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, forgot to push.

for index in 0..num_chunks {
let query = RequestSpecificChunk {
message_index,
index,
};
let request = tonic::Request::new(query);
let response = client.process_specific_chunk(request).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what this was for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calls to read_value, read_multi_values, find_keys_by_prefix and find_key_values_by_prefix can return something of size higher than the one of the maximal size of GRPC. So, we need to split the read into several chunks which are processed one by one.

let response = response.into_inner();
let ReplySpecificChunk { chunk } = response;
value.extend(chunk);
let handle = self.read_single_entry(message_index, index);
handles.push(handle);
}
let values: Vec<Vec<u8>> = join_all(handles)
.await
.into_iter()
.collect::<Result<_, _>>()?;
let value = values.into_iter().flatten().collect::<Vec<_>>();
Copy link
Contributor

@ma2bd ma2bd Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would collect only once. Not sure how to do it with combinators but a for-loop should work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, replaced with a for loop.
Indeed, I saw some potential issues with the two collect statements. But we seem to prefer using functional code, so that is why I used it.

Ok(bcs::from_bytes(&value)?)
}
}
Expand Down
25 changes: 17 additions & 8 deletions linera-storage-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ enum ServiceStoreServerInternal {
RocksDb(RocksDbStore),
}

#[derive(Default)]
struct BigRead {
num_read: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I read this I was confused on wether this was the expected total number of reads or what it was. So maybe num_processed_chunks or something like that would be a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done.

chunks: Vec<Vec<u8>>,
}

#[derive(Default)]
struct PendingBigReads {
index: i64,
chunks_by_index: BTreeMap<i64, Vec<Vec<u8>>>,
big_reads: BTreeMap<i64, BigRead>,
}

struct ServiceStoreServer {
Expand Down Expand Up @@ -219,9 +225,11 @@ impl ServiceStoreServer {
let mut pending_big_reads = self.pending_big_reads.write().await;
let message_index = pending_big_reads.index;
pending_big_reads.index += 1;
pending_big_reads
.chunks_by_index
.insert(message_index, chunks);
let big_read = BigRead {
num_read: 0,
chunks,
};
pending_big_reads.big_reads.insert(message_index, big_read);
(message_index, num_chunks)
}
}
Expand Down Expand Up @@ -457,13 +465,14 @@ impl StoreProcessor for ServiceStoreServer {
index,
} = request;
let mut pending_big_reads = self.pending_big_reads.write().await;
let Some(entry) = pending_big_reads.chunks_by_index.get(&message_index) else {
let Some(entry) = pending_big_reads.big_reads.get_mut(&message_index) else {
return Err(Status::not_found("process_specific_chunk"));
};
let index = index as usize;
let chunk = entry[index].clone();
if entry.len() == index + 1 {
pending_big_reads.chunks_by_index.remove(&message_index);
let chunk = entry.chunks[index].clone();
entry.num_read += 1;
if entry.chunks.len() == entry.num_read {
pending_big_reads.big_reads.remove(&message_index);
}
let response = ReplySpecificChunk { chunk };
Ok(Response::new(response))
Expand Down
Loading