Skip to content

Commit

Permalink
Attempt to fix search cluster tests
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 9, 2024
1 parent 6417974 commit 6bc3f80
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, Instant};

use futures_util::future;
use itertools::Itertools;
Expand Down Expand Up @@ -220,7 +220,28 @@ impl ClusterSandbox {
let temp_dir = tempfile::tempdir()?;
let services = QuickwitService::supported_services();
let node_configs = build_node_configs(temp_dir.path().to_path_buf(), &[services]);
Self::start_cluster_with_configs(temp_dir, node_configs).await
let sandbox = Self::start_cluster_with_configs(temp_dir, node_configs).await?;

let now = Instant::now();
let mut tick = tokio::time::interval(Duration::from_millis(100));

loop {
tick.tick().await;

if now.elapsed() > Duration::from_secs(5) {
panic!("standlone node timed out");
}
if sandbox
.indexer_rest_client
.node_health()
.is_ready()
.await
.unwrap_or(false)
{
break;
}
}
Ok(sandbox)
}

pub async fn start_cluster_with_otlp_service(
Expand Down
13 changes: 0 additions & 13 deletions quickwit/quickwit-integration-tests/src/tests/basic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String {
async fn test_ui_redirect_on_get() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_node().await.unwrap();
assert!(sandbox
.indexer_rest_client
.node_health()
.is_ready()
.await
.unwrap());

let node_config = sandbox.node_configs.first().unwrap();
let client = hyper::Client::builder()
.pool_idle_timeout(Duration::from_secs(30))
Expand Down Expand Up @@ -76,12 +69,6 @@ async fn test_ui_redirect_on_get() {
async fn test_standalone_server() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_node().await.unwrap();
assert!(sandbox
.indexer_rest_client
.node_health()
.is_ready()
.await
.unwrap());
{
// The indexing service should be running.
let counters = sandbox
Expand Down
79 changes: 43 additions & 36 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,23 +709,24 @@ mod tests {
#[tokio::test]
async fn test_put_kv_happy_path() {
// 3 servers 1, 2, 3
// Targetted key has affinity [1, 2, 3].
// Targetted key has affinity [2, 3, 1].
//
// Put on 1 and 2 is successful
// Get succeeds on 1.
let mut mock_search_service_1 = MockSearchService::new();
mock_search_service_1
.expect_put_kv()
.once()
.returning(|_put_req: quickwit_proto::search::PutKvRequest| {});
mock_search_service_1.expect_get_kv().once().returning(
|_get_req: quickwit_proto::search::GetKvRequest| Some(b"my_payload".to_vec()),
);
// Put on 2 and 3 is successful
// Get succeeds on 2.
let mock_search_service_1 = MockSearchService::new();
let mut mock_search_service_2 = MockSearchService::new();
mock_search_service_2
.expect_put_kv()
.once()
.returning(|_put_req: quickwit_proto::search::PutKvRequest| {});
mock_search_service_2.expect_put_kv().once().returning(
|put_req: quickwit_proto::search::PutKvRequest| {
assert_eq!(put_req.key, b"my_key");
assert_eq!(put_req.payload, b"my_payload");
},
);
mock_search_service_2.expect_get_kv().once().returning(
|get_req: quickwit_proto::search::GetKvRequest| {
assert_eq!(get_req.key, b"my_key");
Some(b"my_payload".to_vec())
},
);
let mut mock_search_service_3 = MockSearchService::new();
// Due to the buffered call it is possible for the
// put request to 3 to be emitted too.
Expand Down Expand Up @@ -753,32 +754,38 @@ mod tests {
#[tokio::test]
async fn test_put_kv_failing_get() {
// 3 servers 1, 2, 3
// Targetted key has affinity [1, 2, 3].
// Targetted key has affinity [2, 3, 1].
//
// Put on 1 and 2 is successful
// Get fails on 1.
// Get succeeds on 2.
let mut mock_search_service_1 = MockSearchService::new();
mock_search_service_1
.expect_put_kv()
.once()
.returning(|_put_req: quickwit_proto::search::PutKvRequest| {});
mock_search_service_1
.expect_get_kv()
.once()
.returning(|_get_req: quickwit_proto::search::GetKvRequest| None);
// Put on 2 and 3 is successful
// Get fails on 2.
// Get succeeds on 3.
let mock_search_service_1 = MockSearchService::new();
let mut mock_search_service_2 = MockSearchService::new();
mock_search_service_2
.expect_put_kv()
.once()
.returning(|_put_req: quickwit_proto::search::PutKvRequest| {});
mock_search_service_2.expect_put_kv().once().returning(
|put_req: quickwit_proto::search::PutKvRequest| {
assert_eq!(put_req.key, b"my_key");
assert_eq!(put_req.payload, b"my_payload");
},
);
mock_search_service_2.expect_get_kv().once().returning(
|_get_req: quickwit_proto::search::GetKvRequest| Some(b"my_payload".to_vec()),
|get_req: quickwit_proto::search::GetKvRequest| {
assert_eq!(get_req.key, b"my_key");
None
},
);
let mut mock_search_service_3 = MockSearchService::new();
mock_search_service_3
.expect_put_kv()
.returning(|_leaf_search_req: quickwit_proto::search::PutKvRequest| {});
mock_search_service_3.expect_put_kv().once().returning(
|put_req: quickwit_proto::search::PutKvRequest| {
assert_eq!(put_req.key, b"my_key");
assert_eq!(put_req.payload, b"my_payload");
},
);
mock_search_service_3.expect_get_kv().once().returning(
|get_req: quickwit_proto::search::GetKvRequest| {
assert_eq!(get_req.key, b"my_key");
Some(b"my_payload".to_vec())
},
);
let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", mock_search_service_1),
("127.0.0.1:1002", mock_search_service_2),
Expand Down
54 changes: 27 additions & 27 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2889,33 +2889,6 @@ mod tests {

let mut mock_search_service_1 = MockSearchService::new();
mock_search_service_1
.expect_leaf_search()
.times(1)
.returning(
|_leaf_search_req: quickwit_proto::search::LeafSearchRequest| {
Ok(quickwit_proto::search::LeafSearchResponse {
// requests from split 2 arrive here - simulate failure
num_hits: 0,
partial_hits: Vec::new(),
failed_splits: vec![SplitSearchError {
error: "mock_error".to_string(),
split_id: "split2".to_string(),
retryable_error: true,
}],
num_attempted_splits: 1,
..Default::default()
})
},
);
mock_search_service_1.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);
let mut mock_search_service_2 = MockSearchService::new();
mock_search_service_2
.expect_leaf_search()
.times(2)
.returning(
Expand Down Expand Up @@ -2950,6 +2923,33 @@ mod tests {
}
},
);
mock_search_service_1.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);
let mut mock_search_service_2 = MockSearchService::new();
mock_search_service_2
.expect_leaf_search()
.times(1)
.returning(
|_leaf_search_req: quickwit_proto::search::LeafSearchRequest| {
Ok(quickwit_proto::search::LeafSearchResponse {
// requests from split 2 arrive here - simulate failure
num_hits: 0,
partial_hits: Vec::new(),
failed_splits: vec![SplitSearchError {
error: "mock_error".to_string(),
split_id: "split2".to_string(),
retryable_error: true,
}],
num_attempted_splits: 1,
..Default::default()
})
},
);
mock_search_service_2.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
Ok(quickwit_proto::search::FetchDocsResponse {
Expand Down
12 changes: 6 additions & 6 deletions quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,17 @@ mod tests {
(
expected_searcher_addr_1,
vec![
SearchJob::for_test("split6", 6),
SearchJob::for_test("split3", 3),
SearchJob::for_test("split1", 1),
SearchJob::for_test("split5", 5),
SearchJob::for_test("split4", 4),
SearchJob::for_test("split2", 2),
],
),
(
expected_searcher_addr_2,
vec![
SearchJob::for_test("split5", 5),
SearchJob::for_test("split4", 4),
SearchJob::for_test("split2", 2),
SearchJob::for_test("split6", 6),
SearchJob::for_test("split3", 3),
SearchJob::for_test("split1", 1),
],
),
];
Expand Down

0 comments on commit 6bc3f80

Please sign in to comment.