diff --git a/Cargo.lock b/Cargo.lock index f8878b559..84a722d4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -466,7 +466,7 @@ checksum = "c91ac174c05670edffb720bc376b9d4c274c3d127ac08ed3d38144c9415502cd" dependencies = [ "async-graphql 4.0.16", "async-trait", - "axum", + "axum 0.5.17", "bytes", "futures-util", "http-body", @@ -580,9 +580,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -673,7 +673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.2.9", "base64 0.13.1", "bitflags 1.3.2", "bytes", @@ -683,7 +683,7 @@ dependencies = [ "http-body", "hyper", "itoa", - "matchit", + "matchit 0.5.0", "memchr", "mime", "percent-encoding", @@ -701,6 +701,39 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core 0.3.4", + "bitflags 1.3.2", + "bytes", + "futures-util", + "headers", + "http", + "http-body", + "hyper", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.2.9" @@ -717,6 +750,23 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -746,9 +796,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.2" +version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" +checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "base64ct" @@ -1151,7 +1201,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b949a1c63fb7eb591eb7ba438746326aedf0ae843e51ec92ba6bec5bb382c4f" dependencies = [ - "base64 0.21.2", + "base64 0.21.4", "bech32", "bs58 0.4.0", "digest 0.10.7", @@ -1704,7 +1754,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0be7b2ac146c1f99fe245c02d16af0696450d8e06c135db75e10eeb9e642c20d" dependencies = [ - "base64 0.21.2", + "base64 0.21.4", "bytes", "hex", "k256", @@ -2005,7 +2055,7 @@ checksum = "6838fa110e57d572336178b7c79e94ff88ef976306852d8cb87d9e5b1fc7c0b5" dependencies = [ "async-trait", "auto_impl", - "base64 0.21.2", + "base64 0.21.4", "bytes", "const-hex", "enr", @@ -2584,12 +2634,11 @@ dependencies = [ [[package]] name = "headers" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ - "base64 0.13.1", - "bitflags 1.3.2", + "base64 0.21.4", "bytes", "headers-core", "http", @@ -2607,6 +2656,17 @@ dependencies = [ "http", ] +[[package]] +name = "headers-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f33cf300c485e3cbcba0235013fcc768723451c9b84d1b31aa7fec0491ac9a11" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "heck" version = "0.4.1" @@ -2881,12 +2941,16 @@ dependencies = [ "alloy-sol-types", "anyhow", "arc-swap", + "async-trait", + "axum 0.6.20", "env_logger", "ethers", "ethers-core", "eventuals", "faux", "graphql", + "headers", + "headers-derive", "keccak-hash", "lazy_static", "log", @@ -2899,6 +2963,7 @@ dependencies = [ "sqlx", "tap_core", "test-log", + "thiserror", "tokio", "toolshed", "wiremock", @@ -3027,7 +3092,7 @@ version = "8.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ - "base64 0.21.2", + "base64 0.21.4", "pem", "ring", "serde", @@ -3189,6 +3254,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -4332,7 +4403,7 @@ version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ - "base64 0.21.2", + "base64 0.21.4", "bytes", "encoding_rs", "futures-core", @@ -4626,7 +4697,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64 0.21.2", + "base64 0.21.4", ] [[package]] @@ -4894,6 +4965,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -4932,7 +5013,7 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" dependencies = [ - "base64 0.21.2", + "base64 0.21.4", "chrono", "hex", "indexmap 1.9.3", @@ -4965,7 +5046,7 @@ dependencies = [ "async-graphql 4.0.16", "async-graphql-axum", "autometrics", - "axum", + "axum 0.5.17", "cargo-husky", "clap", "confy", @@ -5320,7 +5401,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" dependencies = [ "atoi", - "base64 0.21.2", + "base64 0.21.4", "bigdecimal", "bitflags 2.4.0", "byteorder", @@ -5365,7 +5446,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" dependencies = [ "atoi", - "base64 0.21.2", + "base64 0.21.4", "bigdecimal", "bitflags 2.4.0", "byteorder", @@ -6601,7 +6682,7 @@ checksum = "c6f71803d3a1c80377a06221e0530be02035d5b3e854af56c6ece7ac20ac441d" dependencies = [ "assert-json-diff", "async-trait", - "base64 0.21.2", + "base64 0.21.4", "deadpool", "futures", "futures-timer", diff --git a/common/Cargo.toml b/common/Cargo.toml index 0afa7b84d..a62a0b356 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -34,6 +34,11 @@ toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main", f ] } graphql = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } tap_core = "0.6.0" +axum = { version = "0.6.20", default_features = true, features = ["headers"] } +thiserror = "1.0.49" +async-trait = "0.1.74" +headers-derive = "0.1.1" +headers = "0.3.9" [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs new file mode 100644 index 000000000..a408c4bb9 --- /dev/null +++ b/common/src/indexer_service/http/config.rs @@ -0,0 +1,49 @@ +use std::net::SocketAddr; + +use alloy_primitives::Address; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct DatabaseConfig { + pub postgres_url: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct NetworkSubgraphConfig { + pub query_url: String, + pub syncing_interval: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct EscrowSubgraphConfig { + pub query_url: String, + pub syncing_interval: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ServerConfig { + pub host_and_port: SocketAddr, + pub url_prefix: String, + pub free_query_auth_token: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct IndexerServiceConfig { + pub indexer: IndexerConfig, + pub server: ServerConfig, + pub database: DatabaseConfig, + pub network_subgraph: NetworkSubgraphConfig, + pub escrow_subgraph: EscrowSubgraphConfig, + pub graph_network: GraphNetworkConfig, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct GraphNetworkConfig { + pub id: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct IndexerConfig { + pub indexer_address: Address, + pub operator_mnemonic: String, +} diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs new file mode 100644 index 000000000..fb3935b5e --- /dev/null +++ b/common/src/indexer_service/http/indexer_service.rs @@ -0,0 +1,231 @@ +use std::{collections::HashMap, fmt::Debug, path::PathBuf, sync::Arc, time::Duration}; + +use alloy_primitives::Address; +use alloy_sol_types::eip712_domain; +use anyhow; +use axum::{ + async_trait, + body::Body, + response::{IntoResponse, Response}, + routing::{get, post}, + Router, Server, +}; +use eventuals::Eventual; +use reqwest::StatusCode; +use serde::{de::DeserializeOwned, Serialize}; +use sqlx::postgres::PgPoolOptions; +use thiserror::Error; +use toolshed::thegraph::DeploymentId; + +use crate::{ + prelude::{ + attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, + AttestationSigner, SubgraphClient, + }, + tap_manager::TapManager, +}; + +use super::{request_handler::request_handler, IndexerServiceConfig}; + +pub trait IsAttestable { + fn is_attestable(&self) -> bool; +} + +#[async_trait] +pub trait IndexerServiceImpl { + type Error: std::error::Error; + type Request: DeserializeOwned + Send + Debug + Serialize; + type Response: IntoResponse + Serialize + IsAttestable; + type State: Send + Sync; + + async fn process_request( + &self, + manifest_id: DeploymentId, + request: Self::Request, + ) -> Result<(Self::Request, Self::Response), Self::Error>; +} + +#[derive(Debug, Error)] +pub enum IndexerServiceError +where + E: std::error::Error, +{ + #[error("No receipt provided with the request")] + NoReceipt, + #[error("Issues with provided receipt: {0}")] + ReceiptError(anyhow::Error), + #[error("Service is not ready yet, try again in a moment")] + ServiceNotReady, + #[error("No attestation signer found for allocation `{0}`")] + NoSignerForAllocation(Address), + #[error("No attestation signer found for manifest `{0}`")] + NoSignerForManifest(DeploymentId), + #[error("Invalid request body: {0}")] + InvalidRequest(anyhow::Error), + #[error("Error while processing the request: {0}")] + ProcessingError(E), + #[error("No receipt or free query auth token provided")] + Unauthorized, + #[error("Invalid free query auth token: {0}")] + InvalidFreeQueryAuthToken(String), + #[error("Failed to sign attestation")] + FailedToSignAttestation, + #[error("Failed to provide attestation")] + FailedToProvideAttestation, + #[error("Failed to provide response")] + FailedToProvideResponse, +} + +impl From<&IndexerServiceError> for StatusCode +where + E: std::error::Error, +{ + fn from(err: &IndexerServiceError) -> Self { + use IndexerServiceError::*; + + match err { + ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE, + + NoReceipt => StatusCode::PAYMENT_REQUIRED, + + Unauthorized => StatusCode::UNAUTHORIZED, + + NoSignerForAllocation(_) => StatusCode::INTERNAL_SERVER_ERROR, + NoSignerForManifest(_) => StatusCode::INTERNAL_SERVER_ERROR, + FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, + FailedToProvideAttestation => StatusCode::INTERNAL_SERVER_ERROR, + FailedToProvideResponse => StatusCode::INTERNAL_SERVER_ERROR, + + ReceiptError(_) => StatusCode::BAD_REQUEST, + InvalidRequest(_) => StatusCode::BAD_REQUEST, + InvalidFreeQueryAuthToken(_) => StatusCode::BAD_REQUEST, + ProcessingError(_) => StatusCode::BAD_REQUEST, + } + } +} + +// Tell axum how to convert `RpcError` into a response. +impl IntoResponse for IndexerServiceError +where + E: std::error::Error, +{ + fn into_response(self) -> Response { + (StatusCode::from(&self), self.to_string()).into_response() + } +} + +pub struct IndexerServiceOptions +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + pub service_impl: I, + pub config: IndexerServiceConfig, + pub extra_routes: Router>, Body>, +} + +pub struct IndexerServiceState +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + pub config: IndexerServiceConfig, + pub attestation_signers: Eventual>, + pub tap_manager: TapManager, + pub service_impl: Arc, +} + +pub struct IndexerService {} + +impl IndexerService { + pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> + where + I: IndexerServiceImpl + Sync + Send + 'static, + { + let network_subgraph = Box::leak(Box::new(SubgraphClient::new( + "network-subgraph", + &options.config.network_subgraph.query_url, + )?)); + + // Identify the dispute manager for the configured network + let dispute_manager = dispute_manager( + network_subgraph, + options.config.graph_network.id, + Duration::from_secs(3600), + ); + + // Monitor the indexer's own allocations + let allocations = indexer_allocations( + network_subgraph, + options.config.indexer.indexer_address, + options.config.graph_network.id, + Duration::from_secs(options.config.network_subgraph.syncing_interval), + ); + + // Maintain an up-to-date set of attestation signers, one for each + // allocation + let attestation_signers = attestation_signers( + allocations.clone(), + options.config.indexer.operator_mnemonic.clone(), + options.config.graph_network.id.into(), + dispute_manager, + ); + + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + "escrow-subgraph", + &options.config.escrow_subgraph.query_url, + )?)); + + let escrow_accounts = escrow_accounts( + escrow_subgraph, + options.config.indexer.indexer_address, + Duration::from_secs(options.config.escrow_subgraph.syncing_interval), + ); + + // Establish Database connection necessary for serving indexer management + // requests with defined schema + // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models + // which defaults to files in "./migrations" to sync the database; + // however, this can cause conflicts with the migrations run by indexer + // agent. Hence we leave syncing and migrating entirely to the agent and + // assume the models are up to date in the service. + let database = PgPoolOptions::new() + .max_connections(50) + .acquire_timeout(Duration::from_secs(30)) + .connect(&options.config.database.postgres_url) + .await?; + + let tap_manager = TapManager::new( + database, + allocations, + escrow_accounts, + // TODO: arguments for eip712_domain should be a config + eip712_domain! { + name: "TapManager", + version: "1", + verifying_contract: options.config.indexer.indexer_address, + }, + ); + + let state = Arc::new(IndexerServiceState { + config: options.config.clone(), + attestation_signers, + tap_manager, + service_impl: Arc::new(options.service_impl), + }); + + let router = Router::new() + .route("/", get("Service is up and running")) + .route( + PathBuf::from(options.config.server.url_prefix) + .join("manifests/:id") + .to_str() + .expect("Failed to set up `/manifest/:id` route"), + post(request_handler::), + ) + .merge(options.extra_routes) + .with_state(state); + + Ok(Server::bind(&options.config.server.host_and_port) + .serve(router.into_make_service()) + .await?) + } +} diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs new file mode 100644 index 000000000..26ab0f838 --- /dev/null +++ b/common/src/indexer_service/http/mod.rs @@ -0,0 +1,12 @@ +mod config; +mod indexer_service; +mod request_handler; +mod scalar_receipt_header; + +pub use config::{ + DatabaseConfig, EscrowSubgraphConfig, GraphNetworkConfig, IndexerConfig, IndexerServiceConfig, + NetworkSubgraphConfig, ServerConfig, +}; +pub use indexer_service::{ + IndexerService, IndexerServiceImpl, IndexerServiceOptions, IsAttestable, +}; diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs new file mode 100644 index 000000000..80a3e3d59 --- /dev/null +++ b/common/src/indexer_service/http/request_handler.rs @@ -0,0 +1,99 @@ +use std::sync::Arc; + +use axum::{ + body::Bytes, + extract::{Path, State}, + http::{HeaderMap, HeaderValue}, + response::IntoResponse, + TypedHeader, +}; +use log::info; +use reqwest::StatusCode; +use toolshed::thegraph::DeploymentId; + +use crate::{indexer_service::http::IsAttestable, prelude::AttestationSigner}; + +use super::{ + indexer_service::{IndexerServiceError, IndexerServiceState}, + scalar_receipt_header::ScalarReceipt, + IndexerServiceImpl, +}; + +pub async fn request_handler( + Path(manifest_id): Path, + TypedHeader(receipt): TypedHeader, + State(state): State>>, + headers: HeaderMap, + body: Bytes, +) -> Result> +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + info!("Handling request for deployment `{manifest_id}`"); + + let request = + serde_json::from_slice(&body).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?; + + let mut attestation_signer: Option = None; + + if let Some(receipt) = receipt.into_signed_receipt() { + let allocation_id = receipt.message.allocation_id; + + // Verify the receipt and store it in the database + state + .tap_manager + .verify_and_store_receipt(receipt) + .await + .map_err(IndexerServiceError::ReceiptError)?; + + // Check if we have an attestation signer for the allocation the receipt was created for + let signers = state + .attestation_signers + .value_immediate() + .ok_or_else(|| IndexerServiceError::ServiceNotReady)?; + + attestation_signer = Some( + signers + .get(&allocation_id) + .map(|signer| signer.clone()) + .ok_or_else(|| (IndexerServiceError::NoSignerForAllocation(allocation_id)))?, + ); + } else if state.config.server.free_query_auth_token.is_some() + && state.config.server.free_query_auth_token + != headers + .get("free-query-auth-token") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + { + return Err(IndexerServiceError::Unauthorized); + } + + let (request, response) = state + .service_impl + .process_request(manifest_id, request) + .await + .map_err(IndexerServiceError::ProcessingError)?; + + let attestation = match (response.is_attestable(), attestation_signer) { + (false, _) => None, + (true, None) => return Err(IndexerServiceError::NoSignerForManifest(manifest_id)), + (true, Some(signer)) => { + let req = serde_json::to_string(&request) + .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; + let res = serde_json::to_string(&response) + .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; + Some(signer.create_attestation(&req, &res)) + } + }; + + let mut headers = HeaderMap::new(); + if let Some(attestation) = attestation { + let raw_attestation = serde_json::to_string(&attestation) + .map_err(|_| IndexerServiceError::FailedToProvideAttestation)?; + let header_value = HeaderValue::from_str(&raw_attestation) + .map_err(|_| IndexerServiceError::FailedToProvideAttestation)?; + headers.insert("graph-attestation", header_value); + } + + Ok((StatusCode::OK, HeaderMap::from(headers), response)) +} diff --git a/common/src/indexer_service/http/scalar_receipt_header.rs b/common/src/indexer_service/http/scalar_receipt_header.rs new file mode 100644 index 000000000..c8398eef0 --- /dev/null +++ b/common/src/indexer_service/http/scalar_receipt_header.rs @@ -0,0 +1,54 @@ +use std::ops::Deref; + +use headers::{Header, HeaderName, HeaderValue}; +use lazy_static::lazy_static; +use tap_core::tap_manager::SignedReceipt; + +pub struct ScalarReceipt(Option); + +impl ScalarReceipt { + pub fn into_signed_receipt(self) -> Option { + self.0 + } +} + +impl Deref for ScalarReceipt { + type Target = Option; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +lazy_static! { + static ref SCALAR_RECEIPT: HeaderName = HeaderName::from_static("scalar-receipt"); +} + +impl Header for ScalarReceipt { + fn name() -> &'static HeaderName { + &SCALAR_RECEIPT + } + + fn decode<'i, I>(values: &mut I) -> Result + where + I: Iterator, + { + let value = values.next(); + let raw_receipt = value + .map(|value| value.to_str()) + .transpose() + .map_err(|_| headers::Error::invalid())?; + let parsed_receipt = raw_receipt + .map(serde_json::from_str) + .transpose() + .map_err(|_| headers::Error::invalid())?; + Ok(ScalarReceipt(parsed_receipt)) + } + + fn encode(&self, _values: &mut E) + where + E: Extend, + { + unimplemented!() + } +} diff --git a/common/src/indexer_service/mod.rs b/common/src/indexer_service/mod.rs new file mode 100644 index 000000000..3883215fc --- /dev/null +++ b/common/src/indexer_service/mod.rs @@ -0,0 +1 @@ +pub mod http; diff --git a/common/src/lib.rs b/common/src/lib.rs index 7bbc5e08f..854a54263 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -5,6 +5,7 @@ pub mod allocations; pub mod attestations; pub mod escrow_accounts; pub mod graphql; +pub mod indexer_service; pub mod signature_verification; pub mod subgraph_client; pub mod tap_manager; diff --git a/common/src/subgraph_client.rs b/common/src/subgraph_client.rs index 23fce374a..0299c959f 100644 --- a/common/src/subgraph_client.rs +++ b/common/src/subgraph_client.rs @@ -8,7 +8,6 @@ use graphql::http::Response; use reqwest::{header, Client, Url}; use serde::de::Deserialize; use serde_json::Value; -use toolshed::thegraph::DeploymentId; /// Network subgraph query wrapper /// @@ -20,48 +19,29 @@ pub struct SubgraphClient { } impl SubgraphClient { - pub fn new( - graph_node_query_endpoint: Option<&str>, - deployment: Option<&DeploymentId>, - subgraph_url: &str, - ) -> Result { - // TODO: Check indexing status of the local subgraph deployment - // if the deployment is healthy and synced, use local_subgraoh_endpoint - let _local_subgraph_endpoint = match (graph_node_query_endpoint, deployment) { - (Some(endpoint), Some(id)) => Some(Self::local_deployment_endpoint(endpoint, id)?), - _ => None, - }; - - let subgraph_url = Url::parse(subgraph_url) - .map_err(|e| anyhow!("Could not parse subgraph url `{}`: {}", subgraph_url, e))?; + pub fn new(name: &str, query_url: &str) -> Result { + let query_url = Url::parse(query_url).map_err(|e| { + anyhow!( + "Could not parse `{}` subgraph query URL `{}`: {}", + name, + query_url, + e + ) + })?; let client = reqwest::Client::builder() .user_agent("indexer-common") .build() - .expect("Could not build a client for the Graph Node query endpoint"); + .expect(&format!( + "Could not build a client for `{name}` subgraph query query URL `{query_url}" + )); Ok(Self { client, - subgraph_url: Arc::new(subgraph_url), + subgraph_url: Arc::new(query_url), }) } - pub fn local_deployment_endpoint( - graph_node_query_endpoint: &str, - deployment: &DeploymentId, - ) -> Result { - Url::parse(graph_node_query_endpoint) - .and_then(|u| u.join("/subgraphs/id/")) - .and_then(|u| u.join(&deployment.to_string())) - .map_err(|e| { - anyhow!( - "Could not parse Graph Node query endpoint for subgraph deployment `{}`: {}", - deployment, - e - ) - }) - } - pub async fn query Deserialize<'de>>( &self, body: &Value, @@ -117,12 +97,7 @@ mod test { } fn network_subgraph_client() -> SubgraphClient { - SubgraphClient::new( - Some(GRAPH_NODE_STATUS_ENDPOINT), - Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), - NETWORK_SUBGRAPH_URL, - ) - .unwrap() + SubgraphClient::new("network-subgraph", NETWORK_SUBGRAPH_URL).unwrap() } #[tokio::test]