Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

add a DHT mode param #298

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::num::NonZeroU16;
use std::path::PathBuf;
use structopt::StructOpt;

use ipfs::{Ipfs, IpfsOptions, IpfsTypes, UninitializedIpfs};
use ipfs::{DhtMode, Ipfs, IpfsOptions, IpfsTypes, UninitializedIpfs};
use ipfs_http::{config, v0};

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -134,8 +134,14 @@ fn main() {
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop");

rt.block_on(async move {
let opts: IpfsOptions =
IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false, None);
let opts: IpfsOptions = IpfsOptions::new(
home.clone().into(),
keypair,
Vec::new(),
false,
None,
DhtMode::Client,
);

let (ipfs, task): (Ipfs<ipfs::TestTypes>, _) = UninitializedIpfs::new(opts, None)
.await
Expand Down
45 changes: 28 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use self::dag::IpldDag;
pub use self::error::Error;
use self::ipns::Ipns;
pub use self::p2p::pubsub::{PubsubMessage, SubscriptionStream};
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
use self::p2p::{create_swarm, TSwarm};
pub use self::p2p::{Connection, ConnectionTarget};
pub use self::path::IpfsPath;
pub use self::repo::RepoTypes;
Expand Down Expand Up @@ -75,19 +75,31 @@ impl RepoTypes for TestTypes {
type TDataStore = repo::mem::MemDataStore;
}

#[derive(Clone, Copy, Debug, PartialEq)]
/// The way the IPFS node operates within the Kademlia DHT.
pub enum DhtMode {
/// As a client, able to discover peers and content.
Client,
/// As a server, able to both discover peers and data
/// and also provide them to the network's DHT.
Server,
}

/// Ipfs options
#[derive(Clone)]
pub struct IpfsOptions {
/// The path of the ipfs repo.
pub ipfs_path: PathBuf,
/// The keypair used with libp2p.
pub keypair: Keypair,
pub keypair: DebuggableKeypair<Keypair>,
/// Nodes dialed during startup.
pub bootstrap: Vec<(Multiaddr, PeerId)>,
/// Enables mdns for peer discovery when true.
pub mdns: bool,
/// Custom Kademlia protocol name.
pub kad_protocol: Option<String>,
/// DHT mode.
pub dht_mode: DhtMode,
}

impl fmt::Debug for IpfsOptions {
Expand All @@ -97,7 +109,7 @@ impl fmt::Debug for IpfsOptions {
fmt.debug_struct("IpfsOptions")
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.field("keypair", &self.keypair)
.field("mdns", &self.mdns)
.field("kad_protocol", &self.kad_protocol)
.finish()
Expand All @@ -109,18 +121,19 @@ impl IpfsOptions {
pub fn inmemory_with_generated_keys() -> Self {
Self {
ipfs_path: std::env::temp_dir().into(),
keypair: Keypair::generate_ed25519(),
keypair: DebuggableKeypair(Keypair::generate_ed25519()),
mdns: Default::default(),
bootstrap: Default::default(),
kad_protocol: Default::default(),
dht_mode: DhtMode::Client,
}
}
}

/// Workaround for libp2p::identity::Keypair missing a Debug impl, works with references and owned
/// keypairs.
#[derive(Clone)]
struct DebuggableKeypair<I: Borrow<Keypair>>(I);
pub struct DebuggableKeypair<I: Borrow<Keypair>>(I);

impl<I: Borrow<Keypair>> fmt::Debug for DebuggableKeypair<I> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -147,13 +160,15 @@ impl IpfsOptions {
bootstrap: Vec<(Multiaddr, PeerId)>,
mdns: bool,
kad_protocol: Option<String>,
dht_mode: DhtMode,
) -> Self {
Self {
ipfs_path,
keypair,
keypair: DebuggableKeypair(keypair),
bootstrap,
mdns,
kad_protocol,
dht_mode,
}
}
}
Expand Down Expand Up @@ -194,7 +209,7 @@ impl Default for IpfsOptions {
.join("rust-ipfs")
.join("config.json");
let config = ConfigFile::new(config_path).unwrap();
let keypair = config.secio_key_pair();
let keypair = DebuggableKeypair(config.secio_key_pair());
let bootstrap = config.bootstrap();

IpfsOptions {
Expand All @@ -203,6 +218,7 @@ impl Default for IpfsOptions {
bootstrap,
mdns: true,
kad_protocol: None,
dht_mode: DhtMode::Client,
}
}
}
Expand All @@ -221,8 +237,8 @@ impl<Types: IpfsTypes> Clone for Ipfs<Types> {
#[derive(Debug)]
pub struct IpfsInner<Types: IpfsTypes> {
pub span: Span,
options: IpfsOptions,
repo: Repo<Types>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
}

Expand Down Expand Up @@ -267,7 +283,6 @@ enum IpfsEvent {
pub struct UninitializedIpfs<Types: IpfsTypes> {
repo: Repo<Types>,
span: Span,
keys: Keypair,
options: IpfsOptions,
repo_events: Receiver<RepoEvent>,
}
Expand All @@ -282,13 +297,11 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
pub async fn new(options: IpfsOptions, span: Option<Span>) -> Self {
let repo_options = RepoOptions::from(&options);
let (repo, repo_events) = create_repo(repo_options);
let keys = options.keypair.clone();
let span = span.unwrap_or_else(|| tracing::trace_span!("ipfs"));

UninitializedIpfs {
repo,
span,
keys,
options,
repo_events,
}
Expand All @@ -306,24 +319,22 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
let UninitializedIpfs {
repo,
span,
keys,
repo_events,
..
options,
} = self;

repo.init().await?;

let (to_task, receiver) = channel::<IpfsEvent>(1);

let ipfs = Ipfs(Arc::new(IpfsInner {
options: options.clone(),
span,
repo,
keys: DebuggableKeypair(keys),
to_task,
}));

let swarm_options = SwarmOptions::from(&self.options);
let swarm = create_swarm(swarm_options, ipfs.clone()).await;
let swarm = create_swarm(ipfs.clone()).await;

let fut = IpfsFuture {
repo_events: repo_events.fuse(),
Expand Down Expand Up @@ -537,7 +548,7 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.send(IpfsEvent::GetAddresses(tx))
.await?;
let addresses = rx.await?;
Ok((self.keys.get_ref().public(), addresses))
Ok((self.options.keypair.get_ref().public(), addresses))
})
.await
}
Expand Down
25 changes: 12 additions & 13 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::pubsub::Pubsub;
use super::swarm::{Connection, ConnectionTarget, Disconnector, SwarmApi};
use crate::p2p::SwarmOptions;
use crate::repo::BlockPut;
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use crate::{Ipfs, IpfsTypes};
Expand Down Expand Up @@ -336,27 +335,28 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour

impl<Types: IpfsTypes> Behaviour<Types> {
/// Create a Kademlia behaviour with the IPFS bootstrap nodes.
pub async fn new(options: SwarmOptions, ipfs: Ipfs<Types>) -> Self {
info!("net: starting with peer id {}", options.peer_id);
pub async fn new(ipfs: Ipfs<Types>) -> Self {
let peer_id = ipfs.options.keypair.get_ref().public().into_peer_id();
info!("net: starting with peer id {}", peer_id);

let mdns = if options.mdns {
let mdns = if ipfs.options.mdns {
Some(Mdns::new().expect("Failed to create mDNS service"))
} else {
None
}
.into();

let store = MemoryStore::new(options.peer_id.to_owned());
let store = MemoryStore::new(peer_id.to_owned());

let mut kad_config = KademliaConfig::default();
kad_config.disjoint_query_paths(true);
kad_config.set_query_timeout(std::time::Duration::from_secs(300));
if let Some(protocol) = options.kad_protocol {
kad_config.set_protocol_name(protocol.into_bytes());
if let Some(ref protocol) = ipfs.options.kad_protocol {
kad_config.set_protocol_name(protocol.clone().into_bytes());
}
let mut kademlia = Kademlia::with_config(options.peer_id.to_owned(), store, kad_config);
let mut kademlia = Kademlia::with_config(peer_id.to_owned(), store, kad_config);

for (addr, peer_id) in &options.bootstrap {
for (addr, peer_id) in &ipfs.options.bootstrap {
kademlia.add_address(peer_id, addr.to_owned());
}

Expand All @@ -365,9 +365,9 @@ impl<Types: IpfsTypes> Behaviour<Types> {
let identify = Identify::new(
"/ipfs/0.1.0".into(),
"rust-ipfs".into(),
options.keypair.public(),
ipfs.options.keypair.get_ref().public(),
);
let pubsub = Pubsub::new(options.peer_id);
let pubsub = Pubsub::new(peer_id);
let swarm = SwarmApi::default();

Behaviour {
Expand Down Expand Up @@ -480,8 +480,7 @@ impl<Types: IpfsTypes> Behaviour<Types> {

/// Create a IPFS behaviour with the IPFS bootstrap nodes.
pub async fn build_behaviour<TIpfsTypes: IpfsTypes>(
options: SwarmOptions,
ipfs: Ipfs<TIpfsTypes>,
) -> Behaviour<TIpfsTypes> {
Behaviour::new(options, ipfs).await
Behaviour::new(ipfs).await
}
41 changes: 5 additions & 36 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! P2P handling for IPFS nodes.
use crate::{Ipfs, IpfsOptions, IpfsTypes};
use libp2p::identity::Keypair;
use crate::{Ipfs, IpfsTypes};
use libp2p::Swarm;
use libp2p::{Multiaddr, PeerId};
use tracing::Span;

mod behaviour;
Expand All @@ -14,46 +12,17 @@ pub use swarm::{Connection, ConnectionTarget};

pub type TSwarm<T> = Swarm<behaviour::Behaviour<T>>;

pub struct SwarmOptions {
pub keypair: Keypair,
pub peer_id: PeerId,
pub bootstrap: Vec<(Multiaddr, PeerId)>,
pub mdns: bool,
pub kad_protocol: Option<String>,
}

impl From<&IpfsOptions> for SwarmOptions {
fn from(options: &IpfsOptions) -> Self {
let keypair = options.keypair.clone();
let peer_id = keypair.public().into_peer_id();
let bootstrap = options.bootstrap.clone();
let mdns = options.mdns;
let kad_protocol = options.kad_protocol.clone();

SwarmOptions {
keypair,
peer_id,
bootstrap,
mdns,
kad_protocol,
}
}
}

/// Creates a new IPFS swarm.
pub async fn create_swarm<TIpfsTypes: IpfsTypes>(
options: SwarmOptions,
ipfs: Ipfs<TIpfsTypes>,
) -> TSwarm<TIpfsTypes> {
let peer_id = options.peer_id.clone();
pub async fn create_swarm<TIpfsTypes: IpfsTypes>(ipfs: Ipfs<TIpfsTypes>) -> TSwarm<TIpfsTypes> {
let peer_id = ipfs.options.keypair.get_ref().public().into_peer_id();

// Set up an encrypted TCP transport over the Mplex protocol.
let transport = transport::build_transport(options.keypair.clone());
let transport = transport::build_transport(ipfs.options.keypair.get_ref().clone());

let swarm_span = ipfs.0.span.clone();

// Create a Kademlia behaviour
let behaviour = behaviour::build_behaviour(options, ipfs).await;
let behaviour = behaviour::build_behaviour(ipfs).await;

// Create a Swarm
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)
Expand Down