Skip to content

Commit

Permalink
feat: adding victoriametrics remote write (#3641)
Browse files Browse the repository at this point in the history
* feat: adding victoria metrics remote write

* test: add e2e tests for prom and vm remote writes
  • Loading branch information
sunng87 authored Apr 7, 2024
1 parent fe1a010 commit b32e0bb
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 20 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.10", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"

## workspaces members
api = { path = "src/api" }
Expand Down
1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ tonic-reflection = "0.10"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
urlencoding = "2.1"
zstd.workspace = true

[target.'cfg(not(windows))'.dependencies]
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
Expand Down
17 changes: 13 additions & 4 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,20 @@ pub enum Error {
error: prost::DecodeError,
},

#[snafu(display("Failed to decompress prometheus remote request"))]
DecompressPromRemoteRequest {
#[snafu(display("Failed to decompress snappy prometheus remote request"))]
DecompressSnappyPromRemoteRequest {
location: Location,
#[snafu(source)]
error: snap::Error,
},

#[snafu(display("Failed to decompress zstd prometheus remote request"))]
DecompressZstdPromRemoteRequest {
location: Location,
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Failed to send prometheus remote request"))]
SendPromRemoteRequest {
location: Location,
Expand Down Expand Up @@ -504,7 +511,8 @@ impl ErrorExt for Error {
| DecodePromRemoteRequest { .. }
| DecodeOtlpRequest { .. }
| CompressPromRemoteRequest { .. }
| DecompressPromRemoteRequest { .. }
| DecompressSnappyPromRemoteRequest { .. }
| DecompressZstdPromRemoteRequest { .. }
| InvalidPromRemoteRequest { .. }
| InvalidExportMetricsConfig { .. }
| InvalidFlightTicket { .. }
Expand Down Expand Up @@ -657,7 +665,8 @@ impl IntoResponse for Error {
| Error::InvalidOpentsdbJsonRequest { .. }
| Error::DecodePromRemoteRequest { .. }
| Error::DecodeOtlpRequest { .. }
| Error::DecompressPromRemoteRequest { .. }
| Error::DecompressSnappyPromRemoteRequest { .. }
| Error::DecompressZstdPromRemoteRequest { .. }
| Error::InvalidPromRemoteRequest { .. }
| Error::InvalidQuery { .. }
| Error::TimePrecision { .. } => HttpStatusCode::BAD_REQUEST,
Expand Down
65 changes: 50 additions & 15 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::RowInsertRequests;
use axum::extract::{Query, RawBody, State};
use axum::http::{header, HeaderValue, StatusCode};
use axum::response::IntoResponse;
use axum::Extension;
use axum::{Extension, TypedHeader};
use bytes::Bytes;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
Expand All @@ -35,7 +35,7 @@ use snafu::prelude::*;

use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
use crate::prom_store::snappy_decompress;
use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};

Expand All @@ -45,19 +45,26 @@ lazy_static! {
Pool::new(256, PromWriteRequest::default);
}

pub const DEFAULT_ENCODING: &str = "snappy";
pub const VM_ENCODING: &str = "zstd";
pub const VM_PROTO_VERSION: &str = "1";

#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct DatabaseQuery {
pub struct RemoteWriteQuery {
pub db: Option<String>,
/// Specify which physical table to use for storing metrics.
/// This only works on remote write requests.
pub physical_table: Option<String>,
/// For VictoriaMetrics modified remote write protocol
pub get_vm_proto_version: Option<String>,
}

impl Default for DatabaseQuery {
fn default() -> DatabaseQuery {
impl Default for RemoteWriteQuery {
fn default() -> RemoteWriteQuery {
Self {
db: Some(DEFAULT_SCHEMA_NAME.to_string()),
physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
get_vm_proto_version: None,
}
}
}
Expand All @@ -66,16 +73,23 @@ impl Default for DatabaseQuery {
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let (request, samples) = decode_remote_write_request(body).await?;
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
Expand All @@ -86,7 +100,8 @@ pub async fn route_write_without_metric_engine(
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
))
)
.into_response())
}

#[axum_macros::debug_handler]
Expand All @@ -96,16 +111,23 @@ pub async fn route_write_without_metric_engine(
)]
pub async fn remote_write(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let (request, samples) = decode_remote_write_request_to_row_inserts(body).await?;
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request_to_row_inserts(is_zstd, body).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
Expand All @@ -118,7 +140,8 @@ pub async fn remote_write(
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
))
)
.into_response())
}

