From 5c4816b0547dffc73d4ff64edf47235e6f1379dd Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 14 Mar 2025 09:21:26 +0200 Subject: [PATCH 1/5] feat: Allow configuring the downloader when creating a blobs protocol handler we don't allow passing in the entire downloader, but all config options. --- src/net_protocol.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 0eaca5781..15bae2d01 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; use tracing::debug; use crate::{ - downloader::Downloader, + downloader::{self, Downloader}, provider::EventSender, store::GcConfig, util::{ @@ -142,11 +142,18 @@ impl BlobBatches { } } +#[derive(Debug, Default)] +pub struct DownloaderConfig { + pub concurrency: downloader::ConcurrencyLimits, + pub retry: downloader::RetryConfig, +} + /// Builder for the Blobs protocol handler #[derive(Debug)] pub struct Builder { store: S, events: Option, + downloader: Option, rt: Option, } @@ -163,6 +170,12 @@ impl Builder { self } + /// Set a custom downloader configuration. + pub fn downloader(mut self, config: DownloaderConfig) -> Self { + self.downloader = Some(config); + self + } + /// Build the Blobs protocol handler. /// You need to provide a the endpoint. pub fn build(self, endpoint: &Endpoint) -> Blobs { @@ -170,7 +183,14 @@ impl Builder { .rt .map(Rt::Handle) .unwrap_or_else(|| Rt::Owned(LocalPool::default())); - let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone()); + let DownloaderConfig { concurrency, retry } = self.downloader.unwrap_or_default(); + let downloader = Downloader::with_config( + self.store.clone(), + endpoint.clone(), + rt.clone(), + concurrency, + retry, + ); Blobs::new( self.store, rt, @@ -187,6 +207,7 @@ impl Blobs { Builder { store, events: None, + downloader: None, rt: None, } } From c263c2a7d36a3fc309163ce1623bf728ddf5cea7 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 14 Mar 2025 09:40:26 +0200 Subject: [PATCH 2/5] Allow getting the downloader config so we can test it --- src/downloader.rs | 34 ++++++++++++++++++++++++++++++++-- src/net_protocol.rs | 12 +++--------- tests/rpc.rs | 25 ++++++++++++++++++++++++- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index 87f0462b3..cfc3af72f 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -141,7 +141,7 @@ pub enum GetOutput { } /// Concurrency limits for the [`Downloader`]. -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ConcurrencyLimits { /// Maximum number of requests the service performs concurrently. pub max_concurrent_requests: usize, @@ -193,7 +193,7 @@ impl ConcurrencyLimits { } /// Configuration for retry behavior of the [`Downloader`]. -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct RetryConfig { /// Maximum number of retry attempts for a node that failed to dial or failed with IO errors. pub max_retries_per_node: u32, @@ -325,6 +325,15 @@ impl Future for DownloadHandle { } } +/// All numerical config options for the downloader. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct Config { + /// Concurrency limits for the downloader. + pub concurrency: ConcurrencyLimits, + /// Retry configuration for the downloader. + pub retry: RetryConfig, +} + /// Handle for the download services. #[derive(Clone, Debug)] pub struct Downloader { @@ -374,6 +383,15 @@ impl Downloader { } } + /// Get the current configuration. + pub async fn get_config(&self) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + let msg = Message::GetConfig { tx }; + self.msg_tx.send(msg).await?; + let config = rx.await?; + Ok(config) + } + /// Queue a download. pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle { let kind = request.kind; @@ -441,6 +459,11 @@ enum Message { /// Cancel an intent. The associated request will be cancelled when the last intent is /// cancelled. CancelIntent { id: IntentId, kind: DownloadKind }, + /// Get the config + GetConfig { + #[debug(skip)] + tx: oneshot::Sender, + }, } #[derive(derive_more::Debug)] @@ -668,6 +691,13 @@ impl, D: DialerT> Service { self.queue.unpark_hash(hash); } } + Message::GetConfig { tx } => { + let config = Config { + concurrency: self.concurrency_limits, + retry: self.retry_config, + }; + tx.send(config).ok(); + } } } diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 15bae2d01..2e7b3c3eb 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -142,18 +142,12 @@ impl BlobBatches { } } -#[derive(Debug, Default)] -pub struct DownloaderConfig { - pub concurrency: downloader::ConcurrencyLimits, - pub retry: downloader::RetryConfig, -} - /// Builder for the Blobs protocol handler #[derive(Debug)] pub struct Builder { store: S, events: Option, - downloader: Option, + downloader: Option, rt: Option, } @@ -171,7 +165,7 @@ impl Builder { } /// Set a custom downloader configuration. - pub fn downloader(mut self, config: DownloaderConfig) -> Self { + pub fn downloader(mut self, config: downloader::Config) -> Self { self.downloader = Some(config); self } @@ -183,7 +177,7 @@ impl Builder { .rt .map(Rt::Handle) .unwrap_or_else(|| Rt::Owned(LocalPool::default())); - let DownloaderConfig { concurrency, retry } = self.downloader.unwrap_or_default(); + let downloader::Config { concurrency, retry } = self.downloader.unwrap_or_default(); let downloader = Downloader::with_config( self.store.clone(), endpoint.clone(), diff --git a/tests/rpc.rs b/tests/rpc.rs index 7dc12e7b2..4cff50169 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -1,7 +1,7 @@ #![cfg(feature = "test")] use std::{net::SocketAddr, path::PathBuf, vec}; -use iroh_blobs::net_protocol::Blobs; +use iroh_blobs::{downloader, net_protocol::Blobs}; use quic_rpc::client::QuinnConnector; use tempfile::TempDir; use testresult::TestResult; @@ -85,3 +85,26 @@ async fn quinn_rpc_large() -> TestResult<()> { assert_eq!(data, &data2[..]); Ok(()) } + +#[tokio::test] +async fn downloader_config() -> TestResult<()> { + let _ = tracing_subscriber::fmt::try_init(); + let endpoint = iroh::Endpoint::builder().bind().await?; + let store = iroh_blobs::store::mem::Store::default(); + let expected = downloader::Config { + concurrency: downloader::ConcurrencyLimits { + max_concurrent_requests: usize::MAX, + max_concurrent_requests_per_node: usize::MAX, + max_open_connections: usize::MAX, + max_concurrent_dials_per_hash: usize::MAX, + }, + retry: downloader::RetryConfig { + max_retries_per_node: u32::MAX, + initial_retry_delay: std::time::Duration::from_secs(1), + }, + }; + let blobs = Blobs::builder(store).downloader(expected).build(&endpoint); + let actual = blobs.downloader().get_config().await?; + assert_eq!(expected, actual); + Ok(()) +} From 3a5ea84ab776b43695710203e823370863c682e6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 14 Mar 2025 10:25:01 +0200 Subject: [PATCH 3/5] A bunch of annoying plumbing just to be able to test that the config is properly applied. - downloader handle has the config - downloader handle gets an Arc --- src/downloader.rs | 74 ++++++++++++++++-------------------------- src/downloader/test.rs | 14 ++++++-- src/net_protocol.rs | 5 ++- tests/rpc.rs | 4 +-- 4 files changed, 43 insertions(+), 54 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index cfc3af72f..22453d528 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -335,12 +335,19 @@ pub struct Config { } /// Handle for the download services. -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub struct Downloader { + inner: Arc, +} + +#[derive(Debug)] +struct Inner { /// Next id to use for a download intent. - next_id: Arc, + next_id: AtomicU64, /// Channel to communicate with the service. msg_tx: mpsc::Sender, + /// Configuration for the downloader. + config: Arc, } impl Downloader { @@ -349,53 +356,46 @@ impl Downloader { where S: Store, { - Self::with_config(store, endpoint, rt, Default::default(), Default::default()) + Self::with_config(store, endpoint, rt, Default::default()) } /// Create a new Downloader with custom [`ConcurrencyLimits`] and [`RetryConfig`]. - pub fn with_config( - store: S, - endpoint: Endpoint, - rt: LocalPoolHandle, - concurrency_limits: ConcurrencyLimits, - retry_config: RetryConfig, - ) -> Self + pub fn with_config(store: S, endpoint: Endpoint, rt: LocalPoolHandle, config: Config) -> Self where S: Store, { + let config = Arc::new(config); let me = endpoint.node_id().fmt_short(); let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY); let dialer = Dialer::new(endpoint); - + let config2 = config.clone(); let create_future = move || { let getter = get::IoGetter { store: store.clone(), }; - - let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx); + let service = Service::new(getter, dialer, config2, msg_rx); service.run().instrument(error_span!("downloader", %me)) }; rt.spawn_detached(create_future); Self { - next_id: Arc::new(AtomicU64::new(0)), - msg_tx, + inner: Arc::new(Inner { + next_id: AtomicU64::new(0), + msg_tx, + config, + }), } } /// Get the current configuration. - pub async fn get_config(&self) -> anyhow::Result { - let (tx, rx) = oneshot::channel(); - let msg = Message::GetConfig { tx }; - self.msg_tx.send(msg).await?; - let config = rx.await?; - Ok(config) + pub fn config(&self) -> &Config { + &self.inner.config } /// Queue a download. pub async fn queue(&self, request: DownloadRequest) -> DownloadHandle { let kind = request.kind; - let intent_id = IntentId(self.next_id.fetch_add(1, Ordering::SeqCst)); + let intent_id = IntentId(self.inner.next_id.fetch_add(1, Ordering::SeqCst)); let (sender, receiver) = oneshot::channel(); let handle = DownloadHandle { id: intent_id, @@ -409,7 +409,7 @@ impl Downloader { }; // if this fails polling the handle will fail as well since the sender side of the oneshot // will be dropped - if let Err(send_err) = self.msg_tx.send(msg).await { + if let Err(send_err) = self.inner.msg_tx.send(msg).await { let msg = send_err.0; debug!(?msg, "download not sent"); } @@ -425,7 +425,7 @@ impl Downloader { receiver: _, } = handle; let msg = Message::CancelIntent { id, kind }; - if let Err(send_err) = self.msg_tx.send(msg).await { + if let Err(send_err) = self.inner.msg_tx.send(msg).await { let msg = send_err.0; debug!(?msg, "cancel not sent"); } @@ -437,7 +437,7 @@ impl Downloader { /// downloads. Use [`Self::queue`] to queue a download. pub async fn nodes_have(&mut self, hash: Hash, nodes: Vec) { let msg = Message::NodesHave { hash, nodes }; - if let Err(send_err) = self.msg_tx.send(msg).await { + if let Err(send_err) = self.inner.msg_tx.send(msg).await { let msg = send_err.0; debug!(?msg, "nodes have not been sent") } @@ -459,11 +459,6 @@ enum Message { /// Cancel an intent. The associated request will be cancelled when the last intent is /// cancelled. CancelIntent { id: IntentId, kind: DownloadKind }, - /// Get the config - GetConfig { - #[debug(skip)] - tx: oneshot::Sender, - }, } #[derive(derive_more::Debug)] @@ -590,19 +585,13 @@ struct Service { progress_tracker: ProgressTracker, } impl, D: DialerT> Service { - fn new( - getter: G, - dialer: D, - concurrency_limits: ConcurrencyLimits, - retry_config: RetryConfig, - msg_rx: mpsc::Receiver, - ) -> Self { + fn new(getter: G, dialer: D, config: Arc, msg_rx: mpsc::Receiver) -> Self { Service { getter, dialer, msg_rx, - concurrency_limits, - retry_config, + concurrency_limits: config.concurrency, + retry_config: config.retry, connected_nodes: Default::default(), retry_node_state: Default::default(), providers: Default::default(), @@ -691,13 +680,6 @@ impl, D: DialerT> Service { self.queue.unpark_hash(hash); } } - Message::GetConfig { tx } => { - let config = Config { - concurrency: self.concurrency_limits, - retry: self.retry_config, - }; - tx.send(config).ok(); - } } } diff --git a/src/downloader/test.rs b/src/downloader/test.rs index 0b5ea1f79..3b452f35a 100644 --- a/src/downloader/test.rs +++ b/src/downloader/test.rs @@ -47,16 +47,24 @@ impl Downloader { let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY); let lp = LocalPool::default(); + let config = Arc::new(Config { + concurrency: concurrency_limits, + retry: retry_config, + }); + let config2 = config.clone(); lp.spawn_detached(move || async move { // we want to see the logs of the service - let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx); + let service = Service::new(getter, dialer, config2, msg_rx); service.run().await }); ( Downloader { - next_id: Arc::new(AtomicU64::new(0)), - msg_tx, + inner: Arc::new(Inner { + next_id: AtomicU64::new(0), + msg_tx, + config, + }), }, lp, ) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 2e7b3c3eb..ce15c59f8 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -177,13 +177,12 @@ impl Builder { .rt .map(Rt::Handle) .unwrap_or_else(|| Rt::Owned(LocalPool::default())); - let downloader::Config { concurrency, retry } = self.downloader.unwrap_or_default(); + let downloader_config = self.downloader.unwrap_or_default(); let downloader = Downloader::with_config( self.store.clone(), endpoint.clone(), rt.clone(), - concurrency, - retry, + downloader_config, ); Blobs::new( self.store, diff --git a/tests/rpc.rs b/tests/rpc.rs index 4cff50169..ee7503fe5 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -104,7 +104,7 @@ async fn downloader_config() -> TestResult<()> { }, }; let blobs = Blobs::builder(store).downloader(expected).build(&endpoint); - let actual = blobs.downloader().get_config().await?; - assert_eq!(expected, actual); + let actual = blobs.downloader().config(); + assert_eq!(&expected, actual); Ok(()) } From a05b2c5b1b213d93af107ce98aa96de9c32ff09f Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 14 Mar 2025 10:58:10 +0200 Subject: [PATCH 4/5] Don't use new rust features --- src/net_protocol.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 0b5c24f04..084561be5 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -172,14 +172,18 @@ impl Builder { /// Set custom [`ConcurrencyLimits`] to use. pub fn concurrency_limits(mut self, concurrency_limits: ConcurrencyLimits) -> Self { - let downloader_config = self.downloader_config.get_or_insert_default(); + let downloader_config = self + .downloader_config + .get_or_insert_with(|| Default::default()); downloader_config.concurrency = concurrency_limits; self } /// Set a custom [`RetryConfig`] to use. pub fn retry_config(mut self, retry_config: RetryConfig) -> Self { - let downloader_config = self.downloader_config.get_or_insert_default(); + let downloader_config = self + .downloader_config + .get_or_insert_with(|| Default::default()); downloader_config.retry = retry_config; self } From b3c060c7d1e9a2a3e874cbe9faba5649da15e014 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 14 Mar 2025 16:28:34 +0200 Subject: [PATCH 5/5] clippy --- src/net_protocol.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 084561be5..a0f6e6574 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -172,18 +172,14 @@ impl Builder { /// Set custom [`ConcurrencyLimits`] to use. pub fn concurrency_limits(mut self, concurrency_limits: ConcurrencyLimits) -> Self { - let downloader_config = self - .downloader_config - .get_or_insert_with(|| Default::default()); + let downloader_config = self.downloader_config.get_or_insert_with(Default::default); downloader_config.concurrency = concurrency_limits; self } /// Set a custom [`RetryConfig`] to use. pub fn retry_config(mut self, retry_config: RetryConfig) -> Self { - let downloader_config = self - .downloader_config - .get_or_insert_with(|| Default::default()); + let downloader_config = self.downloader_config.get_or_insert_with(Default::default); downloader_config.retry = retry_config; self }