Skip to content

Commit

Permalink
Plug ingest v2 into ES bulk API (#4374)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jan 11, 2024
1 parent f6a09db commit bb7043a
Show file tree
Hide file tree
Showing 20 changed files with 780 additions and 142 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use source_config::{
load_source_config_from_user_config, FileSourceParams, GcpPubSubSourceParams,
KafkaSourceParams, KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint,
SourceConfig, SourceInputFormat, SourceParams, TransformConfig, VecSourceParams,
VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID,
VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;

Expand All @@ -64,8 +64,8 @@ pub use crate::metastore_config::{
MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig,
};
pub use crate::node_config::{
IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits,
DEFAULT_QW_CONFIG_PATH,
enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig,
SplitCacheLimits, DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig};
pub use crate::storage_config::{
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::time::Duration;
use anyhow::{bail, ensure};
use bytesize::ByteSize;
use http::HeaderMap;
use once_cell::sync::Lazy;
use quickwit_common::net::HostAddr;
use quickwit_common::uri::Uri;
use quickwit_proto::indexing::CpuCapacity;
Expand Down Expand Up @@ -212,6 +213,12 @@ impl Default for IngestApiConfig {
}
}

/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok());
*ENABLE_INGEST_V2
}

impl IngestApiConfig {
pub fn replication_factor(&self) -> anyhow::Result<NonZeroUsize> {
if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") {
Expand Down
15 changes: 9 additions & 6 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;

use crate::TestableForRegression;
use crate::{enable_ingest_v2, TestableForRegression};

/// Reserved source ID for the `quickwit index ingest` CLI command.
pub const CLI_INGEST_SOURCE_ID: &str = "_ingest-cli-source";
Expand All @@ -44,10 +44,13 @@ pub const INGEST_API_SOURCE_ID: &str = "_ingest-api-source";

/// Reserved source ID used for native Quickwit ingest.
/// (this is for ingest v2)
pub const INGEST_SOURCE_ID: &str = "_ingest-source";
pub const INGEST_V2_SOURCE_ID: &str = "_ingest-source";

pub const RESERVED_SOURCE_IDS: &[&str] =
&[CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID];
pub const RESERVED_SOURCE_IDS: &[&str] = &[
CLI_INGEST_SOURCE_ID,
INGEST_API_SOURCE_ID,
INGEST_V2_SOURCE_ID,
];

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(into = "VersionedSourceConfig")]
Expand Down Expand Up @@ -125,10 +128,10 @@ impl SourceConfig {
/// Creates an ingest source v2.
pub fn ingest_v2_default() -> Self {
Self {
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: false,
enabled: enable_ingest_v2(),
source_params: SourceParams::Ingest,
transform_config: None,
input_format: SourceInputFormat::Json,
Expand Down
34 changes: 17 additions & 17 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ impl EventSubscriber<ShardPositionsUpdate> for ControlPlaneEventSubscriber {
mod tests {
use mockall::Sequence;
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
use quickwit_config::{IndexConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_config::{IndexConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_indexing::IndexingService;
use quickwit_metastore::{
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
Expand Down Expand Up @@ -893,14 +893,14 @@ mod tests {

let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);

let subresponses = vec![ListShardsSubresponse {
index_uid: "test-index:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: "test-index:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 1,
shard_state: ShardState::Open as i32,
..Default::default()
Expand All @@ -925,7 +925,7 @@ mod tests {
subrequests: vec![GetOrCreateOpenShardsSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
}],
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
Expand All @@ -939,7 +939,7 @@ mod tests {

let subresponse = &get_open_shards_response.successes[0];
assert_eq!(subresponse.index_uid, "test-index:0");
assert_eq!(subresponse.source_id, INGEST_SOURCE_ID);
assert_eq!(subresponse.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subresponse.open_shards.len(), 1);
assert_eq!(subresponse.open_shards[0].shard_id, 1);

Expand Down Expand Up @@ -1111,15 +1111,15 @@ mod tests {
assert_eq!(delete_shards_request.subrequests.len(), 1);
let subrequest = &delete_shards_request.subrequests[0];
assert_eq!(subrequest.index_uid, index_uid_clone);
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(&subrequest.shard_ids[..], &[17]);
Ok(DeleteShardsResponse {})
},
);

let mut shard = Shard {
index_uid: index_0.index_uid.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 17,
leader_id: "test_node".to_string(),
..Default::default()
Expand All @@ -1132,7 +1132,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![shard],
next_shard_id: 18,
}],
Expand All @@ -1152,7 +1152,7 @@ mod tests {
);
let source_uid = SourceUid {
index_uid: index_0.index_uid.clone(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};

// This update should not triggeer anything in the control plane.
Expand Down Expand Up @@ -1246,7 +1246,7 @@ mod tests {
assert_eq!(delete_shards_request.subrequests.len(), 1);
let subrequest = &delete_shards_request.subrequests[0];
assert_eq!(subrequest.index_uid, index_uid_clone);
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(&subrequest.shard_ids[..], &[17]);
Ok(DeleteShardsResponse {})
},
Expand All @@ -1258,7 +1258,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![],
next_shard_id: 18,
}],
Expand All @@ -1278,7 +1278,7 @@ mod tests {
);
let source_uid = SourceUid {
index_uid: index_0.index_uid.clone(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};

// This update should not triggeer anything in the control plane.
Expand Down Expand Up @@ -1332,7 +1332,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
Expand Down Expand Up @@ -1428,7 +1428,7 @@ mod tests {
mock_metastore.expect_delete_source().return_once(
move |delete_source_request: DeleteSourceRequest| {
assert_eq!(delete_source_request.index_uid, index_uid_clone.to_string());
assert_eq!(&delete_source_request.source_id, INGEST_SOURCE_ID);
assert_eq!(&delete_source_request.source_id, INGEST_V2_SOURCE_ID);
Ok(EmptyResponse {})
},
);
Expand All @@ -1454,7 +1454,7 @@ mod tests {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
Expand Down Expand Up @@ -1485,7 +1485,7 @@ mod tests {
control_plane_mailbox
.ask(DeleteSourceRequest {
index_uid: index_0.index_uid.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
})
.await
.unwrap()
Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ mod tests {

use std::collections::BTreeSet;

use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{RateMibPerSec, ShardInfo};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
Expand Down Expand Up @@ -1344,7 +1344,7 @@ mod tests {
.returning(|request| {
assert_eq!(request.subrequests.len(), 1);
assert_eq!(request.subrequests[0].index_uid, "test-index:0");
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.subrequests[0].leader_id, "test-ingester");
assert_eq!(request.subrequests[0].next_shard_id, 1);

Expand All @@ -1355,17 +1355,17 @@ mod tests {
mock_metastore.expect_open_shards().returning(|request| {
assert_eq!(request.subrequests.len(), 1);
assert_eq!(request.subrequests[0].index_uid, "test-index:0");
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.subrequests[0].leader_id, "test-ingester");
assert_eq!(request.subrequests[0].next_shard_id, 1);

let subresponses = vec![metastore::OpenShardsSubresponse {
subrequest_id: 0,
index_uid: "test-index:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
opened_shards: vec![Shard {
index_uid: "test-index:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 1,
leader_id: "test-ingester".to_string(),
shard_state: ShardState::Open as i32,
Expand All @@ -1387,7 +1387,7 @@ mod tests {
);

let index_uid: IndexUid = "test-index:0".into();
let source_id: SourceId = INGEST_SOURCE_ID.to_string();
let source_id: SourceId = INGEST_V2_SOURCE_ID.to_string();

let source_uid = SourceUid {
index_uid: index_uid.clone(),
Expand Down Expand Up @@ -1425,7 +1425,7 @@ mod tests {
.returning(|request| {
assert_eq!(request.shards.len(), 1);
assert_eq!(request.shards[0].index_uid, "test-index:0");
assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.shards[0].shard_id, 1);
assert_eq!(request.shards[0].leader_id, "test-ingester");

Expand All @@ -1434,7 +1434,7 @@ mod tests {
ingester_mock.expect_init_shards().returning(|request| {
assert_eq!(request.shards.len(), 1);
assert_eq!(request.shards[0].index_uid, "test-index:0");
assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(request.shards[0].shard_id, 1);
assert_eq!(request.shards[0].leader_id, "test-ingester");

Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl ControlPlaneModel {

#[cfg(test)]
mod tests {
use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::ListIndexesMetadataResponse;
Expand Down Expand Up @@ -381,14 +381,14 @@ mod tests {
assert_eq!(request.subrequests.len(), 2);

assert_eq!(request.subrequests[0].index_uid, "test-index-0:0");
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(
request.subrequests[0].shard_state(),
ShardState::Unspecified
);

assert_eq!(request.subrequests[1].index_uid, "test-index-1:0");
assert_eq!(request.subrequests[1].source_id, INGEST_SOURCE_ID);
assert_eq!(request.subrequests[1].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(
request.subrequests[1].shard_state(),
ShardState::Unspecified
Expand All @@ -397,11 +397,11 @@ mod tests {
let subresponses = vec![
metastore::ListShardsSubresponse {
index_uid: "test-index-0:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![Shard {
shard_id: 42,
index_uid: "test-index-0:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_state: ShardState::Open as i32,
leader_id: "node1".to_string(),
..Default::default()
Expand All @@ -410,7 +410,7 @@ mod tests {
},
metastore::ListShardsSubresponse {
index_uid: "test-index-1:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: Vec::new(),
next_shard_id: 1,
},
Expand Down Expand Up @@ -443,7 +443,7 @@ mod tests {

let source_uid_0 = SourceUid {
index_uid: "test-index-0:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};
let shards: Vec<&ShardEntry> = model
.shard_table
Expand All @@ -458,7 +458,7 @@ mod tests {

let source_uid_1 = SourceUid {
index_uid: "test-index-1:0".into(),
source_id: INGEST_SOURCE_ID.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
};
let shards: Vec<&ShardEntry> = model
.shard_table
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytes = { workspace = true }
bytesize = { workspace = true }
dyn-clone = { workspace = true }
flume = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
Expand Down
Loading

0 comments on commit bb7043a

Please sign in to comment.