Skip to content

Commit

Permalink
Support compression on OTLP HTTP endpoints (#5260)
Browse files Browse the repository at this point in the history
* Support gzip on OTLP HTTP endpoints

* Remove outdated comment

Co-authored-by: trinity-1686a <[email protected]>

---------

Co-authored-by: trinity-1686a <[email protected]>
  • Loading branch information
rdettai and trinity-1686a authored Jul 29, 2024
1 parent 36dea46 commit 5b5978e
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use quickwit_common::uri::Uri as QuickwitUri;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_metastore::{MetastoreResolver, SplitState};
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_client::TraceServiceClient;
use quickwit_proto::types::NodeId;
use quickwit_rest_client::models::IngestSource;
Expand Down Expand Up @@ -93,6 +94,7 @@ pub struct ClusterSandbox {
pub searcher_rest_client: QuickwitClient,
pub indexer_rest_client: QuickwitClient,
pub trace_client: TraceServiceClient<tonic::transport::Channel>,
pub logs_client: LogsServiceClient<tonic::transport::Channel>,
_temp_dir: TempDir,
join_handles: Vec<JoinHandle<Result<HashMap<String, ActorExitStatus>, anyhow::Error>>>,
shutdown_trigger: ClusterShutdownTrigger,
Expand Down Expand Up @@ -206,7 +208,8 @@ impl ClusterSandbox {
indexer_config.node_config.rest_config.listen_addr,
))
.build(),
trace_client: TraceServiceClient::new(channel),
trace_client: TraceServiceClient::new(channel.clone()),
logs_client: LogsServiceClient::new(channel),
_temp_dir: temp_dir,
join_handles,
shutdown_trigger,
Expand Down
82 changes: 68 additions & 14 deletions quickwit/quickwit-integration-tests/src/tests/index_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ use hyper::StatusCode;
use quickwit_config::service::QuickwitService;
use quickwit_config::ConfigFormat;
use quickwit_metastore::SplitState;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value;
use quickwit_proto::opentelemetry::proto::common::v1::AnyValue;
use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span};
use quickwit_rest_client::error::{ApiError, Error};
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::SearchRequestQueryString;
use serde_json::json;
use tonic::codec::CompressionEncoding;

