From 34a162f28f3ab116ac7cbf469dfdc97041e0b3ce Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 2 Sep 2024 14:27:27 +0200 Subject: [PATCH] Fix flaky integ tests (#5348) * Pass TcpListener to quickwit_serve in integ tests * Use static dispatch instead of enum * Simplify cluster sandbox API * Increase wait_for_split to match higher commit timeouts --- quickwit/quickwit-cli/src/service.rs | 2 + .../quickwit-config/src/node_config/mod.rs | 10 +- .../src/node_config/serialize.rs | 9 +- .../quickwit-integration-tests/Cargo.toml | 2 +- .../src/test_utils/cluster_sandbox.rs | 424 +++++++++--------- .../src/test_utils/mod.rs | 3 +- .../src/test_utils/shutdown.rs | 73 +++ .../src/tests/basic_tests.rs | 35 +- .../src/tests/ingest_tests.rs | 97 ++-- .../src/tests/otlp_tests.rs | 56 ++- .../src/tests/sqs_tests.rs | 6 +- .../tests/update_tests/doc_mapping_tests.rs | 4 +- .../update_tests/search_settings_tests.rs | 22 +- quickwit/quickwit-serve/Cargo.toml | 2 +- quickwit/quickwit-serve/src/grpc.rs | 12 +- quickwit/quickwit-serve/src/lib.rs | 7 +- quickwit/quickwit-serve/src/rest.rs | 10 +- quickwit/quickwit-serve/src/tcp_listener.rs | 81 ++++ 18 files changed, 488 insertions(+), 367 deletions(-) create mode 100644 quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs create mode 100644 quickwit/quickwit-serve/src/tcp_listener.rs diff --git a/quickwit/quickwit-cli/src/service.rs b/quickwit/quickwit-cli/src/service.rs index 889c0455d5a..40693e9d4b5 100644 --- a/quickwit/quickwit-cli/src/service.rs +++ b/quickwit/quickwit-cli/src/service.rs @@ -27,6 +27,7 @@ use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::{Protocol, Uri}; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; +use quickwit_serve::tcp_listener::DefaultTcpListenerResolver; use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn}; use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; use tokio::signal; @@ -114,6 +115,7 @@ impl RunCliCommand { runtimes_config, metastore_resolver, storage_resolver, + DefaultTcpListenerResolver, shutdown_signal, env_filter_reload_fn, ) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index f8a5611d75e..2321bd0399a 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -480,9 +480,17 @@ impl NodeConfig { self.storage_configs.redact(); } + /// Creates a config with defaults suitable for testing. + /// + /// Uses the default ports without ensuring that they are available. #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { - serialize::node_config_for_test() + serialize::node_config_for_tests_from_ports(7280, 7281) + } + + #[cfg(any(test, feature = "testsuite"))] + pub fn for_test_from_ports(rest_listen_port: u16, grpc_listen_port: u16) -> Self { + serialize::node_config_for_tests_from_ports(rest_listen_port, grpc_listen_port) } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 208a929badc..81b9260f01d 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -411,13 +411,13 @@ impl RestConfigBuilder { } #[cfg(any(test, feature = "testsuite"))] -pub fn node_config_for_test() -> NodeConfig { - use quickwit_common::net::find_available_tcp_port; - +pub fn node_config_for_tests_from_ports( + rest_listen_port: u16, + grpc_listen_port: u16, +) -> NodeConfig { let node_id = NodeId::new(default_node_id().unwrap()); let enabled_services = QuickwitService::supported_services(); let listen_address = Host::default(); - let rest_listen_port = find_available_tcp_port().expect("OS should find an available port"); let rest_listen_addr = listen_address .with_port(rest_listen_port) .to_socket_addr() @@ -426,7 +426,6 @@ pub fn node_config_for_test() -> NodeConfig { .with_port(rest_listen_port) .to_socket_addr() .expect("default host should be an IP address"); - let grpc_listen_port = find_available_tcp_port().expect("OS should find an available port"); let grpc_listen_addr = listen_address .with_port(grpc_listen_port) .to_socket_addr() diff --git a/quickwit/quickwit-integration-tests/Cargo.toml b/quickwit/quickwit-integration-tests/Cargo.toml index ecfafd9b5a8..9a2892c3ad4 100644 --- a/quickwit/quickwit-integration-tests/Cargo.toml +++ b/quickwit/quickwit-integration-tests/Cargo.toml @@ -41,5 +41,5 @@ quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-opentelemetry = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-rest-client = { workspace = true } -quickwit-serve = { workspace = true } +quickwit-serve = { workspace = true, features = ["testsuite"] } quickwit-storage = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 2ccf38cc1a6..cfa86ab9e91 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet}; use std::io::Write; use std::net::SocketAddr; -use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -31,8 +30,7 @@ use quickwit_actors::ActorExitStatus; use quickwit_cli::tool::{local_ingest_docs_cli, LocalIngestDocsArgs}; use quickwit_common::new_coolid; use quickwit_common::runtimes::RuntimesConfig; -use quickwit_common::test_utils::{wait_for_server_ready, wait_until_predicate}; -use quickwit_common::tower::BoxFutureInfaillible; +use quickwit_common::test_utils::wait_until_predicate; use quickwit_common::uri::Uri as QuickwitUri; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; @@ -45,151 +43,150 @@ use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::{ CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL, }; +use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver; use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString}; use quickwit_storage::StorageResolver; use reqwest::Url; use serde_json::Value; use tempfile::TempDir; -use tokio::sync::watch::{self, Receiver, Sender}; -use tokio::task::JoinHandle; +use tokio::net::TcpListener; use tonic::transport::channel; use tracing::debug; -/// Configuration of a node made of a [`NodeConfig`] and a -/// set of services. -#[derive(Clone)] +use super::shutdown::NodeShutdownHandle; + pub struct TestNodeConfig { - pub node_config: NodeConfig, pub services: HashSet, + pub enable_otlp: bool, } -type NodeJoinHandle = JoinHandle, anyhow::Error>>; - -struct NodeShutdownHandle { - sender: Sender<()>, - receiver: Receiver<()>, - node_services: HashSet, - node_id: NodeId, - join_handle_opt: Option, +pub struct ClusterSandboxBuilder { + temp_dir: TempDir, + node_configs: Vec, } -impl NodeShutdownHandle { - fn new(node_id: NodeId, node_services: HashSet) -> Self { - let (sender, receiver) = watch::channel(()); +impl Default for ClusterSandboxBuilder { + fn default() -> Self { Self { - sender, - receiver, - node_id, - node_services, - join_handle_opt: None, + temp_dir: tempfile::tempdir().unwrap(), + node_configs: Vec::new(), } } +} - fn shutdown_signal(&self) -> BoxFutureInfaillible<()> { - let receiver = self.receiver.clone(); - Box::pin(async move { - receiver.clone().changed().await.unwrap(); - }) +impl ClusterSandboxBuilder { + pub fn add_node(mut self, services: impl IntoIterator) -> Self { + self.node_configs.push(TestNodeConfig { + services: HashSet::from_iter(services), + enable_otlp: false, + }); + self } - fn set_node_join_handle(&mut self, join_handle: NodeJoinHandle) { - self.join_handle_opt = Some(join_handle); + pub fn add_node_with_otlp( + mut self, + services: impl IntoIterator, + ) -> Self { + self.node_configs.push(TestNodeConfig { + services: HashSet::from_iter(services), + enable_otlp: true, + }); + self } - /// Initiate node shutdown and wait for it to complete - async fn shutdown(self) -> anyhow::Result> { - self.sender.send(()).unwrap(); - self.join_handle_opt - .expect("node join handle was not set before shutdown") - .await - .unwrap() + /// Builds a list of of [`NodeConfig`] from the node definitions added to + /// builder. For each node, a [`NodeConfig`] is built with the right + /// parameters such that we will be able to run `quickwit_serve` on them and + /// form a Quickwit cluster. For each node, we set: + /// - `data_dir_path` defined by `root_data_dir/node_id`. + /// - `metastore_uri` defined by `root_data_dir/metastore`. + /// - `default_index_root_uri` defined by `root_data_dir/indexes`. + /// - `peers` defined by others nodes `gossip_advertise_addr`. + async fn build_config(self) -> ResolvedClusterConfig { + let root_data_dir = self.temp_dir.path().to_path_buf(); + let cluster_id = new_coolid("test-cluster"); + let mut resolved_node_configs = Vec::new(); + let mut peers: Vec = Vec::new(); + let unique_dir_name = new_coolid("test-dir"); + let tcp_listener_resolver = TestTcpListenerResolver::default(); + for (node_idx, node_builder) in self.node_configs.iter().enumerate() { + let socket: SocketAddr = ([127, 0, 0, 1], 0u16).into(); + let rest_tcp_listener = TcpListener::bind(socket).await.unwrap(); + let grpc_tcp_listener = TcpListener::bind(socket).await.unwrap(); + let mut config = NodeConfig::for_test_from_ports( + rest_tcp_listener.local_addr().unwrap().port(), + grpc_tcp_listener.local_addr().unwrap().port(), + ); + tcp_listener_resolver.add_listener(rest_tcp_listener).await; + tcp_listener_resolver.add_listener(grpc_tcp_listener).await; + config.indexer_config.enable_otlp_endpoint = node_builder.enable_otlp; + config.enabled_services.clone_from(&node_builder.services); + config.jaeger_config.enable_endpoint = true; + config.cluster_id.clone_from(&cluster_id); + config.node_id = NodeId::new(format!("test-node-{node_idx}")); + config.data_dir_path = root_data_dir.join(config.node_id.as_str()); + config.metastore_uri = + QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/metastore")).unwrap(); + config.default_index_root_uri = + QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/indexes")).unwrap(); + peers.push(config.gossip_advertise_addr.to_string()); + resolved_node_configs.push((config, node_builder.services.clone())); + } + for node_config in resolved_node_configs.iter_mut() { + node_config.0.peer_seeds = peers + .clone() + .into_iter() + .filter(|seed| *seed != node_config.0.gossip_advertise_addr.to_string()) + .collect_vec(); + } + ResolvedClusterConfig { + temp_dir: self.temp_dir, + node_configs: resolved_node_configs, + tcp_listener_resolver, + } } -} - -/// Creates a Cluster Test environment. -/// -/// The goal is to start several nodes and use the gRPC or REST clients to -/// test it. -/// -/// WARNING: Currently, we cannot start an indexer in a different test as it will -/// will share the same `INGEST_API_SERVICE_INSTANCE`. The ingest API will be -/// dropped by the first running test and the other tests will fail. -pub struct ClusterSandbox { - pub node_configs: Vec, - pub searcher_rest_client: QuickwitClient, - pub indexer_rest_client: QuickwitClient, - pub trace_client: TraceServiceClient, - pub logs_client: LogsServiceClient, - pub jaeger_client: SpanReaderPluginClient, - _temp_dir: TempDir, - node_shutdown_handle: Vec, -} -fn transport_url(addr: SocketAddr) -> Url { - let mut url = Url::parse(DEFAULT_BASE_URL).unwrap(); - url.set_ip_host(addr.ip()).unwrap(); - url.set_port(Some(addr.port())).unwrap(); - url -} + pub async fn build_and_start(self) -> ClusterSandbox { + self.build_config().await.start().await + } -#[macro_export] -macro_rules! ingest_json { - ($($json:tt)+) => { - quickwit_rest_client::models::IngestSource::Str(json!($($json)+).to_string()) - }; + pub async fn build_and_start_standalone() -> ClusterSandbox { + ClusterSandboxBuilder::default() + .add_node(QuickwitService::supported_services()) + .build_config() + .await + .start() + .await + } } -pub(crate) async fn ingest_with_retry( - client: &QuickwitClient, - index_id: &str, - ingest_source: IngestSource, - commit_type: CommitType, -) -> anyhow::Result<()> { - wait_until_predicate( - || { - let commit_type_clone = commit_type; - let ingest_source_clone = ingest_source.clone(); - async move { - // Index one record. - if let Err(err) = client - .ingest(index_id, ingest_source_clone, None, None, commit_type_clone) - .await - { - debug!(index=%index_id, err=%err, "failed to ingest"); - false - } else { - true - } - } - }, - Duration::from_secs(10), - Duration::from_millis(100), - ) - .await?; - Ok(()) +/// Intermediate state where the ports of all the the test cluster nodes have +/// been reserved and the configurations have been generated. +struct ResolvedClusterConfig { + temp_dir: TempDir, + node_configs: Vec<(NodeConfig, HashSet)>, + tcp_listener_resolver: TestTcpListenerResolver, } -impl ClusterSandbox { - pub async fn start_cluster_with_configs( - temp_dir: TempDir, - node_configs: Vec, - ) -> anyhow::Result { +impl ResolvedClusterConfig { + /// Start a cluster using this config and waits for the nodes to be ready + pub async fn start(self) -> ClusterSandbox { + let mut node_shutdown_handles = Vec::new(); let runtimes_config = RuntimesConfig::light_for_tests(); let storage_resolver = StorageResolver::unconfigured(); let metastore_resolver = MetastoreResolver::unconfigured(); - let mut node_shutdown_handlers = Vec::new(); - for node_config in node_configs.iter() { - let mut shutdown_handler = NodeShutdownHandle::new( - node_config.node_config.node_id.clone(), - node_config.services.clone(), - ); + let cluster_size = self.node_configs.len(); + for node_config in self.node_configs.iter() { + let mut shutdown_handler = + NodeShutdownHandle::new(node_config.0.node_id.clone(), node_config.1.clone()); let shutdown_signal = shutdown_handler.shutdown_signal(); let join_handle = tokio::spawn({ - let node_config = node_config.node_config.clone(); + let node_config = node_config.0.clone(); let node_id = node_config.node_id.clone(); let services = node_config.enabled_services.clone(); let metastore_resolver = metastore_resolver.clone(); let storage_resolver = storage_resolver.clone(); + let tcp_listener_resolver = self.tcp_listener_resolver.clone(); async move { let result = serve_quickwit( @@ -197,6 +194,7 @@ impl ClusterSandbox { runtimes_config, metastore_resolver, storage_resolver, + tcp_listener_resolver, shutdown_signal, quickwit_serve::do_nothing_env_filter_reload_fn(), ) @@ -206,92 +204,117 @@ impl ClusterSandbox { } }); shutdown_handler.set_node_join_handle(join_handle); - node_shutdown_handlers.push(shutdown_handler); + node_shutdown_handles.push(shutdown_handler); } - let searcher_config = node_configs + let searcher_config = self + .node_configs .iter() - .find(|node_config| node_config.services.contains(&QuickwitService::Searcher)) + .find(|node_config| node_config.1.contains(&QuickwitService::Searcher)) .cloned() .unwrap(); - let indexer_config = node_configs + let indexer_config = self + .node_configs .iter() - .find(|node_config| node_config.services.contains(&QuickwitService::Indexer)) + .find(|node_config| node_config.1.contains(&QuickwitService::Indexer)) .cloned() .unwrap(); - if node_configs.len() == 1 { - // We have only one node, so we can just wait for it to get started - wait_for_server_ready(node_configs[0].node_config.grpc_listen_addr).await?; - } else { - // Wait for a duration greater than chitchat GOSSIP_INTERVAL (50ms) so that the cluster - // is formed. - tokio::time::sleep(Duration::from_millis(100)).await; - } - let indexer_channel = channel::Endpoint::from_str(&format!( - "http://{}", - indexer_config.node_config.grpc_listen_addr - )) - .unwrap() - .connect_lazy(); - let searcher_channel = channel::Endpoint::from_str(&format!( - "http://{}", - searcher_config.node_config.grpc_listen_addr - )) - .unwrap() - .connect_lazy(); - Ok(Self { - node_configs, + let indexer_channel = + channel::Endpoint::from_str(&format!("http://{}", indexer_config.0.grpc_listen_addr)) + .unwrap() + .connect_lazy(); + let searcher_channel = + channel::Endpoint::from_str(&format!("http://{}", searcher_config.0.grpc_listen_addr)) + .unwrap() + .connect_lazy(); + + let sandbox = ClusterSandbox { + node_configs: self.node_configs, searcher_rest_client: QuickwitClientBuilder::new(transport_url( - searcher_config.node_config.rest_config.listen_addr, + searcher_config.0.rest_config.listen_addr, )) .build(), indexer_rest_client: QuickwitClientBuilder::new(transport_url( - indexer_config.node_config.rest_config.listen_addr, + indexer_config.0.rest_config.listen_addr, )) .build(), trace_client: TraceServiceClient::new(indexer_channel.clone()), logs_client: LogsServiceClient::new(indexer_channel), jaeger_client: SpanReaderPluginClient::new(searcher_channel), - _temp_dir: temp_dir, - node_shutdown_handle: node_shutdown_handlers, - }) + _temp_dir: self.temp_dir, + node_shutdown_handles, + }; + sandbox + .wait_for_cluster_num_ready_nodes(cluster_size) + .await + .unwrap(); + sandbox } +} - pub fn enable_ingest_v2(&mut self) { - self.indexer_rest_client.enable_ingest_v2(); - self.searcher_rest_client.enable_ingest_v2(); - } +fn transport_url(addr: SocketAddr) -> Url { + let mut url = Url::parse(DEFAULT_BASE_URL).unwrap(); + url.set_ip_host(addr.ip()).unwrap(); + url.set_port(Some(addr.port())).unwrap(); + url +} - // Starts one node that runs all the services and wait for it to be ready - pub async fn start_standalone_node() -> anyhow::Result { - let sandbox = Self::start_cluster_nodes(&[QuickwitService::supported_services()]).await?; - sandbox.wait_for_cluster_num_ready_nodes(1).await?; - Ok(sandbox) - } +#[macro_export] +macro_rules! ingest_json { + ($($json:tt)+) => { + quickwit_rest_client::models::IngestSource::Str(json!($($json)+).to_string()) + }; +} - pub async fn start_cluster_with_otlp_service( - nodes_services: &[HashSet], - ) -> anyhow::Result { - let temp_dir = tempfile::tempdir()?; - let mut node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services); - // Set OTLP endpoint for indexers. - for node_config in node_configs.iter_mut() { - if node_config.services.contains(&QuickwitService::Indexer) { - node_config.node_config.indexer_config.enable_otlp_endpoint = true; +pub(crate) async fn ingest_with_retry( + client: &QuickwitClient, + index_id: &str, + ingest_source: IngestSource, + commit_type: CommitType, +) -> anyhow::Result<()> { + wait_until_predicate( + || { + let commit_type_clone = commit_type; + let ingest_source_clone = ingest_source.clone(); + async move { + // Index one record. + if let Err(err) = client + .ingest(index_id, ingest_source_clone, None, None, commit_type_clone) + .await + { + debug!(index=%index_id, err=%err, "failed to ingest"); + false + } else { + true + } } - } - Self::start_cluster_with_configs(temp_dir, node_configs).await - } + }, + Duration::from_secs(10), + Duration::from_millis(100), + ) + .await?; + Ok(()) +} - // Starts nodes with corresponding services given by `nodes_services`. - pub async fn start_cluster_nodes( - nodes_services: &[HashSet], - ) -> anyhow::Result { - let temp_dir = tempfile::tempdir()?; - let node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services); - Self::start_cluster_with_configs(temp_dir, node_configs).await +/// A test environment where you can start a Quickwit cluster and use the gRPC +/// or REST clients to test it. +pub struct ClusterSandbox { + pub node_configs: Vec<(NodeConfig, HashSet)>, + pub searcher_rest_client: QuickwitClient, + pub indexer_rest_client: QuickwitClient, + pub trace_client: TraceServiceClient, + pub logs_client: LogsServiceClient, + pub jaeger_client: SpanReaderPluginClient, + _temp_dir: TempDir, + node_shutdown_handles: Vec, +} + +impl ClusterSandbox { + pub fn enable_ingest_v2(&mut self) { + self.indexer_rest_client.enable_ingest_v2(); + self.searcher_rest_client.enable_ingest_v2(); } - pub async fn wait_for_cluster_num_ready_nodes( + async fn wait_for_cluster_num_ready_nodes( &self, expected_num_ready_nodes: usize, ) -> anyhow::Result<()> { @@ -394,7 +417,7 @@ impl ClusterSandbox { } } }, - Duration::from_secs(10), + Duration::from_secs(15), Duration::from_millis(500), ) .await?; @@ -405,7 +428,7 @@ impl ClusterSandbox { let test_conf = self .node_configs .iter() - .find(|config| config.services.contains(&QuickwitService::Indexer)) + .find(|config| config.1.contains(&QuickwitService::Indexer)) .ok_or(anyhow::anyhow!("No indexer node found"))?; // NodeConfig cannot be serialized, we write our own simplified config let mut tmp_config_file = tempfile::Builder::new().suffix(".yaml").tempfile().unwrap(); @@ -415,7 +438,7 @@ impl ClusterSandbox { metastore_uri: {} data_dir: {:?} "#, - test_conf.node_config.metastore_uri, test_conf.node_config.data_dir_path + test_conf.0.metastore_uri, test_conf.0.data_dir_path ); tmp_config_file.write_all(node_config.as_bytes())?; tmp_config_file.flush()?; @@ -479,10 +502,10 @@ impl ClusterSandbox { let mut shutdown_futures = Vec::new(); let mut shutdown_nodes = HashMap::new(); let mut i = 0; - while i < self.node_shutdown_handle.len() { - let handler_services = &self.node_shutdown_handle[i].node_services; + while i < self.node_shutdown_handles.len() { + let handler_services = &self.node_shutdown_handles[i].node_services; if handler_services.is_subset(shutdown_services) { - let handler_to_shutdown = self.node_shutdown_handle.remove(i); + let handler_to_shutdown = self.node_shutdown_handles.remove(i); shutdown_nodes.insert( handler_to_shutdown.node_id.clone(), handler_to_shutdown.node_services.clone(), @@ -508,48 +531,3 @@ impl ClusterSandbox { .await } } - -/// Builds a list of [`NodeConfig`] given a list of Quickwit services. -/// Each element of `nodes_services` defines the services of a given node. -/// For each node, a `NodeConfig` is built with the right parameters -/// such that we will be able to run `quickwit_serve` on them and form -/// a quickwit cluster. -/// For each node, we set: -/// - `data_dir_path` defined by `root_data_dir/node_id`. -/// - `metastore_uri` defined by `root_data_dir/metastore`. -/// - `default_index_root_uri` defined by `root_data_dir/indexes`. -/// - `peers` defined by others nodes `gossip_advertise_addr`. -pub fn build_node_configs( - root_data_dir: PathBuf, - nodes_services: &[HashSet], -) -> Vec { - let cluster_id = new_coolid("test-cluster"); - let mut node_configs = Vec::new(); - let mut peers: Vec = Vec::new(); - let unique_dir_name = new_coolid("test-dir"); - for (node_idx, node_services) in nodes_services.iter().enumerate() { - let mut config = NodeConfig::for_test(); - config.enabled_services.clone_from(node_services); - config.jaeger_config.enable_endpoint = true; - config.cluster_id.clone_from(&cluster_id); - config.node_id = NodeId::new(format!("test-node-{node_idx}")); - config.data_dir_path = root_data_dir.join(config.node_id.as_str()); - config.metastore_uri = - QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/metastore")).unwrap(); - config.default_index_root_uri = - QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/indexes")).unwrap(); - peers.push(config.gossip_advertise_addr.to_string()); - node_configs.push(TestNodeConfig { - node_config: config, - services: node_services.clone(), - }); - } - for node_config in node_configs.iter_mut() { - node_config.node_config.peer_seeds = peers - .clone() - .into_iter() - .filter(|seed| *seed != node_config.node_config.gossip_advertise_addr.to_string()) - .collect_vec(); - } - node_configs -} diff --git a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs index 82d2213068f..00bbdf73e82 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs @@ -18,5 +18,6 @@ // along with this program. If not, see . mod cluster_sandbox; +mod shutdown; -pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox}; +pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox, ClusterSandboxBuilder}; diff --git a/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs b/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs new file mode 100644 index 00000000000..0e9c1aa4e38 --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs @@ -0,0 +1,73 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::{HashMap, HashSet}; + +use quickwit_actors::ActorExitStatus; +use quickwit_common::tower::BoxFutureInfaillible; +use quickwit_config::service::QuickwitService; +use quickwit_proto::types::NodeId; +use tokio::sync::watch::{self, Receiver, Sender}; +use tokio::task::JoinHandle; + +type NodeJoinHandle = JoinHandle, anyhow::Error>>; + +pub(crate) struct NodeShutdownHandle { + sender: Sender<()>, + receiver: Receiver<()>, + pub node_services: HashSet, + pub node_id: NodeId, + join_handle_opt: Option, +} + +impl NodeShutdownHandle { + pub(crate) fn new(node_id: NodeId, node_services: HashSet) -> Self { + let (sender, receiver) = watch::channel(()); + Self { + sender, + receiver, + node_id, + node_services, + join_handle_opt: None, + } + } + + pub(crate) fn shutdown_signal(&self) -> BoxFutureInfaillible<()> { + let receiver = self.receiver.clone(); + Box::pin(async move { + receiver.clone().changed().await.unwrap(); + }) + } + + pub(crate) fn set_node_join_handle(&mut self, join_handle: NodeJoinHandle) { + self.join_handle_opt = Some(join_handle); + } + + /// Initiate node shutdown and wait for it to complete + + pub(crate) async fn shutdown( + self, + ) -> anyhow::Result> { + self.sender.send(()).unwrap(); + self.join_handle_opt + .expect("node join handle was not set before shutdown") + .await + .unwrap() + } +} diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index ee6f18eafe0..66b71c6738b 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -28,7 +27,7 @@ use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::CommitType; use quickwit_serve::SearchRequestQueryString; -use crate::test_utils::{ingest_with_retry, ClusterSandbox}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder}; fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String { format!( @@ -41,18 +40,15 @@ fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String { #[tokio::test] async fn test_ui_redirect_on_get() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let node_config = sandbox.node_configs.first().unwrap(); let client = hyper::Client::builder() .pool_idle_timeout(Duration::from_secs(30)) .http2_only(true) .build_http(); - let root_uri = format!( - "http://{}/", - node_config.node_config.rest_config.listen_addr - ) - .parse::() - .unwrap(); + let root_uri = format!("http://{}/", node_config.0.rest_config.listen_addr) + .parse::() + .unwrap(); let response = client.get(root_uri.clone()).await.unwrap(); assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY); let post_request = Request::builder() @@ -68,7 +64,7 @@ async fn test_ui_redirect_on_get() { #[tokio::test] async fn test_standalone_server() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; { // The indexing service should be running. let counters = sandbox @@ -125,17 +121,14 @@ async fn test_standalone_server() { #[tokio::test] async fn test_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) - .await - .unwrap(); - sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs index b4c3eab3325..e109facf105 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; use std::time::Duration; use hyper::StatusCode; @@ -29,7 +28,7 @@ use quickwit_rest_client::rest_client::CommitType; use serde_json::json; use crate::ingest_json; -use crate::test_utils::{ingest_with_retry, ClusterSandbox}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder}; fn initialize_tests() { quickwit_common::setup_logging_for_tests(); @@ -39,7 +38,7 @@ fn initialize_tests() { #[tokio::test] async fn test_single_node_cluster() { initialize_tests(); - let mut sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test-single-node-cluster"; let index_config = format!( r#" @@ -59,7 +58,6 @@ async fn test_single_node_cluster() { index_id ); sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(1).await.unwrap(); // Create the index. let current_index_metadata = sandbox @@ -203,20 +201,17 @@ async fn test_single_node_cluster() { #[tokio::test] async fn test_ingest_v2_index_not_found() { initialize_tests(); - let nodes_services = &[ - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([ + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Metastore, QuickwitService::Searcher, - ]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..]) - .await - .unwrap(); + ]) + .build_and_start() + .await; sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap(); let missing_index_err: Error = sandbox .indexer_rest_client .ingest( @@ -241,21 +236,17 @@ async fn test_ingest_v2_index_not_found() { #[tokio::test] async fn test_ingest_v2_happy_path() { initialize_tests(); - - let nodes_services = &[ - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([ + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Metastore, QuickwitService::Searcher, - ]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..]) - .await - .unwrap(); + ]) + .build_and_start() + .await; sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap(); let index_id = "test_happy_path"; let index_config = format!( r#" @@ -328,7 +319,7 @@ async fn test_ingest_v2_happy_path() { #[tokio::test] async fn test_commit_modes() { initialize_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test_commit_modes"; let index_config = format!( r#" @@ -396,7 +387,10 @@ async fn test_commit_modes() { sandbox.assert_hit_count(index_id, "body:auto", 0).await; - tokio::time::sleep(Duration::from_secs(3)).await; + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 3) + .await + .unwrap(); sandbox.assert_hit_count(index_id, "body:auto", 1).await; @@ -407,18 +401,15 @@ async fn test_commit_modes() { #[tokio::test] async fn test_very_large_index_name() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) - .await - .unwrap(); + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); let index_id = "its_very_very_very_very_very_very_very_very_very_very_very_\ very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_\ @@ -509,7 +500,7 @@ async fn test_very_large_index_name() { #[tokio::test] async fn test_shutdown_single_node() { initialize_tests(); - let mut sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test_shutdown_single_node"; sandbox.enable_ingest_v2(); @@ -571,18 +562,16 @@ async fn test_shutdown_single_node() { #[tokio::test] async fn test_shutdown_control_plane_early_shutdown() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([ + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Searcher, QuickwitService::Metastore, QuickwitService::Janitor, - ]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) - .await - .unwrap(); + ]) + .build_and_start() + .await; let index_id = "test_shutdown_separate_indexer"; // TODO: make this test work with ingest v2 (#5068) @@ -632,18 +621,16 @@ async fn test_shutdown_control_plane_early_shutdown() { #[tokio::test] async fn test_shutdown_separate_indexer() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([ + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Searcher, QuickwitService::Metastore, QuickwitService::Janitor, - ]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) - .await - .unwrap(); + ]) + .build_and_start() + .await; let index_id = "test_shutdown_separate_indexer"; // TODO: make this test work with ingest v2 (#5068) diff --git a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs index 752fe45acf3..31df6858fd6 100644 --- a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs @@ -37,7 +37,7 @@ use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, Sc use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span}; use tonic::codec::CompressionEncoding; -use crate::test_utils::ClusterSandbox; +use crate::test_utils::ClusterSandboxBuilder; fn initialize_tests() { quickwit_common::setup_logging_for_tests(); @@ -47,16 +47,14 @@ fn initialize_tests() { #[tokio::test] async fn test_ingest_traces_with_otlp_grpc_api() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) - .await - .unwrap(); + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node_with_otlp([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; // Wait for the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); @@ -142,16 +140,14 @@ async fn test_ingest_traces_with_otlp_grpc_api() { #[tokio::test] async fn test_ingest_logs_with_otlp_grpc_api() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) - .await - .unwrap(); + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node_with_otlp([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; // Wait fo the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); @@ -218,16 +214,14 @@ async fn test_ingest_logs_with_otlp_grpc_api() { #[tokio::test] async fn test_jaeger_api() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) - .await - .unwrap(); + let mut sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node_with_otlp([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; // Wait fo the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); diff --git a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs index 419b1f04f2f..9ae9bd90e21 100644 --- a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs @@ -32,7 +32,7 @@ use quickwit_serve::SearchRequestQueryString; use tempfile::NamedTempFile; use tracing::info; -use crate::test_utils::ClusterSandbox; +use crate::test_utils::ClusterSandboxBuilder; fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { let mut temp_file = tempfile::NamedTempFile::new().unwrap(); @@ -48,7 +48,7 @@ fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { #[tokio::test] async fn test_sqs_single_node_cluster() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test-sqs-source-single-node-cluster"; let index_config = format!( r#" @@ -68,8 +68,6 @@ async fn test_sqs_single_node_cluster() { let sqs_client = sqs_test_helpers::get_localstack_sqs_client().await.unwrap(); let queue_url = sqs_test_helpers::create_queue(&sqs_client, "test-single-node-cluster").await; - sandbox.wait_for_cluster_num_ready_nodes(1).await.unwrap(); - info!("create index"); sandbox .indexer_rest_client diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index cf44ed8bc54..c1d55860b20 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -22,7 +22,7 @@ use std::time::Duration; use serde_json::{json, Value}; use super::assert_hits_unordered; -use crate::test_utils::ClusterSandbox; +use crate::test_utils::ClusterSandboxBuilder; /// Update the doc mapping between 2 calls to local-ingest (forces separate indexing pipelines) and /// assert the number of hits for the given query @@ -35,7 +35,7 @@ async fn validate_search_across_doc_mapping_updates( query_and_expect: &[(&str, Result<&[Value], ()>)], ) { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs index 52d43627f22..9da970fbb98 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; use std::time::Duration; use quickwit_config::service::QuickwitService; @@ -26,22 +25,19 @@ use serde_json::json; use super::assert_hits_unordered; use crate::ingest_json; -use crate::test_utils::{ingest_with_retry, ClusterSandbox}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder}; #[tokio::test] async fn test_update_search_settings_on_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) - .await - .unwrap(); - sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 22db2523d05..b82db775761 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -97,4 +97,4 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } pprof = [ "dep:pprof" ] - +testsuite = [] diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index 7a1c24691ee..403ae46d853 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use std::collections::BTreeSet; -use std::net::SocketAddr; +use std::error::Error; use std::sync::Arc; use bytesize::ByteSize; @@ -32,7 +32,9 @@ use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_serv use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer; use quickwit_proto::search::search_service_server::SearchServiceServer; use quickwit_proto::tonic::codegen::CompressionEncoding; +use quickwit_proto::tonic::transport::server::TcpIncoming; use quickwit_proto::tonic::transport::Server; +use tokio::net::TcpListener; use tracing::*; use crate::developer_api::DeveloperApiServer; @@ -41,7 +43,7 @@ use crate::{QuickwitServices, INDEXING_GRPC_SERVER_METRICS_LAYER}; /// Starts and binds gRPC services to `grpc_listen_addr`. pub(crate) async fn start_grpc_server( - grpc_listen_addr: SocketAddr, + tcp_listener: TcpListener, max_message_size: ByteSize, services: Arc, readiness_trigger: BoxFutureInfaillible<()>, @@ -186,12 +188,16 @@ pub(crate) async fn start_grpc_server( .add_optional_service(otlp_trace_grpc_service) .add_optional_service(search_grpc_service); + let grpc_listen_addr = tcp_listener.local_addr()?; info!( enabled_grpc_services=?enabled_grpc_services, grpc_listen_addr=?grpc_listen_addr, "Starting gRPC server listening on {grpc_listen_addr}." ); - let serve_fut = server_router.serve_with_shutdown(grpc_listen_addr, shutdown_signal); + // nodelay=true and keepalive=None are the default values for Server::builder() + let tcp_incoming = TcpIncoming::from_listener(tcp_listener, true, None) + .map_err(|err: Box| anyhow::anyhow!(err))?; + let serve_fut = server_router.serve_with_incoming_shutdown(tcp_incoming, shutdown_signal); let (serve_res, _trigger_res) = tokio::join!(serve_fut, readiness_trigger); serve_res?; Ok(()) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 2b5b7f123f9..4de79fe5d1b 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -43,6 +43,7 @@ mod rest; mod rest_api_response; mod search_api; pub(crate) mod simple_list; +pub mod tcp_listener; mod template_api; mod ui_handler; @@ -115,6 +116,7 @@ use quickwit_search::{ SearchServiceClient, SearcherContext, SearcherPool, }; use quickwit_storage::{SplitCache, StorageResolver}; +use tcp_listener::TcpListenerResolver; use tokio::sync::oneshot; use tower::timeout::Timeout; use tower::ServiceBuilder; @@ -387,6 +389,7 @@ pub async fn serve_quickwit( runtimes_config: RuntimesConfig, metastore_resolver: MetastoreResolver, storage_resolver: StorageResolver, + tcp_listener_resolver: impl TcpListenerResolver, shutdown_signal: BoxFutureInfaillible<()>, env_filter_reload_fn: EnvFilterReloadFn, ) -> anyhow::Result> { @@ -712,7 +715,7 @@ pub async fn serve_quickwit( } }); let grpc_server = grpc::start_grpc_server( - grpc_listen_addr, + tcp_listener_resolver.resolve(grpc_listen_addr).await?, grpc_config.max_message_size, quickwit_services.clone(), grpc_readiness_trigger, @@ -732,7 +735,7 @@ pub async fn serve_quickwit( } }); let rest_server = rest::start_rest_server( - rest_listen_addr, + tcp_listener_resolver.resolve(rest_listen_addr).await?, quickwit_services, rest_readiness_trigger, rest_shutdown_signal, diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 483961a7378..a021da9c6f7 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -18,13 +18,13 @@ // along with this program. If not, see . use std::fmt::Formatter; -use std::net::SocketAddr; use std::sync::Arc; use hyper::http::HeaderValue; use hyper::{http, Method, StatusCode}; use quickwit_common::tower::BoxFutureInfaillible; use quickwit_search::SearchService; +use tokio::net::TcpListener; use tower::make::Shared; use tower::ServiceBuilder; use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove}; @@ -123,7 +123,7 @@ impl Predicate for CompressionPredicate { /// Starts REST services. pub(crate) async fn start_rest_server( - rest_listen_addr: SocketAddr, + tcp_listener: TcpListener, quickwit_services: Arc, readiness_trigger: BoxFutureInfaillible<()>, shutdown_signal: BoxFutureInfaillible<()>, @@ -209,24 +209,26 @@ pub(crate) async fn start_rest_server( .layer(cors) .service(warp_service); + let rest_listen_addr = tcp_listener.local_addr()?; info!( rest_listen_addr=?rest_listen_addr, "Starting REST server listening on {rest_listen_addr}." ); + let rest_listener_std = tcp_listener.into_std()?; // `graceful_shutdown()` seems to be blocking in presence of existing connections. // The following approach of dropping the serve supposedly is not bullet proof, but it seems to // work in our unit test. // // See more of the discussion here: // https://github.com/hyperium/hyper/issues/2386 + let serve_fut = async move { tokio::select! { - res = hyper::Server::bind(&rest_listen_addr).serve(Shared::new(service)) => { res } + res = hyper::Server::from_tcp(rest_listener_std)?.serve(Shared::new(service)) => { res } _ = shutdown_signal => { Ok(()) } } }; - let (serve_res, _trigger_res) = tokio::join!(serve_fut, readiness_trigger); serve_res?; Ok(()) diff --git a/quickwit/quickwit-serve/src/tcp_listener.rs b/quickwit/quickwit-serve/src/tcp_listener.rs new file mode 100644 index 00000000000..749690c648d --- /dev/null +++ b/quickwit/quickwit-serve/src/tcp_listener.rs @@ -0,0 +1,81 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::net::SocketAddr; + +use quickwit_proto::tonic; +use tokio::net::TcpListener; +use tonic::async_trait; + +/// Resolve `SocketAddr` into `TcpListener` instances. +/// +/// This trait can be used to inject existing [`TcpListener`] instances to the +/// Quickwit REST and gRPC servers when running them in tests. +#[async_trait] +pub trait TcpListenerResolver: Clone + Send + 'static { + async fn resolve(&self, addr: SocketAddr) -> anyhow::Result; +} + +#[derive(Clone)] +pub struct DefaultTcpListenerResolver; + +#[async_trait] +impl TcpListenerResolver for DefaultTcpListenerResolver { + async fn resolve(&self, addr: SocketAddr) -> anyhow::Result { + TcpListener::bind(addr) + .await + .map_err(|err| anyhow::anyhow!(err)) + } +} + +#[cfg(any(test, feature = "testsuite"))] +pub mod for_tests { + use std::collections::HashMap; + use std::sync::Arc; + + use anyhow::Context; + use tokio::sync::Mutex; + + use super::*; + + #[derive(Clone, Default)] + pub struct TestTcpListenerResolver { + listeners: Arc>>, + } + + #[async_trait] + impl TcpListenerResolver for TestTcpListenerResolver { + async fn resolve(&self, addr: SocketAddr) -> anyhow::Result { + self.listeners + .lock() + .await + .remove(&addr) + .context(format!("No listener found for address {}", addr)) + } + } + + impl TestTcpListenerResolver { + pub async fn add_listener(&self, listener: TcpListener) { + self.listeners + .lock() + .await + .insert(listener.local_addr().unwrap(), listener); + } + } +}