Skip to content

Commit

Permalink
Add support for OTLP HTTP API. (#4335)
Browse files Browse the repository at this point in the history
* Add support for OTLP HTTP API.

* Add tests.

* Fix otlp http paths and lint.

* Add proto content type header to otlp rest endpoint.

* Fix fmt.

* Move opentelemetry tests.

* Add default otlp endpoints.

* Fix fmt.
  • Loading branch information
fmassot authored Jan 2, 2024
1 parent 30ce9e7 commit 12042de
Show file tree
Hide file tree
Showing 12 changed files with 629 additions and 218 deletions.
3 changes: 3 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ quickwit-cluster = { workspace = true }
quickwit-indexing = { workspace = true, features = ["testsuite"] }
quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-opentelemetry = { workspace = true, features = ["testsuite"] }
quickwit-search = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true, features = ["testsuite"] }

Expand Down
185 changes: 2 additions & 183 deletions quickwit/quickwit-jaeger/src/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use quickwit_ingest::{
IngesterPool, QUEUES_DIR_NAME,
};
use quickwit_metastore::{AddSourceRequestExt, CreateIndexRequestExt, FileBackedMetastore};
use quickwit_opentelemetry::otlp::OtlpGrpcTracesService;
use quickwit_opentelemetry::otlp::{make_resource_spans_for_test, OtlpGrpcTracesService};
use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPlugin;
use quickwit_proto::jaeger::storage::v1::{
FindTraceIDsRequest, GetOperationsRequest, GetServicesRequest, GetTraceRequest, Operation,
Expand All @@ -45,23 +45,13 @@ use quickwit_proto::metastore::{
};
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceService;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpAnyValueValue;
use quickwit_proto::opentelemetry::proto::common::v1::{
AnyValue as OtlpAnyValue, ArrayValue, InstrumentationScope, KeyValue as OtlpKeyValue,
};
use quickwit_proto::opentelemetry::proto::resource::v1::Resource;
use quickwit_proto::opentelemetry::proto::trace::v1::span::{Event as OtlpEvent, Link as OtlpLink};
use quickwit_proto::opentelemetry::proto::trace::v1::{
ResourceSpans, ScopeSpans, Span as OtlpSpan, Status as OtlpStatus,
};
use quickwit_proto::types::{IndexUid, PipelineUid};
use quickwit_search::{
start_searcher_service, SearchJobPlacer, SearchService, SearchServiceClient, SearcherContext,
SearcherPool,
};
use quickwit_storage::StorageResolver;
use tempfile::TempDir;
use time::OffsetDateTime;
use tokio_stream::StreamExt;

use crate::JaegerService;
Expand Down Expand Up @@ -109,7 +99,7 @@ async fn test_otel_jaeger_integration() {
{
// Export traces.
let export_trace_request = ExportTraceServiceRequest {
resource_spans: make_resource_spans(),
resource_spans: make_resource_spans_for_test(),
};
traces_service
.export(tonic::Request::new(export_trace_request))
Expand Down Expand Up @@ -434,174 +424,3 @@ async fn setup_traces_index(
.await
.unwrap();
}

fn now_minus_x_secs(now: &OffsetDateTime, secs: u64) -> u64 {
(*now - Duration::from_secs(secs)).unix_timestamp_nanos() as u64
}

fn make_resource_spans() -> Vec<ResourceSpans> {
let now = OffsetDateTime::now_utc();

let attributes = vec![OtlpKeyValue {
key: "span_key".to_string(),
value: Some(OtlpAnyValue {
value: Some(OtlpAnyValueValue::StringValue("span_value".to_string())),
}),
}];
let events = vec![OtlpEvent {
name: "event_name".to_string(),
time_unix_nano: 1_000_500_003,
attributes: vec![OtlpKeyValue {
key: "event_key".to_string(),
value: Some(OtlpAnyValue {
value: Some(OtlpAnyValueValue::StringValue("event_value".to_string())),
}),
}],
dropped_attributes_count: 6,
}];
let links = vec![OtlpLink {
trace_id: vec![4; 16],
span_id: vec![5; 8],
trace_state: "link_key1=link_value1,link_key2=link_value2".to_string(),
attributes: vec![OtlpKeyValue {
key: "link_key".to_string(),
value: Some(OtlpAnyValue {
value: Some(OtlpAnyValueValue::StringValue("link_value".to_string())),
}),
}],
dropped_attributes_count: 7,
}];
let spans = vec![
OtlpSpan {
trace_id: vec![1; 16],
span_id: vec![1; 8],
parent_span_id: Vec::new(),
trace_state: "key1=value1,key2=value2".to_string(),
name: "stage_splits".to_string(),
kind: 1, // Internal
start_time_unix_nano: now_minus_x_secs(&now, 6),
end_time_unix_nano: now_minus_x_secs(&now, 5),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: Vec::new(),
dropped_events_count: 0,
links: Vec::new(),
dropped_links_count: 0,
status: None,
},
OtlpSpan {
trace_id: vec![2; 16],
span_id: vec![2; 8],
parent_span_id: Vec::new(),
trace_state: "key1=value1,key2=value2".to_string(),
name: "publish_splits".to_string(),
kind: 2, // Server
start_time_unix_nano: now_minus_x_secs(&now, 4),
end_time_unix_nano: now_minus_x_secs(&now, 3),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: Vec::new(),
dropped_events_count: 0,
links: Vec::new(),
dropped_links_count: 0,
status: None,
},
OtlpSpan {
trace_id: vec![3; 16],
span_id: vec![3; 8],
parent_span_id: Vec::new(),
trace_state: "key1=value1,key2=value2".to_string(),
name: "list_splits".to_string(),
kind: 3, // Client
start_time_unix_nano: now_minus_x_secs(&now, 2),
end_time_unix_nano: now_minus_x_secs(&now, 1),
attributes,
dropped_attributes_count: 0,
events: Vec::new(),
dropped_events_count: 0,
links: Vec::new(),
dropped_links_count: 0,
status: Some(OtlpStatus {
code: 1,
message: "".to_string(),
}),
},
OtlpSpan {
trace_id: vec![4; 16],
span_id: vec![4; 8],
parent_span_id: Vec::new(),
trace_state: "key1=value1,key2=value2".to_string(),
name: "list_splits".to_string(),
kind: 3, // Client
start_time_unix_nano: now_minus_x_secs(&now, 2),
end_time_unix_nano: now_minus_x_secs(&now, 1),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: Vec::new(),
dropped_events_count: 0,
links: Vec::new(),
dropped_links_count: 0,
status: Some(OtlpStatus {
code: 2,
message: "An error occurred.".to_string(),
}),
},
OtlpSpan {
trace_id: vec![5; 16],
span_id: vec![5; 8],
parent_span_id: Vec::new(),
trace_state: "key1=value1,key2=value2".to_string(),
name: "delete_splits".to_string(),
kind: 3, // Client
start_time_unix_nano: now_minus_x_secs(&now, 2),
end_time_unix_nano: now_minus_x_secs(&now, 1),
attributes: Vec::new(),
dropped_attributes_count: 0,
events,
dropped_events_count: 0,
links,
dropped_links_count: 0,
status: Some(OtlpStatus {
code: 2,
message: "Storage error.".to_string(),
}),
},
];
let scope_spans = vec![ScopeSpans {
scope: Some(InstrumentationScope {
name: "opentelemetry-otlp".to_string(),
version: "0.11.0".to_string(),
attributes: vec![],
dropped_attributes_count: 0,
}),
spans,
schema_url: "".to_string(),
}];
let resource_attributes = vec![
OtlpKeyValue {
key: "service.name".to_string(),
value: Some(OtlpAnyValue {
value: Some(OtlpAnyValueValue::StringValue("quickwit".to_string())),
}),
},
OtlpKeyValue {
key: "tags".to_string(),
value: Some(OtlpAnyValue {
value: Some(OtlpAnyValueValue::ArrayValue(ArrayValue {
values: vec![OtlpAnyValue {
value: Some(OtlpAnyValueValue::StringValue("foo".to_string())),
}],
})),
}),
},
];
let resource_spans = ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes,
dropped_attributes_count: 0,
}),
scope_spans,
schema_url: "".to_string(),
};
vec![resource_spans]
}
6 changes: 6 additions & 0 deletions quickwit/quickwit-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true, optional = true }
tokio = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
Expand All @@ -29,6 +30,11 @@ quickwit-ingest = { workspace = true }
quickwit-proto = { workspace = true }

