Skip to content

Commit

Permalink
minor improvments for simian army
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Dec 8, 2023
1 parent baf224c commit d1befa0
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 47 deletions.
7 changes: 4 additions & 3 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::time::{Duration, Instant};
use std::{fmt, io};

use anyhow::{anyhow, bail, Context};
use bytes::Bytes;
use bytesize::ByteSize;
use clap::{arg, Arg, ArgAction, ArgMatches, Command};
use colored::{ColoredString, Colorize};
Expand Down Expand Up @@ -466,6 +465,9 @@ pub async fn create_index_cli(args: CreateIndexArgs) -> anyhow::Result<()> {
println!("❯ Creating index...");
let storage_resolver = StorageResolver::unconfigured();
let file_content = load_file(&storage_resolver, &args.index_config_uri).await?;
let index_config_str: String = std::str::from_utf8(&file_content)
.with_context(|| format!("Invalid utf8: `{}`", args.index_config_uri))?
.to_string();
let config_format = ConfigFormat::sniff_from_uri(&args.index_config_uri)?;
let qw_client = args.client_args.client();
// TODO: nice to have: check first if the index exists by send a GET request, if we get a 404,
Expand All @@ -479,10 +481,9 @@ pub async fn create_index_cli(args: CreateIndexArgs) -> anyhow::Result<()> {
return Ok(());
}
}
let bytes = Bytes::from(file_content.to_vec());
qw_client
.indexes()
.create(bytes, config_format, args.overwrite)
.create(&index_config_str, config_format, args.overwrite)
.await?;
println!("{} Index successfully created.", "✔".color(GREEN_COLOR));
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::str::FromStr;

use anyhow::{bail, Context};
use bytes::Bytes;
use clap::{arg, ArgMatches, Command};
use colored::Colorize;
use itertools::Itertools;
Expand Down Expand Up @@ -330,11 +329,13 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
println!("❯ Creating source...");
let storage_resolver = StorageResolver::unconfigured();
let source_config_content = load_file(&storage_resolver, &args.source_config_uri).await?;
let source_config_str: &str = std::str::from_utf8(&source_config_content)
.with_context(|| format!("source config is not utf-8: {}", args.source_config_uri))?;
let config_format = ConfigFormat::sniff_from_uri(&args.source_config_uri)?;
let qw_client = args.client_args.client();
qw_client
.sources(&args.index_id)
.create(Bytes::from(source_config_content.to_vec()), config_format)
.create(source_config_str, config_format)
.await?;
println!("{} Source successfully created.", "✔".color(GREEN_COLOR));
Ok(())
Expand Down
15 changes: 15 additions & 0 deletions quickwit/quickwit-common/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ impl RateLimiter {
}
}

/// Acquires some permits from the rate limiter.
/// If the permits are not available, returns the duration to wait before trying again.
pub fn acquire_with_duration(&mut self, num_permits: u64) -> Result<(), Duration> {
if self.acquire_inner(num_permits) {
return Ok(());
}
self.refill(Instant::now());
if self.acquire_inner(num_permits) {
return Ok(());
}
let missing = num_permits - self.available_permits;
let wait = Duration::from_micros(missing * self.refill_period_micros / self.refill_amount);
Err(wait)
}

