From 735e38400544f9d3f0646688e93b38cc16cb6af4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 9 Dec 2023 00:33:47 +0900 Subject: [PATCH] minor improvements for simian army (#4249) --- quickwit/quickwit-cli/src/index.rs | 7 +++-- quickwit/quickwit-cli/src/source.rs | 5 +-- quickwit/quickwit-common/src/rate_limiter.rs | 15 +++++++++ .../src/test_utils/cluster_sandbox.rs | 2 +- .../src/tests/basic_tests.rs | 6 ++-- .../src/tests/index_tests.rs | 18 +++++------ quickwit/quickwit-rest-client/src/lib.rs | 25 +++++++++------ quickwit/quickwit-rest-client/src/models.rs | 3 +- .../quickwit-rest-client/src/rest_client.rs | 31 ++++++++++--------- 9 files changed, 65 insertions(+), 47 deletions(-) diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs index 38fca8a0f1b..1a2e72f0e66 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -28,7 +28,6 @@ use std::time::{Duration, Instant}; use std::{fmt, io}; use anyhow::{anyhow, bail, Context}; -use bytes::Bytes; use bytesize::ByteSize; use clap::{arg, Arg, ArgAction, ArgMatches, Command}; use colored::{ColoredString, Colorize}; @@ -466,6 +465,9 @@ pub async fn create_index_cli(args: CreateIndexArgs) -> anyhow::Result<()> { println!("❯ Creating index..."); let storage_resolver = StorageResolver::unconfigured(); let file_content = load_file(&storage_resolver, &args.index_config_uri).await?; + let index_config_str: String = std::str::from_utf8(&file_content) + .with_context(|| format!("Invalid utf8: `{}`", args.index_config_uri))? + .to_string(); let config_format = ConfigFormat::sniff_from_uri(&args.index_config_uri)?; let qw_client = args.client_args.client(); // TODO: nice to have: check first if the index exists by send a GET request, if we get a 404, @@ -479,10 +481,9 @@ pub async fn create_index_cli(args: CreateIndexArgs) -> anyhow::Result<()> { return Ok(()); } } - let bytes = Bytes::from(file_content.to_vec()); qw_client .indexes() - .create(bytes, config_format, args.overwrite) + .create(&index_config_str, config_format, args.overwrite) .await?; println!("{} Index successfully created.", "✔".color(GREEN_COLOR)); Ok(()) diff --git a/quickwit/quickwit-cli/src/source.rs b/quickwit/quickwit-cli/src/source.rs index cecb7586434..175ea7fc4c6 100644 --- a/quickwit/quickwit-cli/src/source.rs +++ b/quickwit/quickwit-cli/src/source.rs @@ -20,7 +20,6 @@ use std::str::FromStr; use anyhow::{bail, Context}; -use bytes::Bytes; use clap::{arg, ArgMatches, Command}; use colored::Colorize; use itertools::Itertools; @@ -330,11 +329,13 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> { println!("❯ Creating source..."); let storage_resolver = StorageResolver::unconfigured(); let source_config_content = load_file(&storage_resolver, &args.source_config_uri).await?; + let source_config_str: &str = std::str::from_utf8(&source_config_content) + .with_context(|| format!("source config is not utf-8: {}", args.source_config_uri))?; let config_format = ConfigFormat::sniff_from_uri(&args.source_config_uri)?; let qw_client = args.client_args.client(); qw_client .sources(&args.index_id) - .create(Bytes::from(source_config_content.to_vec()), config_format) + .create(source_config_str, config_format) .await?; println!("{} Source successfully created.", "✔".color(GREEN_COLOR)); Ok(()) diff --git a/quickwit/quickwit-common/src/rate_limiter.rs b/quickwit/quickwit-common/src/rate_limiter.rs index ddcb3c1baae..a1e1e2e2d37 100644 --- a/quickwit/quickwit-common/src/rate_limiter.rs +++ b/quickwit/quickwit-common/src/rate_limiter.rs @@ -101,6 +101,21 @@ impl RateLimiter { } } + /// Acquires some permits from the rate limiter. + /// If the permits are not available, returns the duration to wait before trying again. + pub fn acquire_with_duration(&mut self, num_permits: u64) -> Result<(), Duration> { + if self.acquire_inner(num_permits) { + return Ok(()); + } + self.refill(Instant::now()); + if self.acquire_inner(num_permits) { + return Ok(()); + } + let missing = num_permits - self.available_permits; + let wait = Duration::from_micros(missing * self.refill_period_micros / self.refill_amount); + Err(wait) + } + pub fn acquire_bytes(&mut self, bytes: ByteSize) -> bool { self.acquire(bytes.as_u64()) } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 0f1cb22ef59..9466bcbaf32 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -104,7 +104,7 @@ fn transport_url(addr: SocketAddr) -> Url { #[macro_export] macro_rules! ingest_json { ($($json:tt)+) => { - quickwit_rest_client::models::IngestSource::Bytes(json!($($json)+).to_string().into()) + quickwit_rest_client::models::IngestSource::Str(json!($($json)+).to_string()) }; } diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index 15cb0abc04f..f0264c7368b 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -106,8 +106,7 @@ async fn test_standalone_server() { field_mappings: - name: body type: text - "# - .into(), + "#, quickwit_config::ConfigFormat::Yaml, false, ) @@ -178,8 +177,7 @@ async fn test_multi_nodes_cluster() { type: text indexing_settings: commit_timeout_secs: 1 - "# - .into(), + "#, quickwit_config::ConfigFormat::Yaml, false, ) diff --git a/quickwit/quickwit-integration-tests/src/tests/index_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_tests.rs index 688114cdb98..4eeaf7c53b5 100644 --- a/quickwit/quickwit-integration-tests/src/tests/index_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/index_tests.rs @@ -20,7 +20,6 @@ use std::collections::HashSet; use std::time::Duration; -use bytes::Bytes; use quickwit_common::test_utils::wait_until_predicate; use quickwit_config::service::QuickwitService; use quickwit_config::ConfigFormat; @@ -40,7 +39,7 @@ async fn test_restarting_standalone_server() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); let index_id = "test-index-with-restarting"; - let index_config = Bytes::from(format!( + let index_config = format!( r#" version: 0.6 index_id: {} @@ -56,7 +55,7 @@ async fn test_restarting_standalone_server() { max_merge_factor: 3 "#, index_id - )); + ); // Create the index. sandbox @@ -287,7 +286,7 @@ async fn test_ingest_v2_happy_path() { sandbox .indexer_rest_client .indexes() - .create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false) + .create(TEST_INDEX_CONFIG, ConfigFormat::Yaml, false) .await .unwrap(); sandbox @@ -330,7 +329,7 @@ async fn test_commit_modes() { sandbox .indexer_rest_client .indexes() - .create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false) + .create(TEST_INDEX_CONFIG, ConfigFormat::Yaml, false) .await .unwrap(); @@ -475,8 +474,7 @@ async fn test_very_large_index_name() { - name: body type: text "#, - ) - .into(), + ), ConfigFormat::Yaml, false, ) @@ -531,8 +529,7 @@ async fn test_very_large_index_name() { - name: body type: text "#, - ) - .into(), + ), ConfigFormat::Yaml, false, ) @@ -568,8 +565,7 @@ async fn test_shutdown() { type: text indexing_settings: commit_timeout_secs: 1 - "# - .into(), + "#, ConfigFormat::Yaml, false, ) diff --git a/quickwit/quickwit-rest-client/src/lib.rs b/quickwit/quickwit-rest-client/src/lib.rs index 6c6d442d188..12f8faac537 100644 --- a/quickwit/quickwit-rest-client/src/lib.rs +++ b/quickwit/quickwit-rest-client/src/lib.rs @@ -29,6 +29,10 @@ pub mod error; pub mod models; pub mod rest_client; +// re-exports +pub use quickwit_config::ConfigFormat; +pub use reqwest::Url; + pub(crate) struct BatchLineReader { buf_reader: BufReader>, buffer: Vec, @@ -107,10 +111,11 @@ impl BatchLineReader { self.has_next } - fn from_bytes(bytes: Bytes, max_batch_num_bytes: usize) -> Self { - use std::io::Cursor; - - Self::new(Box::new(Cursor::new(bytes.to_vec())), max_batch_num_bytes) + fn from_string(payload: impl ToString, max_batch_num_bytes: usize) -> Self { + Self::new( + Box::new(std::io::Cursor::new(payload.to_string().into_bytes())), + max_batch_num_bytes, + ) } } @@ -121,12 +126,12 @@ mod tests { #[tokio::test] async fn test_batch_reader() { { - let mut batch_reader = BatchLineReader::from_bytes("".into(), 10); + let mut batch_reader = BatchLineReader::from_string("".to_string(), 10); assert!(batch_reader.next_batch().await.unwrap().is_none()); assert!(batch_reader.next_batch().await.unwrap().is_none()); } { - let mut batch_reader = BatchLineReader::from_bytes("foo\n".into(), 10); + let mut batch_reader = BatchLineReader::from_string("foo\n", 10); assert_eq!( &batch_reader.next_batch().await.unwrap().unwrap()[..], b"foo\n" @@ -135,7 +140,7 @@ mod tests { assert!(batch_reader.next_batch().await.unwrap().is_none()); } { - let mut batch_reader = BatchLineReader::from_bytes("foo\nbar\nqux\n".into(), 10); + let mut batch_reader = BatchLineReader::from_string("foo\nbar\nqux\n", 10); assert_eq!( &batch_reader.next_batch().await.unwrap().unwrap()[..], b"foo\nbar\n" @@ -148,7 +153,7 @@ mod tests { assert!(batch_reader.next_batch().await.unwrap().is_none()); } { - let mut batch_reader = BatchLineReader::from_bytes("fooo\nbaar\nqux\n".into(), 10); + let mut batch_reader = BatchLineReader::from_string("fooo\nbaar\nqux\n", 10); assert_eq!( &batch_reader.next_batch().await.unwrap().unwrap()[..], b"fooo\nbaar\n" @@ -162,7 +167,7 @@ mod tests { } { let mut batch_reader = - BatchLineReader::from_bytes("foobarquxbaz\nfoo\nbar\nqux\n".into(), 10); + BatchLineReader::from_string("foobarquxbaz\nfoo\nbar\nqux\n", 10); assert_eq!( &batch_reader.next_batch().await.unwrap().unwrap()[..], b"foo\nbar\n" @@ -176,7 +181,7 @@ mod tests { } { let mut batch_reader = - BatchLineReader::from_bytes("foo\nbar\nfoobarquxbaz\nqux\n".into(), 10); + BatchLineReader::from_string("foo\nbar\nfoobarquxbaz\nqux\n", 10); assert_eq!( &batch_reader.next_batch().await.unwrap().unwrap()[..], b"foo\nbar\n" diff --git a/quickwit/quickwit-rest-client/src/models.rs b/quickwit/quickwit-rest-client/src/models.rs index dd1577abd25..4ed8d7d92c9 100644 --- a/quickwit/quickwit-rest-client/src/models.rs +++ b/quickwit/quickwit-rest-client/src/models.rs @@ -20,7 +20,6 @@ use std::path::PathBuf; use std::time::Duration; -use bytes::Bytes; use reqwest::StatusCode; use serde::de::DeserializeOwned; @@ -79,7 +78,7 @@ impl ApiResponse { #[derive(Clone)] pub enum IngestSource { - Bytes(Bytes), + Str(String), File(PathBuf), Stdin, } diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 6b333131b93..109c0b7c15e 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -232,11 +232,11 @@ impl QuickwitClient { IndexClient::new(&self.transport, self.timeout) } - pub fn splits<'a, 'b: 'a>(&'a self, index_id: &'b str) -> SplitClient { + pub fn splits<'a>(&'a self, index_id: &'a str) -> SplitClient { SplitClient::new(&self.transport, self.timeout, index_id) } - pub fn sources<'a, 'b: 'a>(&'a self, index_id: &'b str) -> SourceClient { + pub fn sources<'a>(&'a self, index_id: &'a str) -> SourceClient { SourceClient::new(&self.transport, self.timeout, index_id) } @@ -271,7 +271,9 @@ impl QuickwitClient { BatchLineReader::from_file(&filepath, batch_size_limit).await? } IngestSource::Stdin => BatchLineReader::from_stdin(batch_size_limit), - IngestSource::Bytes(bytes) => BatchLineReader::from_bytes(bytes, batch_size_limit), + IngestSource::Str(ingest_payload) => { + BatchLineReader::from_string(ingest_payload, batch_size_limit) + } }; while let Some(batch) = batch_reader.next_batch().await? { loop { @@ -329,11 +331,12 @@ impl<'a> IndexClient<'a> { pub async fn create( &self, - body: Bytes, + index_config: impl ToString, config_format: ConfigFormat, overwrite: bool, ) -> Result { let header_map = header_from_config_format(config_format); + let body = Bytes::from(index_config.to_string()); let response = self .transport .send( @@ -449,14 +452,14 @@ impl<'a, 'b> SplitClient<'a, 'b> { } /// Client for source APIs. -pub struct SourceClient<'a, 'b> { +pub struct SourceClient<'a> { transport: &'a Transport, timeout: Timeout, - index_id: &'b str, + index_id: &'a str, } -impl<'a, 'b> SourceClient<'a, 'b> { - fn new(transport: &'a Transport, timeout: Timeout, index_id: &'b str) -> Self { +impl<'a> SourceClient<'a> { + fn new(transport: &'a Transport, timeout: Timeout, index_id: &'a str) -> Self { Self { transport, timeout, @@ -470,10 +473,11 @@ impl<'a, 'b> SourceClient<'a, 'b> { pub async fn create( &self, - body: Bytes, + source_config_input: impl ToString, config_format: ConfigFormat, ) -> Result { let header_map = header_from_config_format(config_format); + let source_config_bytes: Bytes = Bytes::from(source_config_input.to_string()); let response = self .transport .send::<()>( @@ -481,7 +485,7 @@ impl<'a, 'b> SourceClient<'a, 'b> { &self.sources_root_url(), Some(header_map), None, - Some(body), + Some(source_config_bytes), self.timeout, ) .await?; @@ -651,7 +655,6 @@ mod test { use std::path::PathBuf; use std::str::FromStr; - use bytes::Bytes; use quickwit_config::{ConfigFormat, SourceConfig}; use quickwit_indexing::mock_split; use quickwit_ingest::CommitType; @@ -886,7 +889,7 @@ mod test { .up_to_n_times(1) .mount(&mock_server) .await; - let post_body = Bytes::from(serde_json::to_vec(&index_config_to_create).unwrap()); + let post_body = serde_json::to_string(&index_config_to_create).unwrap(); assert_eq!( qw_client .indexes() @@ -909,7 +912,7 @@ mod test { assert_eq!( qw_client .indexes() - .create(Bytes::from("".as_bytes()), ConfigFormat::Yaml, false) + .create("", ConfigFormat::Yaml, false) .await .unwrap(), index_metadata @@ -1044,7 +1047,7 @@ mod test { assert_eq!( qw_client .sources("my-index") - .create(Bytes::from("".as_bytes()), ConfigFormat::Toml) + .create("", ConfigFormat::Toml) .await .unwrap(), source_config