diff --git a/graph/src/env/graphql.rs b/graph/src/env/graphql.rs index 05e4c6991fb..a153efa23b0 100644 --- a/graph/src/env/graphql.rs +++ b/graph/src/env/graphql.rs @@ -94,6 +94,9 @@ pub struct EnvVarsGraphQl { /// header `X-GraphTraceQuery` set to this value will include a trace of /// the SQL queries that were run. pub query_trace_token: String, + /// Set by the env var `GRAPH_PARALLEL_BLOCK_CONSTRAINTS` + /// Whether to run top-level queries with different block constraints in parallel + pub parallel_block_constraints: bool, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -138,6 +141,7 @@ impl From for EnvVarsGraphQl { disable_bool_filters: x.disable_bool_filters.0, disable_child_sorting: x.disable_child_sorting.0, query_trace_token: x.query_trace_token, + parallel_block_constraints: x.parallel_block_constraints.0, } } } @@ -187,4 +191,6 @@ pub struct InnerGraphQl { pub disable_child_sorting: EnvVarBoolean, #[envconfig(from = "GRAPH_GRAPHQL_TRACE_TOKEN", default = "")] query_trace_token: String, + #[envconfig(from = "GRAPH_PARALLEL_BLOCK_CONSTRAINTS", default = "false")] + pub parallel_block_constraints: EnvVarBoolean, } diff --git a/graphql/src/runner.rs b/graphql/src/runner.rs index 5bc1666ced1..7cf3ae9417b 100644 --- a/graphql/src/runner.rs +++ b/graphql/src/runner.rs @@ -5,7 +5,7 @@ use crate::metrics::GraphQLMetrics; use crate::prelude::{QueryExecutionOptions, StoreResolver, SubscriptionExecutionOptions}; use crate::query::execute_query; use crate::subscription::execute_prepared_subscription; -use graph::prelude::MetricsRegistry; +use graph::prelude::{futures03::future, MetricsRegistry}; use graph::{ components::store::SubscriptionManager, prelude::{ @@ -145,10 +145,11 @@ where let by_block_constraint = query.block_constraint()?; let mut max_block = 0; let mut result: QueryResults = QueryResults::empty(); + let mut query_res_futures: Vec<_> = vec![]; // Note: This will always iterate at least once. + let query_start = Instant::now(); for (bc, (selection_set, error_policy)) in by_block_constraint { - let query_start = Instant::now(); let resolver = StoreResolver::at_block( &self.logger, store.cheap_clone(), @@ -162,7 +163,7 @@ where ) .await?; max_block = max_block.max(resolver.block_number()); - let query_res = execute_query( + query_res_futures.push(execute_query( query.clone(), Some(selection_set), resolver.block_ptr.as_ref().map(Into::into).clone(), @@ -173,8 +174,20 @@ where max_skip: max_skip.unwrap_or(ENV_VARS.graphql.max_skip), trace, }, - ) - .await; + )); + } + + let results: Vec<_> = if ENV_VARS.graphql.parallel_block_constraints { + future::join_all(query_res_futures).await + } else { + let mut results = vec![]; + for query_res_future in query_res_futures { + results.push(query_res_future.await); + } + results + }; + + for query_res in results { query_res.trace.finish(query_start.elapsed()); result.append(query_res); }