Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate documents #5046

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-codegen/example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ utoipa = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-common = { workspace = true }
quickwit-proto ={ workspace = true }
quickwit-proto = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl ThreadPool {
/// but is not running yet "cancellable".
pub fn run_cpu_intensive<F, R>(
&self,
cpu_heavy_task: F,
cpu_intensive_fn: F,
) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
Expand All @@ -103,7 +103,7 @@ impl ThreadPool {
let _guard = span.enter();
let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks);
ongoing_task_guard.add(1i64);
let result = cpu_heavy_task();
let result = cpu_intensive_fn();
let _ = tx.send(result);
});
rx.map_err(|_| Panicked)
Expand All @@ -118,7 +118,7 @@ impl ThreadPool {
///
/// Disclaimer: The function will no be executed if the Future is dropped.
#[must_use = "run_cpu_intensive will not run if the future it returns is dropped"]
pub fn run_cpu_intensive<F, R>(cpu_heavy_task: F) -> impl Future<Output = Result<R, Panicked>>
pub fn run_cpu_intensive<F, R>(cpu_intensive_fn: F) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -129,7 +129,7 @@ where
let num_threads: usize = (crate::num_cpus() / 3).max(2);
ThreadPool::new("small_tasks", Some(num_threads))
})
.run_cpu_intensive(cpu_heavy_task)
.run_cpu_intensive(cpu_intensive_fn)
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand Down
38 changes: 23 additions & 15 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod serialize;

use anyhow::ensure;
use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexId;
use quickwit_proto::types::{DocMappingUid, IndexId};
use serde::{Deserialize, Serialize};
pub use serialize::{IndexTemplateV0_8, VersionedIndexTemplate};

Expand Down Expand Up @@ -68,10 +68,14 @@ impl IndexTemplate {
.unwrap_or(default_index_root_uri)
.join(&index_id)?;

// Ensure that the doc mapping UID is truly unique per index.
let mut doc_mapping = self.doc_mapping.clone();
doc_mapping.doc_mapping_uid = DocMappingUid::random();

let index_config = IndexConfig {
index_id,
index_uri,
doc_mapping: self.doc_mapping.clone(),
doc_mapping,
indexing_settings: self.indexing_settings.clone(),
search_settings: self.search_settings.clone(),
retention_policy_opt: self.retention_policy_opt.clone(),
Expand Down Expand Up @@ -235,33 +239,37 @@ mod tests {
});
let default_index_root_uri = Uri::for_test("s3://test-bucket/indexes");

let index_config = index_template
.apply_template("test-index".to_string(), &default_index_root_uri)
let index_config_foo = index_template
.apply_template("test-index-foo".to_string(), &default_index_root_uri)
.unwrap();

assert_eq!(index_config.index_id, "test-index");
assert_eq!(index_config.index_uri, "ram:///indexes/test-index");
assert_eq!(index_config_foo.index_id, "test-index-foo");
assert_eq!(index_config_foo.index_uri, "ram:///indexes/test-index-foo");

assert_eq!(index_config.doc_mapping.timestamp_field.unwrap(), "ts");
assert_eq!(index_config.indexing_settings.commit_timeout_secs, 42);
assert_eq!(index_config_foo.doc_mapping.timestamp_field.unwrap(), "ts");
assert_eq!(index_config_foo.indexing_settings.commit_timeout_secs, 42);
assert_eq!(
index_config.search_settings.default_search_fields,
index_config_foo.search_settings.default_search_fields,
["message"]
);
let retention_policy = index_config.retention_policy_opt.unwrap();
let retention_policy = index_config_foo.retention_policy_opt.unwrap();
assert_eq!(retention_policy.retention_period, "42 days");
assert_eq!(retention_policy.evaluation_schedule, "hourly");

index_template.index_root_uri = None;

let index_config = index_template
.apply_template("test-index".to_string(), &default_index_root_uri)
let index_config_bar = index_template
.apply_template("test-index-bar".to_string(), &default_index_root_uri)
.unwrap();

assert_eq!(index_config.index_id, "test-index");
assert_eq!(index_config_bar.index_id, "test-index-bar");
assert_eq!(
index_config.index_uri,
"s3://test-bucket/indexes/test-index"
index_config_bar.index_uri,
"s3://test-bucket/indexes/test-index-bar"
);
assert_ne!(
index_config_foo.doc_mapping.doc_mapping_uid,
index_config_bar.doc_mapping.doc_mapping_uid
);
}

Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2401,7 +2401,6 @@ mod tests {
control_plane_mailbox.ask(callback).await.unwrap();

let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
println!("{:?}", control_plane_debug_info);
let shard =
&control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000000");
Expand Down
Loading
Loading