From 0b101a3326a17adf2346cbc0fdd586155186648c Mon Sep 17 00:00:00 2001 From: lyang24 Date: Sun, 8 Sep 2024 22:08:00 -0700 Subject: [PATCH] Adding parallelism in region server to protect datanode from query overload. --- config/config.md | 2 + config/datanode.example.toml | 6 +++ src/datanode/src/config.rs | 7 ++++ src/datanode/src/datanode.rs | 4 +- src/datanode/src/error.rs | 20 ++++++++++ src/datanode/src/region_server.rs | 65 ++++++++++++++++++++++++++++++- 6 files changed, 102 insertions(+), 2 deletions(-) diff --git a/config/config.md b/config/config.md index 235b1f86fd61..68551fe1669e 100644 --- a/config/config.md +++ b/config/config.md @@ -335,6 +335,8 @@ | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | +| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. | +| `concurrent_query_limiter_timeout` | String | `25ms` | The timeout of acquiring concurrency permits. | | `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. | | `rpc_hostname` | String | Unset | Deprecated, use `grpc.hostname` instead. | | `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 4388c4420f8b..e9a042165185 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -19,6 +19,12 @@ enable_telemetry = true ## Parallelism of initializing regions. init_regions_parallelism = 16 +## The maximum current queries allowed to be executed. Zero means unlimited. +max_concurrent_queries = 0 + +## The timeout of acquiring concurrency permits. +concurrent_query_limiter_timeout = "25ms" + ## Deprecated, use `grpc.addr` instead. ## @toml2docs:none-default rpc_addr = "127.0.0.1:3001" diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 7cb18131ed3e..4f8e80cf82fe 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -14,6 +14,8 @@ //! Datanode configurations +use std::time::Duration; + use common_base::readable_size::ReadableSize; use common_base::secrets::{ExposeSecret, SecretString}; use common_config::Configurable; @@ -305,6 +307,9 @@ pub struct DatanodeOptions { pub meta_client: Option, pub wal: DatanodeWalConfig, pub storage: StorageConfig, + pub max_concurrent_queries: usize, + #[serde(with = "humantime_serde")] + pub concurrent_query_limiter_timeout: Duration, /// Options for different store engines. pub region_engine: Vec, pub logging: LoggingOptions, @@ -339,6 +344,8 @@ impl Default for DatanodeOptions { meta_client: None, wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), + max_concurrent_queries: 0, + concurrent_query_limiter_timeout: Duration::from_millis(25), region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), RegionEngineConfig::File(FileEngineConfig::default()), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 149aa44ebe34..c679f2573d48 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -314,7 +314,7 @@ impl DatanodeBuilder { &self, event_listener: RegionServerEventListenerRef, ) -> Result { - let opts = &self.opts; + let opts: &DatanodeOptions = &self.opts; let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. @@ -334,6 +334,8 @@ impl DatanodeBuilder { common_runtime::global_runtime(), event_listener, table_provider_factory, + opts.max_concurrent_queries, + opts.concurrent_query_limiter_timeout, ); let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 5717d0a40389..0b36245924b4 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::error::Error as TableError; +use tokio::time::error::Elapsed; /// Business error of datanode. #[derive(Snafu)] @@ -347,6 +348,22 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to acquire permit, source closed"))] + ConcurrentQueryLimiterClosed { + #[snafu(source)] + error: tokio::sync::AcquireError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to acquire permit under timeouts"))] + ConcurrentQueryLimiterTimeout { + #[snafu(source)] + error: Elapsed, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -411,6 +428,9 @@ impl ErrorExt for Error { FindLogicalRegions { source, .. } => source.status_code(), BuildMitoEngine { source, .. } => source.status_code(), + ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => { + StatusCode::RegionBusy + } } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 56068a38c3aa..67d2dd7d3339 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, RwLock}; +use std::time::Duration; use api::region::RegionResponse; use api::v1::region::{region_request, RegionResponse as RegionResponseV1}; @@ -58,10 +59,13 @@ use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, }; use store_api::storage::RegionId; +use tokio::sync::{Semaphore, SemaphorePermit}; +use tokio::time::timeout; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, + self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu, + ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, @@ -90,6 +94,8 @@ impl RegionServer { runtime, event_listener, Arc::new(DummyTableProviderFactory), + 0, + Duration::from_millis(0), ) } @@ -98,6 +104,8 @@ impl RegionServer { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + max_concurrent_queries: usize, + timeout: Duration, ) -> Self { Self { inner: Arc::new(RegionServerInner::new( @@ -105,6 +113,7 @@ impl RegionServer { runtime, event_listener, table_provider_factory, + RegionServerParallelism::from_opts(max_concurrent_queries, timeout), )), } } @@ -140,6 +149,12 @@ impl RegionServer { region_id: RegionId, request: RegionRequest, ) -> Result { + if let Some(p) = &self.inner.parallelism { + let permit = p.acquire().await?; + let res = self.inner.handle_request(region_id, request).await; + drop(permit); + return res; + } self.inner.handle_request(region_id, request).await } @@ -450,6 +465,33 @@ struct RegionServerInner { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + // The number of queries allowed to be executed at the same time. + // Act as last line of defense on datanode to prevent query overloading. + parallelism: Option, +} + +struct RegionServerParallelism { + semaphore: Semaphore, + timeout: Duration, +} + +impl<'a> RegionServerParallelism { + pub fn from_opts(max_concurrent_queries: usize, timeout: Duration) -> Option { + if max_concurrent_queries == 0 { + return None; + } + Some(RegionServerParallelism { + semaphore: Semaphore::new(max_concurrent_queries), + timeout, + }) + } + + pub async fn acquire(&'a self) -> Result> { + timeout(self.timeout, self.semaphore.acquire()) + .await + .context(ConcurrentQueryLimiterTimeoutSnafu)? + .context(ConcurrentQueryLimiterClosedSnafu) + } } enum CurrentEngine { @@ -478,6 +520,7 @@ impl RegionServerInner { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + parallelism: Option, ) -> Self { Self { engines: RwLock::new(HashMap::new()), @@ -486,6 +529,7 @@ impl RegionServerInner { runtime, event_listener, table_provider_factory, + parallelism, } } @@ -1284,4 +1328,23 @@ mod tests { assert(result); } } + + #[tokio::test] + async fn test_region_server_parallism() { + let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap(); + let first_query = p.acquire().await; + assert!(first_query.is_ok()); + let second_query = p.acquire().await; + assert!(second_query.is_ok()); + let third_query = p.acquire().await; + assert!(third_query.is_err()); + let err = third_query.unwrap_err(); + assert_eq!( + err.output_msg(), + "Failed to acquire permit under timeouts: deadline has elapsed".to_string() + ); + drop(first_query); + let forth_query = p.acquire().await; + assert!(forth_query.is_ok()); + } }