Skip to content

Commit

Permalink
Adding parallelism in region server to protect datanode from query ov…
Browse files Browse the repository at this point in the history
…erload.
  • Loading branch information
lyang24 committed Sep 11, 2024
1 parent 208afe4 commit ed16bbb
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ pub struct DatanodeOptions {
pub meta_client: Option<MetaClientOptions>,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub parallelism: Option<usize>,
pub semaphore_timeout_ms: u64,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
Expand Down Expand Up @@ -339,6 +341,8 @@ impl Default for DatanodeOptions {
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
parallelism: None,
semaphore_timeout_ms: 25,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl DatanodeBuilder {
&self,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let opts = &self.opts;
let opts: &DatanodeOptions = &self.opts;

let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
Expand All @@ -334,6 +334,8 @@ impl DatanodeBuilder {
common_runtime::global_runtime(),
event_listener,
table_provider_factory,
opts.parallelism,
opts.semaphore_timeout_ms,
);

let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
Expand Down
18 changes: 18 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
use tokio::time::error::Elapsed;

/// Business error of datanode.
#[derive(Snafu)]
Expand Down Expand Up @@ -347,6 +348,22 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire semaphore, semaphore closed"))]
SemaphoreClosed {
#[snafu(source)]
error: tokio::sync::AcquireError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire semaphore under timeouts"))]
SemaphoreTimeout {
#[snafu(source)]
error: Elapsed,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -411,6 +428,7 @@ impl ErrorExt for Error {

FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
SemaphoreClosed { .. } | SemaphoreTimeout { .. } => StatusCode::RegionBusy,
}
}

Expand Down
62 changes: 61 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use api::region::RegionResponse;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
Expand Down Expand Up @@ -58,13 +59,16 @@ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};

use crate::error::{
self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
RegionNotReadySnafu, Result, SemaphoreClosedSnafu, SemaphoreTimeoutSnafu,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;

Expand All @@ -90,6 +94,8 @@ impl RegionServer {
runtime,
event_listener,
Arc::new(DummyTableProviderFactory),
None,
0,
)
}

Expand All @@ -98,13 +104,16 @@ impl RegionServer {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
parallelism: Option<usize>,
timeout: u64,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
query_engine,
runtime,
event_listener,
table_provider_factory,
RegionServerParallelism::from_opts(parallelism, timeout),
)),
}
}
Expand Down Expand Up @@ -140,6 +149,12 @@ impl RegionServer {
region_id: RegionId,
request: RegionRequest,
) -> Result<RegionResponse> {
if let Some(p) = &self.inner.parallelism {
let permit = p.acquire().await?;
let res = self.inner.handle_request(region_id, request).await;
drop(permit);
return res;
}
self.inner.handle_request(region_id, request).await
}

Expand Down Expand Up @@ -450,6 +465,30 @@ struct RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
// The number of queries allowed to be executed at the same time.
// Act as last line of defense on datanode to prevent query overloading.
parallelism: Option<RegionServerParallelism>,
}

struct RegionServerParallelism {
semaphore: Semaphore,
timeout: Duration,
}

impl<'a> RegionServerParallelism {
pub fn from_opts(parallism: Option<usize>, timeout: u64) -> Option<Self> {
parallism.map(|n| RegionServerParallelism {
semaphore: Semaphore::new(n),
timeout: Duration::from_millis(timeout),
})
}

pub async fn acquire(&'a self) -> Result<SemaphorePermit<'a>> {
Ok(timeout(self.timeout, self.semaphore.acquire())

Check failure on line 487 in src/datanode/src/region_server.rs

View workflow job for this annotation

GitHub Actions / Clippy

question mark operator is useless here
.await
.context(SemaphoreTimeoutSnafu)?
.context(SemaphoreClosedSnafu)?)
}
}

enum CurrentEngine {
Expand Down Expand Up @@ -478,6 +517,7 @@ impl RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
parallelism: Option<RegionServerParallelism>,
) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
Expand All @@ -486,6 +526,7 @@ impl RegionServerInner {
runtime,
event_listener,
table_provider_factory,
parallelism,
}
}

Expand Down Expand Up @@ -1284,4 +1325,23 @@ mod tests {
assert(result);
}
}

#[tokio::test]
async fn test_region_server_parallism() {
let p = RegionServerParallelism::from_opts(Some(2), 1).unwrap();
let first_query = p.acquire().await;
assert!(first_query.is_ok());
let second_query = p.acquire().await;
assert!(second_query.is_ok());
let third_query = p.acquire().await;
assert!(third_query.is_err());
let err = third_query.unwrap_err();
assert_eq!(
err.output_msg(),
"Failed to acquire semaphore under timeouts: deadline has elapsed".to_string()
);
drop(first_query);
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
}

0 comments on commit ed16bbb

Please sign in to comment.