Skip to content

Commit

Permalink
Use ingest v2 by default
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jan 7, 2025
1 parent 7af85e8 commit 96f1d1d
Show file tree
Hide file tree
Showing 18 changed files with 438 additions and 231 deletions.
2 changes: 1 addition & 1 deletion quickwit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ test-all:
QW_S3_FORCE_PATH_STYLE_ACCESS=1 \
QW_TEST_DATABASE_URL=postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev \
RUST_MIN_STACK=67108864 \
cargo nextest run --all-features --retries 1
cargo nextest run --all-features --retries 5
cargo nextest run --test failpoints --features fail/failpoints

test-failpoints:
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<A: Actor> Supervisable for ActorHandle<A> {
{
Health::Healthy
} else {
error!(actor = self.name(), "actor-timeout");
error!(actor = self.name(), "actor timed out");
Health::FailureOrUnhealthy
}
}
Expand Down
11 changes: 0 additions & 11 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ pub struct ClientArgs {
pub connect_timeout: Option<Timeout>,
pub timeout: Option<Timeout>,
pub commit_timeout: Option<Timeout>,
pub ingest_v2: bool,
}

impl Default for ClientArgs {
Expand All @@ -118,7 +117,6 @@ impl Default for ClientArgs {
connect_timeout: None,
timeout: None,
commit_timeout: None,
ingest_v2: false,
}
}
}
Expand All @@ -137,9 +135,6 @@ impl ClientArgs {
if let Some(commit_timeout) = self.commit_timeout {
builder = builder.commit_timeout(commit_timeout);
}
if self.ingest_v2 {
builder = builder.enable_ingest_v2();
}
builder.build()
}

Expand Down Expand Up @@ -167,11 +162,6 @@ impl ClientArgs {
} else {
None
};
let ingest_v2 = if process_ingest {
matches.get_flag("v2")
} else {
false
};
let commit_timeout = if process_ingest {
if let Some(duration) = matches.remove_one::<String>("commit-timeout") {
Some(parse_duration_or_none(&duration)?)
Expand All @@ -186,7 +176,6 @@ impl ClientArgs {
connect_timeout,
timeout,
commit_timeout,
ingest_v2,
})
}
}
Expand Down
4 changes: 0 additions & 4 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ mod tests {
&& client_args.connect_timeout.is_none()
&& client_args.commit_timeout.is_none()
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.ingest_v2

));
}
Expand Down Expand Up @@ -273,7 +272,6 @@ mod tests {
&& client_args.connect_timeout.is_none()
&& client_args.commit_timeout.is_none()
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:8000").unwrap()
&& !client_args.ingest_v2
));

let app = build_cli().no_binary_name(true);
Expand Down Expand Up @@ -301,7 +299,6 @@ mod tests {
&& client_args.timeout.is_none()
&& client_args.connect_timeout.is_none()
&& client_args.commit_timeout.is_none()
&& !client_args.ingest_v2
&& batch_size_limit == ByteSize::mb(8)
));

Expand Down Expand Up @@ -330,7 +327,6 @@ mod tests {
&& client_args.timeout.is_none()
&& client_args.connect_timeout.is_none()
&& client_args.commit_timeout.is_none()
&& !client_args.ingest_v2
&& batch_size_limit == ByteSize::kb(4)
));

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub use crate::storage_config::{
/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> =
Lazy::new(|| get_bool_from_env("QW_ENABLE_INGEST_V2", false));
Lazy::new(|| get_bool_from_env("QW_ENABLE_INGEST_V2", true));
*ENABLE_INGEST_V2
}

Expand Down
31 changes: 24 additions & 7 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,10 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
};
info!(%index_uid, source_id, enabled=enable, "toggled source");

let mutation_occurred = self.model.toggle_source(&index_uid, &source_id, enable)?;
let mutation_occurred = self
.model
.toggle_source(&index_uid, &source_id, enable)
.context("failed to toggle source")?;

if mutation_occurred {
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
Expand Down Expand Up @@ -1213,8 +1216,11 @@ mod tests {
let indexer_pool = IndexerPool::default();
let ingester_pool = IngesterPool::default();

let index_metadata = IndexMetadata::for_test("test-index", "ram://test");
let index_uid = index_metadata.index_uid.clone();
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://test");
index_metadata
.add_source(SourceConfig::ingest_v2())
.unwrap();

let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_add_source()
Expand All @@ -1226,14 +1232,16 @@ mod tests {
true
})
.returning(|_| Ok(EmptyResponse {}));
let index_metadata = IndexMetadata::for_test("test-index", "ram://test");
mock_metastore
.expect_list_indexes_metadata()
.returning(move |_| {
.return_once(move |_| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone()
]))
});
mock_metastore
.expect_list_shards()
.return_once(move |_| Ok(ListShardsResponse::default()));

let cluster_config = ClusterConfig::for_test();
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
Expand All @@ -1246,6 +1254,7 @@ mod tests {
ingester_pool,
MetastoreServiceClient::from_mock(mock_metastore),
);
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let source_config = SourceConfig::for_test("test-source", SourceParams::void());
let add_source_request = AddSourceRequest {
index_uid: Some(index_uid),
Expand All @@ -1268,15 +1277,23 @@ mod tests {
let indexer_pool = IndexerPool::default();
let ingester_pool = IngesterPool::default();

let mut mock_metastore = MockMetastoreService::new();
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://toto");
index_metadata
.add_source(SourceConfig::ingest_v2())
.unwrap();

let test_source_config = SourceConfig::for_test("test-source", SourceParams::void());
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
index_metadata.add_source(test_source_config).unwrap();

let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_list_indexes_metadata()
.return_once(|_| Ok(ListIndexesMetadataResponse::for_test(vec![index_metadata])));
mock_metastore
.expect_list_shards()
.return_once(move |_| Ok(ListShardsResponse::default()));

let index_uid = IndexUid::for_test("test-index", 0);
let index_uid_clone = index_uid.clone();
mock_metastore
.expect_toggle_source()
Expand Down
Loading

0 comments on commit 96f1d1d

Please sign in to comment.