From 1611cfb5af5ba6cb7cdf7cb66938952d6a6ceeae Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 20 Sep 2023 15:24:20 +0200 Subject: [PATCH] refactor: Use eventuals for monitoring network data Specifically, use one eventual for monitoring the indexer's active and recently closed allocations, and one for establishing attestation signers for these allocations. --- Cargo.lock | 88 +++++ common/Cargo.toml | 3 + common/src/allocations/mod.rs | 6 +- common/src/allocations/monitor.rs | 537 +++++++++------------------ common/src/attestations/signer.rs | 2 +- common/src/attestations/signers.rs | 167 ++------- common/src/lib.rs | 4 +- common/src/network_subgraph/mod.rs | 37 +- common/src/types.rs | 11 +- service/Cargo.toml | 1 + service/src/escrow_monitor.rs | 30 +- service/src/main.rs | 25 +- service/src/query_processor.rs | 17 +- service/src/server/mod.rs | 6 +- service/src/server/routes/network.rs | 39 +- service/src/tap_manager.rs | 22 +- 16 files changed, 375 insertions(+), 620 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36c10a876..ea3d7bcc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -726,6 +726,12 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "by_address" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf8dba2868114ed769a1f2590fc9ae5eb331175b44313b6c9b922f8f7ca813d0" + [[package]] name = "byte-slice-cast" version = "1.2.2" @@ -1912,6 +1918,18 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "eventuals" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0450e5c57135f799007162ff8beba7b2809d4a018cf9cdcbca2c319a73d9d8ee" +dependencies = [ + "by_address", + "futures", + "never", + "tokio", +] + [[package]] name = "eyre" version = "0.6.8" @@ -1985,6 +2003,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "firestorm" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c5f6c2c942da57e2aaaa84b8a521489486f14e75e7fa91dab70aba913975f98" + [[package]] name = "fixed-hash" version = "0.8.0" @@ -2676,16 +2700,19 @@ dependencies = [ "ethereum-types", "ethers", "ethers-core", + "eventuals", "faux", "keccak-hash", "lazy_static", "log", + "lru", "reqwest", "secp256k1 0.27.0", "serde", "serde_json", "test-log", "tokio", + "toolshed", "wiremock", ] @@ -2708,6 +2735,7 @@ checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", "hashbrown 0.14.0", + "serde", ] [[package]] @@ -2988,6 +3016,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "mach" version = "0.3.2" @@ -3168,6 +3205,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "never" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c96aba5aa877601bb3f6dd6a63a969e1f82e60646e81e71b14496995e9853c91" + [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -4695,6 +4738,35 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" +dependencies = [ + "base64 0.21.2", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.0", + "serde", + "serde_json", + "serde_with_macros", + "time 0.3.21", +] + +[[package]] +name = "serde_with_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e6be15c453eb305019bfa438b1593c731f36a289a7853f7707ee29e870b3b3c" +dependencies = [ + "darling 0.20.3", + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "service" version = "0.1.0" @@ -4720,6 +4792,7 @@ dependencies = [ "ethers", "ethers-contract", "ethers-core", + "eventuals", "faux", "graphql-parser", "hex", @@ -5630,6 +5703,21 @@ dependencies = [ "winnow", ] +[[package]] +name = "toolshed" +version = "0.2.2" +source = "git+https://github.com/edgeandnode/toolshed?tag=v0.2.2#5daca78935d9a9fc34216946d3f22c614d9dbeee" +dependencies = [ + "alloy-primitives", + "bs58 0.5.0", + "firestorm", + "graphql-parser", + "serde", + "serde_with", + "sha3", + "url", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/common/Cargo.toml b/common/Cargo.toml index aa9e5381d..eeecc002e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -12,15 +12,18 @@ eip-712-derive = { git = "https://github.com/graphprotocol/eip-712-derive" } ethereum-types = "0.14.1" ethers = "2.0.10" ethers-core = "2.0.10" +eventuals = "0.6.7" faux = { version = "0.1.10", optional = true } keccak-hash = "0.10.0" lazy_static = "1.4.0" log = "0.4.20" +lru = "0.11.1" reqwest = "0.11.20" secp256k1 = { version = "0.27.0", features = ["recovery"] } serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.107" tokio = { version = "1.32.0", features = ["full", "macros", "rt"] } +toolshed = { git = "https://github.com/edgeandnode/toolshed", tag = "v0.2.2", features = ["graphql"] } [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/allocations/mod.rs b/common/src/allocations/mod.rs index 4e050f2d3..e32a8b6c4 100644 --- a/common/src/allocations/mod.rs +++ b/common/src/allocations/mod.rs @@ -14,7 +14,7 @@ use crate::types::SubgraphDeploymentID; pub mod monitor; -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Allocation { pub id: Address, pub status: AllocationStatus, @@ -31,7 +31,7 @@ pub struct Allocation { pub query_fees_collected: Option, } -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum AllocationStatus { Null, Active, @@ -40,7 +40,7 @@ pub enum AllocationStatus { Claimed, } -#[derive(Debug, Eq, PartialEq, Deserialize)] +#[derive(Clone, Debug, Eq, PartialEq, Deserialize)] pub struct SubgraphDeployment { pub id: SubgraphDeploymentID, #[serde(rename = "deniedAt")] diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index 1980ae4b4..83d398f5c 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -1,398 +1,195 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, time::Duration}; use alloy_primitives::Address; -use anyhow::Result; -use log::{info, warn}; -use tokio::sync::watch::{Receiver, Sender}; -use tokio::sync::RwLock; +use anyhow::anyhow; +use eventuals::{timer, Eventual, EventualExt}; +use log::warn; +use serde::Deserialize; +use serde_json::json; +use tokio::time::sleep; -use crate::prelude::{Allocation, NetworkSubgraph}; +use crate::prelude::NetworkSubgraph; -#[derive(Debug)] -struct AllocationMonitorInner { - network_subgraph: NetworkSubgraph, - indexer_address: Address, - interval_ms: u64, - graph_network_id: u64, - eligible_allocations: Arc>>, - watch_sender: Sender<()>, - watch_receiver: Receiver<()>, -} - -#[cfg_attr(any(test, feature = "mock"), faux::create)] -#[derive(Debug, Clone)] -pub struct AllocationMonitor { - _monitor_handle: Arc>, - inner: Arc, -} - -#[cfg_attr(any(test, feature = "mock"), faux::methods)] -impl AllocationMonitor { - pub async fn new( - network_subgraph: NetworkSubgraph, - indexer_address: Address, - graph_network_id: u64, - interval_ms: u64, - ) -> Result { - // These are used to ping subscribers when the allocations are updated - let (watch_sender, watch_receiver) = tokio::sync::watch::channel(()); - - let inner = Arc::new(AllocationMonitorInner { - network_subgraph, - indexer_address, - interval_ms, - graph_network_id, - eligible_allocations: Arc::new(RwLock::new(HashMap::new())), - watch_sender, - watch_receiver, - }); - - let inner_clone = inner.clone(); - - let monitor = AllocationMonitor { - _monitor_handle: Arc::new(tokio::spawn(async move { - AllocationMonitor::monitor_loop(&inner_clone).await.unwrap(); - })), - inner, - }; +use super::Allocation; - Ok(monitor) +async fn current_epoch( + network_subgraph: &'static NetworkSubgraph, + graph_network_id: u64, +) -> Result { + // Types for deserializing the network subgraph response + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetworkResponse { + graph_network: Option, } - - async fn current_epoch( - network_subgraph: &NetworkSubgraph, - graph_network_id: u64, - ) -> Result { - let res = network_subgraph - .network_query( - r#" - query epoch($id: ID!) { - graphNetwork(id: $id) { - currentEpoch - } - } - "# - .to_string(), - Some(serde_json::json!({ "id": graph_network_id })), - ) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to parse current epoch response from network subgraph: {}", - e - ) - })?; - - res.get("data") - .and_then(|d| d.get("graphNetwork")) - .and_then(|d| d.get("currentEpoch")) - .and_then(|d| d.as_u64()) - .ok_or(anyhow::anyhow!( - "Failed to get current epoch from network subgraph" - )) + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetwork { + current_epoch: u64, } - async fn current_eligible_allocations( - network_subgraph: &NetworkSubgraph, - indexer_address: &Address, - closed_at_epoch_threshold: u64, - ) -> Result> { - let mut res = network_subgraph - .network_query( - r#" - query allocations($indexer: ID!, $closedAtEpochThreshold: Int!) { - indexer(id: $indexer) { - activeAllocations: totalAllocations( - where: { status: Active } - orderDirection: desc - first: 1000 - ) { - id - indexer { - id - } - allocatedTokens - createdAtBlockHash - createdAtEpoch - closedAtEpoch - subgraphDeployment { - id - deniedAt - stakedTokens - signalledTokens - queryFeesAmount - } - } - recentlyClosedAllocations: totalAllocations( - where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold } - orderDirection: desc - first: 1000 - ) { - id - indexer { - id - } - allocatedTokens - createdAtBlockHash - createdAtEpoch - closedAtEpoch - subgraphDeployment { - id - deniedAt - stakedTokens - signalledTokens - queryFeesAmount - } - } - } - } - "# - .to_string(), - Some(serde_json::json!({ "indexer": indexer_address, "closedAtEpochThreshold": closed_at_epoch_threshold })), - ) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to fetch current allocations from network subgraph: {}", - e - ) - })?; - - let indexer_json = res - .get_mut("data") - .and_then(|d| d.get_mut("indexer")) - .ok_or_else(|| anyhow::anyhow!("No data / indexer not found on chain",))?; - - let active_allocations_json = - indexer_json.get_mut("activeAllocations").ok_or_else(|| { - anyhow::anyhow!("Failed to parse active allocations from network subgraph",) - })?; - let active_allocations: Vec = - serde_json::from_value(active_allocations_json.take())?; - let mut eligible_allocations: HashMap = - HashMap::from_iter(active_allocations.into_iter().map(|a| (a.id, a))); + // Query the current epoch + let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#; + let response = network_subgraph + .query::(&json!({ + "query": query, + "variables": { + "id": graph_network_id + } + })) + .await?; - let recently_closed_allocations_json = - indexer_json - .get_mut("recentlyClosedAllocations") - .ok_or_else(|| { - anyhow::anyhow!( - "Failed to parse recently closed allocations from network subgraph", - ) - })?; - let recently_closed_allocations: Vec = - serde_json::from_value(recently_closed_allocations_json.take())?; - eligible_allocations.extend( - recently_closed_allocations + if let Some(errors) = response.errors { + warn!( + "Errors encountered identifying current epoch for network {}: {}", + graph_network_id, + errors .into_iter() - .map(move |a| (a.id, a)), + .map(|e| e.message) + .collect::>() + .join(", ") ); - - Ok(eligible_allocations) } - async fn update_allocations(inner: &Arc) -> Result<(), anyhow::Error> { - let current_epoch = - Self::current_epoch(&inner.network_subgraph, inner.graph_network_id).await?; - *(inner.eligible_allocations.write().await) = Self::current_eligible_allocations( - &inner.network_subgraph, - &inner.indexer_address, - current_epoch - 1, - ) - .await?; - Ok(()) + response + .data + .and_then(|data| data.graph_network) + .ok_or_else(|| anyhow!("Network {} not found", graph_network_id)) + .map(|network| network.current_epoch) +} + +pub fn indexer_allocations( + network_subgraph: &'static NetworkSubgraph, + indexer_address: Address, + graph_network_id: u64, + interval: Duration, +) -> Eventual> { + // Types for deserializing the network subgraph response + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct IndexerAllocationsResponse { + indexer: Option, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Indexer { + active_allocations: Vec, + recently_closed_allocations: Vec, } - async fn monitor_loop(inner: &Arc) -> Result<()> { - loop { - match Self::update_allocations(inner).await { - Ok(_) => { - if inner.watch_sender.send(()).is_err() { - warn!( - "Failed to notify subscribers that the allocations have been updated" - ); + let query = r#" + query allocations($indexer: ID!, $closedAtEpochThreshold: Int!) { + indexer(id: $indexer) { + activeAllocations: totalAllocations( + where: { status: Active } + orderDirection: desc + first: 1000 + ) { + id + indexer { + id + } + allocatedTokens + createdAtBlockHash + createdAtEpoch + closedAtEpoch + subgraphDeployment { + id + deniedAt + stakedTokens + signalledTokens + queryFeesAmount } } - Err(e) => { - warn!( - "Failed to query indexer allocations, keeping existing: {:?}. Error: {}", - inner - .eligible_allocations - .read() - .await - .keys() - .collect::>(), - e - ); + recentlyClosedAllocations: totalAllocations( + where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold } + orderDirection: desc + first: 1000 + ) { + id + indexer { + id + } + allocatedTokens + createdAtBlockHash + createdAtEpoch + closedAtEpoch + subgraphDeployment { + id + deniedAt + stakedTokens + signalledTokens + queryFeesAmount + } } } - - info!( - "Eligible allocations: {}", - inner - .eligible_allocations - .read() - .await - .values() - .map(|e| { - format!( - "{{allocation: {:?}, deployment: {}, closedAtEpoch: {:?})}}", - e.id, - e.subgraph_deployment.id.ipfs_hash(), - e.closed_at_epoch - ) - }) - .collect::>() - .join(", ") - ); - - tokio::time::sleep(tokio::time::Duration::from_millis(inner.interval_ms)).await; } - } - - pub async fn get_eligible_allocations( - &self, - ) -> tokio::sync::RwLockReadGuard<'_, HashMap> { - self.inner.eligible_allocations.read().await - } - - pub async fn is_allocation_eligible(&self, allocation_id: &Address) -> bool { - self.inner - .eligible_allocations - .read() - .await - .contains_key(allocation_id) - } - - pub fn subscribe(&self) -> Receiver<()> { - self.inner.watch_receiver.clone() - } -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use test_log::test; - use wiremock::matchers::{method, path}; - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::prelude::NetworkSubgraph; - use crate::test_vectors; - - use super::*; - - #[test(tokio::test)] - async fn test_current_epoch() { - let mock_server = MockServer::start().await; - - let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint( - &mock_server.uri(), - test_vectors::NETWORK_SUBGRAPH_ID, - ); - let network_subgraph = NetworkSubgraph::new( - Some(&mock_server.uri()), - Some(test_vectors::NETWORK_SUBGRAPH_ID), - network_subgraph_endpoint.as_ref(), - ); - - let mock = Mock::given(method("POST")) - .and(path( - "/subgraphs/id/".to_string() + test_vectors::NETWORK_SUBGRAPH_ID, - )) - .respond_with(ResponseTemplate::new(200).set_body_raw( - r#" - { - "data": { - "graphNetwork": { - "currentEpoch": 896419 - } - } - } - "#, - "application/json", - )); - - mock_server.register(mock).await; - - let epoch = AllocationMonitor::current_epoch(&network_subgraph, 1) - .await - .unwrap(); - - assert_eq!(epoch, 896419); - } - - #[test(tokio::test)] - async fn test_current_eligible_allocations() { - let indexer_address = Address::from_str(test_vectors::INDEXER_ADDRESS).unwrap(); - - let mock_server = MockServer::start().await; - - let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint( - &mock_server.uri(), - test_vectors::NETWORK_SUBGRAPH_ID, - ); - let network_subgraph = NetworkSubgraph::new( - Some(&mock_server.uri()), - Some(test_vectors::NETWORK_SUBGRAPH_ID), - network_subgraph_endpoint.as_ref(), - ); + "#; + + let indexer_for_error_handler = indexer_address.clone(); + + // Refresh indexer allocations every now and then + timer(interval).map_with_retry( + move |_| async move { + let current_epoch = current_epoch(&network_subgraph, graph_network_id) + .await + .map_err(|e| format!("Failed to fetch current epoch: {}", e))?; + + // Allocations can be closed one epoch into the past + let closed_at_epoch_threshold = current_epoch - 1; + + // Query active and recently closed allocations for the indexer, + // using the network subgraph + let response = network_subgraph + .query::(&json!({ + "query": query, + "variables": { + "indexer": indexer_address, + "closedAtEpochThreshold": closed_at_epoch_threshold, + }})) + .await + .map_err(|e| e.to_string())?; + + // If there are any GraphQL errors returned, we'll log them for debugging + if let Some(errors) = response.errors { + warn!( + "Errors encountered fetching active or recently closed allocations for indexer {}: {}", + indexer_address, + errors.into_iter().map(|e| e.message).collect::>().join(", ") + ); + } - let mock = Mock::given(method("POST")) - .and(path( - "/subgraphs/id/".to_string() + test_vectors::NETWORK_SUBGRAPH_ID, + // Verify that the indexer could be found at all + let indexer = response + .data + .and_then(|data| data.indexer) + .ok_or_else(|| format!("Indexer {} could not be found on the network", indexer_address))?; + + // Pull active and recently closed allocations out of the indexer + let Indexer { + active_allocations, + recently_closed_allocations + } = indexer; + + Ok(HashMap::from_iter( + active_allocations.into_iter().map(|a| (a.id, a)).chain( + recently_closed_allocations.into_iter().map(|a| (a.id, a))) )) - .respond_with( - ResponseTemplate::new(200) - .set_body_raw(test_vectors::ALLOCATIONS_QUERY_RESPONSE, "application/json"), + }, + + // Need to use string errors here because eventuals `map_with_retry` retries + // errors that can be cloned + move |err: String| { + warn!( + "Failed to fetch active or recently closed allocations for indexer {}: {}", + indexer_for_error_handler, err ); - mock_server.register(mock).await; - - let allocations = AllocationMonitor::current_eligible_allocations( - &network_subgraph, - &indexer_address, - 940, - ) - .await - .unwrap(); - - assert_eq!(allocations, test_vectors::expected_eligible_allocations()) - } - - /// Run with RUST_LOG=info to see the logs from the allocation monitor - #[test(tokio::test)] - #[ignore] - async fn test_local() { - let graph_node_url = - std::env::var("GRAPH_NODE_ENDPOINT").expect("GRAPH_NODE_ENDPOINT not set"); - let network_subgraph_id = - std::env::var("NETWORK_SUBGRAPH_ID").expect("NETWORK_SUBGRAPH_ID not set"); - let indexer_address = std::env::var("INDEXER_ADDRESS").expect("INDEXER_ADDRESS not set"); - - let network_subgraph_endpoint = - NetworkSubgraph::local_deployment_endpoint(&graph_node_url, &network_subgraph_id); - let network_subgraph = NetworkSubgraph::new( - Some(&graph_node_url), - Some(&network_subgraph_id), - network_subgraph_endpoint.as_ref(), - ); - - // graph_network_id=1 and interval_ms=1000 - let _allocation_monitor = AllocationMonitor::new( - network_subgraph, - Address::from_str(&indexer_address).unwrap(), - 1, - 1000, - ) - .await - .unwrap(); - - // sleep for a bit to allow the monitor to fetch the allocations a few times - tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; - } + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)) + }, + ) } diff --git a/common/src/attestations/signer.rs b/common/src/attestations/signer.rs index a01191882..8b22237f6 100644 --- a/common/src/attestations/signer.rs +++ b/common/src/attestations/signer.rs @@ -13,7 +13,7 @@ use secp256k1::SecretKey; use std::convert::TryInto; /// An attestation signer tied to a specific allocation via its signer key -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct AttestationSigner { subgraph_deployment_id: Bytes32, domain_separator: DomainSeparator, diff --git a/common/src/attestations/signers.rs b/common/src/attestations/signers.rs index d362826b6..7e01de95a 100644 --- a/common/src/attestations/signers.rs +++ b/common/src/attestations/signers.rs @@ -3,162 +3,57 @@ use alloy_primitives::Address; use ethers_core::types::U256; -use log::{error, info, warn}; -use std::collections::HashMap; +use eventuals::{Eventual, EventualExt}; +use log::warn; +use lru::LruCache; use std::sync::Arc; -use tokio::sync::RwLock; +use std::{collections::HashMap, num::NonZeroUsize}; +use tokio::sync::Mutex; -use crate::prelude::{AllocationMonitor, AttestationSigner}; +use crate::prelude::{Allocation, AttestationSigner}; use super::{attestation_signer_for_allocation, signer::create_attestation_signer}; -#[derive(Debug, Clone)] -pub struct AttestationSigners { - inner: Arc, - _update_loop_handle: Arc>, -} - -#[derive(Debug)] -pub struct AttestationSignersInner { - attestation_signers: Arc>>, - allocation_monitor: AllocationMonitor, +pub fn attestation_signers( + indexer_allocations: Eventual>, indexer_mnemonic: String, chain_id: U256, dispute_manager: Address, -} +) -> Eventual> { + let cache: &'static Mutex> = Box::leak(Box::new(Mutex::new(LruCache::new( + NonZeroUsize::new(1000).unwrap(), + )))); -impl AttestationSigners { - pub fn new( - allocation_monitor: AllocationMonitor, - indexer_mnemonic: String, - chain_id: U256, - dispute_manager: Address, - ) -> Self { - let inner = Arc::new(AttestationSignersInner { - attestation_signers: Arc::new(RwLock::new(HashMap::new())), - allocation_monitor, - indexer_mnemonic, - chain_id, - dispute_manager, - }); + let indexer_mnemonic = Arc::new(indexer_mnemonic); - let _update_loop_handle = { - let inner = inner.clone(); - tokio::spawn(Self::update_loop(inner.clone())) - }; + indexer_allocations.map(move |allocations| { + let indexer_mnemonic = indexer_mnemonic.clone(); - Self { - inner, - _update_loop_handle: Arc::new(_update_loop_handle), - } - } + async move { + let mut cache = cache.lock().await; - pub async fn update_attestation_signers(inner: Arc) { - let mut attestation_signers_write = inner.attestation_signers.write().await; - for allocation in inner - .allocation_monitor - .get_eligible_allocations() - .await - .values() - { - if let std::collections::hash_map::Entry::Vacant(e) = - attestation_signers_write.entry(allocation.id) - { - match attestation_signer_for_allocation(&inner.indexer_mnemonic, allocation) - .and_then(|signer| { + for (id, allocation) in allocations.iter() { + let result = cache.try_get_or_insert(*id, || { + attestation_signer_for_allocation(&indexer_mnemonic, allocation).and_then(|signer| { create_attestation_signer( - inner.chain_id, - inner.dispute_manager, + chain_id, + dispute_manager, signer, allocation.subgraph_deployment.id.bytes32(), ) - }) { - Ok(signer) => { - e.insert(signer); - info!( - "Found attestation signer for {{allocation: {}, deployment: {}}}", - allocation.id, - allocation.subgraph_deployment.id.ipfs_hash() - ); - } - Err(e) => { - warn!( - "Failed to find the attestation signer for {{allocation: {}, deployment: {}, createdAtEpoch: {}, err: {}}}", - allocation.id, allocation.subgraph_deployment.id.ipfs_hash(), allocation.created_at_epoch, e - ) - } - } - } - } - } - - async fn update_loop(inner: Arc) { - let mut watch_receiver = inner.allocation_monitor.subscribe(); + }) + }); - loop { - match watch_receiver.changed().await { - Ok(_) => { - Self::update_attestation_signers(inner.clone()).await; - } - Err(e) => { - error!( - "Error receiving allocation monitor subscription update: {}", - e + if let Err(e) = result { + warn!( + "Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}", + allocation.id, allocation.subgraph_deployment.id.ipfs_hash(), + allocation.created_at_epoch, e ); } } - } - } - - pub async fn read( - &self, - ) -> tokio::sync::RwLockReadGuard<'_, HashMap> { - self.inner.attestation_signers.read().await - } -} - -#[cfg(test)] -mod tests { - use alloy_primitives::Address; - use ethers_core::types::U256; - use std::str::FromStr; - use std::sync::Arc; - - use crate::prelude::AllocationMonitor; - use crate::test_vectors; - - use super::*; - - #[tokio::test] - async fn test_update_attestation_signers() { - unsafe { - let mut mock_allocation_monitor = AllocationMonitor::faux(); - - faux::when!(mock_allocation_monitor.get_eligible_allocations).then_unchecked(|_| { - // Spawn a thread to be able to call `blocking_read` on the RwLock, which actually spins its own async - // runtime. - // This is needed because `faux` will also use a runtime to mock the async function. - let t = std::thread::spawn(|| { - let eligible_allocations = Box::leak(Box::new(Arc::new(RwLock::new( - test_vectors::expected_eligible_allocations(), - )))); - eligible_allocations.blocking_read() - }); - t.join().unwrap() - }); - - let inner = Arc::new(AttestationSignersInner { - attestation_signers: Arc::new(RwLock::new(HashMap::new())), - allocation_monitor: mock_allocation_monitor, - indexer_mnemonic: test_vectors::INDEXER_OPERATOR_MNEMONIC.to_string(), - chain_id: U256::from(1), - dispute_manager: Address::from_str(test_vectors::DISPUTE_MANAGER_ADDRESS).unwrap(), - }); - - AttestationSigners::update_attestation_signers(inner.clone()).await; - // Check that the attestation signers were found for the allocations - assert_eq!(inner.attestation_signers.read().await.len(), 4); + HashMap::from_iter(cache.iter().map(|(k, v)| (k.clone(), v.clone()))) } - } + }) } diff --git a/common/src/lib.rs b/common/src/lib.rs index 339084436..c6471949c 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -11,12 +11,12 @@ pub mod types; mod test_vectors; pub mod prelude { - pub use super::allocations::monitor::AllocationMonitor; + pub use super::allocations::monitor::indexer_allocations; pub use super::allocations::{Allocation, AllocationStatus, SubgraphDeployment}; pub use super::attestations::{ attestation_signer_for_allocation, signer::{create_attestation_signer, AttestationSigner}, - signers::AttestationSigners, + signers::attestation_signers, }; pub use super::network_subgraph::NetworkSubgraph; pub use super::types::*; diff --git a/common/src/network_subgraph/mod.rs b/common/src/network_subgraph/mod.rs index d8fc317ee..6c79cbfb0 100644 --- a/common/src/network_subgraph/mod.rs +++ b/common/src/network_subgraph/mod.rs @@ -4,16 +4,9 @@ use std::sync::Arc; use reqwest::{header, Client, Url}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use serde_json::Value; - -use crate::types::GraphQLQuery; - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub struct Response { - pub result: T, - pub status: i64, -} +use toolshed::graphql::http::Response; /// Network subgraph query wrapper /// @@ -60,31 +53,19 @@ impl NetworkSubgraph { .expect("Could not parse graph node query endpoint for the network subgraph deployment") } - pub async fn network_query_raw( + pub async fn query Deserialize<'de>>( &self, - body: String, - ) -> Result { + body: &Value, + ) -> Result, reqwest::Error> { self.client .post(Url::clone(&self.network_subgraph_url)) - .body(body.clone()) + .json(body) .header(header::CONTENT_TYPE, "application/json") .send() .await - } - - pub async fn network_query( - &self, - query: String, - variables: Option, - ) -> Result { - let body = GraphQLQuery { query, variables }; - - self.network_query_raw( - serde_json::to_string(&body).expect("serialize network GraphQL query"), - ) - .await? - .json::() - .await + .and_then(|response| response.error_for_status())? + .json::>() + .await } } diff --git a/common/src/types.rs b/common/src/types.rs index cf701719a..cf30b419c 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -2,16 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use ethers::utils::hex; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -/// A serializable GraphQL request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct GraphQLQuery { - pub query: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub variables: Option, -} +use serde::Deserialize; /// Subgraph identifier type: SubgraphDeploymentID with field 'value' #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/service/Cargo.toml b/service/Cargo.toml index 15608bc13..afda8c896 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -60,6 +60,7 @@ ethereum-types = "0.14.1" sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio", "bigdecimal", "rust_decimal", "time"] } alloy-primitives = { version = "0.3.3", features = ["serde"] } alloy-sol-types = "0.3.2" +eventuals = "0.6.7" [dev-dependencies] faux = "0.1.10" diff --git a/service/src/escrow_monitor.rs b/service/src/escrow_monitor.rs index 474538070..4bb2c6647 100644 --- a/service/src/escrow_monitor.rs +++ b/service/src/escrow_monitor.rs @@ -6,12 +6,11 @@ use anyhow::Result; use ethereum_types::U256; use log::{error, info}; use serde::Deserialize; +use serde_json::json; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use indexer_common::prelude::GraphQLQuery; - use crate::graph_node::GraphNodeInstance; #[derive(Debug)] @@ -80,26 +79,25 @@ impl EscrowMonitor { sender: _Sender, } - let query = GraphQLQuery { - query: r#" - query ($indexer: ID!) { - escrowAccounts(where: {receiver_: {id: $indexer}}) { - balance - totalAmountThawing - sender { - id + let request = json!({ + "query": r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + } } } - } - "# - .to_string(), - variables: Some(serde_json::json!({ "indexer": indexer_address})), - }; + "#, + "variables": { "indexer": indexer_address }, + }); let res = graph_node .subgraph_query_raw( escrow_subgraph_deployment, - serde_json::to_string(&query).expect("serialize escrow GraphQL query"), + serde_json::to_string(&request).expect("serialize escrow GraphQL query"), ) .await?; diff --git a/service/src/main.rs b/service/src/main.rs index 5763a35e1..6c0dfd8ce 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -6,10 +6,11 @@ use alloy_sol_types::eip712_domain; use axum::Server; use dotenvy::dotenv; use ethereum_types::U256; +use std::time::Duration; use std::{net::SocketAddr, str::FromStr}; use tracing::info; -use indexer_common::prelude::{AllocationMonitor, AttestationSigners, NetworkSubgraph}; +use indexer_common::prelude::{attestation_signers, indexer_allocations, NetworkSubgraph}; use util::{package_version, shutdown_signal}; @@ -61,26 +62,24 @@ async fn main() -> Result<(), std::io::Error> { // Make an instance of network subgraph at either // graph_node_query_endpoint/subgraphs/id/network_subgraph_deployment // or network_subgraph_endpoint - let network_subgraph = NetworkSubgraph::new( + let network_subgraph: &'static NetworkSubgraph = Box::leak(Box::new(NetworkSubgraph::new( Some(&config.indexer_infrastructure.graph_node_query_endpoint), config .network_subgraph .network_subgraph_deployment .as_deref(), &config.network_subgraph.network_subgraph_endpoint, - ); + ))); - let allocation_monitor = AllocationMonitor::new( - network_subgraph.clone(), + let indexer_allocations = indexer_allocations( + &network_subgraph, config.ethereum.indexer_address, 1, - config.network_subgraph.allocation_syncing_interval, - ) - .await - .expect("Initialize allocation monitor"); + Duration::from_secs(config.network_subgraph.allocation_syncing_interval), + ); - let attestation_signers = AttestationSigners::new( - allocation_monitor.clone(), + let attestation_signers = attestation_signers( + indexer_allocations.clone(), config.ethereum.mnemonic.clone(), // TODO: Chain ID should be a config U256::from(1), @@ -108,7 +107,7 @@ async fn main() -> Result<(), std::io::Error> { let tap_manager = tap_manager::TapManager::new( indexer_management_db.clone(), - allocation_monitor.clone(), + indexer_allocations.clone(), escrow_monitor, // TODO: arguments for eip712_domain should be a config eip712_domain! { @@ -137,7 +136,7 @@ async fn main() -> Result<(), std::io::Error> { config.indexer_infrastructure.graph_node_status_endpoint, indexer_management_db, public_key(&config.ethereum.mnemonic).expect("Failed to initiate with operator wallet"), - network_subgraph, + &network_subgraph, config.network_subgraph.network_subgraph_auth_token, config.network_subgraph.serve_network_subgraph, ); diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs index 19f661418..b7aeca8a6 100644 --- a/service/src/query_processor.rs +++ b/service/src/query_processor.rs @@ -1,12 +1,16 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; + +use alloy_primitives::Address; use ethers_core::types::{Signature, U256}; +use eventuals::Eventual; use log::error; use serde::{Deserialize, Serialize}; use tap_core::tap_manager::SignedReceipt; -use indexer_common::prelude::{AttestationSigner, AttestationSigners, SubgraphDeploymentID}; +use indexer_common::prelude::{AttestationSigner, SubgraphDeploymentID}; use crate::graph_node::GraphNodeInstance; use crate::tap_manager::TapManager; @@ -59,17 +63,17 @@ pub enum QueryError { Other(anyhow::Error), } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct QueryProcessor { graph_node: GraphNodeInstance, - attestation_signers: AttestationSigners, + attestation_signers: Eventual>, tap_manager: TapManager, } impl QueryProcessor { pub fn new( graph_node: GraphNodeInstance, - attestation_signers: AttestationSigners, + attestation_signers: Eventual>, tap_manager: TapManager, ) -> QueryProcessor { QueryProcessor { @@ -114,7 +118,10 @@ impl QueryProcessor { .verify_and_store_receipt(parsed_receipt) .await?; - let signers = self.attestation_signers.read().await; + let signers = self + .attestation_signers + .value_immediate() + .ok_or_else(|| QueryError::Other(anyhow::anyhow!("System is not ready yet")))?; let signer = signers.get(&allocation_id).ok_or_else(|| { QueryError::Other(anyhow::anyhow!( "No signer found for allocation id {}", diff --git a/service/src/server/mod.rs b/service/src/server/mod.rs index 075c993de..8d5cf9df3 100644 --- a/service/src/server/mod.rs +++ b/service/src/server/mod.rs @@ -29,7 +29,7 @@ use crate::{ pub mod routes; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ServerOptions { pub port: Option, pub release: PackageVersion, @@ -38,7 +38,7 @@ pub struct ServerOptions { pub graph_node_status_endpoint: String, pub indexer_management_db: PgPool, pub operator_public_key: String, - pub network_subgraph: NetworkSubgraph, + pub network_subgraph: &'static NetworkSubgraph, pub network_subgraph_auth_token: Option, pub serve_network_subgraph: bool, } @@ -53,7 +53,7 @@ impl ServerOptions { graph_node_status_endpoint: String, indexer_management_db: PgPool, operator_public_key: String, - network_subgraph: NetworkSubgraph, + network_subgraph: &'static NetworkSubgraph, network_subgraph_auth_token: Option, serve_network_subgraph: bool, ) -> Self { diff --git a/service/src/server/routes/network.rs b/service/src/server/routes/network.rs index 1a4313334..ce330c932 100644 --- a/service/src/server/routes/network.rs +++ b/service/src/server/routes/network.rs @@ -3,19 +3,20 @@ use axum::{ extract::Extension, - http::{self, Request, StatusCode}, + http::{self, Request}, response::IntoResponse, Json, }; -use serde_json::Value; +use serde_json::{json, Value}; use crate::server::ServerOptions; -use super::{bad_request_response, response_body_to_query_string}; +use super::bad_request_response; pub async fn network_queries( Extension(server): Extension, req: Request, + axum::extract::Json(body): axum::extract::Json, ) -> impl IntoResponse { // Extract free query auth token let auth_token = req @@ -32,25 +33,17 @@ pub async fn network_queries( return bad_request_response("Not enabled or authorized query"); } - // Serve query using query processor - let req_body = req.into_body(); - let query_string = match response_body_to_query_string(req_body).await { - Ok(q) => q, - Err(e) => return bad_request_response(&e.to_string()), - }; - - let response = server - .network_subgraph - .network_query_raw(query_string) - .await - .expect("Failed to execute free network subgraph query"); - - if response.status().is_success() { - match response.json::().await { - Ok(value) => (StatusCode::OK, Json(value)).into_response(), - Err(e) => bad_request_response(&e.to_string()), - } - } else { - bad_request_response("Bad response from Graph node") + match server.network_subgraph.query::(&body).await { + Ok(result) => Json(json!({ + "data": result.data, + "errors": result.errors.map(|errors| { + errors + .into_iter() + .map(|e| json!({ "message": e.message })) + .collect::>() + }), + })) + .into_response(), + Err(e) => bad_request_response(&e.to_string()), } } diff --git a/service/src/tap_manager.rs b/service/src/tap_manager.rs index 4a4795b97..2a63eb303 100644 --- a/service/src/tap_manager.rs +++ b/service/src/tap_manager.rs @@ -1,19 +1,20 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; +use eventuals::Eventual; +use indexer_common::prelude::Allocation; use log::error; use sqlx::{types::BigDecimal, PgPool}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use tap_core::tap_manager::SignedReceipt; -use indexer_common::prelude::AllocationMonitor; - use crate::{escrow_monitor, query_processor::QueryError}; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct TapManager { - allocation_monitor: AllocationMonitor, + indexer_allocations: Eventual>, escrow_monitor: escrow_monitor::EscrowMonitor, pgpool: PgPool, domain_separator: Arc, @@ -22,12 +23,12 @@ pub struct TapManager { impl TapManager { pub fn new( pgpool: PgPool, - allocation_monitor: AllocationMonitor, + indexer_allocations: Eventual>, escrow_monitor: escrow_monitor::EscrowMonitor, domain_separator: Eip712Domain, ) -> Self { Self { - allocation_monitor, + indexer_allocations, escrow_monitor, pgpool, domain_separator: Arc::new(domain_separator), @@ -43,9 +44,10 @@ impl TapManager { pub async fn verify_and_store_receipt(&self, receipt: SignedReceipt) -> Result<(), QueryError> { let allocation_id = &receipt.message.allocation_id; if !self - .allocation_monitor - .is_allocation_eligible(allocation_id) - .await + .indexer_allocations + .value_immediate() + .map(|allocations| allocations.contains_key(allocation_id)) + .unwrap_or(false) { return Err(QueryError::Other(anyhow::Error::msg(format!( "Receipt's allocation ID ({}) is not eligible for this indexer",