From 4ec0eb789caa4e3444d54176e8c5bb5fc36b6e7e Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Thu, 17 Oct 2024 16:42:04 +0200 Subject: [PATCH] allow specifying index in otel http protobuf api (#5421) * support ingesting to arbitrary index on otel http api * add tests --- .../src/otlp_api/rest_handler.rs | 128 ++++++++++++++++-- 1 file changed, 113 insertions(+), 15 deletions(-) diff --git a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs index 6e3bac742c8..313ebf71cd2 100644 --- a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs @@ -19,7 +19,8 @@ use quickwit_common::rate_limited_error; use quickwit_opentelemetry::otlp::{ - OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID, + OtelSignal, OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, + OTEL_TRACES_INDEX_ID, }; use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService; use quickwit_proto::opentelemetry::proto::collector::logs::v1::{ @@ -41,7 +42,12 @@ use crate::rest_api_response::into_rest_api_response; use crate::{require, with_arg, Body, BodyFormat}; #[derive(utoipa::OpenApi)] -#[openapi(paths(otlp_default_logs_handler, otlp_default_traces_handler))] +#[openapi(paths( + otlp_default_logs_handler, + otlp_logs_handler, + otlp_default_traces_handler, + otlp_ingest_traces_handler +))] pub struct OtlpApi; /// Setup OpenTelemetry API handlers. @@ -77,12 +83,22 @@ pub(crate) fn otlp_default_logs_handler( .and(warp::post()) .and(get_body_bytes()) .then(|otlp_logs_service, body| async move { + // TODO get index id from header if available otlp_ingest_logs(otlp_logs_service, OTEL_LOGS_INDEX_ID.to_string(), body).await }) .and(with_arg(BodyFormat::default())) .map(into_rest_api_response) } - +/// Open Telemetry REST/Protobuf logs ingest endpoint. +#[utoipa::path( + post, + tag = "Open Telemetry", + path = "/{index}/otlp/v1/logs", + request_body(content = String, description = "`ExportLogsServiceRequest` protobuf message", content_type = "application/x-protobuf"), + responses( + (status = 200, description = "Successfully exported logs.", body = ExportLogsServiceResponse) + ), +)] pub(crate) fn otlp_logs_handler( otlp_log_service: Option, ) -> impl Filter + Clone { @@ -121,12 +137,22 @@ pub(crate) fn otlp_default_traces_handler( .and(warp::post()) .and(get_body_bytes()) .then(|otlp_traces_service, body| async move { + // TODO get index id from header if available otlp_ingest_traces(otlp_traces_service, OTEL_TRACES_INDEX_ID.to_string(), body).await }) .and(with_arg(BodyFormat::default())) .map(into_rest_api_response) } - +/// Open Telemetry REST/Protobuf traces ingest endpoint. +#[utoipa::path( + post, + tag = "Open Telemetry", + path = "/{index}/otlp/v1/traces", + request_body(content = String, description = "`ExportTraceServiceRequest` protobuf message", content_type = "application/x-protobuf"), + responses( + (status = 200, description = "Successfully exported traces.", body = ExportTracesServiceResponse) + ), +)] pub(crate) fn otlp_ingest_traces_handler( otlp_traces_service: Option, ) -> impl Filter + Clone { @@ -165,15 +191,22 @@ impl ServiceError for OtlpApiError { async fn otlp_ingest_logs( otlp_logs_service: OtlpGrpcLogsService, - _index_id: IndexId, // <- TODO: use index ID when gRPC service supports it. + index_id: IndexId, body: Body, ) -> Result { // TODO: use index ID. let export_logs_request: ExportLogsServiceRequest = prost::Message::decode(&body.content[..]) .map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?; + let mut request = tonic::Request::new(export_logs_request); + let index = index_id + .try_into() + .map_err(|_| OtlpApiError::InvalidPayload("invalid index id".to_string()))?; + request + .metadata_mut() + .insert(OtelSignal::Logs.header_name(), index); let result = otlp_logs_service - .export(tonic::Request::new(export_logs_request)) + .export(request) .await .map_err(|err| OtlpApiError::Ingest(err.to_string()))?; Ok(result.into_inner()) @@ -181,14 +214,21 @@ async fn otlp_ingest_logs( async fn otlp_ingest_traces( otlp_traces_service: OtlpGrpcTracesService, - _index_id: IndexId, // <- TODO: use index ID when gRPC service supports it. + index_id: IndexId, body: Body, ) -> Result { let export_traces_request: ExportTraceServiceRequest = prost::Message::decode(&body.content[..]) .map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?; + let mut request = tonic::Request::new(export_traces_request); + let index = index_id + .try_into() + .map_err(|_| OtlpApiError::InvalidPayload("invalid index id".to_string()))?; + request + .metadata_mut() + .insert(OtelSignal::Traces.header_name(), index); let response = otlp_traces_service - .export(tonic::Request::new(export_traces_request)) + .export(request) .await .map_err(|err| OtlpApiError::Ingest(err.to_string()))?; Ok(response.into_inner()) @@ -232,11 +272,40 @@ mod tests { let mut mock_ingest_router = MockIngestRouterService::new(); mock_ingest_router .expect_ingest() + .times(2) .withf(|request| { - request.subrequests.len() == 1 - && request.subrequests[0].doc_batch.is_some() + if request.subrequests.len() == 1 { + let subrequest = &request.subrequests[0]; + subrequest.doc_batch.is_some() // && request.commit == CommitType::Auto as i32 - && request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 1 + && subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 1 + && subrequest.index_id == quickwit_opentelemetry::otlp::OTEL_LOGS_INDEX_ID + } else { + false + } + }) + .returning(|_| { + Ok(IngestResponseV2 { + successes: vec![IngestSuccess { + num_ingested_docs: 1, + ..Default::default() + }], + failures: Vec::new(), + }) + }); + mock_ingest_router + .expect_ingest() + .times(1) + .withf(|request| { + if request.subrequests.len() == 1 { + let subrequest = &request.subrequests[0]; + subrequest.doc_batch.is_some() + // && request.commit == CommitType::Auto as i32 + && subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 1 + && subrequest.index_id == "otel-traces-v0_6" + } else { + false + } }) .returning(|_| { Ok(IngestResponseV2 { @@ -349,11 +418,40 @@ mod tests { let mut mock_ingest_router = MockIngestRouterService::new(); mock_ingest_router .expect_ingest() + .times(2) + .withf(|request| { + if request.subrequests.len() == 1 { + let subrequest = &request.subrequests[0]; + subrequest.doc_batch.is_some() + // && request.commit == CommitType::Auto as i32 + && subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 5 + && subrequest.index_id == quickwit_opentelemetry::otlp::OTEL_TRACES_INDEX_ID + } else { + false + } + }) + .returning(|_| { + Ok(IngestResponseV2 { + successes: vec![IngestSuccess { + num_ingested_docs: 1, + ..Default::default() + }], + failures: Vec::new(), + }) + }); + mock_ingest_router + .expect_ingest() + .times(1) .withf(|request| { - request.subrequests.len() == 1 - && request.subrequests[0].doc_batch.is_some() - // && request.commit == CommitType::Auto as i32 - && request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 5 + if request.subrequests.len() == 1 { + let subrequest = &request.subrequests[0]; + subrequest.doc_batch.is_some() + // && request.commit == CommitType::Auto as i32 + && subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 5 + && subrequest.index_id == "otel-traces-v0_6" + } else { + false + } }) .returning(|_| { Ok(IngestResponseV2 {