Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve placing algorithm #5051

Merged
merged 7 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata {
index_uid: index_uid.clone(),
split_id: split_id.to_string(),
partition_id: 13u64,
num_docs: 10,
num_docs: if split_id == "split1" { 1_000_000 } else { 10 },
uncompressed_docs_size_in_bytes: 256,
time_range: Some(121000..=130198),
create_timestamp: 0,
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1406,9 +1406,10 @@ async fn assign_client_fetch_docs_jobs(
}

// Measure the cost associated to searching in a given split metadata.
fn compute_split_cost(_split_metadata: &SplitMetadata) -> usize {
// TODO: Have a smarter cost, by smoothing the number of docs.
1
fn compute_split_cost(split_metadata: &SplitMetadata) -> usize {
// TODO this formula could be tuned a lot more. The general idea is that there is a fixed
// cost to searching a split, plus a somewhat-linear cost depending on the size of the split
5 + split_metadata.num_docs / 100_000
}

/// Builds a LeafSearchRequest to one node, from a list of [`SearchJob`].
Expand Down
87 changes: 79 additions & 8 deletions quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use async_trait::async_trait;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use tracing::warn;

use crate::{SearchJob, SearchServiceClient, SearcherPool, SEARCH_METRICS};

Expand Down Expand Up @@ -177,13 +178,29 @@ impl SearchJobPlacer {
let mut job_assignments: HashMap<SocketAddr, (SearchServiceClient, Vec<J>)> =
HashMap::with_capacity(num_nodes);

let total_load: usize = jobs.iter().map(|job| job.cost()).sum();

// allow arround 5% disparity. Round up so we never end up in a case where
// target_load * num_nodes < total_load
// some of our tests needs 2 splits to be put on 2 different searchers. It makes sens for
// these tests to keep doing so (testing root merge). Either we can make the allowed
// difference stricter, find the right split names ("split6" instead of "split2" works).
// or modify mock_split_meta() so that not all splits have the same job cost
// for now i went with the mock_split_meta() changes.
const ALLOWED_DIFFERENCE: usize = 105;
let target_load = (total_load * ALLOWED_DIFFERENCE).div_ceil(num_nodes * 100);
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
for job in jobs {
sort_by_rendez_vous_hash(&mut candidate_nodes, job.split_id());
// Select the least loaded node.
let chosen_node_idx = if candidate_nodes.len() >= 2 {
usize::from(candidate_nodes[0].load > candidate_nodes[1].load)

let (chosen_node_idx, chosen_node) = if let Some((idx, node)) = candidate_nodes
.iter_mut()
.enumerate()
.find(|(_pos, node)| node.load < target_load)
{
(idx, node)
} else {
0
warn!("found no lightly loaded searcher for split, this should never happen");
(0, &mut candidate_nodes[0])
};
let metric_node_idx = match chosen_node_idx {
0 => "0",
Expand All @@ -194,8 +211,6 @@ impl SearchJobPlacer {
.job_assigned_total
.with_label_values([metric_node_idx])
.inc();

let chosen_node = &mut candidate_nodes[chosen_node_idx];
chosen_node.load += job.cost();

job_assignments
Expand Down Expand Up @@ -406,19 +421,75 @@ mod tests {
vec![
SearchJob::for_test("split5", 5),
SearchJob::for_test("split4", 4),
SearchJob::for_test("split2", 2),
SearchJob::for_test("split3", 3),
],
),
(
expected_searcher_addr_2,
vec![
SearchJob::for_test("split6", 6),
SearchJob::for_test("split3", 3),
SearchJob::for_test("split2", 2),
SearchJob::for_test("split1", 1),
],
),
];
assert_eq!(assigned_jobs, expected_assigned_jobs);
}
{
let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", MockSearchService::new()),
("127.0.0.1:1002", MockSearchService::new()),
]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let jobs = vec![
SearchJob::for_test("split1", 1000),
SearchJob::for_test("split2", 1),
];
let mut assigned_jobs: Vec<(SocketAddr, Vec<SearchJob>)> = search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await
.unwrap()
.map(|(client, jobs)| (client.grpc_addr(), jobs))
.collect();
assigned_jobs.sort_unstable_by_key(|(node_uid, _)| *node_uid);

let expected_searcher_addr_1: SocketAddr = ([127, 0, 0, 1], 1001).into();
let expected_searcher_addr_2: SocketAddr = ([127, 0, 0, 1], 1002).into();
let expected_assigned_jobs = vec![
(
expected_searcher_addr_1,
vec![SearchJob::for_test("split1", 1000)],
),
(
expected_searcher_addr_2,
vec![SearchJob::for_test("split2", 1)],
),
];
assert_eq!(assigned_jobs, expected_assigned_jobs);
}
}

#[tokio::test]
async fn test_search_job_placer_many_splits() {
let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", MockSearchService::new()),
("127.0.0.1:1002", MockSearchService::new()),
("127.0.0.1:1003", MockSearchService::new()),
("127.0.0.1:1004", MockSearchService::new()),
("127.0.0.1:1005", MockSearchService::new()),
]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let jobs = (0..1000)
.map(|id| SearchJob::for_test(&format!("split{id}"), 1))
.collect();
let jobs_len: Vec<usize> = search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await
.unwrap()
.map(|(_, jobs)| jobs.len())
.collect();
for job_len in jobs_len {
assert!(job_len <= 1050 / 5);
}
}
}
Loading