impl IntoResponse for PromStoreResponse {
Expand Down Expand Up @@ -147,7 +170,7 @@ impl IntoResponse for PromStoreResponse {
)]
pub async fn remote_read(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<PromStoreResponse> {
Expand All @@ -162,14 +185,19 @@ pub async fn remote_read(
}

async fn decode_remote_write_request_to_row_inserts(
is_zstd: bool,
body: Body,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(snappy_decompress(&body[..])?);
let buf = Bytes::from(if is_zstd {
zstd_decompress(&body[..])?
} else {
snappy_decompress(&body[..])?
});

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
Expand All @@ -178,13 +206,20 @@ async fn decode_remote_write_request_to_row_inserts(
Ok(request.as_row_insert_requests())
}

async fn decode_remote_write_request(body: Body) -> Result<(RowInsertRequests, usize)> {
async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(snappy_decompress(&body[..])?);
let buf = Bytes::from(if is_zstd {
zstd_decompress(&body[..])?
} else {
snappy_decompress(&body[..])?
});

let mut request = PromWriteRequest::default();
request
Expand Down
7 changes: 6 additions & 1 deletion src/servers/src/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
let mut decoder = Decoder::new();
decoder
.decompress_vec(buf)
.context(error::DecompressPromRemoteRequestSnafu)
.context(error::DecompressSnappyPromRemoteRequestSnafu)
}

#[inline]
Expand All @@ -400,6 +400,11 @@ pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
.context(error::CompressPromRemoteRequestSnafu)
}

#[inline]
pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
}

/// Mock timeseries for test, it is both used in servers and frontend crate
/// So we present it here
pub fn mock_timeseries() -> Vec<TimeSeries> {
Expand Down
2 changes: 2 additions & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mysql_async = { version = "0.33", default-features = false, features = [
] }
object-store.workspace = true
operator.workspace = true
prost.workspace = true
query.workspace = true
rstest = "0.17"
rstest_reuse = "0.5"
Expand All @@ -68,6 +69,7 @@ tokio.workspace = true
tonic.workspace = true
tower = "0.4"
uuid.workspace = true
zstd.workspace = true

[dev-dependencies]
datafusion.workspace = true
Expand Down
65 changes: 65 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::collections::BTreeMap;

use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
use axum::http::{HeaderName, StatusCode};
use axum_test_helper::TestClient;
use common_error::status_code::StatusCode as ErrorCode;
use prost::Message;
use serde_json::json;
use servers::http::error_result::ErrorResponse;
use servers::http::greptime_result_v1::GreptimedbV1Response;
Expand All @@ -26,6 +28,7 @@ use servers::http::header::GREPTIME_TIMEZONE_HEADER_NAME;
use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
Expand Down Expand Up @@ -71,6 +74,8 @@ macro_rules! http_tests {
test_status_api,
test_config_api,
test_dashboard_path,
test_prometheus_remote_write,
test_vm_proto_remote_write,
);
)*
};
Expand Down Expand Up @@ -896,3 +901,63 @@ pub async fn test_dashboard_path(store_type: StorageType) {

#[cfg(not(feature = "dashboard"))]
pub async fn test_dashboard_path(_: StorageType) {}

pub async fn test_prometheus_remote_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_prom_app_with_frontend(store_type, "prometheus_remote_write").await;
let client = TestClient::new(app);

// write snappy encoded data
let write_request = WriteRequest {
timeseries: prom_store::mock_timeseries(),
..Default::default()
};
let serialized_request = write_request.encode_to_vec();
let compressed_request =
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");

let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);

guard.remove_all().await;
}

pub async fn test_vm_proto_remote_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_prom_app_with_frontend(store_type, "vm_proto_remote_write").await;

// handshake
let client = TestClient::new(app);
let res = client
.post("/v1/prometheus/write?get_vm_proto_version=1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(res.text().await, "1");

// write zstd encoded data
let write_request = WriteRequest {
timeseries: prom_store::mock_timeseries(),
..Default::default()
};
let serialized_request = write_request.encode_to_vec();
let compressed_request =
zstd::stream::encode_all(&serialized_request[..], 1).expect("Failed to encode zstd");

let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "zstd")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);

guard.remove_all().await;
}

0 comments on commit b32e0bb

Please sign in to comment.