[dev-dependencies]
time = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }

[features]
testsuite = ["time"]
13 changes: 10 additions & 3 deletions quickwit/quickwit-opentelemetry/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ use serde_json::{Number as JsonNumber, Value as JsonValue};
mod logs;
mod metrics;
mod span_id;
#[cfg(any(test, feature = "testsuite"))]
mod test_utils;
mod trace_id;
mod traces;

pub use logs::{OtlpGrpcLogsService, OTEL_LOGS_INDEX_ID};
pub use span_id::{SpanId, TryFromSpanIdError};
#[cfg(any(test, feature = "testsuite"))]
pub use test_utils::make_resource_spans_for_test;
pub use trace_id::{TraceId, TryFromTraceIdError};
pub use traces::{
parse_otlp_spans_json, parse_otlp_spans_protobuf, Event, JsonSpanIterator, Link,
Expand Down Expand Up @@ -133,11 +137,14 @@ fn is_zero(count: &u32) -> bool {

#[cfg(test)]
mod tests {
use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpAnyValueValue;
use quickwit_proto::opentelemetry::proto::common::v1::any_value::{
Value as OtlpValue, Value as OtlpAnyValueValue,
};
use quickwit_proto::opentelemetry::proto::common::v1::ArrayValue as OtlpArrayValue;
use serde_json::json;
use serde_json::{json, Value as JsonValue};

use super::*;
use crate::otlp::{extract_attributes, parse_log_record_body, to_json_value};

#[test]
fn test_to_json_value() {
Expand Down Expand Up @@ -227,7 +234,7 @@ mod tests {
}),
},
];
let expected_attributes = HashMap::from_iter([
let expected_attributes = std::collections::HashMap::from_iter([
("array_key".to_string(), json!([1337])),
("bool_key".to_string(), json!(true)),
("double_key".to_string(), json!(12.0)),
Expand Down
Loading

0 comments on commit 12042de

Please sign in to comment.