Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce the Limiter in frontend to limit the requests by in-flight write bytes size. #5231

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
Expand Down Expand Up @@ -195,6 +196,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
Expand Down
4 changes: 4 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
## @toml2docs:none-default
default_timezone = "UTC"

## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"

## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
Expand Down
4 changes: 4 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true

## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"

## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
Expand Down
4 changes: 4 additions & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use client::api::v1::meta::RegionRole;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
Expand Down Expand Up @@ -152,6 +153,7 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
}

impl Default for StandaloneOptions {
Expand Down Expand Up @@ -181,6 +183,7 @@ impl Default for StandaloneOptions {
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
}
}
}
Expand Down Expand Up @@ -218,6 +221,7 @@ impl StandaloneOptions {
user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
..Default::default()
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ pub enum Error {
location: Location,
source: BoxedError,
},

#[snafu(display("In-flight write bytes exceeded the maximum limit"))]
InFlightWriteBytesExceeded {
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -392,6 +398,8 @@ impl ErrorExt for Error {
Error::StartScriptManager { source, .. } => source.status_code(),

Error::TableOperation { source, .. } => source.status_code(),

Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_base::readable_size::ReadableSize;
use common_config::config::Configurable;
use common_options::datanode::DatanodeClientOptions;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct FrontendOptions {
pub user_provider: Option<String>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>,
}

impl Default for FrontendOptions {
Expand All @@ -68,6 +70,7 @@ impl Default for FrontendOptions {
user_provider: None,
export_metrics: ExportMetricsOption::default(),
tracing: TracingOptions::default(),
max_in_flight_write_bytes: None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use crate::error::{
};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::limiter::LimiterRef;
use crate::script::ScriptExecutor;

#[async_trait]
Expand Down Expand Up @@ -126,6 +127,7 @@ pub struct Instance {
export_metrics_task: Option<ExportMetricsTask>,
table_metadata_manager: TableMetadataManagerRef,
stats: StatementStatistics,
limiter: Option<LimiterRef>,
}

impl Instance {
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::Instance;
use crate::limiter::Limiter;
use crate::script::ScriptExecutor;

/// The frontend [`Instance`] builder.
Expand Down Expand Up @@ -196,6 +197,14 @@ impl FrontendBuilder {

plugins.insert::<StatementExecutorRef>(statement_executor.clone());

// Create the limiter if the max_in_flight_write_bytes is set.
let limiter = self
.options
.max_in_flight_write_bytes
.map(|max_in_flight_write_bytes| {
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
});

Ok(Instance {
options: self.options,
catalog_manager: self.catalog_manager,
Expand All @@ -211,6 +220,7 @@ impl FrontendBuilder {
export_metrics_task: None,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
stats: self.stats,
limiter,
})
}
}
14 changes: 12 additions & 2 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::table_name::TableName;

use crate::error::{
Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result,
TableOperationSnafu,
Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu,
PermissionSnafu, Result, TableOperationSnafu,
};
use crate::instance::{attach_timer, Instance};
use crate::metrics::{GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED};
Expand All @@ -50,6 +50,16 @@ impl GrpcQueryHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_request(&request);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use servers::error::{AuthSnafu, Error};
use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
Expand Down Expand Up @@ -46,6 +46,16 @@ impl InfluxdbLineProtocolHandler for Instance {
.post_lines_conversion(requests, ctx.clone())
.await?;

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_influx_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use common_error::ext::BoxedError;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu,
PipelineSnafu, Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
Expand Down Expand Up @@ -110,6 +111,16 @@ impl Instance {
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&log);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
Expand All @@ -41,6 +41,17 @@ impl OpentsdbProtocolHandler for Instance {
.context(AuthSnafu)?;

let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

let output = self
.handle_row_inserts(requests, ctx)
.await
Expand Down
33 changes: 32 additions & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::OpenTelemetryProtocolHandler;
Expand Down Expand Up @@ -53,6 +53,16 @@ impl OpenTelemetryProtocolHandler for Instance {
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
OTLP_METRICS_ROWS.inc_by(rows as u64);

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand Down Expand Up @@ -83,6 +93,16 @@ impl OpenTelemetryProtocolHandler for Instance {

OTLP_TRACES_ROWS.inc_by(rows as u64);

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand All @@ -109,6 +129,17 @@ impl OpenTelemetryProtocolHandler for Instance {
interceptor_ref.pre_execute(ctx.clone())?;

let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
Expand Down Expand Up @@ -175,6 +175,16 @@ impl PromStoreProtocolHandler for Instance {
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;

let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&request);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod error;
pub mod frontend;
pub mod heartbeat;
pub mod instance;
pub(crate) mod limiter;
pub(crate) mod metrics;
mod script;
pub mod server;
Expand Down
Loading
Loading