pub fn acquire_bytes(&mut self, bytes: ByteSize) -> bool {
self.acquire(bytes.as_u64())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn transport_url(addr: SocketAddr) -> Url {
#[macro_export]
macro_rules! ingest_json {
($($json:tt)+) => {
quickwit_rest_client::models::IngestSource::Bytes(json!($($json)+).to_string().into())
quickwit_rest_client::models::IngestSource::Str(json!($($json)+).to_string())
};
}

Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-integration-tests/src/tests/basic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ async fn test_standalone_server() {
field_mappings:
- name: body
type: text
"#
.into(),
"#,
quickwit_config::ConfigFormat::Yaml,
false,
)
Expand Down Expand Up @@ -178,8 +177,7 @@ async fn test_multi_nodes_cluster() {
type: text
indexing_settings:
commit_timeout_secs: 1
"#
.into(),
"#,
quickwit_config::ConfigFormat::Yaml,
false,
)
Expand Down
18 changes: 7 additions & 11 deletions quickwit/quickwit-integration-tests/src/tests/index_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::collections::HashSet;
use std::time::Duration;

use bytes::Bytes;
use quickwit_common::test_utils::wait_until_predicate;
use quickwit_config::service::QuickwitService;
use quickwit_config::ConfigFormat;
Expand All @@ -40,7 +39,7 @@ async fn test_restarting_standalone_server() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_node().await.unwrap();
let index_id = "test-index-with-restarting";
let index_config = Bytes::from(format!(
let index_config = format!(
r#"
version: 0.6
index_id: {}
Expand All @@ -56,7 +55,7 @@ async fn test_restarting_standalone_server() {
max_merge_factor: 3
"#,
index_id
));
);

// Create the index.
sandbox
Expand Down Expand Up @@ -287,7 +286,7 @@ async fn test_ingest_v2_happy_path() {
sandbox
.indexer_rest_client
.indexes()
.create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false)
.create(TEST_INDEX_CONFIG, ConfigFormat::Yaml, false)
.await
.unwrap();
sandbox
Expand Down Expand Up @@ -330,7 +329,7 @@ async fn test_commit_modes() {
sandbox
.indexer_rest_client
.indexes()
.create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false)
.create(TEST_INDEX_CONFIG, ConfigFormat::Yaml, false)
.await
.unwrap();

Expand Down Expand Up @@ -475,8 +474,7 @@ async fn test_very_large_index_name() {
- name: body
type: text
"#,
)
.into(),
),
ConfigFormat::Yaml,
false,
)
Expand Down Expand Up @@ -531,8 +529,7 @@ async fn test_very_large_index_name() {
- name: body
type: text
"#,
)
.into(),
),
ConfigFormat::Yaml,
false,
)
Expand Down Expand Up @@ -568,8 +565,7 @@ async fn test_shutdown() {
type: text
indexing_settings:
commit_timeout_secs: 1
"#
.into(),
"#,
ConfigFormat::Yaml,
false,
)
Expand Down
25 changes: 15 additions & 10 deletions quickwit/quickwit-rest-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub mod error;
pub mod models;
pub mod rest_client;

// re-exports
pub use quickwit_config::ConfigFormat;
pub use reqwest::Url;

