Skip to content

Commit

Permalink
chore: add auto-decompression layer for otlp http request (#4723)
Browse files Browse the repository at this point in the history
* chore: add auto-decompression for http request

* test: otlp
  • Loading branch information
shuiyisong authored Sep 18, 2024
1 parent 3b5b906 commit c014e87
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 39 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ opentelemetry-proto = { version = "0.5", features = [
"gen-tonic",
"metrics",
"trace",
"with-serde",
] }
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl OpenTelemetryProtocolHandler for Instance {

OTLP_TRACES_ROWS.inc_by(rows as u64);

self.handle_row_inserts(requests, ctx)
self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
Expand Down
5 changes: 5 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,11 @@ impl HttpServer {
Router::new()
.route("/v1/metrics", routing::post(otlp::metrics))
.route("/v1/traces", routing::post(otlp::traces))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(RequestDecompressionLayer::new()),
)
.with_state(otlp_handler)
}

Expand Down
32 changes: 8 additions & 24 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

use std::sync::Arc;

use axum::extract::{RawBody, State};
use axum::extract::State;
use axum::http::header;
use axum::response::IntoResponse;
use axum::Extension;
use bytes::Bytes;
use common_telemetry::tracing;
use hyper::Body;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
Expand All @@ -39,15 +39,16 @@ use crate::query_handler::OpenTelemetryProtocolHandlerRef;
pub async fn metrics(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
RawBody(body): RawBody,
bytes: Bytes,
) -> Result<OtlpMetricsResponse> {
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = parse_metrics_body(body).await?;
let request =
ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;

handler
.metrics(request, query_ctx)
Expand All @@ -60,15 +61,6 @@ pub async fn metrics(
})
}

async fn parse_metrics_body(body: Body) -> Result<ExportMetricsServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
.and_then(|buf| {
ExportMetricsServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu)
})
}

pub struct OtlpMetricsResponse {
resp_body: ExportMetricsServiceResponse,
write_cost: usize,
Expand All @@ -88,15 +80,16 @@ impl IntoResponse for OtlpMetricsResponse {
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
RawBody(body): RawBody,
bytes: Bytes,
) -> Result<OtlpTracesResponse> {
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = parse_traces_body(body).await?;
let request =
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
handler
.traces(request, query_ctx)
.await
Expand All @@ -108,15 +101,6 @@ pub async fn traces(
})
}

async fn parse_traces_body(body: Body) -> Result<ExportTraceServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
.and_then(|buf| {
ExportTraceServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu)
})
}

pub struct OtlpTracesResponse {
resp_body: ExportTraceServiceResponse,
write_cost: usize,
Expand Down
1 change: 1 addition & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ common-wal.workspace = true
datanode = { workspace = true }
datatypes.workspace = true
dotenv.workspace = true
flate2 = "1.0"
flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
Some(instance.instance.clone()),
)
.with_log_ingest_handler(instance.instance.clone(), None)
.with_otlp_handler(instance.instance.clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());

if let Some(user_provider) = user_provider {
Expand Down
152 changes: 138 additions & 14 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::io::Write;

use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
use axum::http::{HeaderName, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics;
use prost::Message;
use serde_json::{json, Value};
use servers::http::error_result::ErrorResponse;
Expand All @@ -26,7 +32,7 @@ use servers::http::handler::HealthResponse;
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::test_helpers::TestClient;
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use tests_integration::test_util::{
Expand Down Expand Up @@ -80,6 +86,9 @@ macro_rules! http_tests {
test_pipeline_api,
test_test_pipeline_api,
test_plain_text_ingestion,

test_otlp_metrics,
test_otlp_traces,
);
)*
};
Expand Down Expand Up @@ -1391,19 +1400,7 @@ transform:
assert_eq!(res.status(), StatusCode::OK);
let resp = res.text().await;

let resp: Value = serde_json::from_str(&resp).unwrap();
let v = resp
.get("output")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.get("records")
.unwrap()
.get("rows")
.unwrap()
.to_string();
let v = get_rows_from_output(&resp);

assert_eq!(
v,
Expand All @@ -1412,3 +1409,130 @@ transform:

guard.remove_all().await;
}

pub async fn test_otlp_metrics(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_metrics").await;

let content = r#"
{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeMetrics":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"metrics":[{"name":"gen","description":"","unit":"","data":{"gauge":{"dataPoints":[{"attributes":[],"startTimeUnixNano":0,"timeUnixNano":1726053452870391000,"exemplars":[],"flags":0,"value":{"asInt":9471}}]}}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.13.0"}
"#;

let metrics: ResourceMetrics = serde_json::from_str(content).unwrap();
let req = ExportMetricsServiceRequest {
resource_metrics: vec![metrics],
};
let body = req.encode_to_vec();

// handshake
let client = TestClient::new(app);

// write metrics data
let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data
let expected = r#"[[1726053452870391000,9471.0]]"#;
validate_data(&client, "select * from gen;", expected).await;

// drop table
let res = client.get("/v1/sql?sql=drop table gen;").send().await;
assert_eq!(res.status(), StatusCode::OK);

// write metrics data with gzip
let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data again
validate_data(&client, "select * from gen;", expected).await;

guard.remove_all().await;
}

pub async fn test_otlp_traces(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;

let content = r#"
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"b5e5fb572cf0a3335dd194a14145fef5","spanId":"74c82efa6f628e80","traceState":"","parentSpanId":"3364d2da58c9fd2b","flags":0,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":1726631197820927000,"endTimeUnixNano":1726631197821050000,"attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"b5e5fb572cf0a3335dd194a14145fef5","spanId":"3364d2da58c9fd2b","traceState":"","parentSpanId":"","flags":0,"name":"lets-go","kind":3,"startTimeUnixNano":1726631197820927000,"endTimeUnixNano":1726631197821050000,"attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
"#;

let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();

// handshake
let client = TestClient::new(app);

// write traces data
let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await;
assert_eq!(StatusCode::OK, res.status());

// select traces data
let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-server\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-client\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#;
validate_data(&client, "select * from traces_preview_v01;", expected).await;

// drop table
let res = client
.get("/v1/sql?sql=drop table traces_preview_v01;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);

// write metrics data with gzip
let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data again
validate_data(&client, "select * from traces_preview_v01;", expected).await;

guard.remove_all().await;
}

async fn validate_data(client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let resp = res.text().await;
let v = get_rows_from_output(&resp);

assert_eq!(v, expected);
}

async fn send_req(client: &TestClient, path: &str, body: Vec<u8>, with_gzip: bool) -> TestResponse {
let mut req = client
.post(path)
.header("content-type", "application/x-protobuf");

let mut len = body.len();

if with_gzip {
let encoded = compress_vec_with_gzip(body);
len = encoded.len();
req = req.header("content-encoding", "gzip").body(encoded);
} else {
req = req.body(body);
}

req.header("content-length", len).send().await
}

fn get_rows_from_output(output: &str) -> String {
let resp: Value = serde_json::from_str(output).unwrap();
resp.get("output")
.and_then(Value::as_array)
.and_then(|v| v.first())
.and_then(|v| v.get("records"))
.and_then(|v| v.get("rows"))
.unwrap()
.to_string()
}

fn compress_vec_with_gzip(data: Vec<u8>) -> Vec<u8> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&data).unwrap();
encoder.finish().unwrap()
}

0 comments on commit c014e87

Please sign in to comment.