diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index c82b0f5fe14f..53883cc5e705 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -96,7 +96,7 @@ pub async fn traces( } pub struct PipelineInfo { - pub pipeline_name: String, + pub pipeline_name: Option, pub pipeline_version: Option, } @@ -124,15 +124,15 @@ where let pipeline_version = parts.headers.get("X-Pipeline-Version"); match (pipeline_name, pipeline_version) { (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: pipeline_header_error(name)?, + pipeline_name: Some(pipeline_header_error(name)?), pipeline_version: Some(pipeline_header_error(version)?), }), - (None, _) => Err(( - StatusCode::BAD_REQUEST, - "`X-Pipeline-Name` header is missing", - )), + (None, _) => Ok(PipelineInfo { + pipeline_name: None, + pipeline_version: None, + }), (Some(name), None) => Ok(PipelineInfo { - pipeline_name: pipeline_header_error(name)?, + pipeline_name: Some(pipeline_header_error(name)?), pipeline_version: None, }), } @@ -175,15 +175,13 @@ pub async fn logs( let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); - let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED + let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; let pipeline_way; - if pipeline_info.pipeline_name == "identity" { - pipeline_way = PipelineWay::Identity; - } else { + if let Some(pipeline_name) = &pipeline_info.pipeline_name { let pipeline_version = to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { error::InvalidParameterSnafu { @@ -192,11 +190,7 @@ pub async fn logs( .build() })?; let pipeline = match handler - .get_pipeline( - &pipeline_info.pipeline_name, - pipeline_version, - query_ctx.clone(), - ) + .get_pipeline(pipeline_name, pipeline_version, query_ctx.clone()) .await { Ok(p) => p, @@ -205,7 +199,10 @@ pub async fn logs( } }; pipeline_way = PipelineWay::Custom(pipeline); + } else { + pipeline_way = PipelineWay::Identity; } + handler .logs(request, pipeline_way, table_info.table_name, query_ctx) .await diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index cdf927536f04..ead86f3ad88b 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -141,6 +141,13 @@ lazy_static! { &[METRIC_DB_LABEL] ) .unwrap(); + pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_http_otlp_logs_elapsed", + "servers http otlp logs elapsed", + &[METRIC_DB_LABEL] + ) + .unwrap(); pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!( "greptime_servers_http_logs_ingestion_counter", "servers http logs ingestion counter", diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e11060fbbda5..cb34e8582e4c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -17,10 +17,11 @@ use std::io::Write; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; -use axum::http::{HeaderName, StatusCode}; +use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; @@ -89,6 +90,7 @@ macro_rules! http_tests { test_otlp_metrics, test_otlp_traces, + test_otlp_logs, ); )* }; @@ -1429,7 +1431,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { let client = TestClient::new(app); // write metrics data - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data @@ -1441,7 +1443,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1466,7 +1468,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { let client = TestClient::new(app); // write traces data - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select traces data @@ -1481,7 +1483,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1490,6 +1492,41 @@ pub async fn test_otlp_traces(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_otlp_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; + + let content = r#" +{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"stringValue":"This is a log message"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"stringValue":"something happened"},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]} +"#; + + let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + + // handshake + let client = TestClient::new(app); + + // write traces data + let res = send_req( + &client, + vec![( + HeaderName::from_static("x-table-name"), + HeaderValue::from_static("logs"), + )], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // TODO(qtang): we show convert jsonb to json string in http sql API + let expected = r#"[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"null"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"null"]]"#; + validate_data(&client, "select * from logs;", expected).await; + + guard.remove_all().await; +} + async fn validate_data(client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str()) @@ -1502,11 +1539,21 @@ async fn validate_data(client: &TestClient, sql: &str, expected: &str) { assert_eq!(v, expected); } -async fn send_req(client: &TestClient, path: &str, body: Vec, with_gzip: bool) -> TestResponse { +async fn send_req( + client: &TestClient, + headers: Vec<(HeaderName, HeaderValue)>, + path: &str, + body: Vec, + with_gzip: bool, +) -> TestResponse { let mut req = client .post(path) .header("content-type", "application/x-protobuf"); + for (k, v) in headers { + req = req.header(k, v); + } + let mut len = body.len(); if with_gzip {