Skip to content

Commit

Permalink
IPFS: use IPFS Gateway API (#5600)
Browse files Browse the repository at this point in the history
* ipfs: create new gateway and rpc clients

Introduces a new IPFS gateway client, refactoring the existing
RPC API client to reuse the new code. Also introduces some new
types and concepts to make working with IPFS easier.

* ipfs: use new ipfs types instead of old ones

Integrates the new IPFS types and clients into the existing codebase,
replacing the old types and client.
  • Loading branch information
isum authored Oct 1, 2024
1 parent 2509212 commit 90e949d
Show file tree
Hide file tree
Showing 28 changed files with 2,758 additions and 1,111 deletions.
410 changes: 77 additions & 333 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async-graphql-axum = "7.0.6"
axum = "0.7.5"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "env"] }
derivative = "2.2.0"
diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
diesel-dynamic-schema = "0.2.1"
Expand Down
4 changes: 0 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ anyhow = "1.0"

[dev-dependencies]
tower-test = { git = "https://github.com/tower-rs/tower.git" }
ipfs-api-backend-hyper = "0.6"
ipfs-api = { version = "0.17.0", features = [
"with-hyper-rustls",
], default-features = false }
uuid = { version = "1.9.1", features = ["v4"] }
105 changes: 45 additions & 60 deletions core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Error};
use bytes::Bytes;
use graph::futures03::future::BoxFuture;
use graph::{
derive::CheapClone,
ipfs_client::{CidFile, IpfsClient},
prelude::CheapClone,
};
use std::time::Duration;
use graph::ipfs::ContentPath;
use graph::ipfs::IpfsClient;
use graph::ipfs::IpfsError;
use graph::{derive::CheapClone, prelude::CheapClone};
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};

const CLOUDFLARE_TIMEOUT: u16 = 524;
const GATEWAY_TIMEOUT: u16 = 504;

pub type IpfsService = Buffer<CidFile, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>;

