From b171261e58a6c800d937088fda2890d7257e04fe Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Fri, 29 Sep 2023 14:28:32 +0200 Subject: [PATCH] refactor: improve network subgraph querying Leverage serde and serde_json more for serializing queries and deserializing responses in particular. There are tools that go even further with typing GraphQL query and response types (see https://github.com/graphql-rust/graphql-client#getting-started) but my feeling is that would be overkill here. --- Cargo.lock | 140 +++++++++++++++++++++ common/Cargo.toml | 1 + common/src/allocations/monitor.rs | 182 ++++++++++++++------------- common/src/network_subgraph/mod.rs | 76 ++++------- common/src/types.rs | 11 +- service/src/escrow_monitor.rs | 38 +++--- service/src/server/routes/network.rs | 39 +++--- 7 files changed, 293 insertions(+), 194 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b6a26e3c..038b33d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,6 +139,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.3.2" @@ -221,6 +230,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "ascii" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" + [[package]] name = "ascii-canvas" version = "3.0.0" @@ -835,7 +850,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ "android-tzdata", + "iana-time-zone", "num-traits", + "serde", + "winapi", ] [[package]] @@ -961,6 +979,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680" +dependencies = [ + "ascii", + "byteorder", + "either", + "memchr", + "unreachable", +] + [[package]] name = "concurrent-queue" version = "2.2.0" @@ -1951,6 +1982,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "firestorm" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c5f6c2c942da57e2aaaa84b8a521489486f14e75e7fa91dab70aba913975f98" + [[package]] name = "fixed-hash" version = "0.8.0" @@ -2240,6 +2277,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "graphql-parser" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ebc8013b4426d5b81a4364c419a95ed0b404af2b82e2457de52d9348f0e474" +dependencies = [ + "combine", + "thiserror", +] + [[package]] name = "group" version = "0.13.0" @@ -2536,6 +2583,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2619,6 +2689,7 @@ dependencies = [ "serde_json", "test-log", "tokio", + "toolshed", "wiremock", ] @@ -2641,6 +2712,7 @@ checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", "hashbrown 0.14.0", + "serde", ] [[package]] @@ -4606,6 +4678,35 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" +dependencies = [ + "base64 0.21.2", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.0", + "serde", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e6be15c453eb305019bfa438b1593c731f36a289a7853f7707ee29e870b3b3c" +dependencies = [ + "darling 0.20.3", + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "service" version = "0.1.0" @@ -5518,6 +5619,21 @@ dependencies = [ "winnow", ] +[[package]] +name = "toolshed" +version = "0.2.2" +source = "git+https://github.com/edgeandnode/toolshed?tag=v0.2.2#5daca78935d9a9fc34216946d3f22c614d9dbeee" +dependencies = [ + "alloy-primitives", + "bs58 0.5.0", + "firestorm", + "graphql-parser", + "serde", + "serde_with", + "sha3", + "url", +] + [[package]] name = "tower" version = "0.4.13" @@ -5785,6 +5901,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unreachable" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" +dependencies = [ + "void", +] + [[package]] name = "untrusted" version = "0.7.1" @@ -5852,6 +5977,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "wait-timeout" version = "0.2.0" @@ -6033,6 +6164,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.0", +] + [[package]] name = "windows-sys" version = "0.42.0" diff --git a/common/Cargo.toml b/common/Cargo.toml index 354ac151..d38613ff 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -21,6 +21,7 @@ secp256k1 = { version = "0.27.0", features = ["recovery"] } serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.107" tokio = { version = "1.32.0", features = ["full", "macros", "rt"] } +toolshed = { git = "https://github.com/edgeandnode/toolshed", tag = "v0.2.2", features = ["graphql"] } [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index 1980ae4b..6954f6cf 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -7,6 +7,8 @@ use std::sync::Arc; use alloy_primitives::Address; use anyhow::Result; use log::{info, warn}; +use serde::Deserialize; +use serde_json::json; use tokio::sync::watch::{Receiver, Sender}; use tokio::sync::RwLock; @@ -67,18 +69,29 @@ impl AllocationMonitor { network_subgraph: &NetworkSubgraph, graph_network_id: u64, ) -> Result { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetwork { + current_epoch: u64, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct QueryResult { + graph_network: GraphNetwork, + } + let res = network_subgraph - .network_query( - r#" + .query::(&json!({ + "query": r#" query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } } - "# - .to_string(), - Some(serde_json::json!({ "id": graph_network_id })), - ) + "#, + "variables": + { "id": graph_network_id }, + })) .await .map_err(|e| { anyhow::anyhow!( @@ -87,10 +100,8 @@ impl AllocationMonitor { ) })?; - res.get("data") - .and_then(|d| d.get("graphNetwork")) - .and_then(|d| d.get("currentEpoch")) - .and_then(|d| d.as_u64()) + res.data + .map(|data| data.graph_network.current_epoch) .ok_or(anyhow::anyhow!( "Failed to get current epoch from network subgraph" )) @@ -101,98 +112,93 @@ impl AllocationMonitor { indexer_address: &Address, closed_at_epoch_threshold: u64, ) -> Result> { - let mut res = network_subgraph - .network_query( - r#" - query allocations($indexer: ID!, $closedAtEpochThreshold: Int!) { - indexer(id: $indexer) { - activeAllocations: totalAllocations( - where: { status: Active } - orderDirection: desc - first: 1000 - ) { - id - indexer { - id - } - allocatedTokens - createdAtBlockHash - createdAtEpoch - closedAtEpoch - subgraphDeployment { - id - deniedAt - stakedTokens - signalledTokens - queryFeesAmount - } - } - recentlyClosedAllocations: totalAllocations( - where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold } - orderDirection: desc - first: 1000 - ) { - id - indexer { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Indexer { + active_allocations: Vec, + recently_closed_allocations: Vec, + } + + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct QueryResult { + indexer: Option, + } + + let result = network_subgraph + .query::(&json!({ + "query": r#" + query allocations($indexer: ID!, $closedAtEpochThreshold: Int!) { + indexer(id: $indexer) { + activeAllocations: totalAllocations( + where: { status: Active } + orderDirection: desc + first: 1000 + ) { id + indexer { + id + } + allocatedTokens + createdAtBlockHash + createdAtEpoch + closedAtEpoch + subgraphDeployment { + id + deniedAt + stakedTokens + signalledTokens + queryFeesAmount + } } - allocatedTokens - createdAtBlockHash - createdAtEpoch - closedAtEpoch - subgraphDeployment { + recentlyClosedAllocations: totalAllocations( + where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold } + orderDirection: desc + first: 1000 + ) { id - deniedAt - stakedTokens - signalledTokens - queryFeesAmount + indexer { + id + } + allocatedTokens + createdAtBlockHash + createdAtEpoch + closedAtEpoch + subgraphDeployment { + id + deniedAt + stakedTokens + signalledTokens + queryFeesAmount + } } } } + "#, + "variables": { + "indexer": indexer_address, + "closedAtEpochThreshold": closed_at_epoch_threshold } - "# - .to_string(), - Some(serde_json::json!({ "indexer": indexer_address, "closedAtEpochThreshold": closed_at_epoch_threshold })), - ) - .await + })) + .await .map_err(|e| { anyhow::anyhow!( - "Failed to fetch current allocations from network subgraph: {}", - e + "Failed to fetch current allocations for indexer {} from network subgraph: {}", + indexer_address, e ) })?; - let indexer_json = res - .get_mut("data") - .and_then(|d| d.get_mut("indexer")) - .ok_or_else(|| anyhow::anyhow!("No data / indexer not found on chain",))?; + let indexer = result.data.and_then(|d| d.indexer).ok_or_else(|| { + anyhow::anyhow!("No data / indexer {} not found on chain", indexer_address) + })?; - let active_allocations_json = - indexer_json.get_mut("activeAllocations").ok_or_else(|| { - anyhow::anyhow!("Failed to parse active allocations from network subgraph",) - })?; - let active_allocations: Vec = - serde_json::from_value(active_allocations_json.take())?; - let mut eligible_allocations: HashMap = - HashMap::from_iter(active_allocations.into_iter().map(|a| (a.id, a))); - - let recently_closed_allocations_json = - indexer_json - .get_mut("recentlyClosedAllocations") - .ok_or_else(|| { - anyhow::anyhow!( - "Failed to parse recently closed allocations from network subgraph", - ) - })?; - let recently_closed_allocations: Vec = - serde_json::from_value(recently_closed_allocations_json.take())?; - eligible_allocations.extend( - recently_closed_allocations + Ok(HashMap::from_iter( + indexer + .active_allocations .into_iter() - .map(move |a| (a.id, a)), - ); - - Ok(eligible_allocations) + .chain(indexer.recently_closed_allocations.into_iter()) + .map(|allocation| (allocation.id, allocation)), + )) } async fn update_allocations(inner: &Arc) -> Result<(), anyhow::Error> { diff --git a/common/src/network_subgraph/mod.rs b/common/src/network_subgraph/mod.rs index d8fc317e..894a8bb9 100644 --- a/common/src/network_subgraph/mod.rs +++ b/common/src/network_subgraph/mod.rs @@ -4,16 +4,9 @@ use std::sync::Arc; use reqwest::{header, Client, Url}; -use serde::{Deserialize, Serialize}; +use serde::de::Deserialize; use serde_json::Value; - -use crate::types::GraphQLQuery; - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub struct Response { - pub result: T, - pub status: i64, -} +use toolshed::graphql::http::Response; /// Network subgraph query wrapper /// @@ -60,36 +53,25 @@ impl NetworkSubgraph { .expect("Could not parse graph node query endpoint for the network subgraph deployment") } - pub async fn network_query_raw( + pub async fn query Deserialize<'de>>( &self, - body: String, - ) -> Result { + body: &Value, + ) -> Result, reqwest::Error> { self.client .post(Url::clone(&self.network_subgraph_url)) - .body(body.clone()) + .json(body) .header(header::CONTENT_TYPE, "application/json") .send() .await - } - - pub async fn network_query( - &self, - query: String, - variables: Option, - ) -> Result { - let body = GraphQLQuery { query, variables }; - - self.network_query_raw( - serde_json::to_string(&body).expect("serialize network GraphQL query"), - ) - .await? - .json::() - .await + .and_then(|response| response.error_for_status())? + .json::>() + .await } } #[cfg(test)] mod test { + use serde_json::json; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -130,37 +112,23 @@ mod test { } #[tokio::test] - #[ignore] // Run only if explicitly specified async fn test_network_query() { - let network_subgraph = network_subgraph(); - - let query = r#""{\"data\":{\"graphNetwork\":{\"currentEpoch\":960}}}""#; - - // Check that the response is valid JSON - network_subgraph - .network_query(query.to_string(), None) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_network_query_mock() { let _mock_server = mock_graph_node_server().await; - let network_subgraph = network_subgraph(); - - let query = r#" - query { - graphNetwork(id: 1) { - currentEpoch - } - } - "#; - // Check that the response is valid JSON - network_subgraph - .network_query(query.to_string(), None) + let result = network_subgraph() + .query::(&json!({ + "query": r#" + query { + graphNetwork(id: 1) { + currentEpoch + } + } + "#, + })) .await .unwrap(); + + assert!(result.data.is_some()); } } diff --git a/common/src/types.rs b/common/src/types.rs index cf701719..cf30b419 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -2,16 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use ethers::utils::hex; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -/// A serializable GraphQL request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct GraphQLQuery { - pub query: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub variables: Option, -} +use serde::Deserialize; /// Subgraph identifier type: SubgraphDeploymentID with field 'value' #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/service/src/escrow_monitor.rs b/service/src/escrow_monitor.rs index 47453807..44f03274 100644 --- a/service/src/escrow_monitor.rs +++ b/service/src/escrow_monitor.rs @@ -6,12 +6,11 @@ use anyhow::Result; use ethereum_types::U256; use log::{error, info}; use serde::Deserialize; +use serde_json::json; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use indexer_common::prelude::GraphQLQuery; - use crate::graph_node::GraphNodeInstance; #[derive(Debug)] @@ -80,26 +79,27 @@ impl EscrowMonitor { sender: _Sender, } - let query = GraphQLQuery { - query: r#" - query ($indexer: ID!) { - escrowAccounts(where: {receiver_: {id: $indexer}}) { - balance - totalAmountThawing - sender { - id - } - } - } - "# - .to_string(), - variables: Some(serde_json::json!({ "indexer": indexer_address})), - }; - let res = graph_node .subgraph_query_raw( escrow_subgraph_deployment, - serde_json::to_string(&query).expect("serialize escrow GraphQL query"), + serde_json::to_string(&json!({ + "query": r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + } + } + } + "#, + "variables": { + "indexer": indexer_address, + } + } + )) + .expect("serialize escrow GraphQL query"), ) .await?; diff --git a/service/src/server/routes/network.rs b/service/src/server/routes/network.rs index 1a431333..ce330c93 100644 --- a/service/src/server/routes/network.rs +++ b/service/src/server/routes/network.rs @@ -3,19 +3,20 @@ use axum::{ extract::Extension, - http::{self, Request, StatusCode}, + http::{self, Request}, response::IntoResponse, Json, }; -use serde_json::Value; +use serde_json::{json, Value}; use crate::server::ServerOptions; -use super::{bad_request_response, response_body_to_query_string}; +use super::bad_request_response; pub async fn network_queries( Extension(server): Extension, req: Request, + axum::extract::Json(body): axum::extract::Json, ) -> impl IntoResponse { // Extract free query auth token let auth_token = req @@ -32,25 +33,17 @@ pub async fn network_queries( return bad_request_response("Not enabled or authorized query"); } - // Serve query using query processor - let req_body = req.into_body(); - let query_string = match response_body_to_query_string(req_body).await { - Ok(q) => q, - Err(e) => return bad_request_response(&e.to_string()), - }; - - let response = server - .network_subgraph - .network_query_raw(query_string) - .await - .expect("Failed to execute free network subgraph query"); - - if response.status().is_success() { - match response.json::().await { - Ok(value) => (StatusCode::OK, Json(value)).into_response(), - Err(e) => bad_request_response(&e.to_string()), - } - } else { - bad_request_response("Bad response from Graph node") + match server.network_subgraph.query::(&body).await { + Ok(result) => Json(json!({ + "data": result.data, + "errors": result.errors.map(|errors| { + errors + .into_iter() + .map(|e| json!({ "message": e.message })) + .collect::>() + }), + })) + .into_response(), + Err(e) => bad_request_response(&e.to_string()), } }