From 39902a2d9498003480436dc25140bcb3ae654fe0 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Tue, 17 Oct 2023 17:15:05 -0500 Subject: [PATCH] refactor: generalize network subgraph into a subgraph client This way we can also use it for the escrow subgraph and others that we may need in the future. --- common/src/allocations/monitor.rs | 20 +++---- common/src/attestations/dispute_manager.rs | 18 ++++--- common/src/lib.rs | 4 +- .../mod.rs => subgraph_client.rs} | 54 ++++++++++--------- service/src/main.rs | 27 +++++----- service/src/server/mod.rs | 6 +-- 6 files changed, 71 insertions(+), 58 deletions(-) rename common/src/{network_subgraph/mod.rs => subgraph_client.rs} (70%) diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index c01b4ef8..caee3fb3 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -11,12 +11,12 @@ use serde::Deserialize; use serde_json::json; use tokio::time::sleep; -use crate::prelude::NetworkSubgraph; +use crate::prelude::SubgraphClient; use super::Allocation; async fn current_epoch( - network_subgraph: &'static NetworkSubgraph, + network_subgraph: &'static SubgraphClient, graph_network_id: u64, ) -> Result { // Types for deserializing the network subgraph response @@ -63,7 +63,7 @@ async fn current_epoch( /// An always up-to-date list of an indexer's active and recently closed allocations. pub fn indexer_allocations( - network_subgraph: &'static NetworkSubgraph, + network_subgraph: &'static SubgraphClient, indexer_address: Address, graph_network_id: u64, interval: Duration, @@ -195,22 +195,24 @@ mod test { Mock, MockServer, ResponseTemplate, }; - use crate::{prelude::NetworkSubgraph, test_vectors}; + use crate::{prelude::SubgraphClient, test_vectors}; use super::*; - async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) { + async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint( + let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint( &mock_server.uri(), &test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT, - ); - let network_subgraph = NetworkSubgraph::new( + ) + .unwrap(); + let network_subgraph = SubgraphClient::new( Some(&mock_server.uri()), Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), network_subgraph_endpoint.as_ref(), - ); + ) + .unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs index 796e76fc..5a15c10d 100644 --- a/common/src/attestations/dispute_manager.rs +++ b/common/src/attestations/dispute_manager.rs @@ -10,10 +10,10 @@ use serde::Deserialize; use serde_json::json; use tokio::time::sleep; -use crate::network_subgraph::NetworkSubgraph; +use crate::subgraph_client::SubgraphClient; pub fn dispute_manager( - network_subgraph: &'static NetworkSubgraph, + network_subgraph: &'static SubgraphClient, graph_network_id: u64, interval: Duration, ) -> Eventual
{ @@ -88,24 +88,26 @@ mod test { }; use crate::{ - prelude::NetworkSubgraph, + prelude::SubgraphClient, test_vectors::{self, DISPUTE_MANAGER_ADDRESS}, }; use super::*; - async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) { + async fn setup_mock_network_subgraph() -> (&'static SubgraphClient, MockServer) { // Set up a mock network subgraph let mock_server = MockServer::start().await; - let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint( + let network_subgraph_endpoint = SubgraphClient::local_deployment_endpoint( &mock_server.uri(), &test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT, - ); - let network_subgraph = NetworkSubgraph::new( + ) + .unwrap(); + let network_subgraph = SubgraphClient::new( Some(&mock_server.uri()), Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), network_subgraph_endpoint.as_ref(), - ); + ) + .unwrap(); // Mock result for current epoch requests mock_server diff --git a/common/src/lib.rs b/common/src/lib.rs index 980e182f..eeae801c 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -4,8 +4,8 @@ pub mod allocations; pub mod attestations; pub mod graphql; -pub mod network_subgraph; pub mod signature_verification; +pub mod subgraph_client; #[cfg(test)] mod test_vectors; @@ -17,5 +17,5 @@ pub mod prelude { pub use super::attestations::{ dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers, }; - pub use super::network_subgraph::NetworkSubgraph; + pub use super::subgraph_client::SubgraphClient; } diff --git a/common/src/network_subgraph/mod.rs b/common/src/subgraph_client.rs similarity index 70% rename from common/src/network_subgraph/mod.rs rename to common/src/subgraph_client.rs index 196ce1ce..23fce374 100644 --- a/common/src/network_subgraph/mod.rs +++ b/common/src/subgraph_client.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use anyhow::anyhow; use graphql::http::Response; use reqwest::{header, Client, Url}; use serde::de::Deserialize; @@ -13,48 +14,52 @@ use toolshed::thegraph::DeploymentId; /// /// This is Arc internally, so it can be cloned and shared between threads. #[derive(Debug, Clone)] -pub struct NetworkSubgraph { +pub struct SubgraphClient { client: Client, // it is Arc - network_subgraph_url: Arc, + subgraph_url: Arc, } -impl NetworkSubgraph { +impl SubgraphClient { pub fn new( graph_node_query_endpoint: Option<&str>, deployment: Option<&DeploymentId>, - network_subgraph_url: &str, - ) -> NetworkSubgraph { - //TODO: Check indexing status of the local network subgraph deployment - //if the deployment is healthy and synced, use local_network_subgraph_endpoint - let _local_network_subgraph_endpoint = match (graph_node_query_endpoint, deployment) { - (Some(endpoint), Some(id)) => { - Some(NetworkSubgraph::local_deployment_endpoint(endpoint, id)) - } + subgraph_url: &str, + ) -> Result { + // TODO: Check indexing status of the local subgraph deployment + // if the deployment is healthy and synced, use local_subgraoh_endpoint + let _local_subgraph_endpoint = match (graph_node_query_endpoint, deployment) { + (Some(endpoint), Some(id)) => Some(Self::local_deployment_endpoint(endpoint, id)?), _ => None, }; - let network_subgraph_url = - Url::parse(network_subgraph_url).expect("Could not parse network subgraph url"); + let subgraph_url = Url::parse(subgraph_url) + .map_err(|e| anyhow!("Could not parse subgraph url `{}`: {}", subgraph_url, e))?; let client = reqwest::Client::builder() - .user_agent("indexer-service") + .user_agent("indexer-common") .build() - .expect("Could not build a client to graph node query endpoint"); + .expect("Could not build a client for the Graph Node query endpoint"); - NetworkSubgraph { + Ok(Self { client, - network_subgraph_url: Arc::new(network_subgraph_url), - } + subgraph_url: Arc::new(subgraph_url), + }) } pub fn local_deployment_endpoint( graph_node_query_endpoint: &str, deployment: &DeploymentId, - ) -> Url { + ) -> Result { Url::parse(graph_node_query_endpoint) .and_then(|u| u.join("/subgraphs/id/")) .and_then(|u| u.join(&deployment.to_string())) - .expect("Could not parse graph node query endpoint for the network subgraph deployment") + .map_err(|e| { + anyhow!( + "Could not parse Graph Node query endpoint for subgraph deployment `{}`: {}", + deployment, + e + ) + }) } pub async fn query Deserialize<'de>>( @@ -62,7 +67,7 @@ impl NetworkSubgraph { body: &Value, ) -> Result, reqwest::Error> { self.client - .post(Url::clone(&self.network_subgraph_url)) + .post(Url::clone(&self.subgraph_url)) .json(body) .header(header::CONTENT_TYPE, "application/json") .send() @@ -111,12 +116,13 @@ mod test { mock_server } - fn network_subgraph() -> NetworkSubgraph { - NetworkSubgraph::new( + fn network_subgraph_client() -> SubgraphClient { + SubgraphClient::new( Some(GRAPH_NODE_STATUS_ENDPOINT), Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT), NETWORK_SUBGRAPH_URL, ) + .unwrap() } #[tokio::test] @@ -124,7 +130,7 @@ mod test { let _mock_server = mock_graph_node_server().await; // Check that the response is valid JSON - let result = network_subgraph() + let result = network_subgraph_client() .query::(&json!({ "query": r#" query { diff --git a/service/src/main.rs b/service/src/main.rs index ff3df1ce..9e165b6b 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -10,7 +10,7 @@ use toolshed::thegraph::DeploymentId; use tracing::info; use indexer_common::prelude::{ - attestation_signers, dispute_manager, indexer_allocations, NetworkSubgraph, + attestation_signers, dispute_manager, indexer_allocations, SubgraphClient, }; use util::{package_version, shutdown_signal}; @@ -68,17 +68,20 @@ async fn main() -> Result<(), std::io::Error> { // a static lifetime, which avoids having to pass around and clone `Arc` // objects everywhere. Since the network subgraph is read-only, this is // no problem. - let network_subgraph = Box::leak(Box::new(NetworkSubgraph::new( - Some(&config.indexer_infrastructure.graph_node_query_endpoint), - config - .network_subgraph - .network_subgraph_deployment - .map(|s| DeploymentId::from_str(&s)) - .transpose() - .expect("Failed to parse invalid network subgraph deployment") - .as_ref(), - &config.network_subgraph.network_subgraph_endpoint, - ))); + let network_subgraph = Box::leak(Box::new( + SubgraphClient::new( + Some(&config.indexer_infrastructure.graph_node_query_endpoint), + config + .network_subgraph + .network_subgraph_deployment + .map(|s| DeploymentId::from_str(&s)) + .transpose() + .expect("Failed to parse invalid network subgraph deployment") + .as_ref(), + &config.network_subgraph.network_subgraph_endpoint, + ) + .expect("Failed to set up network subgraph client"), + )); let indexer_allocations = indexer_allocations( network_subgraph, diff --git a/service/src/server/mod.rs b/service/src/server/mod.rs index 8d5cf9df..daebfb14 100644 --- a/service/src/server/mod.rs +++ b/service/src/server/mod.rs @@ -19,7 +19,7 @@ use tower_http::{ }; use tracing::Level; -use indexer_common::prelude::NetworkSubgraph; +use indexer_common::prelude::SubgraphClient; use crate::{ query_processor::QueryProcessor, @@ -38,7 +38,7 @@ pub struct ServerOptions { pub graph_node_status_endpoint: String, pub indexer_management_db: PgPool, pub operator_public_key: String, - pub network_subgraph: &'static NetworkSubgraph, + pub network_subgraph: &'static SubgraphClient, pub network_subgraph_auth_token: Option, pub serve_network_subgraph: bool, } @@ -53,7 +53,7 @@ impl ServerOptions { graph_node_status_endpoint: String, indexer_management_db: PgPool, operator_public_key: String, - network_subgraph: &'static NetworkSubgraph, + network_subgraph: &'static SubgraphClient, network_subgraph_auth_token: Option, serve_network_subgraph: bool, ) -> Self {