From 1d979cf28dc83b0ea2cdec775977bf4945556049 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Tue, 10 Oct 2023 15:33:51 +0200 Subject: [PATCH] refactor: use eventuals for network data and signers --- Cargo.lock | 36 ++ common/Cargo.toml | 2 + common/src/allocations/mod.rs | 182 +--------- common/src/allocations/monitor.rs | 531 +++++++++-------------------- common/src/attestations/mod.rs | 171 ---------- common/src/attestations/signer.rs | 276 ++++++++++++--- common/src/attestations/signers.rs | 182 ++-------- common/src/lib.rs | 9 +- service/Cargo.toml | 1 + service/src/main.rs | 24 +- service/src/query_processor.rs | 28 +- service/src/server/mod.rs | 6 +- service/src/tap_manager.rs | 56 ++- 13 files changed, 554 insertions(+), 950 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 831c45095..a8fc84ed7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -800,6 +800,12 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "by_address" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf8dba2868114ed769a1f2590fc9ae5eb331175b44313b6c9b922f8f7ca813d0" + [[package]] name = "byte-slice-cast" version = "1.2.2" @@ -1983,6 +1989,18 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "eventuals" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0450e5c57135f799007162ff8beba7b2809d4a018cf9cdcbca2c319a73d9d8ee" +dependencies = [ + "by_address", + "futures", + "never", + "tokio", +] + [[package]] name = "eyre" version = "0.6.8" @@ -2775,11 +2793,13 @@ dependencies = [ "env_logger", "ethers", "ethers-core", + "eventuals", "faux", "graphql", "keccak-hash", "lazy_static", "log", + "lru", "regex", "reqwest", "secp256k1", @@ -3091,6 +3111,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "mach" version = "0.3.2" @@ -3267,6 +3296,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "never" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c96aba5aa877601bb3f6dd6a63a969e1f82e60646e81e71b14496995e9853c91" + [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -4857,6 +4892,7 @@ dependencies = [ "ethereum-types", "ethers", "ethers-core", + "eventuals", "faux", "hex", "hex-literal", diff --git a/common/Cargo.toml b/common/Cargo.toml index aa0459706..74748f50c 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -11,10 +11,12 @@ bs58 = "0.5.0" eip-712-derive = { git = "https://github.com/graphprotocol/eip-712-derive" } ethers = "2.0.10" ethers-core = "2.0.10" +eventuals = "0.6.7" faux = { version = "0.1.10", optional = true } keccak-hash = "0.10.0" lazy_static = "1.4.0" log = "0.4.20" +lru = "0.11.1" regex = "1.7.1" reqwest = "0.11.20" secp256k1 = { version = "0.27.0", features = ["recovery"] } diff --git a/common/src/allocations/mod.rs b/common/src/allocations/mod.rs index f25e4c751..3f2e64624 100644 --- a/common/src/allocations/mod.rs +++ b/common/src/allocations/mod.rs @@ -1,19 +1,14 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy_primitives::{Address, B256}; -use anyhow::Result; -use ethers::signers::coins_bip39::English; -use ethers::signers::{MnemonicBuilder, Signer, Wallet}; -use ethers_core::k256::ecdsa::SigningKey; +use alloy_primitives::Address; use ethers_core::types::U256; -use serde::Deserialize; -use serde::Deserializer; +use serde::{Deserialize, Deserializer}; use toolshed::thegraph::DeploymentId; pub mod monitor; -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Allocation { pub id: Address, pub status: AllocationStatus, @@ -30,7 +25,7 @@ pub struct Allocation { pub query_fees_collected: Option, } -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum AllocationStatus { Null, Active, @@ -39,21 +34,8 @@ pub enum AllocationStatus { Claimed, } -// Custom deserializer for `DeploymentId` that accepts a `0x...` string -fn deserialize_deployment_id<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let bytes = B256::deserialize(deserializer)?; - Ok(DeploymentId(bytes)) -} - -#[derive(Debug, Eq, PartialEq, Deserialize)] +#[derive(Clone, Debug, Eq, PartialEq, Deserialize)] pub struct SubgraphDeployment { - // This neeeds a custom deserialize function because it's returned from the - // network subgraph as a hex string but `DeploymentId` assumes an IPFS hash - // in it's `Deserialize` implementation - #[serde(deserialize_with = "deserialize_deployment_id")] pub id: DeploymentId, #[serde(rename = "deniedAt")] pub denied_at: Option, @@ -106,157 +88,3 @@ impl<'d> Deserialize<'d> for Allocation { }) } } - -pub fn derive_key_pair( - indexer_mnemonic: &str, - epoch: u64, - deployment: &DeploymentId, - index: u64, -) -> Result> { - let mut derivation_path = format!("m/{}/", epoch); - derivation_path.push_str( - &deployment - .to_string() - .as_bytes() - .iter() - .map(|char| char.to_string()) - .collect::>() - .join("/"), - ); - derivation_path.push_str(format!("/{}", index).as_str()); - - Ok(MnemonicBuilder::::default() - .derivation_path(&derivation_path) - .expect("Valid derivation path") - .phrase(indexer_mnemonic) - .build()?) -} - -pub fn allocation_signer(indexer_mnemonic: &str, allocation: &Allocation) -> Result { - // Guess the allocation index by enumerating all indexes in the - // range [0, 100] and checking for a match - for i in 0..100 { - // The allocation was either created at the epoch it intended to or one - // epoch later. So try both both. - for created_at_epoch in [allocation.created_at_epoch, allocation.created_at_epoch - 1] { - let allocation_wallet = derive_key_pair( - indexer_mnemonic, - created_at_epoch, - &allocation.subgraph_deployment.id, - i, - )?; - if allocation_wallet.address().as_fixed_bytes() == allocation.id { - return Ok(allocation_wallet.signer().clone()); - } - } - } - Err(anyhow::anyhow!( - "Could not find allocation signer for allocation {}", - allocation.id - )) -} - -#[cfg(test)] -mod test { - use lazy_static::lazy_static; - use std::str::FromStr; - use toolshed::thegraph::DeploymentId; - - use super::*; - - const INDEXER_OPERATOR_MNEMONIC: &str = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"; - - lazy_static! { - static ref DEPLOYMENT_ID: DeploymentId = DeploymentId( - "0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a" - .parse() - .unwrap(), - ); - } - - #[test] - fn test_derive_key_pair() { - assert_eq!( - derive_key_pair(INDEXER_OPERATOR_MNEMONIC, 953, &DEPLOYMENT_ID, 0) - .unwrap() - .address() - .as_fixed_bytes(), - Address::from_str("0xfa44c72b753a66591f241c7dc04e8178c30e13af").unwrap() - ); - - assert_eq!( - derive_key_pair(INDEXER_OPERATOR_MNEMONIC, 940, &DEPLOYMENT_ID, 2) - .unwrap() - .address() - .as_fixed_bytes(), - Address::from_str("0xa171cd12c3dde7eb8fe7717a0bcd06f3ffa65658").unwrap() - ); - } - - #[test] - fn test_allocation_signer() { - // Note that we use `derive_key_pair` to derive the private key - - let allocation = Allocation { - id: Address::from_str("0xa171cd12c3dde7eb8fe7717a0bcd06f3ffa65658").unwrap(), - status: AllocationStatus::Null, - subgraph_deployment: SubgraphDeployment { - id: *DEPLOYMENT_ID, - denied_at: None, - staked_tokens: U256::zero(), - signalled_tokens: U256::zero(), - query_fees_amount: U256::zero(), - }, - indexer: Address::ZERO, - allocated_tokens: U256::zero(), - created_at_epoch: 940, - created_at_block_hash: "".to_string(), - closed_at_epoch: None, - closed_at_epoch_start_block_hash: None, - previous_epoch_start_block_hash: None, - poi: None, - query_fee_rebates: None, - query_fees_collected: None, - }; - assert_eq!( - allocation_signer(INDEXER_OPERATOR_MNEMONIC, &allocation).unwrap(), - *derive_key_pair( - INDEXER_OPERATOR_MNEMONIC, - 940, - &allocation.subgraph_deployment.id, - 2 - ) - .unwrap() - .signer() - ); - } - - #[test] - fn test_allocation_signer_error() { - // Note that because allocation will try 200 derivations paths, this is a slow test - - let allocation = Allocation { - // Purposefully wrong address - id: Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(), - status: AllocationStatus::Null, - subgraph_deployment: SubgraphDeployment { - id: *DEPLOYMENT_ID, - denied_at: None, - staked_tokens: U256::zero(), - signalled_tokens: U256::zero(), - query_fees_amount: U256::zero(), - }, - indexer: Address::ZERO, - allocated_tokens: U256::zero(), - created_at_epoch: 940, - created_at_block_hash: "".to_string(), - closed_at_epoch: None, - closed_at_epoch_start_block_hash: None, - previous_epoch_start_block_hash: None, - poi: None, - query_fee_rebates: None, - query_fees_collected: None, - }; - assert!(allocation_signer(INDEXER_OPERATOR_MNEMONIC, &allocation).is_err()); - } -} diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index 5c3426b97..903f4bdf8 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -1,401 +1,196 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, time::Duration}; use alloy_primitives::Address; -use anyhow::Result; -use log::{info, warn}; +use anyhow::anyhow; +use eventuals::{timer, Eventual, EventualExt}; +use log::warn; use serde::Deserialize; use serde_json::json; -use tokio::sync::watch::{Receiver, Sender}; -use tokio::sync::RwLock; +use tokio::time::sleep; -use crate::prelude::{Allocation, NetworkSubgraph}; +use crate::prelude::NetworkSubgraph; -#[derive(Debug)] -struct AllocationMonitorInner { - network_subgraph: NetworkSubgraph, - indexer_address: Address, - interval_ms: u64, - graph_network_id: u64, - eligible_allocations: Arc>>, - watch_sender: Sender<()>, - watch_receiver: Receiver<()>, -} - -#[cfg_attr(any(test, feature = "mock"), faux::create)] -#[derive(Debug, Clone)] -pub struct AllocationMonitor { - _monitor_handle: Arc>, - inner: Arc, -} - -#[cfg_attr(any(test, feature = "mock"), faux::methods)] -impl AllocationMonitor { - pub async fn new( - network_subgraph: NetworkSubgraph, - indexer_address: Address, - graph_network_id: u64, - interval_ms: u64, - ) -> Result { - // These are used to ping subscribers when the allocations are updated - let (watch_sender, watch_receiver) = tokio::sync::watch::channel(()); - - let inner = Arc::new(AllocationMonitorInner { - network_subgraph, - indexer_address, - interval_ms, - graph_network_id, - eligible_allocations: Arc::new(RwLock::new(HashMap::new())), - watch_sender, - watch_receiver, - }); - - let inner_clone = inner.clone(); - - let monitor = AllocationMonitor { - _monitor_handle: Arc::new(tokio::spawn(async move { - AllocationMonitor::monitor_loop(&inner_clone).await.unwrap(); - })), - inner, - }; +use super::Allocation; - Ok(monitor) +async fn current_epoch( + network_subgraph: &'static NetworkSubgraph, + graph_network_id: u64, +) -> Result { + // Types for deserializing the network subgraph response + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetworkResponse { + graph_network: Option, } - - async fn current_epoch( - network_subgraph: &NetworkSubgraph, - graph_network_id: u64, - ) -> Result { - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct GraphNetwork { - current_epoch: u64, - } - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct QueryResult { - graph_network: GraphNetwork, - } - - let res = network_subgraph - .query::(&json!({ - "query": r#" - query epoch($id: ID!) { - graphNetwork(id: $id) { - currentEpoch - } - } - "#, - "variables": { "id": graph_network_id }, - })) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to parse current epoch response from network subgraph: {}", - e - ) - })?; - - res.data - .map(|data| data.graph_network.current_epoch) - .ok_or(anyhow::anyhow!( - "Failed to get current epoch from network subgraph" - )) + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetwork { + current_epoch: u64, } - async fn current_eligible_allocations( - network_subgraph: &NetworkSubgraph, - indexer_address: &Address, - closed_at_epoch_threshold: u64, - ) -> Result> { - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct Indexer { - active_allocations: Vec, - recently_closed_allocations: Vec, - } - - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct QueryResult { - indexer: Option, - } - - let result = network_subgraph - .query::(&json!({ - "query": r#" - query allocations($indexer: ID!, $closedAtEpochThreshold: Int!) { - indexer(id: $indexer) { - activeAllocations: totalAllocations( - where: { status: Active } - orderDirection: desc - first: 1000 - ) { - id - indexer { - id - } - allocatedTokens - createdAtBlockHash - createdAtEpoch - closedAtEpoch - subgraphDeployment { - id - deniedAt - stakedTokens - signalledTokens - queryFeesAmount - } - } - recentlyClosedAllocations: totalAllocations( - where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold } - orderDirection: desc - first: 1000 - ) { - id - indexer { - id - } - allocatedTokens - createdAtBlockHash - createdAtEpoch - closedAtEpoch - subgraphDeployment { - id - deniedAt - stakedTokens - signalledTokens - queryFeesAmount - } - } - } - } - "#, - "variables": { - "indexer": indexer_address, - "closedAtEpochThreshold": closed_at_epoch_threshold - } - })) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to fetch current allocations for indexer {} from network subgraph: {}", - indexer_address, e - ) - })?; - - let indexer = result.data.and_then(|d| d.indexer).ok_or_else(|| { - anyhow::anyhow!("No data / indexer {} not found on chain", indexer_address) - })?; + // Query the current epoch + let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#; + let response = network_subgraph + .query::(&json!({ + "query": query, + "variables": { + "id": graph_network_id + } + })) + .await?; - Ok(HashMap::from_iter( - indexer - .active_allocations + if let Some(errors) = response.errors { + warn!( + "Errors encountered identifying current epoch for network {}: {}", + graph_network_id, + errors .into_iter() - .chain(indexer.recently_closed_allocations.into_iter()) - .map(|allocation| (allocation.id, allocation)), - )) + .map(|e| e.message) + .collect::>() + .join(", ") + ); } - async fn update_allocations(inner: &Arc) -> Result<(), anyhow::Error> { - let current_epoch = - Self::current_epoch(&inner.network_subgraph, inner.graph_network_id).await?; - *(inner.eligible_allocations.write().await) = Self::current_eligible_allocations( - &inner.network_subgraph, - &inner.indexer_address, - current_epoch - 1, - ) - .await?; - Ok(()) + response + .data + .and_then(|data| data.graph_network) + .ok_or_else(|| anyhow!("Network {} not found", graph_network_id)) + .map(|network| network.current_epoch) +} + +/// An always up-to-date list of an indexer's active and recently closed allocations. +pub fn indexer_allocations( + network_subgraph: &'static NetworkSubgraph, + indexer_address: Address, + graph_network_id: u64, + interval: Duration, +) -> Eventual> { + // Types for deserializing the network subgraph response + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct IndexerAllocationsResponse { + indexer: Option, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Indexer { + active_allocations: Vec, + recently_closed_allocations: Vec, } - async fn monitor_loop(inner: &Arc) -> Result<()> { - loop { - match Self::update_allocations(inner).await { - Ok(_) => { - if inner.watch_sender.send(()).is_err() { - warn!( - "Failed to notify subscribers that the allocations have been updated" - ); + let query = r#" + query allocations($indexer: ID!, $closedAtEpochThreshold: Int!) { + indexer(id: $indexer) { + activeAllocations: totalAllocations( + where: { status: Active } + orderDirection: desc + first: 1000 + ) { + id + indexer { + id + } + allocatedTokens + createdAtBlockHash + createdAtEpoch + closedAtEpoch + subgraphDeployment { + id + deniedAt + stakedTokens + signalledTokens + queryFeesAmount } } - Err(e) => { - warn!( - "Failed to query indexer allocations, keeping existing: {:?}. Error: {}", - inner - .eligible_allocations - .read() - .await - .keys() - .collect::>(), - e - ); + recentlyClosedAllocations: totalAllocations( + where: { status: Closed, closedAtEpoch_gte: $closedAtEpochThreshold } + orderDirection: desc + first: 1000 + ) { + id + indexer { + id + } + allocatedTokens + createdAtBlockHash + createdAtEpoch + closedAtEpoch + subgraphDeployment { + id + deniedAt + stakedTokens + signalledTokens + queryFeesAmount + } } } - - info!( - "Eligible allocations: {}", - inner - .eligible_allocations - .read() - .await - .values() - .map(|e| { - format!( - "{{allocation: {:?}, deployment: {}, closedAtEpoch: {:?})}}", - e.id, e.subgraph_deployment.id, e.closed_at_epoch - ) - }) - .collect::>() - .join(", ") - ); - - tokio::time::sleep(tokio::time::Duration::from_millis(inner.interval_ms)).await; } - } - - pub async fn get_eligible_allocations( - &self, - ) -> tokio::sync::RwLockReadGuard<'_, HashMap> { - self.inner.eligible_allocations.read().await - } - - pub async fn is_allocation_eligible(&self, allocation_id: &Address) -> bool { - self.inner - .eligible_allocations - .read() - .await - .contains_key(allocation_id) - } - - pub fn subscribe(&self) -> Receiver<()> { - self.inner.watch_receiver.clone() - } -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use test_log::test; - use wiremock::matchers::{method, path}; - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::prelude::NetworkSubgraph; - use crate::test_vectors; - - use super::*; + "#; - #[test(tokio::test)] - async fn test_current_epoch() { - let mock_server = MockServer::start().await; + // let indexer_for_error_handler = indexer_address.clone(); - let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint( - &mock_server.uri(), - test_vectors::NETWORK_SUBGRAPH_ID, - ); - let network_subgraph = NetworkSubgraph::new( - Some(&mock_server.uri()), - Some(test_vectors::NETWORK_SUBGRAPH_ID), - network_subgraph_endpoint.as_ref(), - ); - - let mock = Mock::given(method("POST")) - .and(path( - "/subgraphs/id/".to_string() + test_vectors::NETWORK_SUBGRAPH_ID, - )) - .respond_with(ResponseTemplate::new(200).set_body_raw( - r#" - { - "data": { - "graphNetwork": { - "currentEpoch": 896419 - } - } - } - "#, - "application/json", - )); + // Refresh indexer allocations every now and then + timer(interval).map_with_retry( + move |_| async move { + let current_epoch = current_epoch(network_subgraph, graph_network_id) + .await + .map_err(|e| format!("Failed to fetch current epoch: {}", e))?; - mock_server.register(mock).await; + // Allocations can be closed one epoch into the past + let closed_at_epoch_threshold = current_epoch - 1; - let epoch = AllocationMonitor::current_epoch(&network_subgraph, 1) - .await - .unwrap(); - - assert_eq!(epoch, 896419); - } - - #[test(tokio::test)] - async fn test_current_eligible_allocations() { - let indexer_address = Address::from_str(test_vectors::INDEXER_ADDRESS).unwrap(); - - let mock_server = MockServer::start().await; - - let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint( - &mock_server.uri(), - test_vectors::NETWORK_SUBGRAPH_ID, - ); - let network_subgraph = NetworkSubgraph::new( - Some(&mock_server.uri()), - Some(test_vectors::NETWORK_SUBGRAPH_ID), - network_subgraph_endpoint.as_ref(), - ); + // Query active and recently closed allocations for the indexer, + // using the network subgraph + let response = network_subgraph + .query::(&json!({ + "query": query, + "variables": { + "indexer": indexer_address, + "closedAtEpochThreshold": closed_at_epoch_threshold, + }})) + .await + .map_err(|e| e.to_string())?; + + // If there are any GraphQL errors returned, we'll log them for debugging + if let Some(errors) = response.errors { + warn!( + "Errors encountered fetching active or recently closed allocations for indexer {}: {}", + indexer_address, + errors.into_iter().map(|e| e.message).collect::>().join(", ") + ); + } - let mock = Mock::given(method("POST")) - .and(path( - "/subgraphs/id/".to_string() + test_vectors::NETWORK_SUBGRAPH_ID, + // Verify that the indexer could be found at all + let indexer = response + .data + .and_then(|data| data.indexer) + .ok_or_else(|| format!("Indexer {} could not be found on the network", indexer_address))?; + + // Pull active and recently closed allocations out of the indexer + let Indexer { + active_allocations, + recently_closed_allocations + } = indexer; + + Ok(HashMap::from_iter( + active_allocations.into_iter().map(|a| (a.id, a)).chain( + recently_closed_allocations.into_iter().map(|a| (a.id, a))) )) - .respond_with( - ResponseTemplate::new(200) - .set_body_raw(test_vectors::ALLOCATIONS_QUERY_RESPONSE, "application/json"), + }, + + // Need to use string errors here because eventuals `map_with_retry` retries + // errors that can be cloned + move |err: String| { + warn!( + "Failed to fetch active or recently closed allocations for indexer {}: {}", + indexer_address, err ); - mock_server.register(mock).await; - - let allocations = AllocationMonitor::current_eligible_allocations( - &network_subgraph, - &indexer_address, - 940, - ) - .await - .unwrap(); - - assert_eq!(allocations, test_vectors::expected_eligible_allocations()) - } - - /// Run with RUST_LOG=info to see the logs from the allocation monitor - #[test(tokio::test)] - #[ignore] - async fn test_local() { - let graph_node_url = - std::env::var("GRAPH_NODE_ENDPOINT").expect("GRAPH_NODE_ENDPOINT not set"); - let network_subgraph_id = - std::env::var("NETWORK_SUBGRAPH_ID").expect("NETWORK_SUBGRAPH_ID not set"); - let indexer_address = std::env::var("INDEXER_ADDRESS").expect("INDEXER_ADDRESS not set"); - - let network_subgraph_endpoint = - NetworkSubgraph::local_deployment_endpoint(&graph_node_url, &network_subgraph_id); - let network_subgraph = NetworkSubgraph::new( - Some(&graph_node_url), - Some(&network_subgraph_id), - network_subgraph_endpoint.as_ref(), - ); - - // graph_network_id=1 and interval_ms=1000 - let _allocation_monitor = AllocationMonitor::new( - network_subgraph, - Address::from_str(&indexer_address).unwrap(), - 1, - 1000, - ) - .await - .unwrap(); - - // sleep for a bit to allow the monitor to fetch the allocations a few times - tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; - } + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)) + }, + ) } diff --git a/common/src/attestations/mod.rs b/common/src/attestations/mod.rs index 0a5f0e748..3ab13f4dd 100644 --- a/common/src/attestations/mod.rs +++ b/common/src/attestations/mod.rs @@ -1,176 +1,5 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use anyhow::Result; -use ethers::signers::coins_bip39::English; -use ethers::signers::MnemonicBuilder; -use ethers::signers::Signer; -use ethers::signers::Wallet; -use ethers_core::k256::ecdsa::SigningKey; -use toolshed::thegraph::DeploymentId; - -use crate::prelude::Allocation; - pub mod signer; pub mod signers; - -pub fn derive_key_pair( - indexer_mnemonic: &str, - epoch: u64, - deployment: &DeploymentId, - index: u64, -) -> Result> { - let mut derivation_path = format!("m/{}/", epoch); - derivation_path.push_str( - &deployment - .to_string() - .as_bytes() - .iter() - .map(|char| char.to_string()) - .collect::>() - .join("/"), - ); - derivation_path.push_str(format!("/{}", index).as_str()); - - Ok(MnemonicBuilder::::default() - .derivation_path(&derivation_path) - .expect("Valid derivation path") - .phrase(indexer_mnemonic) - .build()?) -} - -pub fn attestation_signer_for_allocation( - indexer_mnemonic: &str, - allocation: &Allocation, -) -> Result { - // Guess the allocation index by enumerating all indexes in the - // range [0, 100] and checking for a match - for i in 0..100 { - // The allocation was either created at the epoch it intended to or one - // epoch later. So try both both. - for created_at_epoch in [allocation.created_at_epoch, allocation.created_at_epoch - 1] { - let allocation_wallet = derive_key_pair( - indexer_mnemonic, - created_at_epoch, - &allocation.subgraph_deployment.id, - i, - )?; - if allocation_wallet.address().as_fixed_bytes() == allocation.id { - return Ok(allocation_wallet.signer().clone()); - } - } - } - Err(anyhow::anyhow!( - "Could not find allocation signer for allocation {}", - allocation.id - )) -} - -#[cfg(test)] -mod tests { - use alloy_primitives::Address; - use ethers_core::types::U256; - use lazy_static::lazy_static; - use std::str::FromStr; - use test_log::test; - - use crate::prelude::{Allocation, AllocationStatus, SubgraphDeployment}; - - use super::*; - - const INDEXER_OPERATOR_MNEMONIC: &str = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"; - - lazy_static! { - static ref DEPLOYMENT_ID: DeploymentId = DeploymentId( - "0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a" - .parse() - .unwrap(), - ); - } - - #[test] - fn test_derive_key_pair() { - assert_eq!( - derive_key_pair(INDEXER_OPERATOR_MNEMONIC, 953, &DEPLOYMENT_ID, 0) - .unwrap() - .address() - .to_fixed_bytes(), - Address::from_str("0xfa44c72b753a66591f241c7dc04e8178c30e13af").unwrap() - ); - - assert_eq!( - derive_key_pair(INDEXER_OPERATOR_MNEMONIC, 940, &DEPLOYMENT_ID, 2) - .unwrap() - .address() - .as_fixed_bytes(), - Address::from_str("0xa171cd12c3dde7eb8fe7717a0bcd06f3ffa65658").unwrap() - ); - } - - #[test] - fn test_allocation_signer() { - // Note that we use `derive_key_pair` to derive the private key - - let allocation = Allocation { - id: Address::from_str("0xa171cd12c3dde7eb8fe7717a0bcd06f3ffa65658").unwrap(), - status: AllocationStatus::Null, - subgraph_deployment: SubgraphDeployment { - id: *DEPLOYMENT_ID, - denied_at: None, - staked_tokens: U256::zero(), - signalled_tokens: U256::zero(), - query_fees_amount: U256::zero(), - }, - indexer: Address::ZERO, - allocated_tokens: U256::zero(), - created_at_epoch: 940, - created_at_block_hash: "".to_string(), - closed_at_epoch: None, - closed_at_epoch_start_block_hash: None, - previous_epoch_start_block_hash: None, - poi: None, - query_fee_rebates: None, - query_fees_collected: None, - }; - assert_eq!( - attestation_signer_for_allocation(INDEXER_OPERATOR_MNEMONIC, &allocation).unwrap(), - *derive_key_pair( - INDEXER_OPERATOR_MNEMONIC, - 940, - &allocation.subgraph_deployment.id, - 2 - ) - .unwrap() - .signer() - ); - } - - #[test] - fn test_allocation_signer_error() { - // Note that because allocation will try 200 derivations paths, this is a slow test - - let allocation = Allocation { - // Purposefully wrong address - id: Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(), - status: AllocationStatus::Null, - subgraph_deployment: SubgraphDeployment { - id: *DEPLOYMENT_ID, - denied_at: None, - staked_tokens: U256::zero(), - signalled_tokens: U256::zero(), - query_fees_amount: U256::zero(), - }, - indexer: Address::ZERO, - allocated_tokens: U256::zero(), - created_at_epoch: 940, - created_at_block_hash: "".to_string(), - closed_at_epoch: None, - closed_at_epoch_start_block_hash: None, - previous_epoch_start_block_hash: None, - poi: None, - query_fee_rebates: None, - query_fees_collected: None, - }; - assert!(attestation_signer_for_allocation(INDEXER_OPERATOR_MNEMONIC, &allocation).is_err()); - } -} diff --git a/common/src/attestations/signer.rs b/common/src/attestations/signer.rs index 393da94e8..7cffb3fe1 100644 --- a/common/src/attestations/signer.rs +++ b/common/src/attestations/signer.rs @@ -5,7 +5,10 @@ use alloy_primitives::Address; use eip_712_derive::{ sign_typed, Bytes32, DomainSeparator, Eip712Domain, MemberVisitor, StructType, }; -use ethers::utils::hex; +use ethers::{ + signers::{coins_bip39::English, MnemonicBuilder, Signer, Wallet}, + utils::hex, +}; use ethers_core::k256::ecdsa::SigningKey; use ethers_core::types::U256; use keccak_hash::keccak; @@ -13,40 +16,99 @@ use secp256k1::SecretKey; use std::convert::TryInto; use toolshed::thegraph::DeploymentId; +use crate::prelude::Allocation; + +pub fn derive_key_pair( + indexer_mnemonic: &str, + epoch: u64, + deployment: &DeploymentId, + index: u64, +) -> Result, anyhow::Error> { + let mut derivation_path = format!("m/{}/", epoch); + derivation_path.push_str( + &deployment + .to_string() + .as_bytes() + .iter() + .map(|char| char.to_string()) + .collect::>() + .join("/"), + ); + derivation_path.push_str(format!("/{}", index).as_str()); + + Ok(MnemonicBuilder::::default() + .derivation_path(&derivation_path) + .expect("Valid derivation path") + .phrase(indexer_mnemonic) + .build()?) +} + +pub fn attestation_signer_for_allocation( + indexer_mnemonic: &str, + allocation: &Allocation, +) -> Result { + // Guess the allocation index by enumerating all indexes in the + // range [0, 100] and checking for a match + for i in 0..100 { + // The allocation was either created at the epoch it intended to or one + // epoch later. So try both both. + for created_at_epoch in [allocation.created_at_epoch, allocation.created_at_epoch - 1] { + let allocation_wallet = derive_key_pair( + indexer_mnemonic, + created_at_epoch, + &allocation.subgraph_deployment.id, + i, + )?; + if allocation_wallet.address().as_fixed_bytes() == allocation.id { + return Ok(allocation_wallet.signer().clone()); + } + } + } + Err(anyhow::anyhow!( + "Could not find allocation signer for allocation {}", + allocation.id + )) +} + /// An attestation signer tied to a specific allocation via its signer key -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct AttestationSigner { - subgraph_deployment_id: DeploymentId, + deployment: Bytes32, domain_separator: DomainSeparator, signer: SecretKey, } impl AttestationSigner { pub fn new( - chain_id: eip_712_derive::U256, + indexer_mnemonic: &str, + allocation: &Allocation, + chain_id: U256, dispute_manager: Address, - signer: SecretKey, - deployment_id: DeploymentId, - ) -> Self { + ) -> Result { + // Recreate a wallet that has the same address as the allocation + let wallet = wallet_for_allocation(indexer_mnemonic, allocation)?; + + // Convert chain ID into EIP-712 representation + let mut chain_id_bytes = [0u8; 32]; + chain_id.to_big_endian(&mut chain_id_bytes); + let chain_id = eip_712_derive::U256(chain_id_bytes); + let bytes = hex::decode("a070ffb1cd7409649bf77822cce74495468e06dbfaef09556838bf188679b9c2") .unwrap(); let salt: [u8; 32] = bytes.try_into().unwrap(); - let domain = Eip712Domain { - name: "Graph Protocol".to_owned(), - version: "0".to_owned(), - chain_id, - verifying_contract: eip_712_derive::Address(dispute_manager.into()), - salt, - }; - let domain_separator = DomainSeparator::new(&domain); - - Self { - domain_separator, - signer, - subgraph_deployment_id: deployment_id, - } + Ok(Self { + domain_separator: DomainSeparator::new(&Eip712Domain { + name: "Graph Protocol".to_owned(), + version: "0".to_owned(), + chain_id, + verifying_contract: eip_712_derive::Address(dispute_manager.into()), + salt, + }), + signer: SecretKey::from_slice(&wallet.signer().to_bytes())?, + deployment: allocation.subgraph_deployment.id.0.into(), + }) } pub fn create_attestation(&self, request: &str, response: &str) -> Attestation { @@ -56,7 +118,7 @@ impl AttestationSigner { let receipt = Receipt { request_cid, response_cid, - subgraph_deployment_id: *self.subgraph_deployment_id.0, + subgraph_deployment_id: self.deployment, }; // Unwrap: This can only fail if the SecretKey is invalid. @@ -70,7 +132,7 @@ impl AttestationSigner { v, r, s, - subgraph_deployment_id: *self.subgraph_deployment_id.0, + subgraph_deployment_id: self.deployment, request_cid, response_cid, } @@ -102,21 +164,155 @@ pub struct Attestation { pub s: Bytes32, } -/// Helper for creating an AttestationSigner -pub fn create_attestation_signer( - chain_id: U256, - dispute_manager_address: Address, - signer: SigningKey, - deployment_id: DeploymentId, -) -> anyhow::Result { - // Tedious conversions to the "indexer_native" types - let mut chain_id_bytes = [0u8; 32]; - chain_id.to_big_endian(&mut chain_id_bytes); - let signer = AttestationSigner::new( - eip_712_derive::U256(chain_id_bytes), - dispute_manager_address, - secp256k1::SecretKey::from_slice(&signer.to_bytes())?, - deployment_id, - ); - Ok(signer) +fn wallet_for_allocation( + indexer_mnemonic: &str, + allocation: &Allocation, +) -> Result, anyhow::Error> { + // Guess the allocation index by enumerating all indexes in the + // range [0, 100] and checking for a match + for i in 0..100 { + // The allocation was either created at the epoch it intended to or one + // epoch later. So try both both. + for created_at_epoch in [allocation.created_at_epoch, allocation.created_at_epoch - 1] { + // The allocation ID is the address of a unique key pair, we just + // need to find the right one by enumerating them all + let wallet = derive_key_pair( + indexer_mnemonic, + created_at_epoch, + &allocation.subgraph_deployment.id, + i, + )?; + + // See if we have a match, i.e. a wallet whose address is identical to the allocation ID + if wallet.address().as_fixed_bytes() == allocation.id { + return Ok(wallet); + } + } + } + Err(anyhow::anyhow!( + "Could not generate wallet matching allocation {}", + allocation.id + )) +} + +#[cfg(test)] +mod tests { + use alloy_primitives::Address; + use ethers_core::types::U256; + use std::str::FromStr; + use test_log::test; + + use crate::prelude::{Allocation, AllocationStatus, SubgraphDeployment}; + + use super::*; + + const INDEXER_OPERATOR_MNEMONIC: &str = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"; + + #[test] + fn test_derive_key_pair() { + assert_eq!( + derive_key_pair( + INDEXER_OPERATOR_MNEMONIC, + 953, + &DeploymentId::from_str( + "0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a" + ) + .unwrap(), + 0 + ) + .unwrap() + .address() + .as_fixed_bytes(), + Address::from_str("0xfa44c72b753a66591f241c7dc04e8178c30e13af").unwrap() + ); + + assert_eq!( + derive_key_pair( + INDEXER_OPERATOR_MNEMONIC, + 940, + &DeploymentId::from_str( + "0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a" + ) + .unwrap(), + 2 + ) + .unwrap() + .address() + .as_fixed_bytes(), + Address::from_str("0xa171cd12c3dde7eb8fe7717a0bcd06f3ffa65658").unwrap() + ); + } + + #[test] + fn test_allocation_signer() { + // Note that we use `derive_key_pair` to derive the private key + + let allocation = Allocation { + id: Address::from_str("0xa171cd12c3dde7eb8fe7717a0bcd06f3ffa65658").unwrap(), + status: AllocationStatus::Null, + subgraph_deployment: SubgraphDeployment { + id: DeploymentId::from_str( + "0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a", + ) + .unwrap(), + denied_at: None, + staked_tokens: U256::zero(), + signalled_tokens: U256::zero(), + query_fees_amount: U256::zero(), + }, + indexer: Address::ZERO, + allocated_tokens: U256::zero(), + created_at_epoch: 940, + created_at_block_hash: "".to_string(), + closed_at_epoch: None, + closed_at_epoch_start_block_hash: None, + previous_epoch_start_block_hash: None, + poi: None, + query_fee_rebates: None, + query_fees_collected: None, + }; + assert_eq!( + attestation_signer_for_allocation(INDEXER_OPERATOR_MNEMONIC, &allocation).unwrap(), + *derive_key_pair( + INDEXER_OPERATOR_MNEMONIC, + 940, + &allocation.subgraph_deployment.id, + 2 + ) + .unwrap() + .signer() + ); + } + + #[test] + fn test_allocation_signer_error() { + // Note that because allocation will try 200 derivations paths, this is a slow test + + let allocation = Allocation { + // Purposefully wrong address + id: Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(), + status: AllocationStatus::Null, + subgraph_deployment: SubgraphDeployment { + id: DeploymentId::from_str( + "0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a", + ) + .unwrap(), + denied_at: None, + staked_tokens: U256::zero(), + signalled_tokens: U256::zero(), + query_fees_amount: U256::zero(), + }, + indexer: Address::ZERO, + allocated_tokens: U256::zero(), + created_at_epoch: 940, + created_at_block_hash: "".to_string(), + closed_at_epoch: None, + closed_at_epoch_start_block_hash: None, + previous_epoch_start_block_hash: None, + poi: None, + query_fee_rebates: None, + query_fees_collected: None, + }; + assert!(attestation_signer_for_allocation(INDEXER_OPERATOR_MNEMONIC, &allocation).is_err()); + } } diff --git a/common/src/attestations/signers.rs b/common/src/attestations/signers.rs index 62c1a336d..b985d7cba 100644 --- a/common/src/attestations/signers.rs +++ b/common/src/attestations/signers.rs @@ -3,161 +3,53 @@ use alloy_primitives::Address; use ethers_core::types::U256; -use log::{error, info, warn}; -use std::collections::HashMap; +use eventuals::{Eventual, EventualExt}; +use log::warn; +use lru::LruCache; use std::sync::Arc; -use tokio::sync::RwLock; +use std::{collections::HashMap, num::NonZeroUsize}; +use tokio::sync::Mutex; -use crate::prelude::{AllocationMonitor, AttestationSigner}; +use crate::prelude::{Allocation, AttestationSigner}; -use super::{attestation_signer_for_allocation, signer::create_attestation_signer}; - -#[derive(Debug, Clone)] -pub struct AttestationSigners { - inner: Arc, - _update_loop_handle: Arc>, -} - -#[derive(Debug)] -pub struct AttestationSignersInner { - attestation_signers: Arc>>, - allocation_monitor: AllocationMonitor, +/// An always up-to-date list of attestation signers, one for each of the indexer's allocations. +pub fn attestation_signers( + indexer_allocations: Eventual>, indexer_mnemonic: String, chain_id: U256, dispute_manager: Address, -} - -impl AttestationSigners { - pub fn new( - allocation_monitor: AllocationMonitor, - indexer_mnemonic: String, - chain_id: U256, - dispute_manager: Address, - ) -> Self { - let inner = Arc::new(AttestationSignersInner { - attestation_signers: Arc::new(RwLock::new(HashMap::new())), - allocation_monitor, - indexer_mnemonic, - chain_id, - dispute_manager, - }); - - let _update_loop_handle = { - let inner = inner.clone(); - tokio::spawn(Self::update_loop(inner.clone())) - }; - - Self { - inner, - _update_loop_handle: Arc::new(_update_loop_handle), - } - } - - pub async fn update_attestation_signers(inner: Arc) { - let mut attestation_signers_write = inner.attestation_signers.write().await; - for allocation in inner - .allocation_monitor - .get_eligible_allocations() - .await - .values() - { - if let std::collections::hash_map::Entry::Vacant(e) = - attestation_signers_write.entry(allocation.id) - { - match attestation_signer_for_allocation(&inner.indexer_mnemonic, allocation) - .and_then(|signer| { - create_attestation_signer( - inner.chain_id, - inner.dispute_manager, - signer, - allocation.subgraph_deployment.id, - ) - }) { - Ok(signer) => { - e.insert(signer); - info!( - "Found attestation signer for {{allocation: {}, deployment: {}}}", - allocation.id, allocation.subgraph_deployment.id, - ); - } - Err(e) => { - warn!( - "Failed to find the attestation signer for {{allocation: {}, deployment: {}, createdAtEpoch: {}, err: {}}}", - allocation.id, allocation.subgraph_deployment.id, allocation.created_at_epoch, e - ) - } - } - } - } - } - - async fn update_loop(inner: Arc) { - let mut watch_receiver = inner.allocation_monitor.subscribe(); +) -> Eventual> { + // Keep a cache of the most recent 1000 signers around so we don't need to recreate them + // every time there is a small change in the allocations + let cache: &'static Mutex> = Box::leak(Box::new(Mutex::new(LruCache::new( + NonZeroUsize::new(1000).unwrap(), + )))); + + let indexer_mnemonic = Arc::new(indexer_mnemonic); + + // Whenever the indexer's active or recently closed allocations change, make sure + // we have attestation signers for all of them + indexer_allocations.map(move |allocations| { + let indexer_mnemonic = indexer_mnemonic.clone(); + + async move { + let mut cache = cache.lock().await; + + for (id, allocation) in allocations.iter() { + let result = cache.try_get_or_insert(*id, || { + AttestationSigner::new(&indexer_mnemonic, allocation, chain_id, dispute_manager) + }); - loop { - match watch_receiver.changed().await { - Ok(_) => { - Self::update_attestation_signers(inner.clone()).await; - } - Err(e) => { - error!( - "Error receiving allocation monitor subscription update: {}", - e + if let Err(e) = result { + warn!( + "Failed to establish signer for allocation {}, deployment {}, createdAtEpoch {}: {}", + allocation.id, allocation.subgraph_deployment.id, + allocation.created_at_epoch, e ); } } - } - } - - pub async fn read( - &self, - ) -> tokio::sync::RwLockReadGuard<'_, HashMap> { - self.inner.attestation_signers.read().await - } -} - -#[cfg(test)] -mod tests { - use alloy_primitives::Address; - use ethers_core::types::U256; - use std::str::FromStr; - use std::sync::Arc; - - use crate::prelude::AllocationMonitor; - use crate::test_vectors; - - use super::*; - - #[tokio::test] - async fn test_update_attestation_signers() { - unsafe { - let mut mock_allocation_monitor = AllocationMonitor::faux(); - - faux::when!(mock_allocation_monitor.get_eligible_allocations).then_unchecked(|_| { - // Spawn a thread to be able to call `blocking_read` on the RwLock, which actually spins its own async - // runtime. - // This is needed because `faux` will also use a runtime to mock the async function. - let t = std::thread::spawn(|| { - let eligible_allocations = Box::leak(Box::new(Arc::new(RwLock::new( - test_vectors::expected_eligible_allocations(), - )))); - eligible_allocations.blocking_read() - }); - t.join().unwrap() - }); - - let inner = Arc::new(AttestationSignersInner { - attestation_signers: Arc::new(RwLock::new(HashMap::new())), - allocation_monitor: mock_allocation_monitor, - indexer_mnemonic: test_vectors::INDEXER_OPERATOR_MNEMONIC.to_string(), - chain_id: U256::from(1), - dispute_manager: Address::from_str(test_vectors::DISPUTE_MANAGER_ADDRESS).unwrap(), - }); - - AttestationSigners::update_attestation_signers(inner.clone()).await; - // Check that the attestation signers were found for the allocations - assert_eq!(inner.attestation_signers.read().await.len(), 4); + HashMap::from_iter(cache.iter().map(|(k, v)| (*k, v.clone()))) } - } + }) } diff --git a/common/src/lib.rs b/common/src/lib.rs index b20ddcaad..1d84f956d 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -11,12 +11,9 @@ pub mod signature_verification; mod test_vectors; pub mod prelude { - pub use super::allocations::monitor::AllocationMonitor; - pub use super::allocations::{Allocation, AllocationStatus, SubgraphDeployment}; - pub use super::attestations::{ - attestation_signer_for_allocation, - signer::{create_attestation_signer, AttestationSigner}, - signers::AttestationSigners, + pub use super::allocations::{ + monitor::indexer_allocations, Allocation, AllocationStatus, SubgraphDeployment, }; + pub use super::attestations::{signer::AttestationSigner, signers::attestation_signers}; pub use super::network_subgraph::NetworkSubgraph; } diff --git a/service/Cargo.toml b/service/Cargo.toml index 200135f02..d558e8870 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -12,6 +12,7 @@ indexer-common = { path = "../common" } confy = "0.5.1" ethers-core = "2.0.10" ethers = "2.0.10" +eventuals = "0.6.7" dotenvy = "0.15" log = "0.4.17" anyhow = "1.0.57" diff --git a/service/src/main.rs b/service/src/main.rs index cdbcb980e..f90fce949 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -6,11 +6,11 @@ use alloy_sol_types::eip712_domain; use axum::Server; use dotenvy::dotenv; use ethereum_types::U256; -use std::{net::SocketAddr, str::FromStr}; +use std::{net::SocketAddr, str::FromStr, time::Duration}; use toolshed::thegraph::DeploymentId; use tracing::info; -use indexer_common::prelude::{AllocationMonitor, AttestationSigners, NetworkSubgraph}; +use indexer_common::prelude::{attestation_signers, indexer_allocations, NetworkSubgraph}; use util::{package_version, shutdown_signal}; @@ -62,26 +62,24 @@ async fn main() -> Result<(), std::io::Error> { // Make an instance of network subgraph at either // graph_node_query_endpoint/subgraphs/id/network_subgraph_deployment // or network_subgraph_endpoint - let network_subgraph = NetworkSubgraph::new( + let network_subgraph = Box::leak(Box::new(NetworkSubgraph::new( Some(&config.indexer_infrastructure.graph_node_query_endpoint), config .network_subgraph .network_subgraph_deployment .as_deref(), &config.network_subgraph.network_subgraph_endpoint, - ); + ))); - let allocation_monitor = AllocationMonitor::new( - network_subgraph.clone(), + let indexer_allocations = indexer_allocations( + network_subgraph, config.ethereum.indexer_address, 1, - config.network_subgraph.allocation_syncing_interval, - ) - .await - .expect("Initialize allocation monitor"); + Duration::from_secs(config.network_subgraph.allocation_syncing_interval), + ); - let attestation_signers = AttestationSigners::new( - allocation_monitor.clone(), + let attestation_signers = attestation_signers( + indexer_allocations.clone(), config.ethereum.mnemonic.clone(), // TODO: Chain ID should be a config U256::from(1), @@ -110,7 +108,7 @@ async fn main() -> Result<(), std::io::Error> { let tap_manager = tap_manager::TapManager::new( indexer_management_db.clone(), - allocation_monitor.clone(), + indexer_allocations, escrow_monitor, // TODO: arguments for eip712_domain should be a config eip712_domain! { diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs index 17a1120f2..383b79d99 100644 --- a/service/src/query_processor.rs +++ b/service/src/query_processor.rs @@ -1,13 +1,17 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; + +use alloy_primitives::Address; use ethers_core::types::{Signature, U256}; +use eventuals::Eventual; use log::error; use serde::{Deserialize, Serialize}; use tap_core::tap_manager::SignedReceipt; use toolshed::thegraph::DeploymentId; -use indexer_common::prelude::{AttestationSigner, AttestationSigners}; +use indexer_common::prelude::AttestationSigner; use crate::graph_node::GraphNodeInstance; use crate::tap_manager::TapManager; @@ -60,17 +64,17 @@ pub enum QueryError { Other(anyhow::Error), } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct QueryProcessor { graph_node: GraphNodeInstance, - attestation_signers: AttestationSigners, + attestation_signers: Eventual>, tap_manager: TapManager, } impl QueryProcessor { pub fn new( graph_node: GraphNodeInstance, - attestation_signers: AttestationSigners, + attestation_signers: Eventual>, tap_manager: TapManager, ) -> QueryProcessor { QueryProcessor { @@ -115,7 +119,10 @@ impl QueryProcessor { .verify_and_store_receipt(parsed_receipt) .await?; - let signers = self.attestation_signers.read().await; + let signers = self + .attestation_signers + .value_immediate() + .ok_or_else(|| QueryError::Other(anyhow::anyhow!("System is not ready yet")))?; let signer = signers.get(&allocation_id).ok_or_else(|| { QueryError::Other(anyhow::anyhow!( "No signer found for allocation id {}", @@ -163,8 +170,7 @@ mod tests { use hex_literal::hex; use indexer_common::prelude::{ - attestation_signer_for_allocation, create_attestation_signer, Allocation, AllocationStatus, - SubgraphDeployment, + Allocation, AllocationStatus, AttestationSigner, SubgraphDeployment, }; use lazy_static::lazy_static; @@ -207,13 +213,11 @@ mod tests { query_fees_collected: None, }; - let allocation_key = - attestation_signer_for_allocation(INDEXER_OPERATOR_MNEMONIC, allocation).unwrap(); - let attestation_signer = create_attestation_signer( + let attestation_signer = AttestationSigner::new( + INDEXER_OPERATOR_MNEMONIC, + allocation, U256::from(1), Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(), - allocation_key, - *DEPLOYMENT_ID, ) .unwrap(); diff --git a/service/src/server/mod.rs b/service/src/server/mod.rs index 075c993de..8d5cf9df3 100644 --- a/service/src/server/mod.rs +++ b/service/src/server/mod.rs @@ -29,7 +29,7 @@ use crate::{ pub mod routes; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ServerOptions { pub port: Option, pub release: PackageVersion, @@ -38,7 +38,7 @@ pub struct ServerOptions { pub graph_node_status_endpoint: String, pub indexer_management_db: PgPool, pub operator_public_key: String, - pub network_subgraph: NetworkSubgraph, + pub network_subgraph: &'static NetworkSubgraph, pub network_subgraph_auth_token: Option, pub serve_network_subgraph: bool, } @@ -53,7 +53,7 @@ impl ServerOptions { graph_node_status_endpoint: String, indexer_management_db: PgPool, operator_public_key: String, - network_subgraph: NetworkSubgraph, + network_subgraph: &'static NetworkSubgraph, network_subgraph_auth_token: Option, serve_network_subgraph: bool, ) -> Self { diff --git a/service/src/tap_manager.rs b/service/src/tap_manager.rs index 4a4795b97..4aac9e843 100644 --- a/service/src/tap_manager.rs +++ b/service/src/tap_manager.rs @@ -1,19 +1,20 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; +use eventuals::Eventual; +use indexer_common::prelude::Allocation; use log::error; use sqlx::{types::BigDecimal, PgPool}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use tap_core::tap_manager::SignedReceipt; -use indexer_common::prelude::AllocationMonitor; - use crate::{escrow_monitor, query_processor::QueryError}; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct TapManager { - allocation_monitor: AllocationMonitor, + indexer_allocations: Eventual>, escrow_monitor: escrow_monitor::EscrowMonitor, pgpool: PgPool, domain_separator: Arc, @@ -22,12 +23,12 @@ pub struct TapManager { impl TapManager { pub fn new( pgpool: PgPool, - allocation_monitor: AllocationMonitor, + indexer_allocations: Eventual>, escrow_monitor: escrow_monitor::EscrowMonitor, domain_separator: Eip712Domain, ) -> Self { Self { - allocation_monitor, + indexer_allocations, escrow_monitor, pgpool, domain_separator: Arc::new(domain_separator), @@ -43,9 +44,11 @@ impl TapManager { pub async fn verify_and_store_receipt(&self, receipt: SignedReceipt) -> Result<(), QueryError> { let allocation_id = &receipt.message.allocation_id; if !self - .allocation_monitor - .is_allocation_eligible(allocation_id) + .indexer_allocations + .value() .await + .map(|allocations| allocations.contains_key(allocation_id)) + .unwrap_or(false) { return Err(QueryError::Other(anyhow::Error::msg(format!( "Receipt's allocation ID ({}) is not eligible for this indexer", @@ -100,13 +103,14 @@ mod test { use alloy_primitives::Address; use alloy_sol_types::{eip712_domain, Eip712Domain}; + use ethereum_types::{H256, U256}; use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; + use indexer_common::prelude::{AllocationStatus, SubgraphDeployment}; use sqlx::postgres::PgListener; use tap_core::tap_manager::SignedReceipt; use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt}; - - use indexer_common::prelude::AllocationMonitor; + use toolshed::thegraph::DeploymentId; use super::*; @@ -170,9 +174,31 @@ mod test { let signed_receipt = create_signed_receipt(allocation_id, u64::MAX, u64::MAX, u128::MAX).await; - // Mock allocation monitor - let mut mock_allocation_monitor = AllocationMonitor::faux(); - faux::when!(mock_allocation_monitor.is_allocation_eligible).then_return(true); + // Mock allocation + let allocation = Allocation { + id: allocation_id, + subgraph_deployment: SubgraphDeployment { + id: DeploymentId::from_str("QmAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(), + denied_at: None, + query_fees_amount: U256::zero(), + signalled_tokens: U256::zero(), + staked_tokens: U256::zero(), + }, + status: AllocationStatus::Active, + allocated_tokens: U256::zero(), + closed_at_epoch: None, + closed_at_epoch_start_block_hash: None, + poi: None, + previous_epoch_start_block_hash: None, + created_at_block_hash: H256::zero().to_string(), + created_at_epoch: 0, + indexer: Address::ZERO, + query_fee_rebates: None, + query_fees_collected: None, + }; + let indexer_allocations = Eventual::from_value(HashMap::from_iter( + vec![(allocation_id, allocation)].into_iter(), + )); // Mock escrow monitor let mut mock_escrow_monitor = escrow_monitor::EscrowMonitor::faux(); @@ -180,7 +206,7 @@ mod test { let tap_manager = TapManager::new( pgpool.clone(), - mock_allocation_monitor, + indexer_allocations, mock_escrow_monitor, domain, );