Skip to content

Commit

Permalink
Adding parallelism in region server to protect datanode from query ov…
Browse files Browse the repository at this point in the history
…erload.
  • Loading branch information
lyang24 committed Sep 12, 2024
1 parent 67fb3d0 commit 0a53595
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 3 deletions.
3 changes: 2 additions & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |

| `max_concurrent_queries` | Integer | 0 | Limit on total concurrent queries. Zero means unlimited. |
| `concurrent_query_limiter_timeout` | String | `25ms` | The timeout of acquiring concurrency permits. |

### Flownode

Expand Down
6 changes: 6 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,12 @@ url = ""
## HTTP headers of Prometheus remote-write carry.
headers = { }

## The maximum current queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0

## The timeout of acquiring concurrency permits.
concurrent_query_limiter_timeout = "25ms"

## The tracing options. Only effect when compiled with `tokio-console` feature.
[tracing]
## The tokio console address.
Expand Down
7 changes: 7 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Datanode configurations
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
Expand Down Expand Up @@ -305,6 +307,9 @@ pub struct DatanodeOptions {
pub meta_client: Option<MetaClientOptions>,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub max_concurrent_queries: usize,
#[serde(with = "humantime_serde")]
pub concurrent_query_limiter_timeout: Duration,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
Expand Down Expand Up @@ -339,6 +344,8 @@ impl Default for DatanodeOptions {
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
max_concurrent_queries: 0,
concurrent_query_limiter_timeout: Duration::from_millis(25),
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl DatanodeBuilder {
&self,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let opts = &self.opts;
let opts: &DatanodeOptions = &self.opts;

let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
Expand All @@ -334,6 +334,8 @@ impl DatanodeBuilder {
common_runtime::global_runtime(),
event_listener,
table_provider_factory,
opts.max_concurrent_queries,
opts.concurrent_query_limiter_timeout,
);

let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
Expand Down
20 changes: 20 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
use tokio::time::error::Elapsed;

/// Business error of datanode.
#[derive(Snafu)]
Expand Down Expand Up @@ -347,6 +348,22 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire permit, source closed"))]
ConcurrentQueryLimiterClosed {
#[snafu(source)]
error: tokio::sync::AcquireError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire permit under timeouts"))]
ConcurrentQueryLimiterTimeout {
#[snafu(source)]
error: Elapsed,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -411,6 +428,9 @@ impl ErrorExt for Error {

FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
}
}
}

Expand Down
65 changes: 64 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use api::region::RegionResponse;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
Expand Down Expand Up @@ -58,10 +59,13 @@ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};

use crate::error::{
self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
Expand Down Expand Up @@ -90,6 +94,8 @@ impl RegionServer {
runtime,
event_listener,
Arc::new(DummyTableProviderFactory),
0,
Duration::from_millis(0),
)
}

Expand All @@ -98,13 +104,16 @@ impl RegionServer {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize,
timeout: Duration,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
query_engine,
runtime,
event_listener,
table_provider_factory,
RegionServerParallelism::from_opts(max_concurrent_queries, timeout),
)),
}
}
Expand Down Expand Up @@ -140,6 +149,12 @@ impl RegionServer {
region_id: RegionId,
request: RegionRequest,
) -> Result<RegionResponse> {
if let Some(p) = &self.inner.parallelism {
let permit = p.acquire().await?;
let res = self.inner.handle_request(region_id, request).await;
drop(permit);
return res;
}
self.inner.handle_request(region_id, request).await
}

Expand Down Expand Up @@ -450,6 +465,33 @@ struct RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
// The number of queries allowed to be executed at the same time.
// Act as last line of defense on datanode to prevent query overloading.
parallelism: Option<RegionServerParallelism>,
}

struct RegionServerParallelism {
semaphore: Semaphore,
timeout: Duration,
}

impl<'a> RegionServerParallelism {
pub fn from_opts(max_concurrent_queries: usize, timeout: Duration) -> Option<Self> {
if max_concurrent_queries == 0 {
return None;
}
Some(RegionServerParallelism {
semaphore: Semaphore::new(max_concurrent_queries),
timeout,
})
}

pub async fn acquire(&'a self) -> Result<SemaphorePermit<'a>> {
timeout(self.timeout, self.semaphore.acquire())
.await
.context(ConcurrentQueryLimiterTimeoutSnafu)?
.context(ConcurrentQueryLimiterClosedSnafu)
}
}

enum CurrentEngine {
Expand Down Expand Up @@ -478,6 +520,7 @@ impl RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
parallelism: Option<RegionServerParallelism>,
) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
Expand All @@ -486,6 +529,7 @@ impl RegionServerInner {
runtime,
event_listener,
table_provider_factory,
parallelism,
}
}

Expand Down Expand Up @@ -1284,4 +1328,23 @@ mod tests {
assert(result);
}
}

#[tokio::test]
async fn test_region_server_parallism() {
let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
let first_query = p.acquire().await;
assert!(first_query.is_ok());
let second_query = p.acquire().await;
assert!(second_query.is_ok());
let third_query = p.acquire().await;
assert!(third_query.is_err());
let err = third_query.unwrap_err();
assert_eq!(
err.output_msg(),
"Failed to acquire semaphore under timeouts: deadline has elapsed".to_string()
);
drop(first_query);
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
}

0 comments on commit 0a53595

Please sign in to comment.