From 4c1e228b959bd9de43f83d4e0bbc7f0c4b8b239f Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Mon, 14 Oct 2024 16:54:09 -0400 Subject: [PATCH] Handle nested OTLP values in attributes and log bodies (#5485) * Handle nested OTLP values in attributes and log bodies * Fix Python install --- .github/workflows/ci.yml | 9 +- quickwit/quickwit-opentelemetry/Cargo.toml | 1 + .../quickwit-opentelemetry/src/otlp/mod.rs | 133 ++++++++++++------ 3 files changed, 95 insertions(+), 48 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 20843598963..a2554033c4c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,7 +47,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Install Ubuntu packages - run: sudo apt-get -y install protobuf-compiler python3 python3-pip + run: sudo apt-get -y install protobuf-compiler python3 - uses: dorny/paths-filter@v3 id: modified with: @@ -86,11 +86,12 @@ jobs: run: cargo build --features=postgres --bin quickwit working-directory: ./quickwit - name: Install python packages - run: sudo pip3 install pyaml requests if: always() && steps.modified.outputs.rust_src == 'true' - - name: run REST API tests + run: python3 -m venv venv && source venv/bin/activate && pip install pyaml requests + working-directory: ./quickwit/rest-api-tests + - name: Run REST API tests if: always() && steps.modified.outputs.rust_src == 'true' - run: python3 ./run_tests.py --binary ../target/debug/quickwit + run: source venv/bin/activate && python3 ./run_tests.py --binary ../target/debug/quickwit working-directory: ./quickwit/rest-api-tests lints: diff --git a/quickwit/quickwit-opentelemetry/Cargo.toml b/quickwit/quickwit-opentelemetry/Cargo.toml index ea1c2b8aecb..3534f8ab88e 100644 --- a/quickwit/quickwit-opentelemetry/Cargo.toml +++ b/quickwit/quickwit-opentelemetry/Cargo.toml @@ -32,6 +32,7 @@ quickwit-proto = { workspace = true } [dev-dependencies] quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-metastore = { workspace = true, features = ["testsuite"] } +quickwit-proto = { workspace = true, features = ["testsuite"] } [features] testsuite = [] diff --git a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs index d985c712f07..9d03726f582 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; +use quickwit_common::rate_limited_warn; use quickwit_config::{validate_identifier, validate_index_id_pattern, INGEST_V2_SOURCE_ID}; use quickwit_ingest::{CommitType, IngestServiceError}; use quickwit_proto::ingest::router::{ @@ -119,7 +120,7 @@ pub(crate) fn extract_attributes(attributes: Vec) -> HashMap) -> HashMap Option { +fn oltp_value_to_json_value(value: OtlpValue) -> Option { match value { OtlpValue::ArrayValue(OtlpArrayValue { values }) => Some( values .into_iter() - .flat_map(to_json_value_from_primitive_any_value) + .filter_map(|value| match value.value { + Some(value) => oltp_value_to_json_value(value), + None => None, + }) .collect(), ), - OtlpValue::BoolValue(value) => Some(JsonValue::Bool(value)), - OtlpValue::DoubleValue(value) => JsonNumber::from_f64(value).map(JsonValue::Number), - OtlpValue::IntValue(value) => Some(JsonValue::Number(JsonNumber::from(value))), - OtlpValue::StringValue(value) => Some(JsonValue::String(value)), - OtlpValue::BytesValue(_) | OtlpValue::KvlistValue(_) => { - // These attribute types are not supported for attributes according to the OpenTelemetry - // specification. + OtlpValue::BoolValue(bool_value) => Some(JsonValue::Bool(bool_value)), + OtlpValue::DoubleValue(double_value) => { + JsonNumber::from_f64(double_value).map(JsonValue::Number) + } + OtlpValue::IntValue(int_value) => Some(JsonValue::Number(JsonNumber::from(int_value))), + OtlpValue::KvlistValue(key_values) => { + let mut map = serde_json::Map::with_capacity(key_values.values.len()); + + for key_value in key_values.values { + if let Some(value) = key_value + .value + .and_then(|any_value| any_value.value) + .and_then(oltp_value_to_json_value) + { + map.insert(key_value.key, value); + } + } + Some(JsonValue::Object(map)) + } + OtlpValue::StringValue(string_value) => Some(JsonValue::String(string_value)), + OtlpValue::BytesValue(_) => { + rate_limited_warn!(limit_per_min = 10, "ignoring unsupported OTLP bytes value"); None } } } -fn to_json_value_from_primitive_any_value(any_value: OtlpAnyValue) -> Option { - match any_value.value { - Some(OtlpValue::BoolValue(value)) => Some(JsonValue::Bool(value)), - Some(OtlpValue::DoubleValue(value)) => JsonNumber::from_f64(value).map(JsonValue::Number), - Some(OtlpValue::IntValue(value)) => Some(JsonValue::Number(JsonNumber::from(value))), - Some(OtlpValue::StringValue(value)) => Some(JsonValue::String(value)), - _ => None, - } -} - pub(crate) fn parse_log_record_body(body: OtlpAnyValue) -> Option { - body.value.and_then(to_json_value).map(|value| { + body.value.and_then(oltp_value_to_json_value).map(|value| { if value.is_string() { let mut map = serde_json::Map::with_capacity(1); map.insert("message".to_string(), value); @@ -211,15 +220,13 @@ pub(crate) fn extract_otel_index_id_from_metadata( .transpose() .map_err(|error| { Status::internal(format!( - "failed to extract index ID from request metadata: {}", - error + "failed to extract index ID from request metadata: {error}", )) })? .unwrap_or_else(|| otel_signal.default_index_id()); validate_identifier("index_id", index_id).map_err(|error| { Status::internal(format!( - "invalid index ID pattern in request metadata: {}", - error + "invalid index ID pattern in request metadata: {error}", )) })?; Ok(index_id.to_string()) @@ -241,14 +248,11 @@ async fn ingest_doc_batch_v2( commit_type: commit_type.into(), subrequests: vec![subrequest], }; - let mut response = ingest_router - .ingest(request) - .await - .map_err(IngestServiceError::from)?; + let mut response = ingest_router.ingest(request).await?; let num_responses = response.successes.len() + response.failures.len(); if num_responses != 1 { return Err(IngestServiceError::Internal(format!( - "Expected a single failure/success, got {}.", + "expected a single failure or success, got {}", num_responses ))); } @@ -264,34 +268,75 @@ mod tests { use quickwit_proto::opentelemetry::proto::common::v1::any_value::{ Value as OtlpValue, Value as OtlpAnyValueValue, }; - use quickwit_proto::opentelemetry::proto::common::v1::ArrayValue as OtlpArrayValue; + use quickwit_proto::opentelemetry::proto::common::v1::{ + ArrayValue as OtlpArrayValue, KeyValueList as OtlpKeyValueList, + }; use serde_json::{json, Value as JsonValue}; use super::*; - use crate::otlp::{extract_attributes, parse_log_record_body, to_json_value}; + use crate::otlp::{extract_attributes, oltp_value_to_json_value, parse_log_record_body}; #[test] - fn test_to_json_value() { + fn test_oltp_value_to_json_value() { assert_eq!( - to_json_value(OtlpValue::ArrayValue(OtlpArrayValue { values: Vec::new() })), + oltp_value_to_json_value(OtlpValue::ArrayValue(OtlpArrayValue { values: Vec::new() })), Some(json!([])) ); assert_eq!( - to_json_value(OtlpValue::ArrayValue(OtlpArrayValue { - values: vec![OtlpAnyValue { - value: Some(OtlpAnyValueValue::IntValue(1337)) - }] + oltp_value_to_json_value(OtlpValue::ArrayValue(OtlpArrayValue { + values: vec![ + OtlpAnyValue { + value: Some(OtlpAnyValueValue::IntValue(1337)) + }, + OtlpAnyValue { + value: Some(OtlpAnyValueValue::StringValue("1337".to_string())) + } + ] })), - Some(json!([1337])) + Some(json!([1337, "1337"])) + ); + assert_eq!( + oltp_value_to_json_value(OtlpValue::BoolValue(true)), + Some(json!(true)) ); - assert_eq!(to_json_value(OtlpValue::BoolValue(true)), Some(json!(true))); assert_eq!( - to_json_value(OtlpValue::DoubleValue(12.0)), + oltp_value_to_json_value(OtlpValue::DoubleValue(12.0)), Some(json!(12.0)) ); - assert_eq!(to_json_value(OtlpValue::IntValue(42)), Some(json!(42))); assert_eq!( - to_json_value(OtlpValue::StringValue("foo".to_string())), + oltp_value_to_json_value(OtlpValue::IntValue(42)), + Some(json!(42)) + ); + assert_eq!( + oltp_value_to_json_value(OtlpValue::KvlistValue(OtlpKeyValueList { + values: Vec::new() + })), + Some(json!({})) + ); + assert_eq!( + oltp_value_to_json_value(OtlpValue::KvlistValue(OtlpKeyValueList { + values: vec![ + OtlpKeyValue { + key: "foo".to_string(), + value: Some(OtlpAnyValue { + value: Some(OtlpAnyValueValue::IntValue(1337)) + }) + }, + OtlpKeyValue { + key: "bar".to_string(), + value: Some(OtlpAnyValue { + value: Some(OtlpAnyValueValue::StringValue("1337".to_string())) + }) + } + ] + })), + Some(json!({ + "foo": 1337, + "bar": "1337" + })) + ); + assert_eq!( + oltp_value_to_json_value(OtlpValue::StringValue("foo".to_string())), Some(json!("foo")) ); } @@ -358,7 +403,7 @@ mod tests { }), }, ]; - let expected_attributes = std::collections::HashMap::from_iter([ + let expected_attributes = HashMap::from_iter([ ("array_key".to_string(), json!([1337])), ("bool_key".to_string(), json!(true)), ("double_key".to_string(), json!(12.0)),