From b4ca7dff31c9d17dbc9c64411df64ca45a7851b5 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 2 Nov 2023 17:15:30 +0900 Subject: [PATCH] Added a --v2 hidden in the ingest client. --- quickwit/quickwit-cli/src/index.rs | 6 +++ quickwit/quickwit-cli/src/lib.rs | 11 +++++ quickwit/quickwit-cli/src/main.rs | 30 +++++++++++- .../src/test_utils/cluster_sandbox.rs | 2 +- .../src/test_utils/mod.rs | 2 +- .../quickwit-rest-client/src/rest_client.rs | 48 ++++++++++--------- 6 files changed, 73 insertions(+), 26 deletions(-) diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs index ce07f236224..1e461aaae1d 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -130,6 +130,12 @@ pub fn build_index_command() -> Command { .short('w') .help("Wait for all documents to be commited and available for search before exiting") .action(ArgAction::SetTrue), + // TODO remove me after Quickwit 0.7. + Arg::new("v2") + .long("v2") + .help("Ingest v2 (experimental! Do not use me.)") + .hide(true) + .action(ArgAction::SetTrue), Arg::new("force") .long("force") .short('f') diff --git a/quickwit/quickwit-cli/src/lib.rs b/quickwit/quickwit-cli/src/lib.rs index 3c5cdafc366..675aa20245e 100644 --- a/quickwit/quickwit-cli/src/lib.rs +++ b/quickwit/quickwit-cli/src/lib.rs @@ -106,6 +106,7 @@ pub struct ClientArgs { pub connect_timeout: Option, pub timeout: Option, pub commit_timeout: Option, + pub ingest_v2: bool, } impl Default for ClientArgs { @@ -115,6 +116,7 @@ impl Default for ClientArgs { connect_timeout: None, timeout: None, commit_timeout: None, + ingest_v2: false, } } } @@ -128,6 +130,9 @@ impl ClientArgs { if let Some(timeout) = self.timeout { builder = builder.timeout(timeout); } + if self.ingest_v2 { + builder = builder.enable_ingest_v2(); + } builder.build() } @@ -180,6 +185,11 @@ impl ClientArgs { } else { None }; + let ingest_v2 = if process_ingest { + matches.get_flag("v2") + } else { + false + }; let commit_timeout = if process_ingest { if let Some(duration) = matches.remove_one::("commit-timeout") { Some(parse_duration_or_none(&duration)?) @@ -194,6 +204,7 @@ impl ClientArgs { connect_timeout, timeout, commit_timeout, + ingest_v2, }) } } diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 5d8ec6676da..ef757430e60 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -181,6 +181,32 @@ mod tests { Ok(()) } + #[test] + fn test_parse_ingest_v2_args() { + let app = build_cli().no_binary_name(true); + let matches = app + .try_get_matches_from(["index", "ingest", "--index", "wikipedia", "--v2"]) + .unwrap(); + let command = CliCommand::parse_cli_args(matches).unwrap(); + assert!(matches!( + command, + CliCommand::Index(IndexCliCommand::Ingest( + IngestDocsArgs { + client_args, + index_id, + input_path_opt: None, + batch_size_limit_opt: None, + commit_type: CommitType::Auto, + })) if &index_id == "wikipedia" + && client_args.timeout.is_none() + && client_args.connect_timeout.is_none() + && client_args.commit_timeout.is_none() + && client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap() + && client_args.ingest_v2 + + )); + } + #[test] fn test_parse_ingest_args() -> anyhow::Result<()> { let app = build_cli().no_binary_name(true); @@ -207,6 +233,7 @@ mod tests { && client_args.connect_timeout.is_none() && client_args.commit_timeout.is_none() && client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:8000").unwrap() + && !client_args.ingest_v2 )); let app = build_cli().no_binary_name(true); @@ -234,8 +261,8 @@ mod tests { && client_args.timeout.is_none() && client_args.connect_timeout.is_none() && client_args.commit_timeout.is_none() + && !client_args.ingest_v2 && batch_size_limit == Byte::from_str("8MB").unwrap() - )); let app = build_cli().no_binary_name(true); @@ -263,6 +290,7 @@ mod tests { && client_args.timeout.is_none() && client_args.connect_timeout.is_none() && client_args.commit_timeout.is_none() + && !client_args.ingest_v2 && batch_size_limit == Byte::from_str("4KB").unwrap() )); diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 4c94f5d9851..dc08cbd2186 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -108,7 +108,7 @@ macro_rules! ingest_json { }; } -pub async fn ingest_with_retry( +pub(crate) async fn ingest_with_retry( client: &QuickwitClient, index_id: &str, ingest_source: IngestSource, diff --git a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs index 24be4084f9b..79c23571c88 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs @@ -19,4 +19,4 @@ mod cluster_sandbox; -pub use cluster_sandbox::{build_node_configs, ingest_with_retry, ClusterSandbox}; +pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox}; diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 3125c318702..52ffb9e00ee 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -31,6 +31,7 @@ use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; use serde_json::json; +use tracing::warn; use crate::error::Error; use crate::models::{ApiResponse, IngestSource, Timeout}; @@ -119,6 +120,8 @@ pub struct QuickwitClientBuilder { ingest_timeout: Timeout, /// Timeout for the ingest operations that require waiting for commit. commit_timeout: Timeout, + /// Experimental: if true, use the ingest v2 endpoint. + ingest_v2: bool, } impl QuickwitClientBuilder { @@ -130,6 +133,7 @@ impl QuickwitClientBuilder { search_timeout: DEFAULT_CLIENT_SEARCH_TIMEOUT, ingest_timeout: DEFAULT_CLIENT_INGEST_TIMEOUT, commit_timeout: DEFAULT_CLIENT_COMMIT_TIMEOUT, + ingest_v2: false, } } @@ -143,6 +147,12 @@ impl QuickwitClientBuilder { self } + pub fn enable_ingest_v2(mut self) -> Self { + warn!("ingest v2 experimental feature enabled!"); + self.ingest_v2 = true; + self + } + pub fn search_timeout(mut self, timeout: Timeout) -> Self { self.search_timeout = timeout; self @@ -160,13 +170,14 @@ impl QuickwitClientBuilder { pub fn build(self) -> QuickwitClient { let transport = Transport::new(self.base_url, self.connect_timeout); - QuickwitClient::new( + QuickwitClient { transport, - self.timeout, - self.search_timeout, - self.ingest_timeout, - self.commit_timeout, - ) + timeout: self.timeout, + search_timeout: self.search_timeout, + ingest_timeout: self.ingest_timeout, + commit_timeout: self.commit_timeout, + ingest_v2: self.ingest_v2, + } } } @@ -181,25 +192,12 @@ pub struct QuickwitClient { ingest_timeout: Timeout, /// Timeout for the ingest operations that require waiting for commit. commit_timeout: Timeout, + // TODO remove me after Quickwit 0.7 release. + // If true, rely on ingest v2 + ingest_v2: bool, } impl QuickwitClient { - fn new( - transport: Transport, - timeout: Timeout, - search_timeout: Timeout, - ingest_timeout: Timeout, - commit_timeout: Timeout, - ) -> Self { - Self { - transport, - timeout, - search_timeout, - ingest_timeout, - commit_timeout, - } - } - pub async fn search( &self, index_id: &str, @@ -258,7 +256,11 @@ impl QuickwitClient { on_ingest_event: Option<&(dyn Fn(IngestEvent) + Sync)>, last_block_commit: CommitType, ) -> Result<(), Error> { - let ingest_path = format!("{index_id}/ingest"); + let ingest_path = if self.ingest_v2 { + format!("{index_id}/ingest-v2") + } else { + format!("{index_id}/ingest") + }; let batch_size_limit = batch_size_limit_opt.unwrap_or(INGEST_CONTENT_LENGTH_LIMIT); let mut batch_reader = match ingest_source { IngestSource::File(filepath) => {