pub(crate) struct BatchLineReader {
buf_reader: BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>,
buffer: Vec<u8>,
Expand Down Expand Up @@ -107,10 +111,11 @@ impl BatchLineReader {
self.has_next
}

fn from_bytes(bytes: Bytes, max_batch_num_bytes: usize) -> Self {
use std::io::Cursor;

Self::new(Box::new(Cursor::new(bytes.to_vec())), max_batch_num_bytes)
fn from_string(payload: impl ToString, max_batch_num_bytes: usize) -> Self {
Self::new(
Box::new(std::io::Cursor::new(payload.to_string().into_bytes())),
max_batch_num_bytes,
)
}
}

Expand All @@ -121,12 +126,12 @@ mod tests {
#[tokio::test]
async fn test_batch_reader() {
{
let mut batch_reader = BatchLineReader::from_bytes("".into(), 10);
let mut batch_reader = BatchLineReader::from_string("".to_string(), 10);
assert!(batch_reader.next_batch().await.unwrap().is_none());
assert!(batch_reader.next_batch().await.unwrap().is_none());
}
{
let mut batch_reader = BatchLineReader::from_bytes("foo\n".into(), 10);
let mut batch_reader = BatchLineReader::from_string("foo\n", 10);
assert_eq!(
&batch_reader.next_batch().await.unwrap().unwrap()[..],
b"foo\n"
Expand All @@ -135,7 +140,7 @@ mod tests {
assert!(batch_reader.next_batch().await.unwrap().is_none());
}
{
let mut batch_reader = BatchLineReader::from_bytes("foo\nbar\nqux\n".into(), 10);
let mut batch_reader = BatchLineReader::from_string("foo\nbar\nqux\n", 10);
assert_eq!(
&batch_reader.next_batch().await.unwrap().unwrap()[..],
b"foo\nbar\n"
Expand All @@ -148,7 +153,7 @@ mod tests {
assert!(batch_reader.next_batch().await.unwrap().is_none());
}
{
let mut batch_reader = BatchLineReader::from_bytes("fooo\nbaar\nqux\n".into(), 10);
let mut batch_reader = BatchLineReader::from_string("fooo\nbaar\nqux\n", 10);
assert_eq!(
&batch_reader.next_batch().await.unwrap().unwrap()[..],
b"fooo\nbaar\n"
Expand All @@ -162,7 +167,7 @@ mod tests {
}
{
let mut batch_reader =
BatchLineReader::from_bytes("foobarquxbaz\nfoo\nbar\nqux\n".into(), 10);
BatchLineReader::from_string("foobarquxbaz\nfoo\nbar\nqux\n", 10);
assert_eq!(
&batch_reader.next_batch().await.unwrap().unwrap()[..],
b"foo\nbar\n"
Expand All @@ -176,7 +181,7 @@ mod tests {
}
{
let mut batch_reader =
BatchLineReader::from_bytes("foo\nbar\nfoobarquxbaz\nqux\n".into(), 10);
BatchLineReader::from_string("foo\nbar\nfoobarquxbaz\nqux\n", 10);
assert_eq!(
&batch_reader.next_batch().await.unwrap().unwrap()[..],
b"foo\nbar\n"
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-rest-client/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::path::PathBuf;
use std::time::Duration;

use bytes::Bytes;
use reqwest::StatusCode;
use serde::de::DeserializeOwned;

Expand Down Expand Up @@ -79,7 +78,7 @@ impl ApiResponse {

#[derive(Clone)]
pub enum IngestSource {
Bytes(Bytes),
Str(String),
File(PathBuf),
Stdin,
}
Expand Down
31 changes: 17 additions & 14 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ impl QuickwitClient {
IndexClient::new(&self.transport, self.timeout)
}

pub fn splits<'a, 'b: 'a>(&'a self, index_id: &'b str) -> SplitClient {
pub fn splits<'a>(&'a self, index_id: &'a str) -> SplitClient {
SplitClient::new(&self.transport, self.timeout, index_id)
}

pub fn sources<'a, 'b: 'a>(&'a self, index_id: &'b str) -> SourceClient {
pub fn sources<'a>(&'a self, index_id: &'a str) -> SourceClient {
SourceClient::new(&self.transport, self.timeout, index_id)
}

Expand Down Expand Up @@ -271,7 +271,9 @@ impl QuickwitClient {
BatchLineReader::from_file(&filepath, batch_size_limit).await?
}
IngestSource::Stdin => BatchLineReader::from_stdin(batch_size_limit),
IngestSource::Bytes(bytes) => BatchLineReader::from_bytes(bytes, batch_size_limit),
IngestSource::Str(ingest_payload) => {
BatchLineReader::from_string(ingest_payload, batch_size_limit)
}
};
while let Some(batch) = batch_reader.next_batch().await? {
loop {
Expand Down Expand Up @@ -329,11 +331,12 @@ impl<'a> IndexClient<'a> {

pub async fn create(
&self,
body: Bytes,
index_config: impl ToString,
config_format: ConfigFormat,
overwrite: bool,
) -> Result<IndexMetadata, Error> {
let header_map = header_from_config_format(config_format);
let body = Bytes::from(index_config.to_string());
let response = self
.transport
.send(
Expand Down Expand Up @@ -449,14 +452,14 @@ impl<'a, 'b> SplitClient<'a, 'b> {
}

/// Client for source APIs.
pub struct SourceClient<'a, 'b> {
pub struct SourceClient<'a> {
transport: &'a Transport,
timeout: Timeout,
index_id: &'b str,
index_id: &'a str,
}

impl<'a, 'b> SourceClient<'a, 'b> {
fn new(transport: &'a Transport, timeout: Timeout, index_id: &'b str) -> Self {
impl<'a> SourceClient<'a> {
fn new(transport: &'a Transport, timeout: Timeout, index_id: &'a str) -> Self {
Self {
transport,
timeout,
Expand All @@ -470,18 +473,19 @@ impl<'a, 'b> SourceClient<'a, 'b> {

pub async fn create(
&self,
body: Bytes,
source_config_input: impl ToString,
config_format: ConfigFormat,
) -> Result<SourceConfig, Error> {
let header_map = header_from_config_format(config_format);
let source_config_bytes: Bytes = Bytes::from(source_config_input.to_string());
let response = self
.transport
.send::<()>(
Method::POST,
&self.sources_root_url(),
Some(header_map),
None,
Some(body),
Some(source_config_bytes),
self.timeout,
)
.await?;
Expand Down Expand Up @@ -651,7 +655,6 @@ mod test {
use std::path::PathBuf;
use std::str::FromStr;

use bytes::Bytes;
use quickwit_config::{ConfigFormat, SourceConfig};
use quickwit_indexing::mock_split;
use quickwit_ingest::CommitType;
Expand Down Expand Up @@ -886,7 +889,7 @@ mod test {
.up_to_n_times(1)
.mount(&mock_server)
.await;
let post_body = Bytes::from(serde_json::to_vec(&index_config_to_create).unwrap());
let post_body = serde_json::to_string(&index_config_to_create).unwrap();
assert_eq!(
qw_client
.indexes()
Expand All @@ -909,7 +912,7 @@ mod test {
assert_eq!(
qw_client
.indexes()
.create(Bytes::from("".as_bytes()), ConfigFormat::Yaml, false)
.create("", ConfigFormat::Yaml, false)
.await
.unwrap(),
index_metadata
Expand Down Expand Up @@ -1044,7 +1047,7 @@ mod test {
assert_eq!(
qw_client
.sources("my-index")
.create(Bytes::from("".as_bytes()), ConfigFormat::Toml)
.create("", ConfigFormat::Toml)
.await
.unwrap(),
source_config
Expand Down

0 comments on commit d1befa0

Please sign in to comment.