Skip to content

Commit

Permalink
Limit waiting for a single node when downloading blobs. (#3232) (#3243)
Browse files Browse the repository at this point in the history
## Motivation

This is a backport of #3232.

## Proposal

See #3232.

## Test Plan

CI

## Release Plan

- These changes should be released in a new SDK.

## Links

- Original PR: #3232
- [reviewer
checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
  • Loading branch information
afck authored Feb 4, 2025
1 parent 612f0d3 commit e117e2c
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp
Don't include any messages in blocks, and don't make any decision whether to accept or reject

* `--restrict-chain-ids-to <RESTRICT_CHAIN_IDS_TO>` — A set of chains to restrict incoming messages from. By default, messages from all chains are accepted. To reject messages from all chains, specify an empty string
* `--blob-download-timeout-ms <BLOB_DOWNLOAD_TIMEOUT>` — The delay when downloading a blob, after which we try a second validator, in milliseconds

Default value: `1000`



Expand Down
1 change: 1 addition & 0 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ where
options.long_lived_services,
chain_ids,
name,
options.blob_download_timeout,
);

ClientContext {
Expand Down
8 changes: 8 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ pub struct ClientOptions {
/// an empty string.
#[arg(long, value_parser = util::parse_chain_set)]
pub restrict_chain_ids_to: Option<HashSet<ChainId>>,

/// The delay when downloading a blob, after which we try a second validator, in milliseconds.
#[arg(
long = "blob-download-timeout-ms",
default_value = "1000",
value_parser = util::parse_millis
)]
pub blob_download_timeout: Duration,
}

impl ClientOptions {
Expand Down
3 changes: 2 additions & 1 deletion linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#![allow(clippy::large_futures)]

use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use async_trait::async_trait;
use futures::{lock::Mutex, FutureExt as _};
Expand Down Expand Up @@ -162,6 +162,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
false,
[chain_id0],
format!("Client node for {:.8}", chain_id0),
Duration::from_secs(1),
)),
};
let key_pair = KeyPair::generate_from(&mut rng);
Expand Down
19 changes: 16 additions & 3 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
num::NonZeroUsize,
ops::{Deref, DerefMut},
sync::{Arc, RwLock},
time::Duration,
};

use chain_state::ChainState;
Expand Down Expand Up @@ -195,11 +196,14 @@ where
storage: Storage,
/// Chain state for the managed chains.
chains: DashMap<ChainId, ChainState>,
/// The delay when downloading a blob, after which we try a second validator.
blob_download_timeout: Duration,
}

impl<P, S: Storage + Clone> Client<P, S> {
/// Creates a new `Client` with a new cache and notifiers.
#[instrument(level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
pub fn new(
validator_node_provider: P,
storage: S,
Expand All @@ -208,6 +212,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
long_lived_services: bool,
tracked_chains: impl IntoIterator<Item = ChainId>,
name: impl Into<String>,
blob_download_timeout: Duration,
) -> Self {
let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
let state = WorkerState::new_for_client(
Expand All @@ -231,6 +236,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
tracked_chains,
notifier: Arc::new(ChannelNotifier::default()),
storage,
blob_download_timeout,
}
}

Expand Down Expand Up @@ -290,6 +296,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
max_pending_message_bundles: self.max_pending_message_bundles,
message_policy: self.message_policy.clone(),
cross_chain_message_delivery: self.cross_chain_message_delivery,
blob_download_timeout: self.blob_download_timeout,
},
}
}
Expand Down Expand Up @@ -492,6 +499,8 @@ pub struct ChainClientOptions {
pub message_policy: MessagePolicy,
/// Whether to block on cross-chain message delivery.
pub cross_chain_message_delivery: CrossChainMessageDelivery,
/// The delay when downloading a blob, after which we try a second validator.
pub blob_download_timeout: Duration,
}

/// Client to operate a chain by interacting with validators and the given local storage
Expand Down Expand Up @@ -1244,9 +1253,13 @@ where
if let Err(err) = self.process_certificate(certificate.clone(), vec![]).await {
match &err {
LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) => {
let blobs = RemoteNode::download_blobs(blob_ids, &nodes)
.await
.ok_or(err)?;
let blobs = RemoteNode::download_blobs(
blob_ids,
&nodes,
self.client.blob_download_timeout,
)
.await
.ok_or(err)?;
self.process_certificate(certificate.clone(), blobs).await?;
}
_ => {
Expand Down
28 changes: 22 additions & 6 deletions linera-core/src/remote_node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, fmt};
use std::{collections::HashMap, fmt, time::Duration};

use futures::{stream::FuturesUnordered, StreamExt};
use linera_base::{
Expand Down Expand Up @@ -246,12 +246,24 @@ impl<N: ValidatorNode> RemoteNode<N> {
}

#[instrument(level = "trace", skip(validators))]
async fn download_blob(validators: &[Self], blob_id: BlobId) -> Option<Blob> {
async fn download_blob(
validators: &[Self],
blob_id: BlobId,
timeout: Duration,
) -> Option<Blob> {
// Sequentially try each validator in random order.
let mut validators = validators.iter().collect::<Vec<_>>();
validators.shuffle(&mut rand::thread_rng());
for remote_node in validators {
if let Some(blob) = remote_node.try_download_blob(blob_id).await {
let mut stream = validators
.into_iter()
.zip(0..)
.map(|(remote_node, i)| async move {
tokio::time::sleep(timeout * i * i).await;
remote_node.try_download_blob(blob_id).await
})
.collect::<FuturesUnordered<_>>();
while let Some(maybe_blob) = stream.next().await {
if let Some(blob) = maybe_blob {
return Some(blob);
}
}
Expand All @@ -262,10 +274,14 @@ impl<N: ValidatorNode> RemoteNode<N> {
/// Each task goes through the validators sequentially in random order and tries to download
/// it. Returns `None` if it couldn't find all blobs.
#[instrument(level = "trace", skip(validators))]
pub async fn download_blobs(blob_ids: &[BlobId], validators: &[Self]) -> Option<Vec<Blob>> {
pub async fn download_blobs(
blob_ids: &[BlobId],
validators: &[Self],
timeout: Duration,
) -> Option<Vec<Blob>> {
let mut stream = blob_ids
.iter()
.map(|blob_id| Self::download_blob(validators, *blob_id))
.map(|blob_id| Self::download_blob(validators, *blob_id, timeout))
.collect::<FuturesUnordered<_>>();
let mut blobs = Vec::new();
while let Some(maybe_blob) = stream.next().await {
Expand Down
2 changes: 2 additions & 0 deletions linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
num::NonZeroUsize,
sync::Arc,
time::Duration,
vec,
};

Expand Down Expand Up @@ -807,6 +808,7 @@ where
false,
[chain_id],
format!("Client node for {:.8}", chain_id),
Duration::from_secs(1),
));
Ok(builder.create_chain_client(
chain_id,
Expand Down
10 changes: 9 additions & 1 deletion linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
#![recursion_limit = "256"]
#![deny(clippy::large_futures)]

use std::{borrow::Cow, collections::HashMap, env, path::PathBuf, sync::Arc, time::Instant};
use std::{
borrow::Cow,
collections::HashMap,
env,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};

use anyhow::{anyhow, bail, ensure, Context};
use async_trait::async_trait;
Expand Down Expand Up @@ -1167,6 +1174,7 @@ impl Job {
false,
vec![message_id.chain_id, chain_id],
"Temporary client for fetching the parent chain",
Duration::from_secs(1),
);

// Take the latest committee we know of.
Expand Down

0 comments on commit e117e2c

Please sign in to comment.