From d43f013e7bc184e0eda86c2cf8a665cbc8723da1 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 23 Aug 2024 10:09:51 +0200 Subject: [PATCH] Switch OTLP endpoints to ingest v2 (#5283) * Use ingest V2 for OTLP * Integration test cleanup * Assert OTLP searchable * Disable Jaeger tests for now * Fix clippy * Fix integration tests with orderly shutdown * Migrate Jaeger integ tests to integ tests * Use correct nano timestamps in tests * Simplify JsonDocBatchV2Builder * Refactor some common code * Rename ingest helper method --- quickwit/Cargo.lock | 1 + quickwit/quickwit-ingest/src/error.rs | 42 ++ quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 39 ++ .../quickwit-integration-tests/Cargo.toml | 1 + .../src/test_utils/cluster_sandbox.rs | 162 ++++-- .../tests/{index_tests.rs => ingest_tests.rs} | 470 +++++++----------- .../src/tests/mod.rs | 3 +- .../src/tests/otlp_tests.rs} | 439 ++++++++-------- .../src/tests/sqs_tests.rs | 2 +- .../tests/update_tests/doc_mapping_tests.rs | 14 +- quickwit/quickwit-jaeger/src/lib.rs | 2 - .../quickwit-opentelemetry/src/otlp/logs.rs | 51 +- .../quickwit-opentelemetry/src/otlp/mod.rs | 41 +- .../quickwit-opentelemetry/src/otlp/traces.rs | 52 +- .../src/ingest_api/rest_handler.rs | 39 +- quickwit/quickwit-serve/src/lib.rs | 7 +- .../src/otlp_api/rest_handler.rs | 55 +- 17 files changed, 765 insertions(+), 655 deletions(-) rename quickwit/quickwit-integration-tests/src/tests/{index_tests.rs => ingest_tests.rs} (60%) rename quickwit/{quickwit-jaeger/src/integration_tests.rs => quickwit-integration-tests/src/tests/otlp_tests.rs} (51%) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 87bd1065a81..8d32736877d 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6074,6 +6074,7 @@ dependencies = [ "quickwit-config", "quickwit-indexing", "quickwit-metastore", + "quickwit-opentelemetry", "quickwit-proto", "quickwit-rest-client", "quickwit-serve", diff --git a/quickwit/quickwit-ingest/src/error.rs b/quickwit/quickwit-ingest/src/error.rs index cf926223c4b..ab2c282db36 100644 --- a/quickwit/quickwit-ingest/src/error.rs +++ b/quickwit/quickwit-ingest/src/error.rs @@ -24,6 +24,7 @@ use quickwit_actors::AskError; use quickwit_common::rate_limited_error; use quickwit_common::tower::BufferError; pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error}; +use quickwit_proto::ingest::router::{IngestFailure, IngestFailureReason}; use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause}; use quickwit_proto::types::IndexId; use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; @@ -96,6 +97,47 @@ impl From for IngestServiceError { } } +impl From for IngestServiceError { + fn from(ingest_failure: IngestFailure) -> Self { + match ingest_failure.reason() { + IngestFailureReason::Unspecified => { + IngestServiceError::Internal("unknown error".to_string()) + } + IngestFailureReason::IndexNotFound => IngestServiceError::IndexNotFound { + index_id: ingest_failure.index_id, + }, + IngestFailureReason::SourceNotFound => IngestServiceError::Internal(format!( + "Ingest v2 source not found for index {}", + ingest_failure.index_id + )), + IngestFailureReason::Internal => { + IngestServiceError::Internal("internal error".to_string()) + } + IngestFailureReason::NoShardsAvailable => { + IngestServiceError::Unavailable("no shards available".to_string()) + } + IngestFailureReason::ShardRateLimited => { + IngestServiceError::RateLimited(RateLimitingCause::ShardRateLimiting) + } + IngestFailureReason::WalFull => { + IngestServiceError::RateLimited(RateLimitingCause::WalFull) + } + IngestFailureReason::Timeout => { + IngestServiceError::Internal("request timed out".to_string()) + } + IngestFailureReason::RouterLoadShedding => { + IngestServiceError::RateLimited(RateLimitingCause::RouterLoadShedding) + } + IngestFailureReason::LoadShedding => { + IngestServiceError::RateLimited(RateLimitingCause::LoadShedding) + } + IngestFailureReason::CircuitBreaker => { + IngestServiceError::RateLimited(RateLimitingCause::CircuitBreaker) + } + } + } +} + impl ServiceError for IngestServiceError { fn error_code(&self) -> ServiceErrorCode { match self { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index d9bc7b1be75..9ff314919e8 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -41,6 +41,7 @@ use std::time::Duration; use std::{env, fmt}; pub use broadcast::{setup_local_shards_update_listener, LocalShardsUpdate, ShardInfo, ShardInfos}; +use bytes::buf::Writer; use bytes::{BufMut, BytesMut}; use bytesize::ByteSize; use quickwit_common::tower::Pool; @@ -48,6 +49,7 @@ use quickwit_proto::ingest::ingester::IngesterServiceClient; use quickwit_proto::ingest::router::{IngestRequestV2, IngestSubrequest}; use quickwit_proto::ingest::{CommitTypeV2, DocBatchV2}; use quickwit_proto::types::{DocUid, DocUidGenerator, IndexId, NodeId, SubrequestId}; +use serde::Serialize; use tracing::{error, info}; use workbench::pending_subrequests; @@ -141,6 +143,43 @@ impl DocBatchV2Builder { } } +/// Batch builder that can append [`Serialize`] structs without an extra copy +pub struct JsonDocBatchV2Builder { + doc_uids: Vec, + doc_buffer: Writer, + doc_lengths: Vec, +} + +impl Default for JsonDocBatchV2Builder { + fn default() -> Self { + Self { + doc_uids: Vec::new(), + doc_buffer: BytesMut::new().writer(), + doc_lengths: Vec::new(), + } + } +} + +impl JsonDocBatchV2Builder { + pub fn add_doc(&mut self, doc_uid: DocUid, payload: impl Serialize) -> serde_json::Result<()> { + let old_len = self.doc_buffer.get_ref().len(); + serde_json::to_writer(&mut self.doc_buffer, &payload)?; + let new_len = self.doc_buffer.get_ref().len(); + let written_len = new_len - old_len; + self.doc_uids.push(doc_uid); + self.doc_lengths.push(written_len as u32); + Ok(()) + } + + pub fn build(self) -> DocBatchV2 { + DocBatchV2 { + doc_uids: self.doc_uids, + doc_buffer: self.doc_buffer.into_inner().freeze(), + doc_lengths: self.doc_lengths, + } + } +} + /// Helper struct to build an [`IngestRequestV2`]. #[derive(Debug, Default)] pub struct IngestRequestV2Builder { diff --git a/quickwit/quickwit-integration-tests/Cargo.toml b/quickwit/quickwit-integration-tests/Cargo.toml index aa6a692c236..ecfafd9b5a8 100644 --- a/quickwit/quickwit-integration-tests/Cargo.toml +++ b/quickwit/quickwit-integration-tests/Cargo.toml @@ -38,6 +38,7 @@ quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-config = { workspace = true, features = ["testsuite"] } quickwit-indexing = { workspace = true, features = ["testsuite"] } quickwit-metastore = { workspace = true, features = ["testsuite"] } +quickwit-opentelemetry = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-rest-client = { workspace = true } quickwit-serve = { workspace = true } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 17eed1116fb..2ccf38cc1a6 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -22,7 +22,7 @@ use std::io::Write; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::Context; use futures_util::future; @@ -37,6 +37,7 @@ use quickwit_common::uri::Uri as QuickwitUri; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; use quickwit_metastore::{MetastoreResolver, SplitState}; +use quickwit_proto::jaeger::storage::v1::span_reader_plugin_client::SpanReaderPluginClient; use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient; use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_client::TraceServiceClient; use quickwit_proto::types::NodeId; @@ -44,7 +45,7 @@ use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::{ CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL, }; -use quickwit_serve::{serve_quickwit, ListSplitsQueryParams}; +use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString}; use quickwit_storage::StorageResolver; use reqwest::Url; use serde_json::Value; @@ -62,15 +63,26 @@ pub struct TestNodeConfig { pub services: HashSet, } -struct ClusterShutdownTrigger { +type NodeJoinHandle = JoinHandle, anyhow::Error>>; + +struct NodeShutdownHandle { sender: Sender<()>, receiver: Receiver<()>, + node_services: HashSet, + node_id: NodeId, + join_handle_opt: Option, } -impl ClusterShutdownTrigger { - fn new() -> Self { +impl NodeShutdownHandle { + fn new(node_id: NodeId, node_services: HashSet) -> Self { let (sender, receiver) = watch::channel(()); - Self { sender, receiver } + Self { + sender, + receiver, + node_id, + node_services, + join_handle_opt: None, + } } fn shutdown_signal(&self) -> BoxFutureInfaillible<()> { @@ -80,8 +92,17 @@ impl ClusterShutdownTrigger { }) } - fn shutdown(self) { + fn set_node_join_handle(&mut self, join_handle: NodeJoinHandle) { + self.join_handle_opt = Some(join_handle); + } + + /// Initiate node shutdown and wait for it to complete + async fn shutdown(self) -> anyhow::Result> { self.sender.send(()).unwrap(); + self.join_handle_opt + .expect("node join handle was not set before shutdown") + .await + .unwrap() } } @@ -99,9 +120,9 @@ pub struct ClusterSandbox { pub indexer_rest_client: QuickwitClient, pub trace_client: TraceServiceClient, pub logs_client: LogsServiceClient, + pub jaeger_client: SpanReaderPluginClient, _temp_dir: TempDir, - join_handles: Vec, anyhow::Error>>>, - shutdown_trigger: ClusterShutdownTrigger, + node_shutdown_handle: Vec, } fn transport_url(addr: SocketAddr) -> Url { @@ -156,14 +177,20 @@ impl ClusterSandbox { let runtimes_config = RuntimesConfig::light_for_tests(); let storage_resolver = StorageResolver::unconfigured(); let metastore_resolver = MetastoreResolver::unconfigured(); - let mut join_handles = Vec::new(); - let shutdown_trigger = ClusterShutdownTrigger::new(); + let mut node_shutdown_handlers = Vec::new(); for node_config in node_configs.iter() { - join_handles.push(tokio::spawn({ + let mut shutdown_handler = NodeShutdownHandle::new( + node_config.node_config.node_id.clone(), + node_config.services.clone(), + ); + let shutdown_signal = shutdown_handler.shutdown_signal(); + let join_handle = tokio::spawn({ let node_config = node_config.node_config.clone(); + let node_id = node_config.node_id.clone(); + let services = node_config.enabled_services.clone(); let metastore_resolver = metastore_resolver.clone(); let storage_resolver = storage_resolver.clone(); - let shutdown_signal = shutdown_trigger.shutdown_signal(); + async move { let result = serve_quickwit( node_config, @@ -174,9 +201,12 @@ impl ClusterSandbox { quickwit_serve::do_nothing_env_filter_reload_fn(), ) .await?; + debug!("{} stopped successfully ({:?})", node_id, services); Result::<_, anyhow::Error>::Ok(result) } - })); + }); + shutdown_handler.set_node_join_handle(join_handle); + node_shutdown_handlers.push(shutdown_handler); } let searcher_config = node_configs .iter() @@ -196,12 +226,18 @@ impl ClusterSandbox { // is formed. tokio::time::sleep(Duration::from_millis(100)).await; } - let channel = channel::Endpoint::from_str(&format!( + let indexer_channel = channel::Endpoint::from_str(&format!( "http://{}", indexer_config.node_config.grpc_listen_addr )) .unwrap() .connect_lazy(); + let searcher_channel = channel::Endpoint::from_str(&format!( + "http://{}", + searcher_config.node_config.grpc_listen_addr + )) + .unwrap() + .connect_lazy(); Ok(Self { node_configs, searcher_rest_client: QuickwitClientBuilder::new(transport_url( @@ -212,11 +248,11 @@ impl ClusterSandbox { indexer_config.node_config.rest_config.listen_addr, )) .build(), - trace_client: TraceServiceClient::new(channel.clone()), - logs_client: LogsServiceClient::new(channel), + trace_client: TraceServiceClient::new(indexer_channel.clone()), + logs_client: LogsServiceClient::new(indexer_channel), + jaeger_client: SpanReaderPluginClient::new(searcher_channel), _temp_dir: temp_dir, - join_handles, - shutdown_trigger, + node_shutdown_handle: node_shutdown_handlers, }) } @@ -225,32 +261,10 @@ impl ClusterSandbox { self.searcher_rest_client.enable_ingest_v2(); } - // Starts one node that runs all the services. + // Starts one node that runs all the services and wait for it to be ready pub async fn start_standalone_node() -> anyhow::Result { - let temp_dir = tempfile::tempdir()?; - let services = QuickwitService::supported_services(); - let node_configs = build_node_configs(temp_dir.path().to_path_buf(), &[services]); - let sandbox = Self::start_cluster_with_configs(temp_dir, node_configs).await?; - - let now = Instant::now(); - let mut tick = tokio::time::interval(Duration::from_millis(100)); - - loop { - tick.tick().await; - - if now.elapsed() > Duration::from_secs(5) { - panic!("standalone node timed out"); - } - if sandbox - .indexer_rest_client - .node_health() - .is_ready() - .await - .unwrap_or(false) - { - break; - } - } + let sandbox = Self::start_cluster_nodes(&[QuickwitService::supported_services()]).await?; + sandbox.wait_for_cluster_num_ready_nodes(1).await?; Ok(sandbox) } @@ -431,17 +445,68 @@ impl ClusterSandbox { Ok(()) } - pub async fn shutdown(self) -> Result>, anyhow::Error> { + pub async fn assert_hit_count(&self, index_id: &str, query: &str, expected_num_hits: u64) { + let search_response = self + .searcher_rest_client + .search( + index_id, + SearchRequestQueryString { + query: query.to_string(), + max_hits: 10, + ..Default::default() + }, + ) + .await + .unwrap(); + debug!( + "search response for query {} on index {index_id}: {:?}", + query, search_response + ); + assert_eq!( + search_response.num_hits, expected_num_hits, + "unexpected num_hits for query {}", + query + ); + } + + /// Shutdown nodes that only provide the specified services + pub async fn shutdown_services( + &mut self, + shutdown_services: &HashSet, + ) -> Result>, anyhow::Error> { // We need to drop rest clients first because reqwest can hold connections open // preventing rest server's graceful shutdown. - self.shutdown_trigger.shutdown(); - let result = future::join_all(self.join_handles).await; + let mut shutdown_futures = Vec::new(); + let mut shutdown_nodes = HashMap::new(); + let mut i = 0; + while i < self.node_shutdown_handle.len() { + let handler_services = &self.node_shutdown_handle[i].node_services; + if handler_services.is_subset(shutdown_services) { + let handler_to_shutdown = self.node_shutdown_handle.remove(i); + shutdown_nodes.insert( + handler_to_shutdown.node_id.clone(), + handler_to_shutdown.node_services.clone(), + ); + shutdown_futures.push(handler_to_shutdown.shutdown()); + } else { + i += 1; + } + } + debug!("shutting down {:?}", shutdown_nodes); + let result = future::join_all(shutdown_futures).await; let mut statuses = Vec::new(); for node in result { - statuses.push(node??); + statuses.push(node?); } Ok(statuses) } + + pub async fn shutdown( + mut self, + ) -> Result>, anyhow::Error> { + self.shutdown_services(&QuickwitService::supported_services()) + .await + } } /// Builds a list of [`NodeConfig`] given a list of Quickwit services. @@ -465,6 +530,7 @@ pub fn build_node_configs( for (node_idx, node_services) in nodes_services.iter().enumerate() { let mut config = NodeConfig::for_test(); config.enabled_services.clone_from(node_services); + config.jaeger_config.enable_endpoint = true; config.cluster_id.clone_from(&cluster_id); config.node_id = NodeId::new(format!("test-node-{node_idx}")); config.data_dir_path = root_data_dir.join(config.node_id.as_str()); diff --git a/quickwit/quickwit-integration-tests/src/tests/index_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs similarity index 60% rename from quickwit/quickwit-integration-tests/src/tests/index_tests.rs rename to quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs index ed9cd538a48..b4c3eab3325 100644 --- a/quickwit/quickwit-integration-tests/src/tests/index_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs @@ -24,24 +24,21 @@ use hyper::StatusCode; use quickwit_config::service::QuickwitService; use quickwit_config::ConfigFormat; use quickwit_metastore::SplitState; -use quickwit_proto::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest; -use quickwit_proto::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; -use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value; -use quickwit_proto::opentelemetry::proto::common::v1::AnyValue; -use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; -use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span}; use quickwit_rest_client::error::{ApiError, Error}; use quickwit_rest_client::rest_client::CommitType; -use quickwit_serve::SearchRequestQueryString; use serde_json::json; -use tonic::codec::CompressionEncoding; use crate::ingest_json; use crate::test_utils::{ingest_with_retry, ClusterSandbox}; +fn initialize_tests() { + quickwit_common::setup_logging_for_tests(); + std::env::set_var("QW_ENABLE_INGEST_V2", "true"); +} + #[tokio::test] async fn test_single_node_cluster() { - quickwit_common::setup_logging_for_tests(); + initialize_tests(); let mut sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); let index_id = "test-single-node-cluster"; let index_config = format!( @@ -72,14 +69,6 @@ async fn test_single_node_cluster() { .await .unwrap(); - // Enable ingest v2 source. - sandbox - .indexer_rest_client - .sources(index_id) - .toggle("_ingest-source", true) - .await - .unwrap(); - // Index one record. ingest_with_retry( &sandbox.indexer_rest_client, @@ -163,19 +152,7 @@ async fn test_single_node_cluster() { // .await // .unwrap(); - // let search_response_empty = sandbox - // .searcher_rest_client - // .search( - // index_id, - // SearchRequestQueryString { - // query: "body:record".to_string(), - // ..Default::default() - // }, - // ) - // .await - // .unwrap(); - - // assert_eq!(search_response_empty.num_hits, 3); + // assert_hit_count(&sandbox, index_id, "body:record", 3).await; // // Wait for splits to merge, since we created 3 splits and merge factor is 3, // // we should get 1 published split with no staged splits eventually. @@ -222,25 +199,10 @@ async fn test_single_node_cluster() { sandbox.shutdown().await.unwrap(); } -const TEST_INDEX_CONFIG: &str = r#" - version: 0.8 - index_id: test_index - doc_mapping: - field_mappings: - - name: body - type: text - indexing_settings: - commit_timeout_secs: 1 - merge_policy: - type: stable_log - merge_factor: 4 - max_merge_factor: 4 -"#; - +/// This tests checks what happens when we try to ingest into a non-existing index. #[tokio::test] async fn test_ingest_v2_index_not_found() { - // This tests checks what happens when we try to ingest into a non-existing index. - quickwit_common::setup_logging_for_tests(); + initialize_tests(); let nodes_services = &[ HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), @@ -275,10 +237,11 @@ async fn test_ingest_v2_index_not_found() { sandbox.shutdown().await.unwrap(); } +/// This tests checks our happy path for ingesting one doc. #[tokio::test] async fn test_ingest_v2_happy_path() { - // This tests checks our happy path for ingesting one doc. - quickwit_common::setup_logging_for_tests(); + initialize_tests(); + let nodes_services = &[ HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), @@ -293,16 +256,23 @@ async fn test_ingest_v2_happy_path() { .unwrap(); sandbox.enable_ingest_v2(); sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap(); + let index_id = "test_happy_path"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "# + ); sandbox .indexer_rest_client .indexes() - .create(TEST_INDEX_CONFIG, ConfigFormat::Yaml, false) - .await - .unwrap(); - sandbox - .indexer_rest_client - .sources("test_index") - .toggle("_ingest-source", true) + .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); @@ -316,7 +286,7 @@ async fn test_ingest_v2_happy_path() { let ingest_res = sandbox .indexer_rest_client .ingest( - "test_index", + index_id, ingest_json!({"body": "doc1"}), None, None, @@ -339,25 +309,16 @@ async fn test_ingest_v2_happy_path() { } sandbox - .wait_for_splits("test_index", Some(vec![SplitState::Published]), 1) + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) .await .unwrap(); - let search_req = SearchRequestQueryString { - query: "*".to_string(), - ..Default::default() - }; - let search_result = sandbox - .indexer_rest_client - .search("test_index", search_req) - .await - .unwrap(); - assert_eq!(search_result.num_hits, 1); + sandbox.assert_hit_count(index_id, "*", 1).await; sandbox .indexer_rest_client .indexes() - .delete("test_index", false) + .delete(index_id, false) .await .unwrap(); @@ -366,18 +327,33 @@ async fn test_ingest_v2_happy_path() { #[tokio::test] async fn test_commit_modes() { - quickwit_common::setup_logging_for_tests(); + initialize_tests(); let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); - let index_id = "test_index"; + let index_id = "test_commit_modes"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 2 + "# + ); // Create index sandbox .indexer_rest_client .indexes() - .create(TEST_INDEX_CONFIG, ConfigFormat::Yaml, false) + .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); + // TODO: make this test work with ingest v2 (#4438) + // sandbox.enable_ingest_v2(); + // Test force commit ingest_with_retry( &sandbox.indexer_rest_client, @@ -388,21 +364,7 @@ async fn test_commit_modes() { .await .unwrap(); - assert_eq!( - sandbox - .searcher_rest_client - .search( - index_id, - SearchRequestQueryString { - query: "body:force".to_string(), - ..Default::default() - }, - ) - .await - .unwrap() - .num_hits, - 1 - ); + sandbox.assert_hit_count(index_id, "body:force", 1).await; // Test wait_for commit sandbox @@ -417,21 +379,7 @@ async fn test_commit_modes() { .await .unwrap(); - assert_eq!( - sandbox - .searcher_rest_client - .search( - index_id, - SearchRequestQueryString { - query: "body:wait".to_string(), - ..Default::default() - }, - ) - .await - .unwrap() - .num_hits, - 1 - ); + sandbox.assert_hit_count(index_id, "body:wait", 1).await; // Test auto commit sandbox @@ -446,39 +394,11 @@ async fn test_commit_modes() { .await .unwrap(); - assert_eq!( - sandbox - .searcher_rest_client - .search( - index_id, - SearchRequestQueryString { - query: "body:auto".to_string(), - ..Default::default() - }, - ) - .await - .unwrap() - .num_hits, - 0 - ); + sandbox.assert_hit_count(index_id, "body:auto", 0).await; tokio::time::sleep(Duration::from_secs(3)).await; - assert_eq!( - sandbox - .searcher_rest_client - .search( - index_id, - SearchRequestQueryString { - query: "body:auto".to_string(), - ..Default::default() - }, - ) - .await - .unwrap() - .num_hits, - 1 - ); + sandbox.assert_hit_count(index_id, "body:auto", 1).await; // Clean up sandbox.shutdown().await.unwrap(); @@ -486,7 +406,7 @@ async fn test_commit_modes() { #[tokio::test] async fn test_very_large_index_name() { - quickwit_common::setup_logging_for_tests(); + initialize_tests(); let nodes_services = vec![ HashSet::from_iter([QuickwitService::Searcher]), HashSet::from_iter([QuickwitService::Metastore]), @@ -494,9 +414,10 @@ async fn test_very_large_index_name() { HashSet::from_iter([QuickwitService::ControlPlane]), HashSet::from_iter([QuickwitService::Janitor]), ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) .await .unwrap(); + sandbox.enable_ingest_v2(); sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); let index_id = "its_very_very_very_very_very_very_very_very_very_very_very_\ @@ -518,6 +439,8 @@ async fn test_very_large_index_name() { field_mappings: - name: body type: text + indexing_settings: + commit_timeout_secs: 1 "#, ), ConfigFormat::Yaml, @@ -530,27 +453,18 @@ async fn test_very_large_index_name() { ingest_with_retry( &sandbox.indexer_rest_client, index_id, - ingest_json!({"body": "force"}), - CommitType::Force, + ingest_json!({"body": "not too long"}), + CommitType::WaitFor, ) .await .unwrap(); - assert_eq!( - sandbox - .searcher_rest_client - .search( - index_id, - SearchRequestQueryString { - query: "body:force".to_string(), - ..Default::default() - }, - ) - .await - .unwrap() - .num_hits, - 1 - ); + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) + .await + .unwrap(); + + sandbox.assert_hit_count(index_id, "body:long", 1).await; // Delete the index sandbox @@ -567,13 +481,15 @@ async fn test_very_large_index_name() { .create( format!( r#" - version: 0.8 - index_id: {oversized_index_id} - doc_mapping: - field_mappings: - - name: body - type: text - "#, + version: 0.8 + index_id: {oversized_index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "#, ), ConfigFormat::Yaml, false, @@ -591,26 +507,30 @@ async fn test_very_large_index_name() { } #[tokio::test] -async fn test_shutdown() { - quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); - let index_id = "test_commit_modes_index"; +async fn test_shutdown_single_node() { + initialize_tests(); + let mut sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let index_id = "test_shutdown_single_node"; + + sandbox.enable_ingest_v2(); // Create index sandbox .indexer_rest_client .indexes() .create( - r#" + format!( + r#" version: 0.8 - index_id: test_commit_modes_index + index_id: {index_id} doc_mapping: field_mappings: - name: body type: text indexing_settings: commit_timeout_secs: 1 - "#, + "# + ), ConfigFormat::Yaml, false, ) @@ -640,139 +560,135 @@ async fn test_shutdown() { .await .unwrap(); - // The error we are trying to catch here is that the sandbox is getting stuck in - // shutdown for 180 seconds if not all services exit cleanly, which in turn manifests - // itself with a very slow test that passes. This timeout ensures that the test fails - // if the sandbox gets stuck in shutdown. tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown()) .await .unwrap() .unwrap(); } +/// When the control plane is on a different node, it might be shutdown +/// before the ingest pipeline is scheduled on the indexer. #[tokio::test] -async fn test_ingest_traces_with_otlp_grpc_api() { - quickwit_common::setup_logging_for_tests(); +async fn test_shutdown_control_plane_early_shutdown() { + initialize_tests(); let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), + HashSet::from_iter([ + QuickwitService::ControlPlane, + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Janitor, + ]), ]; - let sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) + let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) .await .unwrap(); - // Wait for the pipelines to start (one for logs and one for traces) - sandbox.wait_for_indexing_pipelines(2).await.unwrap(); - let client = sandbox.trace_client.clone(); - - // build test OTEL span - let scope_spans = vec![ScopeSpans { - spans: vec![ - Span { - trace_id: vec![1; 16], - span_id: vec![2; 8], - start_time_unix_nano: 1_000_000_001, - end_time_unix_nano: 1_000_000_002, - ..Default::default() - }, - Span { - trace_id: vec![3; 16], - span_id: vec![4; 8], - start_time_unix_nano: 2_000_000_001, - end_time_unix_nano: 2_000_000_002, - ..Default::default() - }, - ], - ..Default::default() - }]; - let resource_spans = vec![ResourceSpans { - scope_spans, - ..Default::default() - }]; - let request = ExportTraceServiceRequest { resource_spans }; - - // Send the spans on the default index, uncompressed and compressed - for mut tested_client in vec![ - client.clone(), - client.clone().send_compressed(CompressionEncoding::Gzip), - ] { - let response = tested_client.export(request.clone()).await.unwrap(); - assert_eq!( - response - .into_inner() - .partial_success - .unwrap() - .rejected_spans, - 0 - ); - } + let index_id = "test_shutdown_separate_indexer"; - // Send the spans on a non existing index, should return an error. - { - let mut tonic_request = tonic::Request::new(request); - tonic_request.metadata_mut().insert( - "qw-otel-traces-index", - tonic::metadata::MetadataValue::try_from("non-existing-index").unwrap(), - ); - let status = client.clone().export(tonic_request).await.unwrap_err(); - assert_eq!(status.code(), tonic::Code::NotFound); - } + // TODO: make this test work with ingest v2 (#5068) + // sandbox.enable_ingest_v2(); - sandbox.shutdown().await.unwrap(); + // Create index + sandbox + .indexer_rest_client + .indexes() + .create( + format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "# + ), + ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + // Ensure that the index is ready to accept records. + ingest_with_retry( + &sandbox.indexer_rest_client, + index_id, + ingest_json!({"body": "one"}), + CommitType::WaitFor, + ) + .await + .unwrap(); + + tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown()) + .await + .unwrap() + .unwrap(); } +/// When the control plane/metastore are shutdown before the indexer, the +/// indexer shutdown should not hang indefinitely #[tokio::test] -async fn test_ingest_logs_with_otlp_grpc_api() { - quickwit_common::setup_logging_for_tests(); +async fn test_shutdown_separate_indexer() { + initialize_tests(); let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), + HashSet::from_iter([ + QuickwitService::ControlPlane, + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Janitor, + ]), ]; - let sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) + let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) .await .unwrap(); - // Wait fo the pipelines to start (one for logs and one for traces) - sandbox.wait_for_indexing_pipelines(2).await.unwrap(); - let client = sandbox.logs_client.clone(); - - // build test OTEL log - let log_record = LogRecord { - time_unix_nano: 1_000_000_001, - body: Some(AnyValue { - value: Some(Value::StringValue("hello".to_string())), - }), - ..Default::default() - }; - let scope_logs = ScopeLogs { - log_records: vec![log_record], - ..Default::default() - }; - let resource_logs = vec![ResourceLogs { - scope_logs: vec![scope_logs], - ..Default::default() - }]; - let request = ExportLogsServiceRequest { resource_logs }; - - // Send the logs on the default index, uncompressed and compressed - for mut tested_client in vec![ - client.clone(), - client.clone().send_compressed(CompressionEncoding::Gzip), - ] { - let response = tested_client.export(request.clone()).await.unwrap(); - assert_eq!( - response - .into_inner() - .partial_success - .unwrap() - .rejected_log_records, - 0 - ); - } + let index_id = "test_shutdown_separate_indexer"; - sandbox.shutdown().await.unwrap(); + // TODO: make this test work with ingest v2 (#5068) + // sandbox.enable_ingest_v2(); + + // Create index + sandbox + .indexer_rest_client + .indexes() + .create( + format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "# + ), + ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + // Ensure that the index is ready to accept records. + ingest_with_retry( + &sandbox.indexer_rest_client, + index_id, + ingest_json!({"body": "one"}), + CommitType::WaitFor, + ) + .await + .unwrap(); + + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) + .await + .unwrap(); + + tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown()) + .await + .unwrap() + .unwrap(); } diff --git a/quickwit/quickwit-integration-tests/src/tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/mod.rs index 103fd9a9f1f..d5bd2d5ed2e 100644 --- a/quickwit/quickwit-integration-tests/src/tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/mod.rs @@ -18,7 +18,8 @@ // along with this program. If not, see . mod basic_tests; -mod index_tests; +mod ingest_tests; +mod otlp_tests; #[cfg(feature = "sqs-localstack-tests")] mod sqs_tests; mod update_tests; diff --git a/quickwit/quickwit-jaeger/src/integration_tests.rs b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs similarity index 51% rename from quickwit/quickwit-jaeger/src/integration_tests.rs rename to quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs index 380de46db9c..752fe45acf3 100644 --- a/quickwit/quickwit-jaeger/src/integration_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs @@ -17,100 +17,244 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; -use std::path::Path; -use std::sync::Arc; -use std::time::Duration; +use std::collections::{HashMap, HashSet}; -use quickwit_actors::{ActorHandle, Mailbox, Universe}; -use quickwit_cluster::{create_cluster_for_test, ChannelTransport, Cluster}; -use quickwit_common::pubsub::EventBroker; -use quickwit_common::uri::Uri; -use quickwit_config::{IndexerConfig, IngestApiConfig, JaegerConfig, SearcherConfig, SourceConfig}; -use quickwit_indexing::actors::MergeSchedulerService; -use quickwit_indexing::models::SpawnPipeline; -use quickwit_indexing::IndexingService; -use quickwit_ingest::{ - init_ingest_api, CommitType, CreateQueueRequest, IngestApiService, IngestServiceClient, - IngesterPool, QUEUES_DIR_NAME, +use futures_util::StreamExt; +use quickwit_config::service::QuickwitService; +use quickwit_metastore::SplitState; +use quickwit_opentelemetry::otlp::{ + make_resource_spans_for_test, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID, }; -use quickwit_metastore::{AddSourceRequestExt, CreateIndexRequestExt, FileBackedMetastore}; -use quickwit_opentelemetry::otlp::{make_resource_spans_for_test, OtlpGrpcTracesService}; -use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPlugin; use quickwit_proto::jaeger::storage::v1::{ FindTraceIDsRequest, GetOperationsRequest, GetServicesRequest, GetTraceRequest, Operation, SpansResponseChunk, TraceQueryParameters, }; -use quickwit_proto::metastore::{ - AddSourceRequest, CreateIndexRequest, MetastoreService, MetastoreServiceClient, -}; -use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceService; +use quickwit_proto::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest; use quickwit_proto::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; -use quickwit_proto::types::{IndexUid, NodeId, PipelineUid}; -use quickwit_search::{ - start_searcher_service, SearchJobPlacer, SearchService, SearchServiceClient, SearcherContext, - SearcherPool, -}; -use quickwit_storage::StorageResolver; -use tempfile::TempDir; -use tokio_stream::StreamExt; +use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value; +use quickwit_proto::opentelemetry::proto::common::v1::AnyValue; +use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; +use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span}; +use tonic::codec::CompressionEncoding; -use crate::JaegerService; +use crate::test_utils::ClusterSandbox; + +fn initialize_tests() { + quickwit_common::setup_logging_for_tests(); + std::env::set_var("QW_ENABLE_INGEST_V2", "true"); +} #[tokio::test] -async fn test_otel_jaeger_integration() { - let cluster = cluster_for_test().await; - let universe = Universe::with_accelerated_time(); - let temp_dir = tempfile::tempdir().unwrap(); +async fn test_ingest_traces_with_otlp_grpc_api() { + initialize_tests(); + let nodes_services = vec![ + HashSet::from_iter([QuickwitService::Searcher]), + HashSet::from_iter([QuickwitService::Metastore]), + HashSet::from_iter([QuickwitService::Indexer]), + HashSet::from_iter([QuickwitService::ControlPlane]), + HashSet::from_iter([QuickwitService::Janitor]), + ]; + let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) + .await + .unwrap(); + // Wait for the pipelines to start (one for logs and one for traces) + sandbox.wait_for_indexing_pipelines(2).await.unwrap(); - let (ingester_service, ingester_client) = ingester_for_test(&universe, temp_dir.path()).await; - let ingester_pool = IngesterPool::default(); - let traces_service = OtlpGrpcTracesService::new(ingester_client, Some(CommitType::Force)); + fn build_span(span_name: String) -> Vec { + let scope_spans = vec![ScopeSpans { + spans: vec![Span { + name: span_name, + trace_id: vec![1; 16], + span_id: vec![2; 8], + start_time_unix_nano: 1724060143000000001, + end_time_unix_nano: 1724060144000000000, + ..Default::default() + }], + ..Default::default() + }]; + vec![ResourceSpans { + scope_spans, + ..Default::default() + }] + } - let storage_resolver = StorageResolver::unconfigured(); - let metastore = metastore_for_test(&storage_resolver).await; - let (indexer_service, _indexer_handle) = indexer_for_test( - &universe, - temp_dir.path(), - cluster.clone(), - metastore.clone(), - storage_resolver.clone(), - ingester_service.clone(), - ingester_pool.clone(), - ) - .await; + // Send the spans on the default index + let tested_clients = vec![ + sandbox.trace_client.clone(), + sandbox + .trace_client + .clone() + .send_compressed(CompressionEncoding::Gzip), + ]; + for (idx, mut tested_client) in tested_clients.into_iter().enumerate() { + let body = format!("hello{}", idx); + let request = ExportTraceServiceRequest { + resource_spans: build_span(body.clone()), + }; + let response = tested_client.export(request).await.unwrap(); + assert_eq!( + response + .into_inner() + .partial_success + .unwrap() + .rejected_spans, + 0 + ); + sandbox + .wait_for_splits( + OTEL_TRACES_INDEX_ID, + Some(vec![SplitState::Published]), + idx + 1, + ) + .await + .unwrap(); + sandbox + .assert_hit_count(OTEL_TRACES_INDEX_ID, &format!("span_name:{}", body), 1) + .await; + } - setup_traces_index( - &temp_dir, - metastore.clone(), - &ingester_service, - &indexer_service, - ) - .await; + // Send the spans on a non existing index, should return an error. + { + let request = ExportTraceServiceRequest { + resource_spans: build_span("hello".to_string()), + }; + let mut tonic_request = tonic::Request::new(request); + tonic_request.metadata_mut().insert( + "qw-otel-traces-index", + tonic::metadata::MetadataValue::try_from("non-existing-index").unwrap(), + ); + let status = sandbox + .trace_client + .clone() + .export(tonic_request) + .await + .unwrap_err(); + assert_eq!(status.code(), tonic::Code::NotFound); + } - let search_service = - searcher_for_test(&cluster, metastore.clone(), storage_resolver.clone()).await; - let jaeger_service = JaegerService::new(JaegerConfig::default(), search_service); + sandbox + .shutdown_services(&HashSet::from_iter([QuickwitService::Indexer])) + .await + .unwrap(); + sandbox.shutdown().await.unwrap(); +} - cluster - .wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) +#[tokio::test] +async fn test_ingest_logs_with_otlp_grpc_api() { + initialize_tests(); + let nodes_services = vec![ + HashSet::from_iter([QuickwitService::Searcher]), + HashSet::from_iter([QuickwitService::Metastore]), + HashSet::from_iter([QuickwitService::Indexer]), + HashSet::from_iter([QuickwitService::ControlPlane]), + HashSet::from_iter([QuickwitService::Janitor]), + ]; + let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) .await .unwrap(); + // Wait fo the pipelines to start (one for logs and one for traces) + sandbox.wait_for_indexing_pipelines(2).await.unwrap(); - { - // Export traces. - let export_trace_request = ExportTraceServiceRequest { - resource_spans: make_resource_spans_for_test(), + fn build_log(body: String) -> Vec { + let log_record = LogRecord { + time_unix_nano: 1724060143000000001, + body: Some(AnyValue { + value: Some(Value::StringValue(body)), + }), + ..Default::default() + }; + let scope_logs = ScopeLogs { + log_records: vec![log_record], + ..Default::default() }; - traces_service - .export(tonic::Request::new(export_trace_request)) + vec![ResourceLogs { + scope_logs: vec![scope_logs], + ..Default::default() + }] + } + + // Send the logs on the default index + let tested_clients = vec![ + sandbox.logs_client.clone(), + sandbox + .logs_client + .clone() + .send_compressed(CompressionEncoding::Gzip), + ]; + for (idx, mut tested_client) in tested_clients.into_iter().enumerate() { + let body: String = format!("hello{}", idx); + let request = ExportLogsServiceRequest { + resource_logs: build_log(body.clone()), + }; + let response = tested_client.export(request).await.unwrap(); + assert_eq!( + response + .into_inner() + .partial_success + .unwrap() + .rejected_log_records, + 0 + ); + sandbox + .wait_for_splits( + OTEL_LOGS_INDEX_ID, + Some(vec![SplitState::Published]), + idx + 1, + ) .await .unwrap(); + sandbox + .assert_hit_count(OTEL_LOGS_INDEX_ID, &format!("body.message:{}", body), 1) + .await; } + + sandbox + .shutdown_services(&HashSet::from_iter([QuickwitService::Indexer])) + .await + .unwrap(); + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_jaeger_api() { + initialize_tests(); + let nodes_services = vec![ + HashSet::from_iter([QuickwitService::Searcher]), + HashSet::from_iter([QuickwitService::Metastore]), + HashSet::from_iter([QuickwitService::Indexer]), + HashSet::from_iter([QuickwitService::ControlPlane]), + HashSet::from_iter([QuickwitService::Janitor]), + ]; + let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) + .await + .unwrap(); + // Wait fo the pipelines to start (one for logs and one for traces) + sandbox.wait_for_indexing_pipelines(2).await.unwrap(); + + let export_trace_request = ExportTraceServiceRequest { + resource_spans: make_resource_spans_for_test(), + }; + sandbox + .trace_client + .export(export_trace_request) + .await + .unwrap(); + + sandbox + .wait_for_splits(OTEL_TRACES_INDEX_ID, Some(vec![SplitState::Published]), 1) + .await + .unwrap(); + + sandbox + .shutdown_services(&HashSet::from_iter([QuickwitService::Indexer])) + .await + .unwrap(); + { // Test `GetServices` let get_services_request = GetServicesRequest {}; - let get_services_response = jaeger_service + let get_services_response = sandbox + .jaeger_client .get_services(tonic::Request::new(get_services_request)) .await .unwrap() @@ -123,7 +267,8 @@ async fn test_otel_jaeger_integration() { service: "quickwit".to_string(), span_kind: "".to_string(), }; - let get_operations_response = jaeger_service + let get_operations_response = sandbox + .jaeger_client .get_operations(tonic::Request::new(get_operations_request)) .await .unwrap() @@ -155,7 +300,8 @@ async fn test_otel_jaeger_integration() { service: "quickwit".to_string(), span_kind: "server".to_string(), }; - let get_operations_response = jaeger_service + let get_operations_response = sandbox + .jaeger_client .get_operations(tonic::Request::new(get_operations_request)) .await .unwrap() @@ -184,7 +330,8 @@ async fn test_otel_jaeger_integration() { num_traces: 10, }; let find_trace_ids_request = FindTraceIDsRequest { query: Some(query) }; - let find_trace_ids_response = jaeger_service + let find_trace_ids_response = sandbox + .jaeger_client .find_trace_i_ds(tonic::Request::new(find_trace_ids_request)) .await .unwrap() @@ -204,7 +351,8 @@ async fn test_otel_jaeger_integration() { num_traces: 10, }; let find_trace_ids_request = FindTraceIDsRequest { query: Some(query) }; - let find_trace_ids_response = jaeger_service + let find_trace_ids_response = sandbox + .jaeger_client .find_trace_i_ds(tonic::Request::new(find_trace_ids_request)) .await .unwrap() @@ -224,7 +372,8 @@ async fn test_otel_jaeger_integration() { num_traces: 10, }; let find_trace_ids_request = FindTraceIDsRequest { query: Some(query) }; - let find_trace_ids_response = jaeger_service + let find_trace_ids_response = sandbox + .jaeger_client .find_trace_i_ds(tonic::Request::new(find_trace_ids_request)) .await .unwrap() @@ -244,7 +393,8 @@ async fn test_otel_jaeger_integration() { num_traces: 10, }; let find_trace_ids_request = FindTraceIDsRequest { query: Some(query) }; - let find_trace_ids_response = jaeger_service + let find_trace_ids_response = sandbox + .jaeger_client .find_trace_i_ds(tonic::Request::new(find_trace_ids_request)) .await .unwrap() @@ -264,7 +414,8 @@ async fn test_otel_jaeger_integration() { num_traces: 10, }; let find_trace_ids_request = FindTraceIDsRequest { query: Some(query) }; - let find_trace_ids_response = jaeger_service + let find_trace_ids_response = sandbox + .jaeger_client .find_trace_i_ds(tonic::Request::new(find_trace_ids_request)) .await .unwrap() @@ -277,7 +428,8 @@ async fn test_otel_jaeger_integration() { let get_trace_request = GetTraceRequest { trace_id: [1; 16].to_vec(), }; - let mut span_stream = jaeger_service + let mut span_stream = sandbox + .jaeger_client .get_trace(tonic::Request::new(get_trace_request)) .await .unwrap() @@ -293,136 +445,5 @@ async fn test_otel_jaeger_integration() { assert_eq!(process.tags[0].key, "tags"); assert_eq!(process.tags[0].v_str, r#"["foo"]"#); } - _indexer_handle.quit().await; - universe.assert_quit().await; -} - -async fn cluster_for_test() -> Cluster { - let transport = ChannelTransport::default(); - create_cluster_for_test( - Vec::new(), - &["metastore", "indexer", "searcher"], - &transport, - true, - ) - .await - .unwrap() -} - -async fn ingester_for_test( - universe: &Universe, - data_dir_path: &Path, -) -> (Mailbox, IngestServiceClient) { - let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); - let ingester_service = init_ingest_api(universe, &queues_dir_path, &IngestApiConfig::default()) - .await - .unwrap(); - let ingester_client = IngestServiceClient::from_mailbox(ingester_service.clone()); - (ingester_service, ingester_client) -} - -async fn metastore_for_test(storage_resolver: &StorageResolver) -> MetastoreServiceClient { - let storage = storage_resolver - .resolve(&Uri::for_test("ram:///metastore")) - .await - .unwrap(); - MetastoreServiceClient::new(FileBackedMetastore::for_test(storage)) -} - -async fn indexer_for_test( - universe: &Universe, - data_dir_path: &Path, - cluster: Cluster, - metastore: MetastoreServiceClient, - storage_resolver: StorageResolver, - ingester_service: Mailbox, - ingester_pool: IngesterPool, -) -> (Mailbox, ActorHandle) { - let indexer_config = IndexerConfig::for_test().unwrap(); - let indexing_service = IndexingService::new( - NodeId::from("test-node"), - data_dir_path.to_path_buf(), - indexer_config, - 1, - cluster, - metastore, - Some(ingester_service), - universe.get_or_spawn_one::(), - ingester_pool, - storage_resolver, - EventBroker::default(), - ) - .await - .unwrap(); - universe.spawn_builder().spawn(indexing_service) -} - -async fn searcher_for_test( - cluster: &Cluster, - metastore: MetastoreServiceClient, - storage_resolver: StorageResolver, -) -> Arc { - let searcher_config = SearcherConfig::default(); - let searcher_pool = SearcherPool::default(); - let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); - let searcher_context = Arc::new(SearcherContext::new(searcher_config, None)); - let searcher_service = start_searcher_service( - metastore, - storage_resolver, - search_job_placer, - searcher_context, - ) - .await - .unwrap(); - let grpc_advertise_addr = cluster - .ready_members() - .await - .first() - .unwrap() - .grpc_advertise_addr; - let searcher_client = - SearchServiceClient::from_service(searcher_service.clone(), grpc_advertise_addr); - searcher_pool.insert(grpc_advertise_addr, searcher_client); - searcher_service -} - -async fn setup_traces_index( - temp_dir: &TempDir, - metastore: MetastoreServiceClient, - ingester_service: &Mailbox, - indexer_service: &Mailbox, -) { - let index_root_uri: Uri = format!("{}", temp_dir.path().join("indexes").display()) - .parse() - .unwrap(); - let index_config = OtlpGrpcTracesService::index_config(&index_root_uri).unwrap(); - let index_id = index_config.index_id.clone(); - let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); - let source_config = SourceConfig::ingest_api_default(); - let add_source_request = - AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); - metastore.add_source(add_source_request).await.unwrap(); - - let create_queue_request = CreateQueueRequest { - queue_id: index_id.clone(), - }; - ingester_service - .ask_for_res(create_queue_request) - .await - .unwrap(); - let spawn_pipeline_request = SpawnPipeline { - index_id: index_id.clone(), - source_config, - pipeline_uid: PipelineUid::default(), - }; - indexer_service - .ask_for_res(spawn_pipeline_request) - .await - .unwrap(); + sandbox.shutdown().await.unwrap(); } diff --git a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs index d2f86b00ff6..419b1f04f2f 100644 --- a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs @@ -47,7 +47,7 @@ fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { #[tokio::test] async fn test_sqs_single_node_cluster() { - tracing_subscriber::fmt::init(); + quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); let index_id = "test-sqs-source-single-node-cluster"; let index_config = format!( diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index fc42b2abf66..cf44ed8bc54 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -17,10 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; use std::time::Duration; -use quickwit_config::service::QuickwitService; use serde_json::{json, Value}; use super::assert_hits_unordered; @@ -37,17 +35,7 @@ async fn validate_search_across_doc_mapping_updates( query_and_expect: &[(&str, Result<&[Value], ()>)], ) { quickwit_common::setup_logging_for_tests(); - let nodes_services = vec![HashSet::from_iter([ - QuickwitService::Searcher, - QuickwitService::Metastore, - QuickwitService::Indexer, - QuickwitService::ControlPlane, - QuickwitService::Janitor, - ])]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) - .await - .unwrap(); - sandbox.wait_for_cluster_num_ready_nodes(1).await.unwrap(); + let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index fe663cd4d8c..19c7c63b8dc 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -60,8 +60,6 @@ use tracing::{debug, error, instrument, warn, Span as RuntimeSpan}; use crate::metrics::JAEGER_SERVICE_METRICS; -#[cfg(test)] -mod integration_tests; mod metrics; // OpenTelemetry to Jaeger Transformation diff --git a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs index e5b527ac0f2..a78f49fceea 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs @@ -26,14 +26,14 @@ use quickwit_common::rate_limited_error; use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; -use quickwit_ingest::{ - CommitType, DocBatch, DocBatchBuilder, IngestRequest, IngestService, IngestServiceClient, -}; +use quickwit_ingest::{CommitType, JsonDocBatchV2Builder}; +use quickwit_proto::ingest::router::IngestRouterServiceClient; +use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService; use quickwit_proto::opentelemetry::proto::collector::logs::v1::{ ExportLogsPartialSuccess, ExportLogsServiceRequest, ExportLogsServiceResponse, }; -use quickwit_proto::types::IndexId; +use quickwit_proto::types::{DocUidGenerator, IndexId}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use tonic::{Request, Response, Status}; @@ -41,8 +41,8 @@ use tracing::field::Empty; use tracing::{error, instrument, warn, Span as RuntimeSpan}; use super::{ - extract_otel_index_id_from_metadata, is_zero, parse_log_record_body, OtelSignal, SpanId, - TraceId, TryFromSpanIdError, TryFromTraceIdError, + extract_otel_index_id_from_metadata, ingest_doc_batch_v2, is_zero, parse_log_record_body, + OtelSignal, SpanId, TraceId, TryFromSpanIdError, TryFromTraceIdError, }; use crate::otlp::extract_attributes; use crate::otlp::metrics::OTLP_SERVICE_METRICS; @@ -226,7 +226,7 @@ impl PartialEq for OrdLogRecord { impl Eq for OrdLogRecord {} struct ParsedLogRecords { - doc_batch: DocBatch, + doc_batch: DocBatchV2, num_log_records: u64, num_parse_errors: u64, error_message: String, @@ -234,12 +234,12 @@ struct ParsedLogRecords { #[derive(Clone)] pub struct OtlpGrpcLogsService { - ingest_service: IngestServiceClient, + ingest_router: IngestRouterServiceClient, } impl OtlpGrpcLogsService { - pub fn new(ingest_service: IngestServiceClient) -> Self { - Self { ingest_service } + pub fn new(ingest_router: IngestRouterServiceClient) -> Self { + Self { ingest_router } } pub fn index_config(default_index_root_uri: &Uri) -> anyhow::Result { @@ -265,7 +265,7 @@ impl OtlpGrpcLogsService { error_message, } = run_cpu_intensive({ let parent_span = RuntimeSpan::current(); - || Self::parse_logs(request, parent_span, index_id) + || Self::parse_logs(request, parent_span) }) .await .map_err(|join_error| { @@ -276,7 +276,7 @@ impl OtlpGrpcLogsService { return Err(tonic::Status::internal(error_message)); } let num_bytes = doc_batch.num_bytes() as u64; - self.store_logs(doc_batch).await?; + self.store_logs(index_id, doc_batch).await?; OTLP_SERVICE_METRICS .ingested_log_records_total @@ -301,21 +301,22 @@ impl OtlpGrpcLogsService { fn parse_logs( request: ExportLogsServiceRequest, parent_span: RuntimeSpan, - index_id: IndexId, ) -> Result { let (log_records, mut num_parse_errors) = parse_otlp_logs(request)?; let num_log_records = log_records.len() as u64; let mut error_message = String::new(); - let mut doc_batch = DocBatchBuilder::new(index_id).json_writer(); + let mut doc_batch_builder = JsonDocBatchV2Builder::default(); + let mut doc_uid_generator = DocUidGenerator::default(); for log_record in log_records { - if let Err(error) = doc_batch.ingest_doc(&log_record.0) { + let doc_uid = doc_uid_generator.next_doc_uid(); + if let Err(error) = doc_batch_builder.add_doc(doc_uid, log_record.0) { error!(error=?error, "failed to JSON serialize span"); error_message = format!("failed to JSON serialize span: {error:?}"); num_parse_errors += 1; } } - let doc_batch = doc_batch.build(); + let doc_batch = doc_batch_builder.build(); let current_span = RuntimeSpan::current(); current_span.record("num_log_records", num_log_records); current_span.record("num_bytes", doc_batch.num_bytes()); @@ -331,12 +332,18 @@ impl OtlpGrpcLogsService { } #[instrument(skip_all, fields(num_bytes = doc_batch.num_bytes()))] - async fn store_logs(&mut self, doc_batch: DocBatch) -> Result<(), tonic::Status> { - let ingest_request = IngestRequest { - doc_batches: vec![doc_batch], - commit: CommitType::Auto.into(), - }; - self.ingest_service.ingest(ingest_request).await?; + async fn store_logs( + &mut self, + index_id: String, + doc_batch: DocBatchV2, + ) -> Result<(), tonic::Status> { + ingest_doc_batch_v2( + self.ingest_router.clone(), + index_id, + doc_batch, + CommitType::Auto, + ) + .await?; Ok(()) } diff --git a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs index 8ff9b4b8e62..d985c712f07 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs @@ -19,7 +19,12 @@ use std::collections::HashMap; -use quickwit_config::{validate_identifier, validate_index_id_pattern}; +use quickwit_config::{validate_identifier, validate_index_id_pattern, INGEST_V2_SOURCE_ID}; +use quickwit_ingest::{CommitType, IngestServiceError}; +use quickwit_proto::ingest::router::{ + IngestRequestV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest, +}; +use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpValue; use quickwit_proto::opentelemetry::proto::common::v1::{ AnyValue as OtlpAnyValue, ArrayValue as OtlpArrayValue, KeyValue as OtlpKeyValue, @@ -220,6 +225,40 @@ pub(crate) fn extract_otel_index_id_from_metadata( Ok(index_id.to_string()) } +async fn ingest_doc_batch_v2( + ingest_router: IngestRouterServiceClient, + index_id: String, + doc_batch: DocBatchV2, + commit_type: CommitType, +) -> Result<(), IngestServiceError> { + let subrequest = IngestSubrequest { + subrequest_id: 0, + index_id, + source_id: INGEST_V2_SOURCE_ID.to_string(), + doc_batch: Some(doc_batch), + }; + let request = IngestRequestV2 { + commit_type: commit_type.into(), + subrequests: vec![subrequest], + }; + let mut response = ingest_router + .ingest(request) + .await + .map_err(IngestServiceError::from)?; + let num_responses = response.successes.len() + response.failures.len(); + if num_responses != 1 { + return Err(IngestServiceError::Internal(format!( + "Expected a single failure/success, got {}.", + num_responses + ))); + } + if response.successes.pop().is_some() { + return Ok(()); + } + let ingest_failure = response.failures.pop().unwrap(); + Err(ingest_failure.into()) +} + #[cfg(test)] mod tests { use quickwit_proto::opentelemetry::proto::common::v1::any_value::{ diff --git a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs index c3c4e5e1e72..daa3cc46a1f 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs @@ -26,9 +26,9 @@ use prost::Message; use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; -use quickwit_ingest::{ - CommitType, DocBatch, DocBatchBuilder, IngestRequest, IngestService, IngestServiceClient, -}; +use quickwit_ingest::{CommitType, JsonDocBatchV2Builder}; +use quickwit_proto::ingest::router::IngestRouterServiceClient; +use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceService; use quickwit_proto::opentelemetry::proto::collector::trace::v1::{ ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse, @@ -38,7 +38,7 @@ use quickwit_proto::opentelemetry::proto::resource::v1::Resource as OtlpResource use quickwit_proto::opentelemetry::proto::trace::v1::span::Link as OtlpLink; use quickwit_proto::opentelemetry::proto::trace::v1::status::StatusCode as OtlpStatusCode; use quickwit_proto::opentelemetry::proto::trace::v1::{Span as OtlpSpan, Status as OtlpStatus}; -use quickwit_proto::types::IndexId; +use quickwit_proto::types::{DocUidGenerator, IndexId}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use tonic::{Request, Response, Status}; @@ -46,8 +46,8 @@ use tracing::field::Empty; use tracing::{error, instrument, warn, Span as RuntimeSpan}; use super::{ - extract_otel_index_id_from_metadata, is_zero, OtelSignal, TryFromSpanIdError, - TryFromTraceIdError, + extract_otel_index_id_from_metadata, ingest_doc_batch_v2, is_zero, OtelSignal, + TryFromSpanIdError, TryFromTraceIdError, }; use crate::otlp::metrics::OTLP_SERVICE_METRICS; use crate::otlp::{extract_attributes, SpanId, TraceId}; @@ -670,7 +670,7 @@ fn parse_otlp_spans( } struct ParsedSpans { - doc_batch: DocBatch, + doc_batch: DocBatchV2, num_spans: u64, num_parse_errors: u64, error_message: String, @@ -678,14 +678,17 @@ struct ParsedSpans { #[derive(Debug, Clone)] pub struct OtlpGrpcTracesService { - ingest_service: IngestServiceClient, + ingest_router: IngestRouterServiceClient, commit_type: CommitType, } impl OtlpGrpcTracesService { - pub fn new(ingest_service: IngestServiceClient, commit_type_opt: Option) -> Self { + pub fn new( + ingest_router: IngestRouterServiceClient, + commit_type_opt: Option, + ) -> Self { Self { - ingest_service, + ingest_router, commit_type: commit_type_opt.unwrap_or_default(), } } @@ -714,7 +717,7 @@ impl OtlpGrpcTracesService { error_message, } = run_cpu_intensive({ let parent_span = RuntimeSpan::current(); - || Self::parse_spans(request, parent_span, index_id) + || Self::parse_spans(request, parent_span) }) .await .map_err(|join_error| { @@ -728,7 +731,7 @@ impl OtlpGrpcTracesService { return Err(tonic::Status::internal(error_message)); } let num_bytes = doc_batch.num_bytes() as u64; - self.store_spans(doc_batch).await?; + self.store_spans(index_id, doc_batch).await?; OTLP_SERVICE_METRICS .ingested_spans_total @@ -753,16 +756,17 @@ impl OtlpGrpcTracesService { fn parse_spans( request: ExportTraceServiceRequest, parent_span: RuntimeSpan, - index_id: IndexId, ) -> tonic::Result { let spans = parse_otlp_spans(request)?; let num_spans = spans.len() as u64; let mut num_parse_errors = 0; let mut error_message = String::new(); - let mut doc_batch_builder = DocBatchBuilder::new(index_id).json_writer(); + let mut doc_batch_builder = JsonDocBatchV2Builder::default(); + let mut doc_uid_generator = DocUidGenerator::default(); for span in spans { - if let Err(error) = doc_batch_builder.ingest_doc(&span.0) { + let doc_uid = doc_uid_generator.next_doc_uid(); + if let Err(error) = doc_batch_builder.add_doc(doc_uid, span.0) { error!(error=?error, "failed to JSON serialize span"); error_message = format!("failed to JSON serialize span: {error:?}"); num_parse_errors += 1; @@ -784,12 +788,18 @@ impl OtlpGrpcTracesService { } #[instrument(skip_all, fields(num_bytes = doc_batch.num_bytes()))] - async fn store_spans(&mut self, doc_batch: DocBatch) -> Result<(), tonic::Status> { - let ingest_request = IngestRequest { - doc_batches: vec![doc_batch], - commit: self.commit_type.into(), - }; - self.ingest_service.ingest(ingest_request).await?; + async fn store_spans( + &mut self, + index_id: String, + doc_batch: DocBatchV2, + ) -> Result<(), tonic::Status> { + ingest_doc_batch_v2( + self.ingest_router.clone(), + index_id, + doc_batch, + self.commit_type, + ) + .await?; Ok(()) } diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index 266bc2e6eb8..ee086902522 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -24,10 +24,9 @@ use quickwit_ingest::{ IngestService, IngestServiceClient, IngestServiceError, TailRequest, }; use quickwit_proto::ingest::router::{ - IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, - IngestRouterServiceClient, IngestSubrequest, + IngestRequestV2, IngestResponseV2, IngestRouterService, IngestRouterServiceClient, + IngestSubrequest, }; -use quickwit_proto::ingest::RateLimitingCause; use quickwit_proto::types::{DocUidGenerator, IndexId}; use serde::Deserialize; use warp::{Filter, Rejection}; @@ -167,39 +166,7 @@ fn convert_ingest_response_v2( }); } let ingest_failure = response.failures.pop().unwrap(); - let reason = ingest_failure.reason(); - Err(match reason { - IngestFailureReason::Unspecified => { - IngestServiceError::Internal("unknown error".to_string()) - } - IngestFailureReason::IndexNotFound => IngestServiceError::IndexNotFound { - index_id: ingest_failure.index_id, - }, - IngestFailureReason::SourceNotFound => IngestServiceError::Internal(format!( - "Ingest v2 source not found for index {}", - ingest_failure.index_id - )), - IngestFailureReason::Internal => IngestServiceError::Internal("internal error".to_string()), - IngestFailureReason::NoShardsAvailable => { - IngestServiceError::Unavailable("no shards available".to_string()) - } - IngestFailureReason::ShardRateLimited => { - IngestServiceError::RateLimited(RateLimitingCause::ShardRateLimiting) - } - IngestFailureReason::WalFull => IngestServiceError::RateLimited(RateLimitingCause::WalFull), - IngestFailureReason::Timeout => { - IngestServiceError::Internal("request timed out".to_string()) - } - IngestFailureReason::RouterLoadShedding => { - IngestServiceError::RateLimited(RateLimitingCause::RouterLoadShedding) - } - IngestFailureReason::LoadShedding => { - IngestServiceError::RateLimited(RateLimitingCause::LoadShedding) - } - IngestFailureReason::CircuitBreaker => { - IngestServiceError::RateLimited(RateLimitingCause::CircuitBreaker) - } - }) + Err(ingest_failure.into()) } #[utoipa::path( diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index a00c012956e..fa8df1d9237 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -655,7 +655,7 @@ pub async fn serve_quickwit( let otlp_logs_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) && node_config.indexer_config.enable_otlp_endpoint { - Some(OtlpGrpcLogsService::new(ingest_service.clone())) + Some(OtlpGrpcLogsService::new(ingest_router_service.clone())) } else { None }; @@ -663,7 +663,10 @@ pub async fn serve_quickwit( let otlp_traces_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) && node_config.indexer_config.enable_otlp_endpoint { - Some(OtlpGrpcTracesService::new(ingest_service.clone(), None)) + Some(OtlpGrpcTracesService::new( + ingest_router_service.clone(), + None, + )) } else { None }; diff --git a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs index 07c5f57afcb..6e3bac742c8 100644 --- a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs @@ -201,10 +201,13 @@ mod tests { use flate2::write::GzEncoder; use flate2::Compression; use prost::Message; - use quickwit_ingest::{CommitType, IngestResponse, IngestServiceClient, MockIngestService}; + use quickwit_ingest::CommitType; use quickwit_opentelemetry::otlp::{ make_resource_spans_for_test, OtlpGrpcLogsService, OtlpGrpcTracesService, }; + use quickwit_proto::ingest::router::{ + IngestResponseV2, IngestRouterServiceClient, IngestSuccess, MockIngestRouterService, + }; use quickwit_proto::opentelemetry::proto::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, }; @@ -226,23 +229,27 @@ mod tests { #[tokio::test] async fn test_otlp_ingest_logs_handler() { - let mut mock_ingest_service = MockIngestService::new(); - mock_ingest_service + let mut mock_ingest_router = MockIngestRouterService::new(); + mock_ingest_router .expect_ingest() .withf(|request| { - request.doc_batches.len() == 1 + request.subrequests.len() == 1 + && request.subrequests[0].doc_batch.is_some() // && request.commit == CommitType::Auto as i32 - && request.doc_batches[0].doc_lengths.len() == 1 + && request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 1 }) .returning(|_| { - Ok(IngestResponse { - num_docs_for_processing: 1, + Ok(IngestResponseV2 { + successes: vec![IngestSuccess { + num_ingested_docs: 1, + ..Default::default() + }], + failures: Vec::new(), }) }); - let ingest_service_client = IngestServiceClient::from_mock(mock_ingest_service); - let logs_service = OtlpGrpcLogsService::new(ingest_service_client.clone()); - let traces_service = - OtlpGrpcTracesService::new(ingest_service_client, Some(CommitType::Force)); + let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router); + let logs_service = OtlpGrpcLogsService::new(ingest_router.clone()); + let traces_service = OtlpGrpcTracesService::new(ingest_router, Some(CommitType::Force)); let export_logs_request = ExportLogsServiceRequest { resource_logs: vec![ResourceLogs { resource: Some(Resource { @@ -339,23 +346,27 @@ mod tests { #[tokio::test] async fn test_otlp_ingest_traces_handler() { - let mut mock_ingest_service = MockIngestService::new(); - mock_ingest_service + let mut mock_ingest_router = MockIngestRouterService::new(); + mock_ingest_router .expect_ingest() .withf(|request| { - request.doc_batches.len() == 1 - && request.commit == CommitType::Force as i32 - && request.doc_batches[0].doc_lengths.len() == 5 + request.subrequests.len() == 1 + && request.subrequests[0].doc_batch.is_some() + // && request.commit == CommitType::Auto as i32 + && request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 5 }) .returning(|_| { - Ok(IngestResponse { - num_docs_for_processing: 1, + Ok(IngestResponseV2 { + successes: vec![IngestSuccess { + num_ingested_docs: 1, + ..Default::default() + }], + failures: Vec::new(), }) }); - let ingest_service_client = IngestServiceClient::from_mock(mock_ingest_service); - let logs_service = OtlpGrpcLogsService::new(ingest_service_client.clone()); - let traces_service = - OtlpGrpcTracesService::new(ingest_service_client, Some(CommitType::Force)); + let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router); + let logs_service = OtlpGrpcLogsService::new(ingest_router.clone()); + let traces_service = OtlpGrpcTracesService::new(ingest_router, Some(CommitType::Force)); let export_trace_request = ExportTraceServiceRequest { resource_spans: make_resource_spans_for_test(), };