Skip to content

Commit

Permalink
feat: add strict mode to validate protocol strings (#3638)
Browse files Browse the repository at this point in the history
* feat: add strict mode to validate protocol strings

* hotfix: fix test

* fix: fix return pair and test param

* test: add test for utf-8 validation

* fix: cargo check

* Update src/servers/src/prom_row_builder.rs

Co-authored-by: Eugene Tolbakov <[email protected]>

* fix: fix param of without_strict_mode

* fix: change field name in HttpOptions

* fix: replace if else with match

* fix: replace all strict_mode with is_stirct_mode

* fix: fix test_config_api

* fix: fix bench, add vm handshake, catch error

---------

Co-authored-by: Eugene Tolbakov <[email protected]>
Co-authored-by: tison <[email protected]>
  • Loading branch information
3 people authored Apr 15, 2024
1 parent 2f4726f commit 7a04bfe
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 33 deletions.
6 changes: 5 additions & 1 deletion src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ where

if opts.prom_store.enable {
builder = builder
.with_prom_handler(self.instance.clone(), opts.prom_store.with_metric_engine)
.with_prom_handler(
self.instance.clone(),
opts.prom_store.with_metric_engine,
opts.http.is_strict_mode,
)
.with_prometheus_handler(self.instance.clone());
}

Expand Down
40 changes: 37 additions & 3 deletions src/servers/benches/prom_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use prost::Message;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::proto::PromWriteRequest;

fn bench_decode_prom_request(c: &mut Criterion) {
fn bench_decode_prom_request_without_strict_mode(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");
Expand All @@ -30,6 +30,7 @@ fn bench_decode_prom_request(c: &mut Criterion) {

let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
let is_strict_mode = false;
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
Expand All @@ -43,11 +44,44 @@ fn bench_decode_prom_request(c: &mut Criterion) {
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data).unwrap();
prom_request.merge(data, is_strict_mode).unwrap();
prom_request.as_row_insert_requests();
});
});
}

criterion_group!(benches, bench_decode_prom_request);
fn bench_decode_prom_request_with_strict_mode(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");

let data = Bytes::from(std::fs::read(d).unwrap());

let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
let is_strict_mode = true;
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
b.iter(|| {
request.clear();
let data = data.clone();
request.merge(data).unwrap();
to_grpc_row_insert_requests(&request).unwrap();
});
})
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data, is_strict_mode).unwrap();
prom_request.as_row_insert_requests();
});
});
}

criterion_group!(
benches,
bench_decode_prom_request_without_strict_mode,
bench_decode_prom_request_with_strict_mode
);
criterion_main!(benches);
36 changes: 28 additions & 8 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub struct HttpOptions {
pub disable_dashboard: bool,

pub body_limit: ReadableSize,

pub is_strict_mode: bool,
}

impl Default for HttpOptions {
Expand All @@ -139,6 +141,7 @@ impl Default for HttpOptions {
timeout: Duration::from_secs(30),
disable_dashboard: false,
body_limit: DEFAULT_BODY_LIMIT,
is_strict_mode: false,
}
}
}
Expand Down Expand Up @@ -505,11 +508,12 @@ impl HttpServerBuilder {
self,
handler: PromStoreProtocolHandlerRef,
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/prometheus"),
HttpServer::route_prom(handler, prom_store_with_metric_engine),
HttpServer::route_prom(handler, prom_store_with_metric_engine, is_strict_mode),
),
..self
}
Expand Down Expand Up @@ -701,15 +705,31 @@ impl HttpServer {
fn route_prom<S>(
prom_handler: PromStoreProtocolHandlerRef,
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Router<S> {
let mut router = Router::new().route("/read", routing::post(prom_store::remote_read));
if prom_store_with_metric_engine {
router = router.route("/write", routing::post(prom_store::remote_write));
} else {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine),
);
match (prom_store_with_metric_engine, is_strict_mode) {
(true, true) => {
router = router.route("/write", routing::post(prom_store::remote_write))
}
(true, false) => {
router = router.route(
"/write",
routing::post(prom_store::remote_write_without_strict_mode),
)
}
(false, true) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine),
)
}
(false, false) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine_and_strict_mode),
)
}
}
router.with_state(prom_handler)
}
Expand Down
83 changes: 79 additions & 4 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,38 @@ pub async fn route_write_without_metric_engine(
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body).await?;
let (request, samples) = decode_remote_write_request(is_zstd, body, true).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}

let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
)
.into_response())
}

/// Same with [remote_write] but won't store data to metric engine.
/// And without strict_mode on will not check invalid UTF-8.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine_and_strict_mode(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body, false).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
Expand Down Expand Up @@ -127,7 +158,8 @@ pub async fn remote_write(
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request_to_row_inserts(is_zstd, body).await?;
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, true).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
Expand All @@ -144,6 +176,47 @@ pub async fn remote_write(
.into_response())
}

#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write_without_strict_mode(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, false).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}

let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
)
.into_response())
}

impl IntoResponse for PromStoreResponse {
fn into_response(self) -> axum::response::Response {
let mut header_map = HeaderMap::new();
Expand Down Expand Up @@ -187,6 +260,7 @@ pub async fn remote_read(
async fn decode_remote_write_request_to_row_inserts(
is_zstd: bool,
body: Body,
is_strict_mode: bool,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
Expand All @@ -201,14 +275,15 @@ async fn decode_remote_write_request_to_row_inserts(

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf)
.merge(buf, is_strict_mode)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}

async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
is_strict_mode: bool,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
Expand All @@ -223,7 +298,7 @@ async fn decode_remote_write_request(

let mut request = PromWriteRequest::default();
request
.merge(buf)
.merge(buf, is_strict_mode)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}
Expand Down
Loading

0 comments on commit 7a04bfe

Please sign in to comment.