From df1255eeca95b3d5732da957c0ac339ec87775fc Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Wed, 20 Sep 2023 17:29:01 +0200 Subject: [PATCH] WIP: Clean up --- common/src/allocations/monitor.rs | 189 ++++++++++++++++-------------- 1 file changed, 100 insertions(+), 89 deletions(-) diff --git a/common/src/allocations/monitor.rs b/common/src/allocations/monitor.rs index 118e0518..83d398f5 100644 --- a/common/src/allocations/monitor.rs +++ b/common/src/allocations/monitor.rs @@ -4,16 +4,63 @@ use std::{collections::HashMap, time::Duration}; use alloy_primitives::Address; -use eventuals::{timer, Eventual, EventualExt, EventualWriter}; +use anyhow::anyhow; +use eventuals::{timer, Eventual, EventualExt}; use log::warn; use serde::Deserialize; use serde_json::json; -use tokio::sync::Mutex; +use tokio::time::sleep; use crate::prelude::NetworkSubgraph; use super::Allocation; +async fn current_epoch( + network_subgraph: &'static NetworkSubgraph, + graph_network_id: u64, +) -> Result { + // Types for deserializing the network subgraph response + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetworkResponse { + graph_network: Option, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct GraphNetwork { + current_epoch: u64, + } + + // Query the current epoch + let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#; + let response = network_subgraph + .query::(&json!({ + "query": query, + "variables": { + "id": graph_network_id + } + })) + .await?; + + if let Some(errors) = response.errors { + warn!( + "Errors encountered identifying current epoch for network {}: {}", + graph_network_id, + errors + .into_iter() + .map(|e| e.message) + .collect::>() + .join(", ") + ); + } + + response + .data + .and_then(|data| data.graph_network) + .ok_or_else(|| anyhow!("Network {} not found", graph_network_id)) + .map(|network| network.current_epoch) +} + pub fn indexer_allocations( network_subgraph: &'static NetworkSubgraph, indexer_address: Address, @@ -82,103 +129,67 @@ pub fn indexer_allocations( } "#; - let (writer, reader) = Eventual::new(); - let writer: &'static Mutex> = Box::leak(Box::new(Mutex::new(writer))); + let indexer_for_error_handler = indexer_address.clone(); + + // Refresh indexer allocations every now and then + timer(interval).map_with_retry( + move |_| async move { + let current_epoch = current_epoch(&network_subgraph, graph_network_id) + .await + .map_err(|e| format!("Failed to fetch current epoch: {}", e))?; - current_epoch(network_subgraph, graph_network_id, interval) - .pipe_async(move |current_epoch| async move { // Allocations can be closed one epoch into the past let closed_at_epoch_threshold = current_epoch - 1; - let result = network_subgraph + // Query active and recently closed allocations for the indexer, + // using the network subgraph + let response = network_subgraph .query::(&json!({ "query": query, "variables": { "indexer": indexer_address, "closedAtEpochThreshold": closed_at_epoch_threshold, }})) - .await; - - match result { - Ok(response) => { - let allocations = response.data.and_then(|data| data.indexer).map(|indexer| { - let Indexer { - active_allocations, - recently_closed_allocations, - } = indexer; - - let mut eligible_allocations = - HashMap::from_iter(active_allocations.into_iter().map(|a| (a.id, a))); - - eligible_allocations - .extend(recently_closed_allocations.into_iter().map(|a| (a.id, a))); - - eligible_allocations - }); - - match allocations { - Some(allocations) => writer.lock().await.write(allocations), - None => warn!( - "No active or recently closed allocations found for indexer {}", - indexer_address - ), - } - } - Err(err) => warn!("Failed to fetch active allocations: {}", err), - } - }) - .forever(); - - reader -} - -fn current_epoch( - network_subgraph: &'static NetworkSubgraph, - graph_network_id: u64, - interval: Duration, -) -> Eventual { - // Types for deserializing the network subgraph response - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct GraphNetworkResponse { - graph_network: Option, - } - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct GraphNetwork { - current_epoch: u64, - } - - let (writer, reader) = Eventual::new(); - let writer: &'static Mutex> = Box::leak(Box::new(Mutex::new(writer))); - - timer(interval) - .pipe_async(move |_| async move { - let query = r#"query epoch($id: ID!) { graphNetwork(id: $id) { currentEpoch } }"#; - let result = network_subgraph - .query::(&json!({ - "query": query, - "variables": { - "id": graph_network_id - } - })) - .await; - - match result { - Ok(response) => { - match response - .data - .and_then(|data| data.graph_network) - .map(|network| network.current_epoch) - { - Some(epoch) => writer.lock().await.write(epoch), - None => warn!("No epoch found for network with ID {}", graph_network_id), - } - } - Err(err) => warn!("Failed to fetch current epoch: {}", err), + .await + .map_err(|e| e.to_string())?; + + // If there are any GraphQL errors returned, we'll log them for debugging + if let Some(errors) = response.errors { + warn!( + "Errors encountered fetching active or recently closed allocations for indexer {}: {}", + indexer_address, + errors.into_iter().map(|e| e.message).collect::>().join(", ") + ); } - }) - .forever(); - reader + // Verify that the indexer could be found at all + let indexer = response + .data + .and_then(|data| data.indexer) + .ok_or_else(|| format!("Indexer {} could not be found on the network", indexer_address))?; + + // Pull active and recently closed allocations out of the indexer + let Indexer { + active_allocations, + recently_closed_allocations + } = indexer; + + Ok(HashMap::from_iter( + active_allocations.into_iter().map(|a| (a.id, a)).chain( + recently_closed_allocations.into_iter().map(|a| (a.id, a))) + )) + }, + + // Need to use string errors here because eventuals `map_with_retry` retries + // errors that can be cloned + move |err: String| { + warn!( + "Failed to fetch active or recently closed allocations for indexer {}: {}", + indexer_for_error_handler, err + ); + + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)) + }, + ) }