diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index c6e7218a389e..e9345dc4eb0c 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -321,6 +321,12 @@ pub enum Error { location: Location, source: BoxedError, }, + + #[snafu(display("In-flight write bytes exceeded"))] + InFlightWriteBytesExceeded { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -392,6 +398,8 @@ impl ErrorExt for Error { Error::StartScriptManager { source, .. } => source.status_code(), Error::TableOperation { source, .. } => source.status_code(), + + Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited, } } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 55f2dae3c386..b2e2802f99a3 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -46,6 +46,7 @@ pub struct FrontendOptions { pub user_provider: Option, pub export_metrics: ExportMetricsOption, pub tracing: TracingOptions, + pub max_in_flight_write_bytes: u64, } impl Default for FrontendOptions { @@ -68,6 +69,7 @@ impl Default for FrontendOptions { user_provider: None, export_metrics: ExportMetricsOption::default(), tracing: TracingOptions::default(), + max_in_flight_write_bytes: u64::MAX, } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index b22bde96e0ff..8ede08a4c5ba 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -86,6 +86,7 @@ use crate::error::{ }; use crate::frontend::FrontendOptions; use crate::heartbeat::HeartbeatTask; +use crate::limiter::LimiterRef; use crate::script::ScriptExecutor; #[async_trait] @@ -124,6 +125,7 @@ pub struct Instance { export_metrics_task: Option, table_metadata_manager: TableMetadataManagerRef, stats: StatementStatistics, + limiter: LimiterRef, } impl Instance { diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index f24141d8ba2b..0b25f9402f73 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -43,6 +43,7 @@ use crate::frontend::FrontendOptions; use crate::heartbeat::HeartbeatTask; use crate::instance::region_query::FrontendRegionQueryHandler; use crate::instance::Instance; +use crate::limiter::Limiter; use crate::script::ScriptExecutor; /// The frontend [`Instance`] builder. @@ -196,6 +197,7 @@ impl FrontendBuilder { plugins.insert::(statement_executor.clone()); + let limiter = Arc::new(Limiter::new(self.options.max_in_flight_write_bytes)); Ok(Instance { options: self.options, catalog_manager: self.catalog_manager, @@ -211,6 +213,7 @@ impl FrontendBuilder { export_metrics_task: None, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), stats: self.stats, + limiter, }) } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index ad225bf30b4e..39eb2695a54a 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -29,8 +29,8 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::table_name::TableName; use crate::error::{ - Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result, - TableOperationSnafu, + Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu, + PermissionSnafu, Result, TableOperationSnafu, }; use crate::instance::{attach_timer, Instance}; use crate::metrics::{GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED}; @@ -50,6 +50,14 @@ impl GrpcQueryHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request)) .context(PermissionSnafu)?; + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_request(&request); + let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?, diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index c337e4174615..435571d8fbea 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; -use servers::error::{AuthSnafu, Error}; +use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu}; use servers::influxdb::InfluxdbRequest; use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef}; use servers::query_handler::InfluxdbLineProtocolHandler; @@ -46,6 +46,14 @@ impl InfluxdbLineProtocolHandler for Instance { .post_lines_conversion(requests, ctx.clone()) .await?; + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_row_inserts(&requests); + self.handle_influx_row_inserts(requests, ctx) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 2da2d6717d3b..89f4cd1979a2 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -22,7 +22,8 @@ use common_error::ext::BoxedError; use pipeline::pipeline_operator::PipelineOperator; use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; use servers::error::{ - AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, + AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu, + PipelineSnafu, Result as ServerResult, }; use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::PipelineHandler; @@ -110,6 +111,14 @@ impl Instance { log: RowInsertRequests, ctx: QueryContextRef, ) -> ServerResult { + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_row_inserts(&log); + self.inserter .handle_log_inserts(log, ctx, self.statement_executor.as_ref()) .await diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 946c3b9ff7f5..c8dad6b985cd 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -17,7 +17,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; use common_telemetry::tracing; use servers::error as server_error; -use servers::error::AuthSnafu; +use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu}; use servers::opentsdb::codec::DataPoint; use servers::opentsdb::data_point_to_grpc_row_insert_requests; use servers::query_handler::OpentsdbProtocolHandler; @@ -41,6 +41,13 @@ impl OpentsdbProtocolHandler for Instance { .context(AuthSnafu)?; let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?; + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_row_inserts(&requests); let output = self .handle_row_inserts(requests, ctx) .await diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index f28179d40d59..1f753a985e1a 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -21,7 +21,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::PipelineWay; -use servers::error::{self, AuthSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::query_handler::OpenTelemetryProtocolHandler; @@ -53,6 +53,14 @@ impl OpenTelemetryProtocolHandler for Instance { let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?; OTLP_METRICS_ROWS.inc_by(rows as u64); + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_row_inserts(&requests); + self.handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) @@ -83,6 +91,14 @@ impl OpenTelemetryProtocolHandler for Instance { OTLP_TRACES_ROWS.inc_by(rows as u64); + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_row_inserts(&requests); + self.handle_log_inserts(requests, ctx) .await .map_err(BoxedError::new) @@ -109,6 +125,15 @@ impl OpenTelemetryProtocolHandler for Instance { interceptor_ref.pre_execute(ctx.clone())?; let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; + + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_row_inserts(&requests); + self.handle_log_inserts(requests, ctx) .await .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 8f1098b058f1..1f8221de0fdd 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing}; use operator::insert::InserterRef; use operator::statement::StatementExecutor; use prost::Message; -use servers::error::{self, AuthSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult}; use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef}; @@ -175,6 +175,14 @@ impl PromStoreProtocolHandler for Instance { .get::>(); interceptor_ref.pre_write(&request, ctx.clone())?; + if self.limiter.should_be_limited() { + return InFlightWriteBytesExceededSnafu.fail(); + } + + self.limiter + .in_flight_write_bytes_counter() + .process_with_row_inserts(&request); + let output = if with_metric_engine { let physical_table = ctx .extension(PHYSICAL_TABLE_PARAM) diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index de800b0b41c6..e887172797bd 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -18,6 +18,7 @@ pub mod error; pub mod frontend; pub mod heartbeat; pub mod instance; +pub(crate) mod limiter; pub(crate) mod metrics; mod script; pub mod server; diff --git a/src/frontend/src/limiter.rs b/src/frontend/src/limiter.rs new file mode 100644 index 000000000000..c07872d742cf --- /dev/null +++ b/src/frontend/src/limiter.rs @@ -0,0 +1,259 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use api::v1::column::Values; +use api::v1::greptime_request::Request; +use api::v1::value::ValueData; +use api::v1::{Decimal128, InsertRequests, IntervalMonthDayNano, RowInsertRequests}; + +pub(crate) type LimiterRef = Arc; + +/// A frontend request limiter that controls the total size of in-flight write requests. +pub(crate) struct Limiter { + // The maximum number of bytes that can be in flight. + max_in_flight_write_bytes: u64, + + // The current in-flight write bytes. + in_flight_write_bytes: Arc, +} + +/// A counter for the in-flight write bytes. +pub(crate) struct InFlightWriteBytesCounter { + // The current in-flight write bytes. + in_flight_write_bytes: Arc, + + // The write bytes that are being processed. + processing_write_bytes: u64, +} + +impl InFlightWriteBytesCounter { + pub fn new(in_flight_write_bytes: Arc) -> Self { + Self { + in_flight_write_bytes, + processing_write_bytes: 0, + } + } + + pub fn process_with_request(&mut self, request: &Request) { + let size = match request { + Request::Inserts(requests) => self.insert_requests_data_size(requests), + Request::RowInserts(requests) => self.rows_insert_requests_data_size(requests), + _ => 0, + }; + self.add(size as u64); + } + + pub fn process_with_row_inserts(&mut self, requests: &RowInsertRequests) { + let size = self.rows_insert_requests_data_size(requests); + self.add(size as u64); + } + + fn add(&mut self, bytes: u64) { + self.in_flight_write_bytes + .fetch_add(bytes, Ordering::Relaxed); + self.processing_write_bytes = bytes; + } + + fn insert_requests_data_size(&self, request: &InsertRequests) -> usize { + let mut size: usize = 0; + for insert in &request.inserts { + for column in &insert.columns { + if let Some(values) = &column.values { + size += self.size_of_column_values(values); + } + } + } + size + } + + fn rows_insert_requests_data_size(&self, request: &RowInsertRequests) -> usize { + let mut size: usize = 0; + for insert in &request.inserts { + if let Some(rows) = &insert.rows { + for row in &rows.rows { + for value in &row.values { + if let Some(value) = &value.value_data { + size += self.size_of_value_data(value); + } + } + } + } + } + size + } + + fn size_of_column_values(&self, values: &Values) -> usize { + let mut size: usize = 0; + size += values.i8_values.len() * size_of::(); + size += values.i16_values.len() * size_of::(); + size += values.i32_values.len() * size_of::(); + size += values.i64_values.len() * size_of::(); + size += values.u8_values.len() * size_of::(); + size += values.u16_values.len() * size_of::(); + size += values.u32_values.len() * size_of::(); + size += values.u64_values.len() * size_of::(); + size += values.f32_values.len() * size_of::(); + size += values.f64_values.len() * size_of::(); + size += values.bool_values.len() * size_of::(); + size += values + .binary_values + .iter() + .map(|v| v.len() * size_of::()) + .sum::(); + size += values.string_values.iter().map(|v| v.len()).sum::(); + size += values.date_values.len() * size_of::(); + size += values.datetime_values.len() * size_of::(); + size += values.timestamp_second_values.len() * size_of::(); + size += values.timestamp_millisecond_values.len() * size_of::(); + size += values.timestamp_microsecond_values.len() * size_of::(); + size += values.timestamp_nanosecond_values.len() * size_of::(); + size += values.time_second_values.len() * size_of::(); + size += values.time_millisecond_values.len() * size_of::(); + size += values.time_microsecond_values.len() * size_of::(); + size += values.time_nanosecond_values.len() * size_of::(); + size += values.interval_year_month_values.len() * size_of::(); + size += values.interval_day_time_values.len() * size_of::(); + size += values.interval_month_day_nano_values.len() * size_of::(); + size += values.decimal128_values.len() * size_of::(); + size + } + + fn size_of_value_data(&self, value: &ValueData) -> usize { + match value { + ValueData::I8Value(_) => size_of::(), + ValueData::I16Value(_) => size_of::(), + ValueData::I32Value(_) => size_of::(), + ValueData::I64Value(_) => size_of::(), + ValueData::U8Value(_) => size_of::(), + ValueData::U16Value(_) => size_of::(), + ValueData::U32Value(_) => size_of::(), + ValueData::U64Value(_) => size_of::(), + ValueData::F32Value(_) => size_of::(), + ValueData::F64Value(_) => size_of::(), + ValueData::BoolValue(_) => size_of::(), + ValueData::BinaryValue(v) => v.len() * size_of::(), + ValueData::StringValue(v) => v.len(), + ValueData::DateValue(_) => size_of::(), + ValueData::DatetimeValue(_) => size_of::(), + ValueData::TimestampSecondValue(_) => size_of::(), + ValueData::TimestampMillisecondValue(_) => size_of::(), + ValueData::TimestampMicrosecondValue(_) => size_of::(), + ValueData::TimestampNanosecondValue(_) => size_of::(), + ValueData::TimeSecondValue(_) => size_of::(), + ValueData::TimeMillisecondValue(_) => size_of::(), + ValueData::TimeMicrosecondValue(_) => size_of::(), + ValueData::TimeNanosecondValue(_) => size_of::(), + ValueData::IntervalYearMonthValue(_) => size_of::(), + ValueData::IntervalDayTimeValue(_) => size_of::(), + ValueData::IntervalMonthDayNanoValue(_) => size_of::(), + ValueData::Decimal128Value(_) => size_of::(), + } + } +} + +impl Drop for InFlightWriteBytesCounter { + fn drop(&mut self) { + self.in_flight_write_bytes + .fetch_sub(self.processing_write_bytes, Ordering::Relaxed); + } +} + +impl Limiter { + pub fn new(max_in_flight_write_bytes: u64) -> Self { + Self { + max_in_flight_write_bytes, + in_flight_write_bytes: Arc::new(AtomicU64::new(0)), + } + } + + /// Returns a counter for the in-flight write bytes. + pub fn in_flight_write_bytes_counter(&self) -> InFlightWriteBytesCounter { + InFlightWriteBytesCounter::new(Arc::clone(&self.in_flight_write_bytes)) + } + + /// Returns true if the in-flight write bytes exceed the maximum limit. + pub fn should_be_limited(&self) -> bool { + self.in_flight_write_bytes.load(Ordering::Relaxed) >= self.max_in_flight_write_bytes + } + + /// Returns the current in-flight write bytes. + #[allow(dead_code)] + pub fn in_flight_write_bytes(&self) -> u64 { + self.in_flight_write_bytes.load(Ordering::Relaxed) + } +} + +#[cfg(test)] +mod tests { + use api::v1::column::Values; + use api::v1::greptime_request::Request; + use api::v1::{Column, InsertRequest}; + + use super::*; + + fn generate_request(size: usize) -> Request { + let i8_values = vec![0; size]; + Request::Inserts(InsertRequests { + inserts: vec![InsertRequest { + columns: vec![Column { + values: Some(Values { + i8_values, + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + }], + }) + } + + #[tokio::test] + async fn test_limiter() { + let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024)); + let tasks_count = 10; + let request_data_size = 100; + let mut handles = vec![]; + + // Generate multiple requests to test the limiter. + for _ in 0..tasks_count { + let limiter = limiter_ref.clone(); + let handle = tokio::spawn(async move { + assert!(!limiter.should_be_limited()); + let request = generate_request(request_data_size); + limiter + .in_flight_write_bytes_counter() + .process_with_request(&request); + }); + handles.push(handle); + } + + // Wait for all threads to complete. + for handle in handles { + handle.await.unwrap(); + } + } + + #[test] + fn test_in_flight_write_bytes() { + let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024)); + let req1 = generate_request(100); + let mut c1 = limiter_ref.in_flight_write_bytes_counter(); + c1.process_with_request(&req1); + assert_eq!(limiter_ref.in_flight_write_bytes(), 100); + assert!(!limiter_ref.should_be_limited()); + + let req2 = generate_request(200); + let mut c2 = limiter_ref.in_flight_write_bytes_counter(); + c2.process_with_request(&req2); + assert_eq!(limiter_ref.in_flight_write_bytes(), 300); + assert!(limiter_ref.should_be_limited()); + + drop(c1); + assert_eq!(limiter_ref.in_flight_write_bytes(), 200); + assert!(!limiter_ref.should_be_limited()); + + drop(c2); + assert_eq!(limiter_ref.in_flight_write_bytes(), 0); + assert!(!limiter_ref.should_be_limited()); + } +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 071de93683cc..d59c65875ebc 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -589,6 +589,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("In-flight write bytes exceeded"))] + InFlightWriteBytesExceeded { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -706,6 +712,8 @@ impl ErrorExt for Error { ToJson { .. } => StatusCode::Internal, ConvertSqlValue { source, .. } => source.status_code(), + + InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited, } }