From 138a5e6776af53e2cdf5ea1911ae72b5e7c7c266 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 11 Dec 2023 18:08:20 +0100 Subject: [PATCH] feat: add serving /network and /escrow subgraphs back in --- common/src/indexer_service/http/config.rs | 4 + .../indexer_service/http/indexer_service.rs | 78 ++++++++++++++++--- common/src/indexer_service/http/mod.rs | 1 + .../indexer_service/http/static_subgraph.rs | 52 +++++++++++++ service/src/config.rs | 33 +------- 5 files changed, 126 insertions(+), 42 deletions(-) create mode 100644 common/src/indexer_service/http/static_subgraph.rs diff --git a/common/src/indexer_service/http/config.rs b/common/src/indexer_service/http/config.rs index a15a2535..57cdbfe4 100644 --- a/common/src/indexer_service/http/config.rs +++ b/common/src/indexer_service/http/config.rs @@ -14,6 +14,10 @@ pub struct DatabaseConfig { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SubgraphConfig { + #[serde(default)] + pub serve_subgraph: bool, + pub serve_auth_token: Option, + pub deployment: Option, pub query_url: String, pub syncing_interval: u64, diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 4db0960d..37b00183 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -16,7 +16,7 @@ use axum::{ error_handling::HandleErrorLayer, response::{IntoResponse, Response}, routing::{get, post}, - BoxError, Json, Router, Server, + BoxError, Extension, Json, Router, Server, }; use build_info::BuildInfo; use eventuals::Eventual; @@ -31,7 +31,9 @@ use tower_governor::{errors::display_error, governor::GovernorConfigBuilder, Gov use tracing::info; use crate::{ - indexer_service::http::metrics::IndexerServiceMetrics, + indexer_service::http::{ + metrics::IndexerServiceMetrics, static_subgraph::static_subgraph_request_handler, + }, prelude::{ attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, AttestationSigner, DeploymentDetails, SubgraphClient, @@ -83,7 +85,7 @@ where InvalidRequest(anyhow::Error), #[error("Error while processing the request: {0}")] ProcessingError(E), - #[error("No receipt or free query auth token provided")] + #[error("No valid receipt or free query auth token provided")] Unauthorized, #[error("Invalid free query auth token")] InvalidFreeQueryAuthToken, @@ -93,6 +95,8 @@ where FailedToProvideAttestation, #[error("Failed to provide response")] FailedToProvideResponse, + #[error("Failed to query subgraph: {0}")] + FailedToQueryStaticSubgraph(anyhow::Error), } impl From<&IndexerServiceError> for StatusCode @@ -119,6 +123,8 @@ where InvalidRequest(_) => StatusCode::BAD_REQUEST, InvalidFreeQueryAuthToken => StatusCode::BAD_REQUEST, ProcessingError(_) => StatusCode::BAD_REQUEST, + + FailedToQueryStaticSubgraph(_) => StatusCode::INTERNAL_SERVER_ERROR, } } } @@ -192,7 +198,7 @@ impl IndexerService { .build() .expect("Failed to init HTTP client"); - let network_subgraph = Box::leak(Box::new(SubgraphClient::new( + let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new( http_client.clone(), options .config @@ -234,7 +240,7 @@ impl IndexerService { dispute_manager, ); - let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new( http_client, options .config @@ -294,7 +300,7 @@ impl IndexerService { // Rate limits by allowing bursts of 10 requests and requiring 100ms of // time between consecutive requests after that, effectively rate // limiting to 10 req/s. - let rate_limiter = GovernorLayer { + let misc_rate_limiter = GovernorLayer { config: Box::leak(Box::new( GovernorConfigBuilder::default() .per_millisecond(100) @@ -304,7 +310,7 @@ impl IndexerService { )), }; - let misc_routes = Router::new() + let mut misc_routes = Router::new() .route("/", get("Service is up and running")) .route("/version", get(Json(options.release))) .layer( @@ -312,9 +318,61 @@ impl IndexerService { .layer(HandleErrorLayer::new(|e: BoxError| async move { display_error(e) })) - .layer(rate_limiter), - ) - .with_state(state.clone()); + .layer(misc_rate_limiter), + ); + + // Rate limits by allowing bursts of 50 requests and requiring 20ms of + // time between consecutive requests after that, effectively rate + // limiting to 50 req/s. + let static_subgraph_rate_limiter = GovernorLayer { + config: Box::leak(Box::new( + GovernorConfigBuilder::default() + .per_millisecond(20) + .burst_size(50) + .finish() + .expect("Failed to set up rate limiting"), + )), + }; + + if options.config.network_subgraph.serve_subgraph { + info!("Serving network subgraph at /network"); + + misc_routes = misc_routes.route( + "/network", + post(static_subgraph_request_handler::) + .route_layer(Extension(network_subgraph)) + .route_layer(Extension( + options.config.network_subgraph.serve_auth_token.clone(), + )) + .route_layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|e: BoxError| async move { + display_error(e) + })) + .layer(static_subgraph_rate_limiter.clone()), + ), + ); + } + + if options.config.escrow_subgraph.serve_subgraph { + info!("Serving escrow subgraph at /escrow"); + + misc_routes = misc_routes + .route("/escrow", post(static_subgraph_request_handler::)) + .route_layer(Extension(escrow_subgraph)) + .route_layer(Extension( + options.config.escrow_subgraph.serve_auth_token.clone(), + )) + .route_layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|e: BoxError| async move { + display_error(e) + })) + .layer(static_subgraph_rate_limiter), + ); + } + + misc_routes = misc_routes.with_state(state.clone()); let data_routes = Router::new() .route( diff --git a/common/src/indexer_service/http/mod.rs b/common/src/indexer_service/http/mod.rs index 3d22b4a4..0f6b7b25 100644 --- a/common/src/indexer_service/http/mod.rs +++ b/common/src/indexer_service/http/mod.rs @@ -6,6 +6,7 @@ mod indexer_service; mod metrics; mod request_handler; mod scalar_receipt_header; +mod static_subgraph; pub use config::{ DatabaseConfig, GraphNetworkConfig, IndexerConfig, IndexerServiceConfig, ServerConfig, diff --git a/common/src/indexer_service/http/static_subgraph.rs b/common/src/indexer_service/http/static_subgraph.rs new file mode 100644 index 00000000..5842f7f1 --- /dev/null +++ b/common/src/indexer_service/http/static_subgraph.rs @@ -0,0 +1,52 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use axum::{body::Bytes, http::HeaderMap, response::IntoResponse, Extension}; +use tracing::warn; + +use crate::subgraph_client::SubgraphClient; + +use super::{indexer_service::IndexerServiceError, IndexerServiceImpl}; + +#[autometrics::autometrics] +pub async fn static_subgraph_request_handler( + Extension(subgraph_client): Extension<&'static SubgraphClient>, + Extension(required_auth_token): Extension>, + headers: HeaderMap, + body: Bytes, +) -> Result> +where + I: IndexerServiceImpl + Sync + Send + 'static, +{ + if let Some(required_auth_token) = required_auth_token { + let authorization = headers + .get("authorization") + .map(|value| value.to_str()) + .transpose() + .map_err(|_| IndexerServiceError::Unauthorized)? + .ok_or_else(|| IndexerServiceError::Unauthorized)? + .trim_start_matches("Bearer "); + + if authorization != required_auth_token { + return Err(IndexerServiceError::Unauthorized); + } + } + + let response = subgraph_client + .query_raw(body) + .await + .map_err(IndexerServiceError::FailedToQueryStaticSubgraph)?; + + Ok(( + response.status(), + response.headers().to_owned(), + response + .text() + .await + .map_err(|e| { + warn!("Failed to read response body: {}", e); + e + }) + .map_err(|e| IndexerServiceError::FailedToQueryStaticSubgraph(e.into()))?, + )) +} diff --git a/service/src/config.rs b/service/src/config.rs index 71688b6d..7af03aba 100644 --- a/service/src/config.rs +++ b/service/src/config.rs @@ -3,49 +3,18 @@ use std::path::PathBuf; -use alloy_primitives::Address; use figment::{ providers::{Format, Toml}, Figment, }; use indexer_common::indexer_service::http::IndexerServiceConfig; -use serde::{Deserialize, Serialize}; -use thegraph::types::DeploymentId; +use serde::Deserialize; #[derive(Clone, Debug, Deserialize)] pub struct Config { - // pub receipts: Receipts, - // pub indexer_infrastructure: IndexerInfrastructure, - // pub postgres: Postgres, - // pub network_subgraph: NetworkSubgraph, - // pub escrow_subgraph: EscrowSubgraph, pub common: IndexerServiceConfig, } -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -pub struct NetworkSubgraph { - // #[clap( - // long, - // value_name = "serve-network-subgraph", - // env = "SERVE_NETWORK_SUBGRAPH", - // default_value_t = false, - // help = "Whether to serve the network subgraph at /network" - // )] - pub serve_network_subgraph: bool, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -pub struct EscrowSubgraph { - // #[clap( - // long, - // value_name = "serve-escrow-subgraph", - // env = "SERVE_ESCROW_SUBGRAPH", - // default_value_t = false, - // help = "Whether to serve the escrow subgraph at /escrow" - // )] - // pub serve_escrow_subgraph: bool, -} - impl Config { pub fn load(filename: &PathBuf) -> Result { Figment::new().merge(Toml::file(filename)).extract()