Skip to content

Commit

Permalink
allow specifying index in otel http protobuf api (#5421)
Browse files Browse the repository at this point in the history
* support ingesting to arbitrary index on otel http api

* add tests
  • Loading branch information
trinity-1686a authored Oct 17, 2024
1 parent a3ce1b8 commit 4ec0eb7
Showing 1 changed file with 113 additions and 15 deletions.
128 changes: 113 additions & 15 deletions quickwit/quickwit-serve/src/otlp_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

use quickwit_common::rate_limited_error;
use quickwit_opentelemetry::otlp::{
OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID,
OtelSignal, OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID,
OTEL_TRACES_INDEX_ID,
};
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::{
Expand All @@ -41,7 +42,12 @@ use crate::rest_api_response::into_rest_api_response;
use crate::{require, with_arg, Body, BodyFormat};

#[derive(utoipa::OpenApi)]
#[openapi(paths(otlp_default_logs_handler, otlp_default_traces_handler))]
#[openapi(paths(
otlp_default_logs_handler,
otlp_logs_handler,
otlp_default_traces_handler,
otlp_ingest_traces_handler
))]
pub struct OtlpApi;

/// Setup OpenTelemetry API handlers.
Expand Down Expand Up @@ -77,12 +83,22 @@ pub(crate) fn otlp_default_logs_handler(
.and(warp::post())
.and(get_body_bytes())
.then(|otlp_logs_service, body| async move {
// TODO get index id from header if available
otlp_ingest_logs(otlp_logs_service, OTEL_LOGS_INDEX_ID.to_string(), body).await
})
.and(with_arg(BodyFormat::default()))
.map(into_rest_api_response)
}

/// Open Telemetry REST/Protobuf logs ingest endpoint.
#[utoipa::path(
post,
tag = "Open Telemetry",
path = "/{index}/otlp/v1/logs",
request_body(content = String, description = "`ExportLogsServiceRequest` protobuf message", content_type = "application/x-protobuf"),
responses(
(status = 200, description = "Successfully exported logs.", body = ExportLogsServiceResponse)
),
)]
pub(crate) fn otlp_logs_handler(
otlp_log_service: Option<OtlpGrpcLogsService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
Expand Down Expand Up @@ -121,12 +137,22 @@ pub(crate) fn otlp_default_traces_handler(
.and(warp::post())
.and(get_body_bytes())
.then(|otlp_traces_service, body| async move {
// TODO get index id from header if available
otlp_ingest_traces(otlp_traces_service, OTEL_TRACES_INDEX_ID.to_string(), body).await
})
.and(with_arg(BodyFormat::default()))
.map(into_rest_api_response)
}

/// Open Telemetry REST/Protobuf traces ingest endpoint.
#[utoipa::path(
post,
tag = "Open Telemetry",
path = "/{index}/otlp/v1/traces",
request_body(content = String, description = "`ExportTraceServiceRequest` protobuf message", content_type = "application/x-protobuf"),
responses(
(status = 200, description = "Successfully exported traces.", body = ExportTracesServiceResponse)
),
)]
pub(crate) fn otlp_ingest_traces_handler(
otlp_traces_service: Option<OtlpGrpcTracesService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
Expand Down Expand Up @@ -165,30 +191,44 @@ impl ServiceError for OtlpApiError {

async fn otlp_ingest_logs(
otlp_logs_service: OtlpGrpcLogsService,
_index_id: IndexId, // <- TODO: use index ID when gRPC service supports it.
index_id: IndexId,
body: Body,
) -> Result<ExportLogsServiceResponse, OtlpApiError> {
// TODO: use index ID.
let export_logs_request: ExportLogsServiceRequest =
prost::Message::decode(&body.content[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let mut request = tonic::Request::new(export_logs_request);
let index = index_id
.try_into()
.map_err(|_| OtlpApiError::InvalidPayload("invalid index id".to_string()))?;
request
.metadata_mut()
.insert(OtelSignal::Logs.header_name(), index);
let result = otlp_logs_service
.export(tonic::Request::new(export_logs_request))
.export(request)
.await
.map_err(|err| OtlpApiError::Ingest(err.to_string()))?;
Ok(result.into_inner())
}

async fn otlp_ingest_traces(
otlp_traces_service: OtlpGrpcTracesService,
_index_id: IndexId, // <- TODO: use index ID when gRPC service supports it.
index_id: IndexId,
body: Body,
) -> Result<ExportTraceServiceResponse, OtlpApiError> {
let export_traces_request: ExportTraceServiceRequest =
prost::Message::decode(&body.content[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let mut request = tonic::Request::new(export_traces_request);
let index = index_id
.try_into()
.map_err(|_| OtlpApiError::InvalidPayload("invalid index id".to_string()))?;
request
.metadata_mut()
.insert(OtelSignal::Traces.header_name(), index);
let response = otlp_traces_service
.export(tonic::Request::new(export_traces_request))
.export(request)
.await
.map_err(|err| OtlpApiError::Ingest(err.to_string()))?;
Ok(response.into_inner())
Expand Down Expand Up @@ -232,11 +272,40 @@ mod tests {
let mut mock_ingest_router = MockIngestRouterService::new();
mock_ingest_router
.expect_ingest()
.times(2)
.withf(|request| {
request.subrequests.len() == 1
&& request.subrequests[0].doc_batch.is_some()
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 1
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 1
&& subrequest.index_id == quickwit_opentelemetry::otlp::OTEL_LOGS_INDEX_ID
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
successes: vec![IngestSuccess {
num_ingested_docs: 1,
..Default::default()
}],
failures: Vec::new(),
})
});
mock_ingest_router
.expect_ingest()
.times(1)
.withf(|request| {
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 1
&& subrequest.index_id == "otel-traces-v0_6"
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
Expand Down Expand Up @@ -349,11 +418,40 @@ mod tests {
let mut mock_ingest_router = MockIngestRouterService::new();
mock_ingest_router
.expect_ingest()
.times(2)
.withf(|request| {
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 5
&& subrequest.index_id == quickwit_opentelemetry::otlp::OTEL_TRACES_INDEX_ID
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
successes: vec![IngestSuccess {
num_ingested_docs: 1,
..Default::default()
}],
failures: Vec::new(),
})
});
mock_ingest_router
.expect_ingest()
.times(1)
.withf(|request| {
request.subrequests.len() == 1
&& request.subrequests[0].doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 5
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 5
&& subrequest.index_id == "otel-traces-v0_6"
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
Expand Down

0 comments on commit 4ec0eb7

Please sign in to comment.