Skip to content

chore: upgrade to iroh and iroh-blobs @ v0.32.0 #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ missing_debug_implementations = "warn"
unused-async = "warn"

[workspace.dependencies]
iroh = "0.31"
iroh-base = "0.31"
iroh-blobs = { version = "0.31", features = ["rpc"] }
iroh = { version ="0.32", features = ["discovery-pkarr-dht"] }
iroh-base = "0.32"
iroh-blobs = { version = "0.32", features = ["rpc"] }
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
let connection = iroh_endpoint
.connect(tracker, iroh_mainline_content_discovery::protocol::ALPN)
.await?;
iroh_mainline_content_discovery::announce(connection, signed_announce).await?;
iroh_mainline_content_discovery::announce_iroh(connection, signed_announce).await?;
}
}
if !args.quic_tracker.is_empty() {
Expand All @@ -82,7 +82,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
for tracker in args.quic_tracker {
println!("announcing via quic to {:?}: {}", tracker, content);
let connection = quinn_endpoint.connect(tracker, "localhost")?.await?;
iroh_mainline_content_discovery::announce(connection, signed_announce).await?;
iroh_mainline_content_discovery::announce_quinn(connection, signed_announce).await?;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ hex = "0.4.3"

# Optional features for the client functionality
tracing = { version = "0.1", optional = true }
iroh-quinn = { version = "0.12", optional = true }
iroh-quinn = { version = "0.13", optional = true }
mainline = { version = "2.0.0", optional = true, features = ["async"] }
anyhow = { version = "1", features = ["backtrace"], optional = true }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std"], optional = true }
Expand Down
72 changes: 62 additions & 10 deletions content-discovery/iroh-mainline-content-discovery/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,30 @@ use crate::protocol::{
/// `tracker` is the node id of the tracker to announce to. It must understand the [TRACKER_ALPN] protocol.
/// `content` is the content to announce.
/// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it.
pub async fn announce(
pub async fn announce_quinn(
connection: iroh_quinn::Connection,
signed_announce: SignedAnnounce,
) -> anyhow::Result<()> {
let (mut send, mut recv) = connection.open_bi().await?;
tracing::debug!("opened bi stream");
let request = Request::Announce(signed_announce);
let request = postcard::to_stdvec(&request)?;
tracing::debug!("sending announce");
send.write_all(&request).await?;
send.finish()?;
let _response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
Ok(())
}

/// Announce to a tracker.
///
/// You can only announce content you yourself claim to have, to avoid spamming other nodes.
///
/// `endpoint` is the iroh endpoint to use for announcing.
/// `tracker` is the node id of the tracker to announce to. It must understand the [TRACKER_ALPN] protocol.
/// `content` is the content to announce.
/// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it.
pub async fn announce_iroh(
connection: iroh::endpoint::Connection,
signed_announce: SignedAnnounce,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -80,7 +103,7 @@ async fn query_socket_one(
args: Query,
) -> anyhow::Result<Vec<SignedAnnounce>> {
let connection = endpoint.connect(addr).await?;
let result = query(connection, args).await?;
let result = query_quinn(connection, args).await?;
Ok(result.hosts)
}

Expand All @@ -90,7 +113,7 @@ async fn query_iroh_one(
args: Query,
) -> anyhow::Result<Vec<SignedAnnounce>> {
let connection = endpoint.connect(node_id, ALPN).await?;
let result = query(connection, args).await?;
let result = query_iroh(connection, args).await?;
Ok(result.hosts)
}

Expand Down Expand Up @@ -185,9 +208,29 @@ pub fn announce_dht(
}

/// Assume an existing connection to a tracker and query it for peers for some content.
pub async fn query(
pub async fn query_iroh(
connection: iroh::endpoint::Connection,
args: Query,
) -> anyhow::Result<QueryResponse> {
tracing::info!("connected to {:?}", connection.remote_node_id()?);
let (mut send, mut recv) = connection.open_bi().await?;
tracing::info!("opened bi stream");
let request = Request::Query(args);
let request = postcard::to_stdvec(&request)?;
tracing::info!("sending query");
send.write_all(&request).await?;
send.finish()?;
let response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
let response = postcard::from_bytes::<Response>(&response)?;
Ok(match response {
Response::QueryResponse(response) => response,
})
}

/// Assume an existing connection to a tracker and query it for peers for some content.
pub async fn query_quinn(
connection: iroh_quinn::Connection,
args: Query,
) -> anyhow::Result<QueryResponse> {
tracing::info!("connected to {:?}", connection.remote_address());
let (mut send, mut recv) = connection.open_bi().await?;
Expand Down Expand Up @@ -283,14 +326,23 @@ pub async fn connect(
tracker: &TrackerId,
local_ipv4_addr: SocketAddrV4,
local_ipv6_addr: SocketAddrV6,
) -> anyhow::Result<iroh::endpoint::Connection> {
) -> anyhow::Result<Connection> {
match tracker {
TrackerId::Quinn(tracker) => connect_socket(*tracker, local_ipv4_addr.into()).await,
TrackerId::Iroh(tracker) => connect_iroh(*tracker, local_ipv4_addr, local_ipv6_addr).await,
TrackerId::Quinn(tracker) => Ok(Connection::Quinn(
connect_socket(*tracker, local_ipv4_addr.into()).await?,
)),
TrackerId::Iroh(tracker) => Ok(Connection::Iroh(
connect_iroh(*tracker, local_ipv4_addr, local_ipv6_addr).await?,
)),
TrackerId::Udp(_) => anyhow::bail!("can not connect to udp tracker"),
}
}

pub enum Connection {
Iroh(iroh::endpoint::Connection),
Quinn(iroh_quinn::Connection),
}

/// Create a iroh endpoint and connect to a tracker using the [crate::protocol::ALPN] protocol.
async fn connect_iroh(
tracker: NodeId,
Expand All @@ -307,13 +359,13 @@ async fn connect_iroh(
Ok(connection)
}

/// Create a quinn endpoint and connect to a tracker using the [crate::protocol::ALPN] protocol.
/// Create a quinn endpoint and connect to a tracker using the [crate] protocol.
async fn connect_socket(
tracker: SocketAddr,
local_addr: SocketAddr,
) -> anyhow::Result<iroh::endpoint::Connection> {
) -> anyhow::Result<iroh_quinn::Connection> {
let endpoint = create_quinn_client(local_addr, vec![ALPN.to_vec()], false)?;
tracing::info!("trying to connect to tracker at {:?}", tracker);
tracing::info!("trying t?o )connect to tracker at {:?}", tracker);
let connection = endpoint.connect(tracker, "localhost")?.await?;
Ok(connection)
}
Expand Down
3 changes: 2 additions & 1 deletion content-discovery/iroh-mainline-tracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ iroh-blobs = { workspace = true }
mainline = { version = "2.0.0", features = ["async"] }
pkarr = { version = "1.0.1", features = ["async"] }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std"] }
iroh-quinn = "0.12"
iroh-quinn = "0.13"
rand = "0.8"
rcgen = "0.12.0"
redb = "1.5.0"
rustls = "0.21"
rustls-pki-types = "1.11"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.107"
tempfile = "3.4"
Expand Down
2 changes: 1 addition & 1 deletion content-discovery/iroh-mainline-tracker/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub fn log_connection_attempt(
path: &Option<PathBuf>,
host: &NodeId,
t0: Instant,
outcome: &anyhow::Result<iroh_quinn::Connection>,
outcome: &anyhow::Result<iroh::endpoint::Connection>,
) -> anyhow::Result<()> {
if let Some(path) = path {
let now = SystemTime::now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use rand::Rng;
/// This is just reading the size header and then immediately closing the connection.
/// It can be used to check if a peer has any data at all.
pub async fn unverified_size(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
) -> anyhow::Result<(u64, Stats)> {
let request = iroh_blobs::protocol::GetRequest::new(
Expand All @@ -42,7 +42,7 @@ pub async fn unverified_size(
/// This asks for the last chunk of the blob and validates the response.
/// Note that this does not validate that the peer has all the data.
pub async fn verified_size(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
) -> anyhow::Result<(u64, Stats)> {
tracing::debug!("Getting verified size of {}", hash.to_hex());
Expand Down Expand Up @@ -81,7 +81,7 @@ pub async fn verified_size(
}

pub async fn get_hash_seq_and_sizes(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
max_size: u64,
) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
Expand Down Expand Up @@ -135,7 +135,7 @@ pub async fn get_hash_seq_and_sizes(

/// Probe for a single chunk of a blob.
pub async fn chunk_probe(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
chunk: ChunkNum,
) -> anyhow::Result<Stats> {
Expand Down
12 changes: 5 additions & 7 deletions content-discovery/iroh-mainline-tracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use clap::Parser;
use iroh::{discovery::pkarr::dht::DhtDiscovery, endpoint::get_remote_node_id, Endpoint, NodeId};
use iroh::{discovery::pkarr::dht::DhtDiscovery, Endpoint, NodeId};
use iroh_blobs::util::fs::load_secret_key;
use iroh_mainline_content_discovery::protocol::ALPN;
use iroh_mainline_tracker::{
Expand All @@ -24,8 +24,6 @@ use iroh_mainline_tracker::{

use crate::args::Args;

use iroh_mainline_tracker::tracker::get_alpn;

static VERBOSE: AtomicBool = AtomicBool::new(false);

fn set_verbose(verbose: bool) {
Expand Down Expand Up @@ -82,11 +80,11 @@ async fn create_endpoint(

/// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol.
pub async fn accept_conn(
mut conn: iroh_quinn::Connecting,
) -> anyhow::Result<(NodeId, String, iroh_quinn::Connection)> {
let alpn = get_alpn(&mut conn).await?;
mut conn: iroh::endpoint::Connecting,
) -> anyhow::Result<(NodeId, String, iroh::endpoint::Connection)> {
let alpn = String::from_utf8(conn.alpn().await?)?;
let conn = conn.await?;
let peer_id = get_remote_node_id(&conn)?;
let peer_id = conn.remote_node_id()?;
Ok((peer_id, alpn, conn))
}

Expand Down
Loading