use crate::ingest_json;
use crate::test_utils::{ingest_with_retry, ClusterSandbox};
Expand Down Expand Up @@ -660,7 +665,9 @@ async fn test_ingest_traces_with_otlp_grpc_api() {
.unwrap();
// Wait for the pipelines to start (one for logs and one for traces)
sandbox.wait_for_indexing_pipelines(2).await.unwrap();
let client = sandbox.trace_client.clone();

// build test OTEL span
let scope_spans = vec![ScopeSpans {
spans: vec![
Span {
Expand All @@ -686,14 +693,12 @@ async fn test_ingest_traces_with_otlp_grpc_api() {
}];
let request = ExportTraceServiceRequest { resource_spans };

// Send the spans on the default index.
{
let response = sandbox
.trace_client
.clone()
.export(request.clone())
.await
.unwrap();
// Send the spans on the default index, uncompressed and compressed
for mut tested_client in vec![
client.clone(),
client.clone().send_compressed(CompressionEncoding::Gzip),
] {
let response = tested_client.export(request.clone()).await.unwrap();
assert_eq!(
response
.into_inner()
Expand All @@ -711,14 +716,63 @@ async fn test_ingest_traces_with_otlp_grpc_api() {
"qw-otel-traces-index",
tonic::metadata::MetadataValue::try_from("non-existing-index").unwrap(),
);
let status = sandbox
.trace_client
.clone()
.export(tonic_request)
.await
.unwrap_err();
let status = client.clone().export(tonic_request).await.unwrap_err();
assert_eq!(status.code(), tonic::Code::NotFound);
}

sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_ingest_logs_with_otlp_grpc_api() {
quickwit_common::setup_logging_for_tests();
let nodes_services = vec![
HashSet::from_iter([QuickwitService::Searcher]),
HashSet::from_iter([QuickwitService::Metastore]),
HashSet::from_iter([QuickwitService::Indexer]),
HashSet::from_iter([QuickwitService::ControlPlane]),
HashSet::from_iter([QuickwitService::Janitor]),
];
let sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services)
.await
.unwrap();
// Wait fo the pipelines to start (one for logs and one for traces)
sandbox.wait_for_indexing_pipelines(2).await.unwrap();
let client = sandbox.logs_client.clone();

// build test OTEL log
let log_record = LogRecord {
time_unix_nano: 1_000_000_001,
body: Some(AnyValue {
value: Some(Value::StringValue("hello".to_string())),
}),
..Default::default()
};
let scope_logs = ScopeLogs {
log_records: vec![log_record],
..Default::default()
};
let resource_logs = vec![ResourceLogs {
scope_logs: vec![scope_logs],
..Default::default()
}];
let request = ExportLogsServiceRequest { resource_logs };

// Send the logs on the default index, uncompressed and compressed
for mut tested_client in vec![
client.clone(),
client.clone().send_compressed(CompressionEncoding::Gzip),
] {
let response = tested_client.export(request.clone()).await.unwrap();
assert_eq!(
response
.into_inner()
.partial_success
.unwrap()
.rejected_log_records,
0
);
}

sandbox.shutdown().await.unwrap();
}
74 changes: 62 additions & 12 deletions quickwit/quickwit-serve/src/otlp_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytes::Bytes;
use quickwit_common::rate_limited_error;
use quickwit_opentelemetry::otlp::{
OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID,
Expand All @@ -36,9 +35,10 @@ use serde::{self, Serialize};
use tracing::error;
use warp::{Filter, Rejection};

use crate::decompression::get_body_bytes;
use crate::rest::recover_fn;
use crate::rest_api_response::into_rest_api_response;
use crate::{require, with_arg, BodyFormat};
use crate::{require, with_arg, Body, BodyFormat};

#[derive(utoipa::OpenApi)]
#[openapi(paths(otlp_default_logs_handler, otlp_default_traces_handler))]
Expand Down Expand Up @@ -75,7 +75,7 @@ pub(crate) fn otlp_default_logs_handler(
"application/x-protobuf",
))
.and(warp::post())
.and(warp::body::bytes())
.and(get_body_bytes())
.then(|otlp_logs_service, body| async move {
otlp_ingest_logs(otlp_logs_service, OTEL_LOGS_INDEX_ID.to_string(), body).await
})
Expand All @@ -93,7 +93,7 @@ pub(crate) fn otlp_logs_handler(
"application/x-protobuf",
))
.and(warp::post())
.and(warp::body::bytes())
.and(get_body_bytes())
.then(otlp_ingest_logs)
.and(with_arg(BodyFormat::default()))
.map(into_rest_api_response)
Expand All @@ -119,7 +119,7 @@ pub(crate) fn otlp_default_traces_handler(
"application/x-protobuf",
))
.and(warp::post())
.and(warp::body::bytes())
.and(get_body_bytes())
.then(|otlp_traces_service, body| async move {
otlp_ingest_traces(otlp_traces_service, OTEL_TRACES_INDEX_ID.to_string(), body).await
})
Expand All @@ -137,7 +137,7 @@ pub(crate) fn otlp_ingest_traces_handler(
"application/x-protobuf",
))
.and(warp::post())
.and(warp::body::bytes())
.and(get_body_bytes())
.then(otlp_ingest_traces)
.and(with_arg(BodyFormat::default()))
.map(into_rest_api_response)
Expand Down Expand Up @@ -166,11 +166,12 @@ impl ServiceError for OtlpApiError {
async fn otlp_ingest_logs(
otlp_logs_service: OtlpGrpcLogsService,
_index_id: IndexId, // <- TODO: use index ID when gRPC service supports it.
body: Bytes,
body: Body,
) -> Result<ExportLogsServiceResponse, OtlpApiError> {
// TODO: use index ID.
let export_logs_request: ExportLogsServiceRequest = prost::Message::decode(&body[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let export_logs_request: ExportLogsServiceRequest =
prost::Message::decode(&body.content[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let result = otlp_logs_service
.export(tonic::Request::new(export_logs_request))
.await
Expand All @@ -181,10 +182,11 @@ async fn otlp_ingest_logs(
async fn otlp_ingest_traces(
otlp_traces_service: OtlpGrpcTracesService,
_index_id: IndexId, // <- TODO: use index ID when gRPC service supports it.
body: Bytes,
body: Body,
) -> Result<ExportTraceServiceResponse, OtlpApiError> {
let export_traces_request: ExportTraceServiceRequest = prost::Message::decode(&body[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let export_traces_request: ExportTraceServiceRequest =
prost::Message::decode(&body.content[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let response = otlp_traces_service
.export(tonic::Request::new(export_traces_request))
.await
Expand All @@ -194,6 +196,10 @@ async fn otlp_ingest_traces(

#[cfg(test)]
mod tests {
use std::io::Write;

use flate2::write::GzEncoder;
use flate2::Compression;
use prost::Message;
use quickwit_ingest::{CommitType, IngestResponse, IngestServiceClient, MockIngestService};
use quickwit_opentelemetry::otlp::{
Expand All @@ -212,6 +218,12 @@ mod tests {
use super::otlp_ingest_api_handlers;
use crate::rest::recover_fn;

fn compress(body: &[u8]) -> Vec<u8> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body).expect("Failed to write to encoder");
encoder.finish().expect("Failed to finish compression")
}

#[tokio::test]
async fn test_otlp_ingest_logs_handler() {
let mut mock_ingest_service = MockIngestService::new();
Expand Down Expand Up @@ -280,6 +292,28 @@ mod tests {
0
);
}
{
// Test default otlp endpoint with compression
let resp = warp::test::request()
.path("/otlp/v1/logs")
.method("POST")
.header("content-type", "application/x-protobuf")
.header("content-encoding", "gzip")
.body(compress(&body))
.reply(&otlp_traces_api_handler)
.await;
assert_eq!(resp.status(), 200);
let actual_response: ExportLogsServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(
actual_response
.partial_success
.unwrap()
.rejected_log_records,
0
);
}
{
// Test endpoint with given index ID.
let resp = warp::test::request()
Expand Down Expand Up @@ -343,6 +377,22 @@ mod tests {
assert!(actual_response.partial_success.is_some());
assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0);
}
{
// Test default otlp endpoint with compression
let resp = warp::test::request()
.path("/otlp/v1/traces")
.method("POST")
.header("content-type", "application/x-protobuf")
.header("content-encoding", "gzip")
.body(compress(&body))
.reply(&otlp_traces_api_handler)
.await;
assert_eq!(resp.status(), 200);
let actual_response: ExportTraceServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0);
}
{
// Test endpoint with given index ID.
let resp = warp::test::request()
Expand Down

0 comments on commit 5b5978e

Please sign in to comment.