Skip to content

Commit

Permalink
feat: introduct Limiter to limit in-flight write bytes size in frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed Dec 24, 2024
1 parent d51b65a commit cda9642
Show file tree
Hide file tree
Showing 14 changed files with 407 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<u64>,
}

impl Default for StandaloneOptions {
Expand Down Expand Up @@ -181,6 +182,7 @@ impl Default for StandaloneOptions {
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
}
}
}
Expand Down Expand Up @@ -218,6 +220,7 @@ impl StandaloneOptions {
user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
..Default::default()
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ pub enum Error {
location: Location,
source: BoxedError,
},

#[snafu(display("In-flight write bytes exceeded"))]
InFlightWriteBytesExceeded {
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -392,6 +398,8 @@ impl ErrorExt for Error {
Error::StartScriptManager { source, .. } => source.status_code(),

Error::TableOperation { source, .. } => source.status_code(),

Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct FrontendOptions {
pub user_provider: Option<String>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub max_in_flight_write_bytes: Option<u64>,
}

impl Default for FrontendOptions {
Expand All @@ -68,6 +69,7 @@ impl Default for FrontendOptions {
user_provider: None,
export_metrics: ExportMetricsOption::default(),
tracing: TracingOptions::default(),
max_in_flight_write_bytes: None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use crate::error::{
};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::limiter::LimiterRef;
use crate::script::ScriptExecutor;

#[async_trait]
Expand Down Expand Up @@ -124,6 +125,7 @@ pub struct Instance {
export_metrics_task: Option<ExportMetricsTask>,
table_metadata_manager: TableMetadataManagerRef,
stats: StatementStatistics,
limiter: Option<LimiterRef>,
}

impl Instance {
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::Instance;
use crate::limiter::Limiter;
use crate::script::ScriptExecutor;

/// The frontend [`Instance`] builder.
Expand Down Expand Up @@ -196,6 +197,13 @@ impl FrontendBuilder {

plugins.insert::<StatementExecutorRef>(statement_executor.clone());

let limiter =
if let Some(max_in_flight_write_bytes) = self.options.max_in_flight_write_bytes {

Check failure on line 201 in src/frontend/src/instance/builder.rs

View workflow job for this annotation

GitHub Actions / Clippy

manual implementation of `Option::map`
Some(Arc::new(Limiter::new(max_in_flight_write_bytes)))
} else {
None
};

Ok(Instance {
options: self.options,
catalog_manager: self.catalog_manager,
Expand All @@ -211,6 +219,7 @@ impl FrontendBuilder {
export_metrics_task: None,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
stats: self.stats,
limiter,
})
}
}
14 changes: 12 additions & 2 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::table_name::TableName;

use crate::error::{
Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result,
TableOperationSnafu,
Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu,
PermissionSnafu, Result, TableOperationSnafu,
};
use crate::instance::{attach_timer, Instance};
use crate::metrics::{GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED};
Expand All @@ -50,6 +50,16 @@ impl GrpcQueryHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;

_ = if let Some(limiter) = &self.limiter {
let result = limiter.limit_request(&request);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use servers::error::{AuthSnafu, Error};
use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
Expand Down Expand Up @@ -46,6 +46,16 @@ impl InfluxdbLineProtocolHandler for Instance {
.post_lines_conversion(requests, ctx.clone())
.await?;

if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_influx_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use common_error::ext::BoxedError;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu,
PipelineSnafu, Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
Expand Down Expand Up @@ -110,6 +111,16 @@ impl Instance {
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
_ = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&log);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
Expand All @@ -41,6 +41,17 @@ impl OpentsdbProtocolHandler for Instance {
.context(AuthSnafu)?;

let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;

_ = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

let output = self
.handle_row_inserts(requests, ctx)
.await
Expand Down
33 changes: 32 additions & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::OpenTelemetryProtocolHandler;
Expand Down Expand Up @@ -53,6 +53,16 @@ impl OpenTelemetryProtocolHandler for Instance {
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
OTLP_METRICS_ROWS.inc_by(rows as u64);

_ = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand Down Expand Up @@ -83,6 +93,16 @@ impl OpenTelemetryProtocolHandler for Instance {

OTLP_TRACES_ROWS.inc_by(rows as u64);

_ = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
Expand All @@ -109,6 +129,17 @@ impl OpenTelemetryProtocolHandler for Instance {
interceptor_ref.pre_execute(ctx.clone())?;

let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;

_ = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
Expand Down Expand Up @@ -175,6 +175,16 @@ impl PromStoreProtocolHandler for Instance {
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;

_ = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&request);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};

let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod error;
pub mod frontend;
pub mod heartbeat;
pub mod instance;
pub(crate) mod limiter;
pub(crate) mod metrics;
mod script;
pub mod server;
Expand Down
Loading

0 comments on commit cda9642

Please sign in to comment.