Skip to content

Commit

Permalink
WIP: rewrite subgraph service using new framework
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Nov 27, 2023
1 parent 6f7316a commit f9f4728
Show file tree
Hide file tree
Showing 33 changed files with 1,359 additions and 2,669 deletions.
1,315 changes: 632 additions & 683 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ sqlx = { version = "0.7.1", features = [
"time",
] }
tokio = { version = "1.32.0", features = ["full", "macros", "rt"] }
toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main", features = [
"graphql",
toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main" }
graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main", features = [
"http-reqwest",
] }
graphql = { git = "https://github.com/edgeandnode/toolshed", branch = "main" }
tap_core = "0.6.0"
axum = { version = "0.6.20", default_features = true, features = ["headers"] }
thiserror = "1.0.49"
Expand Down
75 changes: 26 additions & 49 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use alloy_primitives::Address;
use anyhow::anyhow;
use eventuals::{timer, Eventual, EventualExt};
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;
use tracing::warn;

use crate::prelude::SubgraphClient;
use crate::prelude::{Query, SubgraphClient};

use super::Allocation;

Expand All @@ -22,7 +21,7 @@ async fn current_epoch(
// Types for deserializing the network subgraph response
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetworkResponse {
struct GraphNetworkData {
graph_network: Option<GraphNetwork>,
}
#[derive(Deserialize)]
Expand All @@ -33,30 +32,15 @@ async fn current_epoch(

// Query the current epoch
let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#;
let response = network_subgraph
.query::<GraphNetworkResponse>(&json!({
"query": query,
"variables": {
"id": graph_network_id
}
}))
let result = network_subgraph
.query::<GraphNetworkData>(Query::new_with_variables(
query,
[("id", graph_network_id.into())],
))
.await?;

if let Some(errors) = response.errors {
warn!(
"Errors encountered identifying current epoch for network {}: {}",
graph_network_id,
errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

response
.data
.and_then(|data| data.graph_network)
result?
.graph_network
.ok_or_else(|| anyhow!("Network {} not found", graph_network_id))
.map(|network| network.current_epoch)
}
Expand Down Expand Up @@ -137,42 +121,35 @@ pub fn indexer_allocations(
// Query active and recently closed allocations for the indexer,
// using the network subgraph
let response = network_subgraph
.query::<IndexerAllocationsResponse>(&json!({
"query": query,
"variables": {
"indexer": format!("{indexer_address:?}"),
"closedAtEpochThreshold": closed_at_epoch_threshold,
}}))
.query::<IndexerAllocationsResponse>(Query::new_with_variables(
query,
[
("indexer", format!("{indexer_address:?}").into()),
("closedAtEpochThreshold", closed_at_epoch_threshold.into()),
],
))
.await
.map_err(|e| e.to_string())?;

// If there are any GraphQL errors returned, we'll log them for debugging
if let Some(errors) = response.errors {
warn!(
"Errors encountered fetching active or recently closed allocations for indexer {:?}: {}",
indexer_address,
errors.into_iter().map(|e| e.message).collect::<Vec<_>>().join(", ")
);
}

// Verify that the indexer could be found at all
let indexer = response
.data
.and_then(|data| data.indexer)
.ok_or_else(|| format!("Indexer {:?} could not be found on the network", indexer_address))?;
let indexer = response.map_err(|e| e.to_string()).and_then(|data| {
// Verify that the indexer could be found at all
data.indexer
.ok_or_else(|| format!("Indexer `{indexer_address}` not found on the network"))
})?;

// Pull active and recently closed allocations out of the indexer
let Indexer {
active_allocations,
recently_closed_allocations
recently_closed_allocations,
} = indexer;

Ok(HashMap::from_iter(
active_allocations.into_iter().map(|a| (a.id, a)).chain(
recently_closed_allocations.into_iter().map(|a| (a.id, a)))
active_allocations
.into_iter()
.map(|a| (a.id, a))
.chain(recently_closed_allocations.into_iter().map(|a| (a.id, a))),
))
},

// Need to use string errors here because eventuals `map_with_retry` retries
// errors that can be cloned
move |err: String| {
Expand Down
39 changes: 12 additions & 27 deletions common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use std::time::Duration;
use alloy_primitives::Address;
use eventuals::{timer, Eventual, EventualExt};
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;
use tracing::warn;

use crate::subgraph_client::SubgraphClient;
use crate::subgraph_client::{Query, SubgraphClient};

pub fn dispute_manager(
network_subgraph: &'static SubgraphClient,
Expand All @@ -32,40 +31,26 @@ pub fn dispute_manager(
timer(interval).map_with_retry(
move |_| async move {
let response = network_subgraph
.query::<DisputeManagerResponse>(&json!({
"query": r#"
.query::<DisputeManagerResponse>(Query::new_with_variables(
r#"
query network($id: ID!) {
graphNetwork(id: $id) {
disputeManager
}
}
"#,
"variables": {
"id": graph_network_id
}
}))
[("id", graph_network_id.into())],
))
.await
.map_err(|e| e.to_string())?;

if let Some(errors) = response.errors {
warn!(
"Errors encountered querying the dispute manager for network {}: {}",
graph_network_id,
errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

response
.data
.and_then(|data| data.graph_network)
.map(|network| network.dispute_manager)
.ok_or_else(|| {
format!("Network {} not found in network subgraph", graph_network_id)
})
response.map_err(|e| e.to_string()).and_then(|data| {
data.graph_network
.map(|network| network.dispute_manager)
.ok_or_else(|| {
format!("Network {} not found in network subgraph", graph_network_id)
})
})
},
move |err: String| {
warn!(
Expand Down
71 changes: 26 additions & 45 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use anyhow::Result;
use ethers_core::types::U256;
use eventuals::{timer, Eventual, EventualExt};
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;
use tracing::{error, warn};

use crate::prelude::SubgraphClient;
use crate::prelude::{Query, SubgraphClient};

pub fn escrow_accounts(
escrow_subgraph: &'static SubgraphClient,
Expand Down Expand Up @@ -44,8 +43,8 @@ pub fn escrow_accounts(
timer(interval).map_with_retry(
move |_| async move {
let response = escrow_subgraph
.query::<EscrowAccountsResponse>(&json!({
"query": r#"
.query::<EscrowAccountsResponse>(Query::new_with_variables(
r#"
query ($indexer: ID!) {
escrowAccounts(where: {receiver_: {id: $indexer}}) {
balance
Expand All @@ -56,51 +55,33 @@ pub fn escrow_accounts(
}
}
"#,
"variables": {
"indexer": indexer_address,
}
}
[("indexer", indexer_address.to_string().into())],
))
.await
.map_err(|e| e.to_string())?;

// If there are any GraphQL errors returned, we'll log them for debugging
if let Some(errors) = response.errors {
error!(
"Errors encountered fetching escrow accounts for indexer {:?}: {}",
indexer_address,
errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

let sender_accounts = response
.data
.map_or(vec![], |data| data.escrow_accounts)
.iter()
.map(|account| {
let balance = U256::checked_sub(
U256::from_dec_str(&account.balance)?,
U256::from_dec_str(&account.total_amount_thawing)?,
)
.unwrap_or_else(|| {
warn!(
"Balance minus total amount thawing underflowed for account {}. \
Setting balance to 0, no queries will be served for this sender.",
account.sender.id
);
U256::from(0)
});

Ok((account.sender.id, balance))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
.map_err(|e| format!("{}", e))?;

Ok(sender_accounts)
response.map_err(|e| e.to_string()).and_then(|data| {
data.escrow_accounts
.iter()
.map(|account| {
let balance = U256::checked_sub(
U256::from_dec_str(&account.balance)?,
U256::from_dec_str(&account.total_amount_thawing)?,
)
.unwrap_or_else(|| {
warn!(
"Balance minus total amount thawing underflowed for account {}. \
Setting balance to 0, no queries will be served for this sender.",
account.sender.id
);
U256::from(0)
});

Ok((account.sender.id, balance))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
.map_err(|e| format!("{}", e))
})
},
move |err: String| {
error!(
Expand Down
10 changes: 8 additions & 2 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,14 @@ impl IndexerService {
{
let metrics = IndexerServiceMetrics::new(options.metrics_prefix);

let http_client = reqwest::Client::builder()
.tcp_nodelay(true)
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to init HTTP client");

let network_subgraph = Box::leak(Box::new(SubgraphClient::new(
reqwest::Client::new(),
http_client.clone(),
options
.config
.graph_node
Expand Down Expand Up @@ -223,7 +229,7 @@ impl IndexerService {
);

let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
reqwest::Client::new(),
http_client,
options
.config
.graph_node
Expand Down
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ pub mod prelude {
};
pub use super::escrow_accounts::escrow_accounts;
pub use super::indexer_errors;
pub use super::subgraph_client::{DeploymentDetails, SubgraphClient};
pub use super::subgraph_client::{DeploymentDetails, Query, QueryVariables, SubgraphClient};
pub use super::tap_manager::TapManager;
}
Loading

0 comments on commit f9f4728

Please sign in to comment.