pub fn ipfs_service(
client: IpfsClient,
client: Arc<dyn IpfsClient>,
max_file_size: usize,
timeout: Duration,
rate_limit: u16,
) -> IpfsService {
let ipfs = IpfsServiceInner {
client,
max_file_size,
timeout,
max_file_size,
};

let svc = ServiceBuilder::new()
Expand All @@ -38,37 +36,30 @@ pub fn ipfs_service(

#[derive(Clone, CheapClone)]
struct IpfsServiceInner {
client: IpfsClient,
max_file_size: usize,
client: Arc<dyn IpfsClient>,
timeout: Duration,
max_file_size: usize,
}

impl IpfsServiceInner {
async fn call_inner(self, req: CidFile) -> Result<Option<Bytes>, Error> {
let CidFile { cid, path } = req;
let multihash = cid.hash().code();
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> {
let multihash = path.cid().hash().code();
if !SAFE_MULTIHASHES.contains(&multihash) {
return Err(anyhow!("CID multihash {} is not allowed", multihash));
}

let cid_str = match path {
Some(path) => format!("{}/{}", cid, path),
None => cid.to_string(),
};

let res = self
.client
.cat_all(&cid_str, Some(self.timeout), self.max_file_size)
.cat(&path, self.max_file_size, Some(self.timeout))
.await;

match res {
Ok(file_bytes) => Ok(Some(file_bytes)),
Err(e) => match e.status().map(|e| e.as_u16()) {
// Timeouts in IPFS mean the file is not available, so we return `None`
Some(GATEWAY_TIMEOUT) | Some(CLOUDFLARE_TIMEOUT) => return Ok(None),
_ if e.is_timeout() => return Ok(None),
_ => return Err(e.into()),
},
Err(IpfsError::RequestFailed(err)) if err.is_timeout() => {
// Timeouts in IPFS mean that the content is not available, so we return `None`.
Ok(None)
}
Err(err) => Err(err.into()),
}
}
}
Expand Down Expand Up @@ -96,48 +87,42 @@ const SAFE_MULTIHASHES: [u64; 15] = [

#[cfg(test)]
mod test {
use ipfs::IpfsApi;
use ipfs_api as ipfs;
use std::{fs, str::FromStr, time::Duration};
use std::time::Duration;

use graph::components::link_resolver::ArweaveClient;
use graph::components::link_resolver::ArweaveResolver;
use graph::data::value::Word;
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
use graph::ipfs::IpfsRpcClient;
use graph::ipfs::ServerAddress;
use graph::tokio;
use tower::ServiceExt;

use cid::Cid;
use graph::{
components::link_resolver::{ArweaveClient, ArweaveResolver},
data::value::Word,
ipfs_client::IpfsClient,
tokio,
};

use uuid::Uuid;

use super::*;

#[tokio::test]
async fn cat_file_in_folder() {
let path = "./tests/fixtures/ipfs_folder";
let uid = Uuid::new_v4().to_string();
fs::write(format!("{}/random.txt", path), &uid).unwrap();
let random_bytes = Uuid::new_v4().as_bytes().to_vec();
let ipfs_file = ("dir/file.txt", random_bytes.clone());

let cl: ipfs::IpfsClient = ipfs::IpfsClient::default();
let add_resp = add_files_to_local_ipfs_node_for_testing([ipfs_file])
.await
.unwrap();

let rsp = cl.add_path(path).await.unwrap();
let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash;

let ipfs_folder = rsp.iter().find(|rsp| rsp.name == "ipfs_folder").unwrap();
let client =
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
.unwrap()
.into_boxed();

let local = IpfsClient::localhost();
let cid = Cid::from_str(&ipfs_folder.hash).unwrap();
let file = "random.txt".to_string();
let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10);

let svc = super::ipfs_service(local, 100000, Duration::from_secs(5), 10);
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
let content = svc.oneshot(path).await.unwrap().unwrap();

let content = svc
.oneshot(super::CidFile {
cid,
path: Some(file),
})
.await
.unwrap()
.unwrap();
assert_eq!(content.to_vec(), uid.as_bytes().to_vec());
assert_eq!(content.to_vec(), random_bytes);
}

#[tokio::test]
Expand Down
6 changes: 3 additions & 3 deletions core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use graph::{
CausalityRegion, DataSource, DataSourceTemplate,
},
derive::CheapClone,
ipfs_client::CidFile,
ipfs::ContentPath,
prelude::{
BlockNumber, BlockPtr, BlockState, CancelGuard, CheapClone, DeploymentHash,
MetricsRegistry, RuntimeHostBuilder, SubgraphCountMetric, SubgraphInstanceMetrics,
Expand Down Expand Up @@ -228,8 +228,8 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
}

pub struct OffchainMonitor {
ipfs_monitor: PollingMonitor<CidFile>,
ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>,
ipfs_monitor: PollingMonitor<ContentPath>,
ipfs_monitor_rx: mpsc::UnboundedReceiver<(ContentPath, Bytes)>,
arweave_monitor: PollingMonitor<Base64>,
arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>,
}
Expand Down
4 changes: 3 additions & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ atomic_refcell = "0.1.13"
old_bigdecimal = { version = "=0.1.2", features = ["serde"], package = "bigdecimal" }
bytes = "1.0.1"
cid = "0.11.1"
derivative = { workspace = true }
graph_derive = { path = "./derive" }
diesel = { workspace = true }
diesel_derives = { workspace = true }
Expand Down Expand Up @@ -90,7 +91,7 @@ defer = "0.2"
# Our fork contains patches to make some fields optional for Celo and Fantom compatibility.
# Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`.
web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = [
"arbitrary_precision","test"
"arbitrary_precision", "test"
] }
serde_plain = "1.0.2"
csv = "1.3.0"
Expand All @@ -100,6 +101,7 @@ object_store = { version = "0.10.1", features = ["gcp"] }
clap.workspace = true
maplit = "1.0.2"
hex-literal = "0.4"
wiremock = "0.6.1"

[build-dependencies]
tonic-build = { workspace = true }
Loading

0 comments on commit 90e949d

Please sign in to comment.