Skip to content

Commit

Permalink
feat: Accept Arc to allow shared blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Jul 24, 2024
1 parent 3494715 commit a47eafb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ categories = [

[dependencies]
asynchronous-codec = "0.7"
blockstore = "0.5"
blockstore = "0.6"
bytes = "1"
cid = "0.11"
fnv = "1.0.5"
Expand Down
7 changes: 4 additions & 3 deletions examples/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! cargo run --example=node -- -l 9898 bafkreiczsrdrvoybcevpzqmblh3my5fu6ui3tgag3jm3hsxvvhaxhswpyu
//! ```
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
Expand Down Expand Up @@ -71,12 +72,12 @@ async fn main() -> Result<()> {

let _guard = init_tracing();

let store = InMemoryBlockstore::new();
let blockstore = Arc::new(InMemoryBlockstore::new());
for preload_string in args.preload_blockstore_string {
let block = StringBlock(preload_string);
let cid = block.cid()?;
info!("inserted {cid} with content '{}'", block.0);
store.put_keyed(&cid, block.data()).await?;
blockstore.put_keyed(&cid, block.data()).await?;
}

let mut swarm = SwarmBuilder::with_new_identity()
Expand All @@ -91,7 +92,7 @@ async fn main() -> Result<()> {
"/ipfs/id/1.0.0".to_string(),
key.public(),
)),
bitswap: beetswap::Behaviour::new(store),
bitswap: beetswap::Behaviour::new(blockstore),
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
Expand Down
18 changes: 11 additions & 7 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ use crate::{Behaviour, Error, Result};
/// # Example
///
/// ```rust,no_run
/// # use std::sync::Arc;
/// # use blockstore::InMemoryBlockstore;
/// # fn new() -> beetswap::Behaviour<64, InMemoryBlockstore<64>> {
/// beetswap::Behaviour::builder(InMemoryBlockstore::new())
/// beetswap::Behaviour::builder(Arc::new(InMemoryBlockstore::new()))
/// .build()
/// # }
pub struct BehaviourBuilder<const S: usize, B>
where
B: Blockstore + 'static,
{
protocol_prefix: Option<String>,
blockstore: B,
blockstore: Arc<B>,
client: ClientConfig,
multihasher: MultihasherTable<S>,
}
Expand All @@ -33,7 +34,7 @@ where
B: Blockstore + 'static,
{
/// Creates a new builder for [`Behaviour`].
pub(crate) fn new(blockstore: B) -> Self {
pub(crate) fn new(blockstore: Arc<B>) -> Self {
BehaviourBuilder {
protocol_prefix: None,
blockstore,
Expand All @@ -55,10 +56,11 @@ where
/// # Example
///
/// ```rust
/// # use std::sync::Arc;
/// # use blockstore::InMemoryBlockstore;
/// # fn new() -> beetswap::Result<beetswap::Behaviour<64, InMemoryBlockstore<64>>> {
/// # Ok(
/// beetswap::Behaviour::builder(InMemoryBlockstore::new())
/// beetswap::Behaviour::builder(Arc::new(InMemoryBlockstore::new()))
/// .protocol_prefix("/celestia/celestia")?
/// .build()
/// # )
Expand All @@ -78,9 +80,10 @@ where
/// # Example
///
/// ```rust
/// # use std::sync::Arc;
/// # use blockstore::InMemoryBlockstore;
/// # fn new() -> beetswap::Behaviour<64, InMemoryBlockstore<64>> {
/// beetswap::Behaviour::builder(InMemoryBlockstore::new())
/// beetswap::Behaviour::builder(Arc::new(InMemoryBlockstore::new()))
/// .client_set_send_dont_have(false)
/// .build()
/// # }
Expand Down Expand Up @@ -111,7 +114,7 @@ where

/// Build a [`Behaviour`].
pub fn build(self) -> Behaviour<S, B> {
let blockstore = Arc::new(self.blockstore);
let blockstore = self.blockstore;
let multihasher = Arc::new(self.multihasher);
let protocol_prefix = self.protocol_prefix.as_deref();

Expand All @@ -133,7 +136,8 @@ mod tests {
#[test]
fn invalid_protocol_prefix() {
assert!(matches!(
BehaviourBuilder::<64, _>::new(InMemoryBlockstore::<64>::new()).protocol_prefix("foo"),
BehaviourBuilder::<64, _>::new(Arc::new(InMemoryBlockstore::<64>::new()))
.protocol_prefix("foo"),
Err(Error::InvalidProtocolPrefix(_))
));
}
Expand Down
32 changes: 19 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,9 @@ mod tests {
#[tokio::test]
async fn get_unknown_cid_responds_with_have() {
let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
let mut client =
Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new()));
let mut client = Swarm::new_ephemeral(|_| {
Behaviour::<64, _>::new(Arc::new(InMemoryBlockstore::<64>::new()))
});

let (mut server_control, mut server_incoming_streams) =
connect_to_server(&mut client, server).await;
Expand Down Expand Up @@ -788,8 +789,9 @@ mod tests {
async fn get_unknown_cid_responds_with_dont_have() {
let server1 = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
let server2 = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
let mut client =
Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new()));
let mut client = Swarm::new_ephemeral(|_| {
Behaviour::<64, _>::new(Arc::new(InMemoryBlockstore::<64>::new()))
});

let (mut server1_control, mut server1_incoming_streams) =
connect_to_server(&mut client, server1).await;
Expand Down Expand Up @@ -903,8 +905,9 @@ mod tests {
#[tokio::test]
async fn get_unknown_cid_responds_with_block() {
let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
let mut client =
Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new()));
let mut client = Swarm::new_ephemeral(|_| {
Behaviour::<64, _>::new(Arc::new(InMemoryBlockstore::<64>::new()))
});

let (mut server_control, mut server_incoming_streams) =
connect_to_server(&mut client, server).await;
Expand Down Expand Up @@ -983,8 +986,9 @@ mod tests {
#[tokio::test]
async fn update_wantlist() {
let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
let mut client =
Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new()));
let mut client = Swarm::new_ephemeral(|_| {
Behaviour::<64, _>::new(Arc::new(InMemoryBlockstore::<64>::new()))
});

let (_server_control, mut server_incoming_streams) =
connect_to_server(&mut client, server).await;
Expand Down Expand Up @@ -1060,8 +1064,9 @@ mod tests {
#[tokio::test]
async fn request_then_cancel() {
let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
let mut client =
Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new()));
let mut client = Swarm::new_ephemeral(|_| {
Behaviour::<64, _>::new(Arc::new(InMemoryBlockstore::<64>::new()))
});

let (_server_control, mut server_incoming_streams) =
connect_to_server(&mut client, server).await;
Expand Down Expand Up @@ -1129,8 +1134,9 @@ mod tests {
#[tokio::test]
async fn request_before_connect() {
let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
let mut client =
Swarm::new_ephemeral(|_| Behaviour::<64, _>::new(InMemoryBlockstore::<64>::new()));
let mut client = Swarm::new_ephemeral(|_| {
Behaviour::<64, _>::new(Arc::new(InMemoryBlockstore::<64>::new()))
});

let cid1 = cid_of_data(b"x1");
let cid2 = cid_of_data(b"x2");
Expand Down Expand Up @@ -1181,7 +1187,7 @@ mod tests {
let cid1 = cid_of_data(data1);
let cid2 = cid_of_data(b"x2");

let blockstore = InMemoryBlockstore::<64>::new();
let blockstore = Arc::new(InMemoryBlockstore::<64>::new());
blockstore.put_keyed(&cid1, data1).await.unwrap();

let server = Swarm::new_ephemeral(|_| libp2p_stream::Behaviour::new());
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ where
B: Blockstore + 'static,
{
/// Creates a new [`Behaviour`] with the default configuration.
pub fn new(blockstore: B) -> Behaviour<MAX_MULTIHASH_SIZE, B> {
pub fn new(blockstore: Arc<B>) -> Behaviour<MAX_MULTIHASH_SIZE, B> {
BehaviourBuilder::new(blockstore).build()
}

/// Creates a new [`BehaviourBuilder`].
pub fn builder(blockstore: B) -> BehaviourBuilder<MAX_MULTIHASH_SIZE, B> {
pub fn builder(blockstore: Arc<B>) -> BehaviourBuilder<MAX_MULTIHASH_SIZE, B> {
BehaviourBuilder::new(blockstore)
}

Expand Down
5 changes: 3 additions & 2 deletions tests/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::future::Future;
use std::sync::Arc;

use beetswap::{Error, Event, QueryId};
use blockstore::InMemoryBlockstore;
Expand Down Expand Up @@ -139,7 +140,7 @@ impl TestBitswapWorker {
}

pub async fn spawn_node(store: Option<InMemoryBlockstore<CID_SIZE>>) -> TestBitswapNode {
let store = store.unwrap_or_default();
let blockstore = Arc::new(store.unwrap_or_default());

let mut swarm = SwarmBuilder::with_new_identity()
.with_tokio()
Expand All @@ -149,7 +150,7 @@ pub async fn spawn_node(store: Option<InMemoryBlockstore<CID_SIZE>>) -> TestBits
libp2p_yamux::Config::default,
)
.unwrap()
.with_behaviour(|_key| beetswap::Behaviour::<CID_SIZE, _>::new(store))
.with_behaviour(|_key| beetswap::Behaviour::<CID_SIZE, _>::new(blockstore))
.unwrap()
.build();

Expand Down

0 comments on commit a47eafb

Please sign in to comment.