From 8a1131ae922c9809fafc4f903bce9193dfa56725 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Wed, 8 Nov 2023 09:01:25 -0500 Subject: [PATCH] fix: share HTTP client between `DeploymentClient`s Ideally, a single `reqwest::Client` is reused so that they use the same connection pool. This also provides a clearer point to set timeout policies, etc. and removes an unused error case. --- common/src/allocations/monitor.rs | 4 +- common/src/attestations/dispute_manager.rs | 4 +- common/src/escrow_accounts.rs | 20 +++--- common/src/subgraph_client/client.rs | 35 +++++----- service/src/main.rs | 76 +++++++++++----------- 5 files changed, 69 insertions(+), 70 deletions(-) diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index c26d08a1..d1042990 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -203,6 +203,7 @@ mod test { // Set up a mock network subgraph let mock_server = MockServer::start().await; let network_subgraph = SubgraphClient::new( + reqwest::Client::new(), None, DeploymentDetails::for_query_url(&format!( "{}/subgraphs/id/{}", @@ -210,8 +211,7 @@ mod test { *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT )) .unwrap(), - ) - .unwrap(); + ); // Mock result for current epoch requests mock_server diff --git a/common/src/attestations/dispute_manager.rs b/common/src/attestations/dispute_manager.rs index 5a071b6d..bb86e8ff 100644 --- a/common/src/attestations/dispute_manager.rs +++ b/common/src/attestations/dispute_manager.rs @@ -99,6 +99,7 @@ mod test { // Set up a mock network subgraph let mock_server = MockServer::start().await; let network_subgraph = SubgraphClient::new( + reqwest::Client::new(), None, DeploymentDetails::for_query_url(&format!( "{}/subgraphs/id/{}", @@ -106,8 +107,7 @@ mod test { *test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT )) .unwrap(), - ) - .unwrap(); + ); // Mock result for current epoch requests mock_server diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 3d91cd4d..09f81d7d 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -127,18 +127,16 @@ mod tests { async fn test_current_accounts() { // Set up a mock escrow subgraph let mock_server = MockServer::start().await; - let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new( - None, - DeploymentDetails::for_query_url(&format!( - "{}/subgraphs/id/{}", - &mock_server.uri(), - *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT - )) - .unwrap(), - ) + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url(&format!( + "{}/subgraphs/id/{}", + &mock_server.uri(), + *test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT + )) .unwrap(), - )); + ))); let mock = Mock::given(method("POST")) .and(path(format!( diff --git a/common/src/subgraph_client/client.rs b/common/src/subgraph_client/client.rs index d893a6c2..c14dd347 100644 --- a/common/src/subgraph_client/client.rs +++ b/common/src/subgraph_client/client.rs @@ -41,13 +41,15 @@ impl DeploymentDetails { } struct DeploymentClient { + pub http_client: reqwest::Client, pub status: Option>, pub query_url: Url, } impl DeploymentClient { - pub fn new(details: DeploymentDetails) -> Self { + pub fn new(http_client: reqwest::Client, details: DeploymentDetails) -> Self { Self { + http_client, status: details .deployment .zip(details.status_url) @@ -71,7 +73,8 @@ impl DeploymentClient { } } - Ok(reqwest::Client::new() + Ok(self + .http_client .post(self.query_url.as_ref()) .json(body) .header(header::USER_AGENT, "indexer-common") @@ -92,16 +95,14 @@ pub struct SubgraphClient { impl SubgraphClient { pub fn new( + http_client: reqwest::Client, local_deployment: Option, remote_deployment: DeploymentDetails, - ) -> Result { - let local_client = local_deployment.map(DeploymentClient::new); - let remote_client = DeploymentClient::new(remote_deployment); - - Ok(Self { - local_client, - remote_client, - }) + ) -> Self { + Self { + local_client: local_deployment.map(|d| DeploymentClient::new(http_client.clone(), d)), + remote_client: DeploymentClient::new(http_client, remote_deployment), + } } pub async fn query Deserialize<'de>>( @@ -173,10 +174,10 @@ mod test { fn network_subgraph_client() -> SubgraphClient { SubgraphClient::new( + reqwest::Client::new(), None, DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(), ) - .unwrap() } #[tokio::test] @@ -253,6 +254,7 @@ mod test { // Create the subgraph client let client = SubgraphClient::new( + reqwest::Client::new(), Some(DeploymentDetails::for_graph_node(&mock_server_local.uri(), deployment).unwrap()), DeploymentDetails::for_query_url(&format!( "{}/subgraphs/id/{}", @@ -260,8 +262,7 @@ mod test { deployment )) .unwrap(), - ) - .unwrap(); + ); // Query the subgraph let response: Response = client @@ -325,6 +326,7 @@ mod test { // Create the subgraph client let client = SubgraphClient::new( + reqwest::Client::new(), Some(DeploymentDetails::for_graph_node(&mock_server_local.uri(), deployment).unwrap()), DeploymentDetails::for_query_url(&format!( "{}/subgraphs/id/{}", @@ -332,8 +334,7 @@ mod test { deployment )) .unwrap(), - ) - .unwrap(); + ); // Query the subgraph let response: Response = client @@ -397,6 +398,7 @@ mod test { // Create the subgraph client let client = SubgraphClient::new( + reqwest::Client::new(), Some(DeploymentDetails::for_graph_node(&mock_server_local.uri(), deployment).unwrap()), DeploymentDetails::for_query_url(&format!( "{}/subgraphs/id/{}", @@ -404,8 +406,7 @@ mod test { deployment )) .unwrap(), - ) - .unwrap(); + ); // Query the subgraph let response: Response = client diff --git a/service/src/main.rs b/service/src/main.rs index 31fbcd6b..4ba70d5f 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -61,6 +61,12 @@ async fn main() -> Result<(), std::io::Error> { &config.indexer_infrastructure.graph_node_query_endpoint, ); + let http_client = reqwest::Client::builder() + .tcp_nodelay(true) + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to init HTTP client"); + // Make an instance of network subgraph at either // graph_node_query_endpoint/subgraphs/id/network_subgraph_deployment // or network_subgraph_endpoint @@ -69,26 +75,22 @@ async fn main() -> Result<(), std::io::Error> { // a static lifetime, which avoids having to pass around and clone `Arc` // objects everywhere. Since the network subgraph is read-only, this is // no problem. - let network_subgraph = Box::leak(Box::new( - SubgraphClient::new( - config - .network_subgraph - .network_subgraph_deployment - .map(|deployment| { - DeploymentDetails::for_graph_node( - &config.indexer_infrastructure.graph_node_query_endpoint, - deployment, - ) - }) - .transpose() - .expect( - "Failed to parse graph node query endpoint and network subgraph deployment", - ), - DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint) - .expect("Failed to parse network subgraph endpoint"), - ) - .expect("Failed to set up network subgraph client"), - )); + let network_subgraph = Box::leak(Box::new(SubgraphClient::new( + http_client.clone(), + config + .network_subgraph + .network_subgraph_deployment + .map(|deployment| { + DeploymentDetails::for_graph_node( + &config.indexer_infrastructure.graph_node_query_endpoint, + deployment, + ) + }) + .transpose() + .expect("Failed to parse graph node query endpoint and network subgraph deployment"), + DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint) + .expect("Failed to parse network subgraph endpoint"), + ))); let indexer_allocations = indexer_allocations( network_subgraph, @@ -119,24 +121,22 @@ async fn main() -> Result<(), std::io::Error> { // assume the models are up to date in the service. let indexer_management_db = database::connect(&config.postgres).await; - let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new( - config - .escrow_subgraph - .escrow_subgraph_deployment - .map(|deployment| { - DeploymentDetails::for_graph_node( - &config.indexer_infrastructure.graph_node_query_endpoint, - deployment, - ) - }) - .transpose() - .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), - DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint) - .expect("Failed to parse escrow subgraph endpoint"), - ) - .expect("Failed to set up escrow subgraph client"), - )); + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + http_client, + config + .escrow_subgraph + .escrow_subgraph_deployment + .map(|deployment| { + DeploymentDetails::for_graph_node( + &config.indexer_infrastructure.graph_node_query_endpoint, + deployment, + ) + }) + .transpose() + .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), + DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint) + .expect("Failed to parse escrow subgraph endpoint"), + ))); let escrow_accounts = escrow_accounts( escrow_subgraph,