diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 7cb18131ed3e..a1e0045f4f72 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -305,6 +305,8 @@ pub struct DatanodeOptions { pub meta_client: Option, pub wal: DatanodeWalConfig, pub storage: StorageConfig, + pub parallelism: Option, + pub semaphore_timeout_ms: u64, /// Options for different store engines. pub region_engine: Vec, pub logging: LoggingOptions, @@ -339,6 +341,8 @@ impl Default for DatanodeOptions { meta_client: None, wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), + parallelism: None, + semaphore_timeout_ms: 5, region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), RegionEngineConfig::File(FileEngineConfig::default()), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 149aa44ebe34..2f73bf14a4e0 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -314,7 +314,7 @@ impl DatanodeBuilder { &self, event_listener: RegionServerEventListenerRef, ) -> Result { - let opts = &self.opts; + let opts: &DatanodeOptions = &self.opts; let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. @@ -334,6 +334,8 @@ impl DatanodeBuilder { common_runtime::global_runtime(), event_listener, table_provider_factory, + opts.parallelism, + opts.semaphore_timeout_ms, ); let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 5717d0a40389..b33c1d80aea8 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::error::Error as TableError; +use tokio::time::error::Elapsed; /// Business error of datanode. #[derive(Snafu)] @@ -347,6 +348,22 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to acuqire semaphore, semaphore closed"))] + SemaphoreClosed { + #[snafu(source)] + error: tokio::sync::AcquireError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to acquire semaphore under timeouts"))] + SemaphoreTimeout { + #[snafu(source)] + error: Elapsed, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -411,6 +428,7 @@ impl ErrorExt for Error { FindLogicalRegions { source, .. } => source.status_code(), BuildMitoEngine { source, .. } => source.status_code(), + SemaphoreClosed { .. } | SemaphoreTimeout { .. } => StatusCode::RegionBusy, } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 56068a38c3aa..354b49bc6e5a 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, RwLock}; +use std::time::Duration; use api::region::RegionResponse; use api::v1::region::{region_request, RegionResponse as RegionResponseV1}; @@ -58,13 +59,16 @@ use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, }; use store_api::storage::RegionId; +use tokio::sync::{Semaphore, SemaphorePermit}; +use tokio::time::timeout; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, - RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, + RegionNotReadySnafu, Result, SemaphoreClosedSnafu, SemaphoreTimeoutSnafu, + StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -90,6 +94,8 @@ impl RegionServer { runtime, event_listener, Arc::new(DummyTableProviderFactory), + None, + 0, ) } @@ -98,6 +104,8 @@ impl RegionServer { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + parallelism: Option, + timeout: u64, ) -> Self { Self { inner: Arc::new(RegionServerInner::new( @@ -105,6 +113,7 @@ impl RegionServer { runtime, event_listener, table_provider_factory, + RegionServerParallelism::from_opts(parallelism, timeout), )), } } @@ -140,6 +149,12 @@ impl RegionServer { region_id: RegionId, request: RegionRequest, ) -> Result { + if let Some(p) = &self.inner.parallelism { + let permit = p.acquire().await?; + let res = self.inner.handle_request(region_id, request).await; + drop(permit); + return res; + } self.inner.handle_request(region_id, request).await } @@ -450,6 +465,30 @@ struct RegionServerInner { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + // The number of queries allowed to be executed at the same time. + // Act as last line of defense on datanode to prevent query overloading. + parallelism: Option, +} + +struct RegionServerParallelism { + semaphore: Semaphore, + timeout: Duration, +} + +impl<'a> RegionServerParallelism { + pub fn from_opts(parallism: Option, timeout: u64) -> Option { + parallism.map(|n| RegionServerParallelism { + semaphore: Semaphore::new(n), + timeout: Duration::from_millis(timeout), + }) + } + + pub async fn acquire(&'a self) -> Result> { + Ok(timeout(self.timeout, self.semaphore.acquire()) + .await + .context(SemaphoreTimeoutSnafu)? + .context(SemaphoreClosedSnafu)?) + } } enum CurrentEngine { @@ -478,6 +517,7 @@ impl RegionServerInner { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + parallelism: Option, ) -> Self { Self { engines: RwLock::new(HashMap::new()), @@ -486,6 +526,7 @@ impl RegionServerInner { runtime, event_listener, table_provider_factory, + parallelism, } } @@ -1284,4 +1325,23 @@ mod tests { assert(result); } } + + #[tokio::test] + async fn test_region_server_parallism() { + let p = RegionServerParallelism::from_opts(Some(2), 1).unwrap(); + let first_query = p.acquire().await; + assert!(first_query.is_ok()); + let second_query = p.acquire().await; + assert!(second_query.is_ok()); + let thrid_query = p.acquire().await; + assert!(thrid_query.is_err()); + let err = thrid_query.unwrap_err(); + assert_eq!( + err.output_msg(), + "Failed to acquire semaphore under timeouts: deadline has elapsed".to_string() + ); + drop(first_query); + let forth_query = p.acquire().await; + assert!(forth_query.is_ok()); + } }