diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4df22395c..e96f386686 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -246,13 +246,13 @@ jobs: # TODO: We have a bunch of platform-dependent code so should # probably run this job on the full platform matrix - name: clippy check (all features) - run: cargo clippy --workspace --all-features --all-targets --bins --tests --benches + run: cargo clippy --workspace --all-features --all-targets --lib --bins --tests --benches --examples - name: clippy check (no features) - run: cargo clippy --workspace --no-default-features --lib --bins --tests + run: cargo clippy --workspace --no-default-features --all-targets --lib --bins --tests --benches --examples - name: clippy check (default features) - run: cargo clippy --workspace --all-targets + run: cargo clippy --workspace --all-targets --lib --bins --tests --benches --examples msrv: if: "github.event_name != 'pull_request' || ! contains(github.event.pull_request.labels.*.name, 'flaky-test')" @@ -295,7 +295,7 @@ jobs: branch: ${{ github.ref }} max_workers: 4 netsim_branch: "main" - sim_paths: "sims/iroh/iroh.json,sims/integration" + sim_paths: "sims/iroh_v2/iroh.json,sims/integration_v2" pr_number: ${{ github.event.pull_request.number || '' }} codespell: diff --git a/.github/workflows/netsim.yml b/.github/workflows/netsim.yml index b7a79cd1d1..cbd08149e4 100644 --- a/.github/workflows/netsim.yml +++ b/.github/workflows/netsim.yml @@ -39,7 +39,7 @@ jobs: branch: "main" max_workers: 1 netsim_branch: "main" - sim_paths: "sims/iroh,sims/integration" + sim_paths: "sims/iroh_v2,sims/integration_v2" pr_number: "" publish_metrics: true build_profile: "optimized-release" @@ -53,7 +53,7 @@ jobs: branch: ${{inputs.branch}} max_workers: 1 netsim_branch: ${{inputs.netsim_branch}} - sim_paths: "sims/iroh" + sim_paths: "sims/iroh_v2" pr_number: ${{inputs.pr_number}} publish_metrics: false build_profile: "optimized-release" diff --git a/.github/workflows/netsim_runner.yaml b/.github/workflows/netsim_runner.yaml index ae7b20d08c..df168f2803 100644 --- a/.github/workflows/netsim_runner.yaml +++ b/.github/workflows/netsim_runner.yaml @@ -133,7 +133,7 @@ jobs: - name: Copy binaries to right location run: | cp target/${{inputs.build_profile}}/examples/* ../chuck/netsim/bins/ - cp target/${{inputs.build_profile}}/iroh ../chuck/netsim/bins/iroh + cp target/${{inputs.build_profile}}/examples/transfer ../chuck/netsim/bins/iroh-transfer cp target/${{inputs.build_profile}}/iroh-relay ../chuck/netsim/bins/iroh-relay cp ../chuck/target/release/chuck ../chuck/netsim/bins/chuck diff --git a/Cargo.lock b/Cargo.lock index 409947558a..99275d046a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2499,6 +2499,7 @@ dependencies = [ "nested_enum_utils", "num_cpus", "parking_lot", + "parse-size", "postcard", "proptest", "quic-rpc", @@ -3655,6 +3656,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-size" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "944553dd59c802559559161f9816429058b869003836120e262e8caec061b7ae" + [[package]] name = "paste" version = "1.0.15" diff --git a/iroh-net/bench/src/lib.rs b/iroh-net/bench/src/lib.rs index a591581d26..93e0c91e51 100644 --- a/iroh-net/bench/src/lib.rs +++ b/iroh-net/bench/src/lib.rs @@ -21,7 +21,7 @@ pub mod s2n; pub mod stats; #[derive(Parser, Debug, Clone, Copy)] -#[clap(name = "bulk")] +#[clap(name = "iroh-net-bench")] pub enum Commands { Iroh(Opt), #[cfg(not(any(target_os = "freebsd", target_os = "openbsd", target_os = "netbsd")))] diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 91b2350171..392256c730 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -62,6 +62,7 @@ ref-cast = "1.0.23" # Examples clap = { version = "4", features = ["derive"], optional = true } indicatif = { version = "0.17", features = ["tokio"], optional = true } +parse-size = { version = "=1.0.0", optional = true } # pinned version to avoid bumping msrv to 1.81 # Documentation tests url = { version = "2.5.0", features = ["serde"] } @@ -74,7 +75,7 @@ test = [] discovery-pkarr-dht = ["iroh-net/discovery-pkarr-dht"] test-utils = ["iroh-net/test-utils"] -examples = ["dep:clap", "dep:indicatif"] +examples = ["dep:clap", "dep:indicatif", "dep:parse-size"] [dev-dependencies] anyhow = { version = "1" } @@ -101,3 +102,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"] [[example]] name = "rpc" required-features = ["examples"] + +[[example]] +name = "transfer" +required-features = ["examples"] diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs new file mode 100644 index 0000000000..9225155b21 --- /dev/null +++ b/iroh/examples/transfer.rs @@ -0,0 +1,324 @@ +use std::{ + str::FromStr, + time::{Duration, Instant}, +}; + +use anyhow::{Context, Result}; +use bytes::Bytes; +use clap::{Parser, Subcommand}; +use futures_lite::StreamExt; +use indicatif::HumanBytes; +use iroh_net::{ + key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, +}; +use tracing::info; + +// Transfer ALPN that we are using to communicate over the `Endpoint` +const TRANSFER_ALPN: &[u8] = b"n0/iroh/transfer/example/0"; + +#[derive(Parser, Debug)] +#[command(name = "transfer")] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Debug)] +enum Commands { + Provide { + #[clap(long, default_value = "1G", value_parser = parse_byte_size)] + size: u64, + #[clap(long)] + relay_url: Option, + }, + Fetch { + #[arg(index = 1)] + ticket: String, + #[clap(long)] + relay_url: Option, + }, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let cli = Cli::parse(); + + match &cli.command { + Commands::Provide { size, relay_url } => provide(*size, relay_url.clone()).await?, + Commands::Fetch { ticket, relay_url } => fetch(ticket, relay_url.clone()).await?, + } + + Ok(()) +} + +async fn provide(size: u64, relay_url: Option) -> anyhow::Result<()> { + let secret_key = SecretKey::generate(); + let relay_mode = match relay_url { + Some(relay_url) => { + let relay_url = RelayUrl::from_str(&relay_url)?; + let relay_map = RelayMap::from_url(relay_url); + RelayMode::Custom(relay_map) + } + None => RelayMode::Default, + }; + let endpoint = Endpoint::builder() + .secret_key(secret_key) + .alpns(vec![TRANSFER_ALPN.to_vec()]) + .relay_mode(relay_mode) + .bind() + .await?; + + let node_id = endpoint.node_id(); + + for local_endpoint in endpoint + .direct_addresses() + .next() + .await + .context("no endpoints")? + { + println!("\t{}", local_endpoint.addr) + } + + let relay_url = endpoint + .home_relay() + .expect("should be connected to a relay server"); + let local_addrs = endpoint + .direct_addresses() + .next() + .await + .context("no endpoints")? + .into_iter() + .map(|endpoint| endpoint.addr) + .collect::>(); + + let node_addr = NodeAddr::from_parts(node_id, Some(relay_url), local_addrs); + let ticket = NodeTicket::new(node_addr); + + println!("NodeTicket: {}", ticket); + + // accept incoming connections, returns a normal QUIC connection + while let Some(incoming) = endpoint.accept().await { + let connecting = match incoming.accept() { + Ok(connecting) => connecting, + Err(err) => { + tracing::warn!("incoming connection failed: {err:#}"); + // we can carry on in these cases: + // this can be caused by retransmitted datagrams + continue; + } + }; + let conn = connecting.await?; + let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?; + info!( + "new connection from {node_id} with ALPN {} (coming from {})", + String::from_utf8_lossy(&TRANSFER_ALPN), + conn.remote_address() + ); + + // spawn a task to handle reading and writing off of the connection + tokio::spawn(async move { + // accept a bi-directional QUIC connection + // use the `quinn` APIs to send and recv content + let (mut send, mut recv) = conn.accept_bi().await?; + tracing::debug!("accepted bi stream, waiting for data..."); + let message = recv.read_to_end(100).await?; + let message = String::from_utf8(message)?; + println!("received: {message}"); + + send_data_on_stream(&mut send, size).await?; + + // We sent the last message, so wait for the client to close the connection once + // it received this message. + let res = tokio::time::timeout(Duration::from_secs(3), async move { + let closed = conn.closed().await; + if !matches!(closed, quinn::ConnectionError::ApplicationClosed(_)) { + println!("node {node_id} disconnected with an error: {closed:#}"); + } + }) + .await; + if res.is_err() { + println!("node {node_id} did not disconnect within 3 seconds"); + } + Ok::<_, anyhow::Error>(()) + }); + } + + // stop with SIGINT (ctrl-c) + Ok(()) +} + +async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { + let ticket: NodeTicket = ticket.parse()?; + let secret_key = SecretKey::generate(); + let relay_mode = match relay_url { + Some(relay_url) => { + let relay_url = RelayUrl::from_str(&relay_url)?; + let relay_map = RelayMap::from_url(relay_url); + RelayMode::Custom(relay_map) + } + None => RelayMode::Default, + }; + let endpoint = Endpoint::builder() + .secret_key(secret_key) + .alpns(vec![TRANSFER_ALPN.to_vec()]) + .relay_mode(relay_mode) + .bind() + .await?; + + let start = Instant::now(); + + let me = endpoint.node_id(); + println!("node id: {me}"); + println!("node listening addresses:"); + for local_endpoint in endpoint + .direct_addresses() + .next() + .await + .context("no endpoints")? + { + println!("\t{}", local_endpoint.addr) + } + + let relay_url = endpoint + .home_relay() + .expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server"); + println!("node relay server url: {relay_url}\n"); + + // Attempt to connect, over the given ALPN. + // Returns a Quinn connection. + let conn = endpoint + .connect(ticket.node_addr().clone(), TRANSFER_ALPN) + .await?; + info!("connected"); + + // Use the Quinn API to send and recv content. + let (mut send, mut recv) = conn.open_bi().await?; + + let message = format!("{me} is saying 'hello!'"); + send.write_all(message.as_bytes()).await?; + + // Call `finish` to signal no more data will be sent on this stream. + send.finish()?; + + let (len, time_to_first_byte, chnk) = drain_stream(&mut recv, false).await?; + + // We received the last message: close all connections and allow for the close + // message to be sent. + endpoint.close(0u8.into(), b"bye").await?; + + // Ensure the client has closed the connection + let res = tokio::time::timeout(Duration::from_secs(3), async move { + let closed = conn.closed().await; + if !matches!(closed, quinn::ConnectionError::LocallyClosed) { + println!("node disconnected with an error: {closed:#}"); + } + }) + .await; + if res.is_err() { + println!("node did not disconnect within 3 seconds"); + } + + let duration = start.elapsed(); + println!( + "Received {} in {:.4}s with time to first byte {}s in {} chunks", + HumanBytes(len as u64), + duration.as_secs_f64(), + time_to_first_byte.as_secs_f64(), + chnk + ); + println!( + "Transferred {} in {:.4}, {}/s", + HumanBytes(len as u64), + duration.as_secs_f64(), + HumanBytes((len as f64 / duration.as_secs_f64()) as u64) + ); + + Ok(()) +} + +async fn drain_stream( + stream: &mut iroh_net::endpoint::RecvStream, + read_unordered: bool, +) -> Result<(usize, Duration, u64)> { + let mut read = 0; + + let download_start = Instant::now(); + let mut first_byte = true; + let mut time_to_first_byte = download_start.elapsed(); + + let mut num_chunks: u64 = 0; + + if read_unordered { + while let Some(chunk) = stream.read_chunk(usize::MAX, false).await? { + if first_byte { + time_to_first_byte = download_start.elapsed(); + first_byte = false; + } + read += chunk.bytes.len(); + num_chunks += 1; + } + } else { + // These are 32 buffers, for reading approximately 32kB at once + #[rustfmt::skip] + let mut bufs = [ + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + ]; + + while let Some(n) = stream.read_chunks(&mut bufs[..]).await? { + if first_byte { + time_to_first_byte = download_start.elapsed(); + first_byte = false; + } + read += bufs.iter().take(n).map(|buf| buf.len()).sum::(); + num_chunks += 1; + } + } + + Ok((read, time_to_first_byte, num_chunks)) +} + +async fn send_data_on_stream( + stream: &mut iroh_net::endpoint::SendStream, + stream_size: u64, +) -> Result<()> { + const DATA: &[u8] = &[0xAB; 1024 * 1024]; + let bytes_data = Bytes::from_static(DATA); + + let full_chunks = stream_size / (DATA.len() as u64); + let remaining = (stream_size % (DATA.len() as u64)) as usize; + + for _ in 0..full_chunks { + stream + .write_chunk(bytes_data.clone()) + .await + .context("failed sending data")?; + } + + if remaining != 0 { + stream + .write_chunk(bytes_data.slice(0..remaining)) + .await + .context("failed sending data")?; + } + + stream.finish().context("failed finishing stream")?; + stream + .stopped() + .await + .context("failed to wait for stream to be stopped")?; + + Ok(()) +} + +fn parse_byte_size(s: &str) -> Result { + let cfg = parse_size::Config::new().with_binary(); + cfg.parse_size(s).map_err(|e| anyhow::anyhow!(e)) +}