diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index cd164c35b38..7b100fd40a2 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5688,6 +5688,7 @@ dependencies = [ "bytesize", "dyn-clone", "flume", + "fnv", "futures", "http", "hyper", diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 13fe40c4976..c637f329a92 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -53,7 +53,7 @@ pub use source_config::{ load_source_config_from_user_config, FileSourceParams, GcpPubSubSourceParams, KafkaSourceParams, KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams, TransformConfig, VecSourceParams, - VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID, + VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID, }; use tracing::warn; @@ -64,8 +64,8 @@ pub use crate::metastore_config::{ MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig, }; pub use crate::node_config::{ - IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits, - DEFAULT_QW_CONFIG_PATH, + enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, + SplitCacheLimits, DEFAULT_QW_CONFIG_PATH, }; use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig}; pub use crate::storage_config::{ diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 8d66384ceec..46acb060bc9 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -29,6 +29,7 @@ use std::time::Duration; use anyhow::{bail, ensure}; use bytesize::ByteSize; use http::HeaderMap; +use once_cell::sync::Lazy; use quickwit_common::net::HostAddr; use quickwit_common::uri::Uri; use quickwit_proto::indexing::CpuCapacity; @@ -212,6 +213,12 @@ impl Default for IngestApiConfig { } } +/// Returns true if the ingest API v2 is enabled. +pub fn enable_ingest_v2() -> bool { + static ENABLE_INGEST_V2: Lazy = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok()); + *ENABLE_INGEST_V2 +} + impl IngestApiConfig { pub fn replication_factor(&self) -> anyhow::Result { if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") { diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index d0182219c2f..e55e83a05b9 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -34,7 +34,7 @@ pub use serialize::load_source_config_from_user_config; // For backward compatibility. use serialize::VersionedSourceConfig; -use crate::TestableForRegression; +use crate::{enable_ingest_v2, TestableForRegression}; /// Reserved source ID for the `quickwit index ingest` CLI command. pub const CLI_INGEST_SOURCE_ID: &str = "_ingest-cli-source"; @@ -44,10 +44,13 @@ pub const INGEST_API_SOURCE_ID: &str = "_ingest-api-source"; /// Reserved source ID used for native Quickwit ingest. /// (this is for ingest v2) -pub const INGEST_SOURCE_ID: &str = "_ingest-source"; +pub const INGEST_V2_SOURCE_ID: &str = "_ingest-source"; -pub const RESERVED_SOURCE_IDS: &[&str] = - &[CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID]; +pub const RESERVED_SOURCE_IDS: &[&str] = &[ + CLI_INGEST_SOURCE_ID, + INGEST_API_SOURCE_ID, + INGEST_V2_SOURCE_ID, +]; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(into = "VersionedSourceConfig")] @@ -125,10 +128,10 @@ impl SourceConfig { /// Creates an ingest source v2. pub fn ingest_v2_default() -> Self { Self { - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), - enabled: false, + enabled: enable_ingest_v2(), source_params: SourceParams::Ingest, transform_config: None, input_format: SourceInputFormat::Json, diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 3370aa7868e..823ea42a9eb 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -569,7 +569,7 @@ impl EventSubscriber for ControlPlaneEventSubscriber { mod tests { use mockall::Sequence; use quickwit_actors::{AskError, Observe, SupervisorMetrics}; - use quickwit_config::{IndexConfig, SourceParams, INGEST_SOURCE_ID}; + use quickwit_config::{IndexConfig, SourceParams, INGEST_V2_SOURCE_ID}; use quickwit_indexing::IndexingService; use quickwit_metastore::{ CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt, @@ -893,14 +893,14 @@ mod tests { let subrequest = &request.subrequests[0]; assert_eq!(subrequest.index_uid, "test-index:0"); - assert_eq!(subrequest.source_id, INGEST_SOURCE_ID); + assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID); let subresponses = vec![ListShardsSubresponse { index_uid: "test-index:0".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shards: vec![Shard { index_uid: "test-index:0".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shard_id: 1, shard_state: ShardState::Open as i32, ..Default::default() @@ -925,7 +925,7 @@ mod tests { subrequests: vec![GetOrCreateOpenShardsSubrequest { subrequest_id: 0, index_id: "test-index".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), }], closed_shards: Vec::new(), unavailable_leaders: Vec::new(), @@ -939,7 +939,7 @@ mod tests { let subresponse = &get_open_shards_response.successes[0]; assert_eq!(subresponse.index_uid, "test-index:0"); - assert_eq!(subresponse.source_id, INGEST_SOURCE_ID); + assert_eq!(subresponse.source_id, INGEST_V2_SOURCE_ID); assert_eq!(subresponse.open_shards.len(), 1); assert_eq!(subresponse.open_shards[0].shard_id, 1); @@ -1111,7 +1111,7 @@ mod tests { assert_eq!(delete_shards_request.subrequests.len(), 1); let subrequest = &delete_shards_request.subrequests[0]; assert_eq!(subrequest.index_uid, index_uid_clone); - assert_eq!(subrequest.source_id, INGEST_SOURCE_ID); + assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID); assert_eq!(&subrequest.shard_ids[..], &[17]); Ok(DeleteShardsResponse {}) }, @@ -1119,7 +1119,7 @@ mod tests { let mut shard = Shard { index_uid: index_0.index_uid.to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shard_id: 17, leader_id: "test_node".to_string(), ..Default::default() @@ -1132,7 +1132,7 @@ mod tests { let list_shards_resp = ListShardsResponse { subresponses: vec![ListShardsSubresponse { index_uid: index_uid_clone.to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shards: vec![shard], next_shard_id: 18, }], @@ -1152,7 +1152,7 @@ mod tests { ); let source_uid = SourceUid { index_uid: index_0.index_uid.clone(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), }; // This update should not triggeer anything in the control plane. @@ -1246,7 +1246,7 @@ mod tests { assert_eq!(delete_shards_request.subrequests.len(), 1); let subrequest = &delete_shards_request.subrequests[0]; assert_eq!(subrequest.index_uid, index_uid_clone); - assert_eq!(subrequest.source_id, INGEST_SOURCE_ID); + assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID); assert_eq!(&subrequest.shard_ids[..], &[17]); Ok(DeleteShardsResponse {}) }, @@ -1258,7 +1258,7 @@ mod tests { let list_shards_resp = ListShardsResponse { subresponses: vec![ListShardsSubresponse { index_uid: index_uid_clone.to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shards: vec![], next_shard_id: 18, }], @@ -1278,7 +1278,7 @@ mod tests { ); let source_uid = SourceUid { index_uid: index_0.index_uid.clone(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), }; // This update should not triggeer anything in the control plane. @@ -1332,7 +1332,7 @@ mod tests { let list_shards_resp = ListShardsResponse { subresponses: vec![ListShardsSubresponse { index_uid: index_uid_clone.to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shards: vec![Shard { index_uid: index_uid_clone.to_string(), source_id: source.source_id.to_string(), @@ -1428,7 +1428,7 @@ mod tests { mock_metastore.expect_delete_source().return_once( move |delete_source_request: DeleteSourceRequest| { assert_eq!(delete_source_request.index_uid, index_uid_clone.to_string()); - assert_eq!(&delete_source_request.source_id, INGEST_SOURCE_ID); + assert_eq!(&delete_source_request.source_id, INGEST_V2_SOURCE_ID); Ok(EmptyResponse {}) }, ); @@ -1454,7 +1454,7 @@ mod tests { let list_shards_resp = ListShardsResponse { subresponses: vec![ListShardsSubresponse { index_uid: index_uid_clone.to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shards: vec![Shard { index_uid: index_uid_clone.to_string(), source_id: source.source_id.to_string(), @@ -1485,7 +1485,7 @@ mod tests { control_plane_mailbox .ask(DeleteSourceRequest { index_uid: index_0.index_uid.to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), }) .await .unwrap() diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 5cd46ce71a8..ebdbbb2477a 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -672,7 +672,7 @@ mod tests { use std::collections::BTreeSet; - use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID}; + use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID}; use quickwit_ingest::{RateMibPerSec, ShardInfo}; use quickwit_metastore::IndexMetadata; use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; @@ -1344,7 +1344,7 @@ mod tests { .returning(|request| { assert_eq!(request.subrequests.len(), 1); assert_eq!(request.subrequests[0].index_uid, "test-index:0"); - assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID); assert_eq!(request.subrequests[0].leader_id, "test-ingester"); assert_eq!(request.subrequests[0].next_shard_id, 1); @@ -1355,17 +1355,17 @@ mod tests { mock_metastore.expect_open_shards().returning(|request| { assert_eq!(request.subrequests.len(), 1); assert_eq!(request.subrequests[0].index_uid, "test-index:0"); - assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID); assert_eq!(request.subrequests[0].leader_id, "test-ingester"); assert_eq!(request.subrequests[0].next_shard_id, 1); let subresponses = vec![metastore::OpenShardsSubresponse { subrequest_id: 0, index_uid: "test-index:0".into(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), opened_shards: vec![Shard { index_uid: "test-index:0".into(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shard_id: 1, leader_id: "test-ingester".to_string(), shard_state: ShardState::Open as i32, @@ -1387,7 +1387,7 @@ mod tests { ); let index_uid: IndexUid = "test-index:0".into(); - let source_id: SourceId = INGEST_SOURCE_ID.to_string(); + let source_id: SourceId = INGEST_V2_SOURCE_ID.to_string(); let source_uid = SourceUid { index_uid: index_uid.clone(), @@ -1425,7 +1425,7 @@ mod tests { .returning(|request| { assert_eq!(request.shards.len(), 1); assert_eq!(request.shards[0].index_uid, "test-index:0"); - assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID); assert_eq!(request.shards[0].shard_id, 1); assert_eq!(request.shards[0].leader_id, "test-ingester"); @@ -1434,7 +1434,7 @@ mod tests { ingester_mock.expect_init_shards().returning(|request| { assert_eq!(request.shards.len(), 1); assert_eq!(request.shards[0].index_uid, "test-index:0"); - assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID); assert_eq!(request.shards[0].shard_id, 1); assert_eq!(request.shards[0].leader_id, "test-ingester"); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index b7e2d9642ab..aee54dcc9d4 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -345,7 +345,7 @@ impl ControlPlaneModel { #[cfg(test)] mod tests { - use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID}; + use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID}; use quickwit_metastore::IndexMetadata; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::ListIndexesMetadataResponse; @@ -381,14 +381,14 @@ mod tests { assert_eq!(request.subrequests.len(), 2); assert_eq!(request.subrequests[0].index_uid, "test-index-0:0"); - assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID); assert_eq!( request.subrequests[0].shard_state(), ShardState::Unspecified ); assert_eq!(request.subrequests[1].index_uid, "test-index-1:0"); - assert_eq!(request.subrequests[1].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[1].source_id, INGEST_V2_SOURCE_ID); assert_eq!( request.subrequests[1].shard_state(), ShardState::Unspecified @@ -397,11 +397,11 @@ mod tests { let subresponses = vec![ metastore::ListShardsSubresponse { index_uid: "test-index-0:0".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shards: vec![Shard { shard_id: 42, index_uid: "test-index-0:0".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shard_state: ShardState::Open as i32, leader_id: "node1".to_string(), ..Default::default() @@ -410,7 +410,7 @@ mod tests { }, metastore::ListShardsSubresponse { index_uid: "test-index-1:0".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), shards: Vec::new(), next_shard_id: 1, }, @@ -443,7 +443,7 @@ mod tests { let source_uid_0 = SourceUid { index_uid: "test-index-0:0".into(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), }; let shards: Vec<&ShardEntry> = model .shard_table @@ -458,7 +458,7 @@ mod tests { let source_uid_1 = SourceUid { index_uid: "test-index-1:0".into(), - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), }; let shards: Vec<&ShardEntry> = model .shard_table diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 06c46b43cd2..6734633886d 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -12,6 +12,7 @@ bytes = { workspace = true } bytesize = { workspace = true } dyn-clone = { workspace = true } flume = { workspace = true } +fnv = { workspace = true } futures = { workspace = true } http = { workspace = true } hyper = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 2dfae0f937e..4fe92ba156f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -36,11 +36,14 @@ use std::fmt; use std::ops::{Add, AddAssign}; pub use broadcast::{setup_local_shards_update_listener, LocalShardsUpdate, ShardInfo, ShardInfos}; +use bytes::{BufMut, BytesMut}; use bytesize::ByteSize; +use fnv::FnvHashMap; use quickwit_common::tower::Pool; use quickwit_proto::ingest::ingester::IngesterServiceClient; -use quickwit_proto::ingest::DocBatchV2; -use quickwit_proto::types::NodeId; +use quickwit_proto::ingest::router::{IngestRequestV2, IngestSubrequest}; +use quickwit_proto::ingest::{CommitTypeV2, DocBatchV2}; +use quickwit_proto::types::{IndexId, NodeId}; pub use self::fetch::{FetchStreamError, MultiFetchStream}; pub use self::ingester::{wait_for_ingester_decommission, Ingester}; @@ -57,6 +60,80 @@ pub type LeaderId = NodeId; pub type FollowerId = NodeId; +/// Helper struct to build a [`DocBatchV2`]`. +#[derive(Debug, Default)] +pub struct DocBatchV2Builder { + doc_buffer: BytesMut, + doc_lengths: Vec, +} + +impl DocBatchV2Builder { + /// Adds a document to the batch. + pub fn add_doc(&mut self, doc: &[u8]) { + self.doc_lengths.push(doc.len() as u32); + self.doc_buffer.put(doc); + } + + /// Builds the [`DocBatchV2`], returning `None` if the batch is empty. + pub fn build(self) -> Option { + if self.doc_lengths.is_empty() { + return None; + } + let doc_batch = DocBatchV2 { + doc_buffer: self.doc_buffer.freeze(), + doc_lengths: self.doc_lengths, + }; + Some(doc_batch) + } +} + +/// Helper struct to build an [`IngestRequestV2`]. +#[derive(Debug, Default)] +pub struct IngestRequestV2Builder { + per_index_id_doc_batch_builders: FnvHashMap, +} + +impl IngestRequestV2Builder { + /// Adds a document to the request. + pub fn add_doc(&mut self, index_id: IndexId, doc: &[u8]) { + let doc_batch_builder = self + .per_index_id_doc_batch_builders + .entry(index_id) + .or_default(); + doc_batch_builder.add_doc(doc); + } + + /// Builds the [`IngestRequestV2`], returning `None` if the request is empty. + pub fn build(self, source_id: &str, commit_type: CommitTypeV2) -> Option { + let subrequests: Vec = self + .per_index_id_doc_batch_builders + .into_iter() + .enumerate() + .flat_map(|(subrequest_id, (index_id, doc_batch_builder))| { + let Some(doc_batch) = doc_batch_builder.build() else { + return None; + }; + let ingest_subrequest = IngestSubrequest { + subrequest_id: subrequest_id as u32, + index_id, + source_id: source_id.to_string(), + doc_batch: Some(doc_batch), + }; + Some(ingest_subrequest) + }) + .collect(); + + if subrequests.is_empty() { + return None; + } + let ingest_request = IngestRequestV2 { + subrequests, + commit_type: commit_type as i32, + }; + Some(ingest_request) + } +} + pub(super) fn estimate_size(doc_batch: &DocBatchV2) -> ByteSize { let estimate = doc_batch.num_bytes() + doc_batch.num_docs() * MRECORD_HEADER_LEN; ByteSize(estimate as u64) @@ -95,8 +172,119 @@ impl AddAssign for RateMibPerSec { #[cfg(test)] mod tests { + use bytes::Bytes; + use super::*; + #[test] + fn test_doc_batch_builder() { + let doc_batch_builder = DocBatchV2Builder::default(); + let doc_batch_opt = doc_batch_builder.build(); + assert!(doc_batch_opt.is_none()); + + let mut doc_batch_builder = DocBatchV2Builder::default(); + doc_batch_builder.add_doc(b"Hello, "); + doc_batch_builder.add_doc(b"World!"); + let doc_batch = doc_batch_builder.build().unwrap(); + + assert_eq!(doc_batch.num_docs(), 2); + assert_eq!(doc_batch.num_bytes(), 13); + assert_eq!(doc_batch.doc_lengths, [7, 6]); + assert_eq!(doc_batch.doc_buffer, Bytes::from(&b"Hello, World!"[..])); + } + + #[test] + fn test_ingest_request_builder() { + let ingest_request_builder = IngestRequestV2Builder::default(); + let ingest_request_opt = ingest_request_builder.build("test-source", CommitTypeV2::Auto); + assert!(ingest_request_opt.is_none()); + + let mut ingest_request_builder = IngestRequestV2Builder::default(); + ingest_request_builder.add_doc("test-index-foo".to_string(), b"Hello, "); + ingest_request_builder.add_doc("test-index-foo".to_string(), b"World!"); + + ingest_request_builder.add_doc("test-index-bar".to_string(), b"Hola, "); + ingest_request_builder.add_doc("test-index-bar".to_string(), b"Mundo!"); + let mut ingest_request = ingest_request_builder + .build("test-source", CommitTypeV2::Auto) + .unwrap(); + + ingest_request + .subrequests + .sort_by(|left, right| left.index_id.cmp(&right.index_id).reverse()); + + assert_eq!(ingest_request.subrequests.len(), 2); + assert_eq!(ingest_request.subrequests[0].index_id, "test-index-foo"); + assert_eq!(ingest_request.subrequests[0].source_id, "test-source"); + assert_eq!( + ingest_request.subrequests[0] + .doc_batch + .as_ref() + .unwrap() + .num_docs(), + 2 + ); + assert_eq!( + ingest_request.subrequests[0] + .doc_batch + .as_ref() + .unwrap() + .num_bytes(), + 13 + ); + assert_eq!( + ingest_request.subrequests[0] + .doc_batch + .as_ref() + .unwrap() + .doc_lengths, + [7, 6] + ); + assert_eq!( + ingest_request.subrequests[0] + .doc_batch + .as_ref() + .unwrap() + .doc_buffer, + Bytes::from(&b"Hello, World!"[..]) + ); + + assert_eq!(ingest_request.subrequests[1].index_id, "test-index-bar"); + assert_eq!(ingest_request.subrequests[1].source_id, "test-source"); + assert_eq!( + ingest_request.subrequests[1] + .doc_batch + .as_ref() + .unwrap() + .num_docs(), + 2 + ); + assert_eq!( + ingest_request.subrequests[1] + .doc_batch + .as_ref() + .unwrap() + .num_bytes(), + 12 + ); + assert_eq!( + ingest_request.subrequests[1] + .doc_batch + .as_ref() + .unwrap() + .doc_lengths, + [6, 6] + ); + assert_eq!( + ingest_request.subrequests[1] + .doc_batch + .as_ref() + .unwrap() + .doc_buffer, + Bytes::from(&b"Hola, Mundo!"[..]) + ); + } + #[test] fn test_estimate_size() { let doc_batch = DocBatchV2 { diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs index 0ff9fd40926..be6a97f5bf2 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs @@ -30,7 +30,7 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::PrettySample; -use quickwit_config::{SourceConfig, INGEST_SOURCE_ID}; +use quickwit_config::{SourceConfig, INGEST_V2_SOURCE_ID}; use quickwit_proto::metastore::{ AcquireShardsSubrequest, AcquireShardsSubresponse, DeleteQuery, DeleteShardsSubrequest, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, @@ -85,7 +85,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex { let index_metadata = IndexMetadata::sample_for_regression(); let index_uid = index_metadata.index_uid.clone(); - let source_id = INGEST_SOURCE_ID.to_string(); + let source_id = INGEST_V2_SOURCE_ID.to_string(); let split_metadata = SplitMetadata::sample_for_regression(); let split = Split { @@ -353,7 +353,7 @@ impl FileBackedIndex { if let Some(checkpoint_delta) = checkpoint_delta_opt { let source_id = checkpoint_delta.source_id.clone(); - if source_id == INGEST_SOURCE_ID { + if source_id == INGEST_V2_SOURCE_ID { let publish_token = publish_token_opt.ok_or_else(|| { let message = format!( "publish token is required for publishing splits for source `{source_id}`" diff --git a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs index 69e8d46f2d4..f1a71ad2ea5 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs @@ -18,17 +18,22 @@ // along with this program. If not, see . use std::collections::HashMap; +use std::time::Instant; use bytes::Bytes; use hyper::StatusCode; +use quickwit_config::enable_ingest_v2; use quickwit_ingest::{ - CommitType, DocBatchBuilder, IngestRequest, IngestResponse, IngestService, IngestServiceClient, + CommitType, DocBatchBuilder, IngestRequest, IngestService, IngestServiceClient, }; +use quickwit_proto::ingest::router::IngestRouterServiceClient; +use quickwit_proto::types::IndexId; use warp::{Filter, Rejection}; +use super::bulk_v2::{elastic_bulk_ingest_v2, ElasticBulkResponse}; use crate::elastic_search_api::filter::{elastic_bulk_filter, elastic_index_bulk_filter}; use crate::elastic_search_api::make_elastic_api_response; -use crate::elastic_search_api::model::{BulkAction, ElasticIngestOptions, ElasticSearchError}; +use crate::elastic_search_api::model::{BulkAction, ElasticBulkOptions, ElasticSearchError}; use crate::format::extract_format_from_qs; use crate::ingest_api::lines; use crate::with_arg; @@ -36,11 +41,13 @@ use crate::with_arg; /// POST `_elastic/_bulk` pub fn es_compat_bulk_handler( ingest_service: IngestServiceClient, + ingest_router: IngestRouterServiceClient, ) -> impl Filter + Clone { elastic_bulk_filter() .and(with_arg(ingest_service)) - .then(|body, ingest_option, ingest_service| { - elastic_ingest_bulk(None, body, ingest_option, ingest_service) + .and(with_arg(ingest_router)) + .then(|body, bulk_options, ingest_service, ingest_router| { + elastic_ingest_bulk(None, body, bulk_options, ingest_service, ingest_router) }) .and(extract_format_from_qs()) .map(make_elastic_api_response) @@ -49,22 +56,37 @@ pub fn es_compat_bulk_handler( /// POST `_elastic//_bulk` pub fn es_compat_index_bulk_handler( ingest_service: IngestServiceClient, + ingest_router: IngestRouterServiceClient, ) -> impl Filter + Clone { elastic_index_bulk_filter() .and(with_arg(ingest_service)) - .then(|index, body, ingest_option, ingest_service| { - elastic_ingest_bulk(Some(index), body, ingest_option, ingest_service) - }) + .and(with_arg(ingest_router)) + .then( + |index_id, body, bulk_options, ingest_service, ingest_router| { + elastic_ingest_bulk( + Some(index_id), + body, + bulk_options, + ingest_service, + ingest_router, + ) + }, + ) .and(extract_format_from_qs()) .map(make_elastic_api_response) } async fn elastic_ingest_bulk( - index: Option, + default_index_id: Option, body: Bytes, - ingest_options: ElasticIngestOptions, + bulk_options: ElasticBulkOptions, mut ingest_service: IngestServiceClient, -) -> Result { + ingest_router: IngestRouterServiceClient, +) -> Result { + if enable_ingest_v2() { + return elastic_bulk_ingest_v2(default_index_id, body, bulk_options, ingest_router).await; + } + let now = Instant::now(); let mut doc_batch_builders = HashMap::new(); let mut lines = lines(&body).enumerate(); @@ -85,8 +107,8 @@ async fn elastic_ingest_bulk( // ES honors it and create the doc in the requested index. That is, `my-index` is a default // value in case _index: is missing, but not a constraint on each sub-action. let index_id = action - .into_index() - .or_else(|| index.clone()) + .into_index_id() + .or_else(|| default_index_id.clone()) .ok_or_else(|| { ElasticSearchError::new( StatusCode::BAD_REQUEST, @@ -103,13 +125,19 @@ async fn elastic_ingest_bulk( .into_values() .map(|builder| builder.build()) .collect(); - let commit_type: CommitType = ingest_options.refresh.into(); + let commit_type: CommitType = bulk_options.refresh.into(); let ingest_request = IngestRequest { doc_batches, commit: commit_type.into(), }; - let ingest_response = ingest_service.ingest(ingest_request).await?; - Ok(ingest_response) + ingest_service.ingest(ingest_request).await?; + let took_millis = now.elapsed().as_millis() as u64; + let errors = false; + let bulk_response = ElasticBulkResponse { + took_millis, + errors, + }; + Ok(bulk_response) } #[cfg(test)] @@ -119,11 +147,11 @@ mod tests { use hyper::StatusCode; use quickwit_config::{IngestApiConfig, NodeConfig}; - use quickwit_ingest::{ - FetchRequest, IngestResponse, IngestServiceClient, SuggestTruncateRequest, - }; + use quickwit_ingest::{FetchRequest, IngestServiceClient, SuggestTruncateRequest}; + use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_search::MockSearchService; + use crate::elastic_search_api::bulk_v2::ElasticBulkResponse; use crate::elastic_search_api::elastic_api_handlers; use crate::elastic_search_api::model::ElasticSearchError; use crate::ingest_api::setup_ingest_service; @@ -134,7 +162,9 @@ mod tests { let search_service = Arc::new(MockSearchService::new()); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await; - let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let elastic_api_handlers = + elastic_api_handlers(config, search_service, ingest_service, ingest_router); let payload = r#" { "create" : { "_index" : "my-index", "_id" : "1"} } {"id": 1, "message": "push"} @@ -156,7 +186,9 @@ mod tests { let search_service = Arc::new(MockSearchService::new()); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; - let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let elastic_api_handlers = + elastic_api_handlers(config, search_service, ingest_service, ingest_router); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -171,8 +203,8 @@ mod tests { .reply(&elastic_api_handlers) .await; assert_eq!(resp.status(), 200); - let ingest_response: IngestResponse = serde_json::from_slice(resp.body()).unwrap(); - assert_eq!(ingest_response.num_docs_for_processing, 3); + let bulk_response: ElasticBulkResponse = serde_json::from_slice(resp.body()).unwrap(); + assert!(!bulk_response.errors); universe.assert_quit().await; } @@ -182,7 +214,9 @@ mod tests { let search_service = Arc::new(MockSearchService::new()); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index-1"], &IngestApiConfig::default()).await; - let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let elastic_api_handlers = + elastic_api_handlers(config, search_service, ingest_service, ingest_router); let payload = " {\"create\": {\"_index\": \"my-index-1\", \"_id\": \"1674834324802805760\"}} \u{20}\u{20}\u{20}\u{20}\n @@ -194,8 +228,8 @@ mod tests { .reply(&elastic_api_handlers) .await; assert_eq!(resp.status(), 200); - let ingest_response: IngestResponse = serde_json::from_slice(resp.body()).unwrap(); - assert_eq!(ingest_response.num_docs_for_processing, 1); + let bulk_response: ElasticBulkResponse = serde_json::from_slice(resp.body()).unwrap(); + assert!(!bulk_response.errors); universe.assert_quit().await; } @@ -205,7 +239,9 @@ mod tests { let search_service = Arc::new(MockSearchService::new()); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; - let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let elastic_api_handlers = + elastic_api_handlers(config, search_service, ingest_service, ingest_router); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -220,8 +256,8 @@ mod tests { .reply(&elastic_api_handlers) .await; assert_eq!(resp.status(), 200); - let ingest_response: IngestResponse = serde_json::from_slice(resp.body()).unwrap(); - assert_eq!(ingest_response.num_docs_for_processing, 3); + let bulk_response: ElasticBulkResponse = serde_json::from_slice(resp.body()).unwrap(); + assert!(!bulk_response.errors); universe.assert_quit().await; } @@ -231,7 +267,9 @@ mod tests { let search_service = Arc::new(MockSearchService::new()); let (universe, _temp_dir, ingest_service, ingest_service_mailbox) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; - let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let elastic_api_handlers = + elastic_api_handlers(config, search_service, ingest_service, ingest_router); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -248,8 +286,8 @@ mod tests { .await; assert_eq!(resp.status(), 200); - let ingest_response: IngestResponse = serde_json::from_slice(resp.body()).unwrap(); - assert_eq!(ingest_response.num_docs_for_processing, 3); + let bulk_response: ElasticBulkResponse = serde_json::from_slice(resp.body()).unwrap(); + assert!(!bulk_response.errors); }); universe.sleep(Duration::from_secs(10)).await; assert!(!handle.is_finished()); @@ -308,7 +346,9 @@ mod tests { let search_service = Arc::new(MockSearchService::new()); let (universe, _temp_dir, ingest_service, ingest_service_mailbox) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; - let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let elastic_api_handlers = + elastic_api_handlers(config, search_service, ingest_service, ingest_router); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -325,8 +365,8 @@ mod tests { .await; assert_eq!(resp.status(), 200); - let ingest_response: IngestResponse = serde_json::from_slice(resp.body()).unwrap(); - assert_eq!(ingest_response.num_docs_for_processing, 3); + let bulk_response: ElasticBulkResponse = serde_json::from_slice(resp.body()).unwrap(); + assert!(!bulk_response.errors); }); universe.sleep(Duration::from_secs(10)).await; assert!(!handle.is_finished()); @@ -383,7 +423,9 @@ mod tests { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); let ingest_service = IngestServiceClient::from(IngestServiceClient::mock()); - let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let elastic_api_handlers = + elastic_api_handlers(config, search_service, ingest_service, ingest_router); let payload = r#" {"create": {"_index": "my-index", "_id": "1"},} {"id": 1, "message": "my-doc"}"#; diff --git a/quickwit/quickwit-serve/src/elastic_search_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elastic_search_api/bulk_v2.rs new file mode 100644 index 00000000000..493d21ab230 --- /dev/null +++ b/quickwit/quickwit-serve/src/elastic_search_api/bulk_v2.rs @@ -0,0 +1,341 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::time::Instant; + +use bytes::Bytes; +use hyper::StatusCode; +use quickwit_config::INGEST_V2_SOURCE_ID; +use quickwit_ingest::IngestRequestV2Builder; +use quickwit_proto::ingest::router::{IngestRouterService, IngestRouterServiceClient}; +use quickwit_proto::ingest::CommitTypeV2; +use quickwit_proto::types::IndexId; +use serde::{Deserialize, Serialize}; +use tracing::warn; + +use crate::elastic_search_api::model::{BulkAction, ElasticBulkOptions, ElasticSearchError}; +use crate::ingest_api::lines; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct ElasticBulkResponse { + #[serde(rename = "took")] + pub took_millis: u64, + pub errors: bool, +} + +pub(crate) async fn elastic_bulk_ingest_v2( + default_index_id: Option, + body: Bytes, + bulk_options: ElasticBulkOptions, + mut ingest_router: IngestRouterServiceClient, +) -> Result { + let now = Instant::now(); + let mut ingest_request_builder = IngestRequestV2Builder::default(); + let mut lines = lines(&body).enumerate(); + + while let Some((line_no, line)) = lines.next() { + let action = serde_json::from_slice::(line).map_err(|error| { + ElasticSearchError::new( + StatusCode::BAD_REQUEST, + format!("unsupported or malformed action on line #{line_no}: `{error}`"), + ) + })?; + let (_, source) = lines.next().ok_or_else(|| { + ElasticSearchError::new( + StatusCode::BAD_REQUEST, + format!("associated source data with action on line #{line_no} is missing"), + ) + })?; + // When ingesting into `/my-index/_bulk`, if `_index` is set to something other than + // `my-index`, ES honors it and creates the doc for the requested index. That is, + // `my-index` is a default value in case `_index`` is missing, but not a constraint on + // each sub-action. + let index_id = action + .into_index_id() + .or_else(|| default_index_id.clone()) + .ok_or_else(|| { + ElasticSearchError::new( + StatusCode::BAD_REQUEST, + format!("`_index` field of action on line #{line_no} is missing"), + ) + })?; + ingest_request_builder.add_doc(index_id, source); + } + let commit_type: CommitTypeV2 = bulk_options.refresh.into(); + + if commit_type != CommitTypeV2::Auto { + warn!("ingest API v2 does not support the `refresh` parameter (yet)"); + } + let ingest_request_opt = ingest_request_builder.build(INGEST_V2_SOURCE_ID, commit_type); + + if let Some(ingest_request) = ingest_request_opt { + let ingest_response_v2 = ingest_router.ingest(ingest_request).await?; + let took_millis = now.elapsed().as_millis() as u64; + let errors = !ingest_response_v2.failures.is_empty(); + let bulk_response = ElasticBulkResponse { + took_millis, + errors, + }; + Ok(bulk_response) + } else { + Ok(ElasticBulkResponse::default()) + } +} + +#[cfg(test)] +mod tests { + use quickwit_proto::ingest::router::{ + IngestFailure, IngestFailureReason, IngestResponseV2, IngestSuccess, + }; + use quickwit_proto::types::Position; + use warp::{Filter, Rejection, Reply}; + + use super::*; + use crate::elastic_search_api::bulk_v2::ElasticBulkResponse; + use crate::elastic_search_api::filter::elastic_bulk_filter; + use crate::elastic_search_api::make_elastic_api_response; + use crate::elastic_search_api::model::ElasticSearchError; + use crate::format::extract_format_from_qs; + use crate::with_arg; + + fn es_compat_bulk_handler_v2( + ingest_router: IngestRouterServiceClient, + ) -> impl Filter + Clone { + elastic_bulk_filter() + .and(with_arg(ingest_router)) + .then(|body, bulk_options, ingest_router| { + elastic_bulk_ingest_v2(None, body, bulk_options, ingest_router) + }) + .and(extract_format_from_qs()) + .map(make_elastic_api_response) + } + + #[tokio::test] + async fn test_bulk_api_happy_path() { + let mut ingest_router_mock = IngestRouterServiceClient::mock(); + ingest_router_mock + .expect_ingest() + .once() + .returning(|ingest_request| { + assert_eq!(ingest_request.subrequests.len(), 3); + assert_eq!(ingest_request.commit_type(), CommitTypeV2::Auto); + + let mut subrequests = ingest_request.subrequests; + assert_eq!(subrequests[0].subrequest_id, 0); + assert_eq!(subrequests[1].subrequest_id, 1); + assert_eq!(subrequests[2].subrequest_id, 2); + + subrequests.sort_by(|left, right| left.index_id.cmp(&right.index_id)); + + assert_eq!(subrequests[0].index_id, "my-index-1"); + assert_eq!(subrequests[0].source_id, INGEST_V2_SOURCE_ID); + assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_docs(), 2); + assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 96); + + assert_eq!(subrequests[1].index_id, "my-index-2"); + assert_eq!(subrequests[1].source_id, INGEST_V2_SOURCE_ID); + assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_docs(), 1); + assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 48); + + assert_eq!(subrequests[2].index_id, "my-index-3"); + assert_eq!(subrequests[2].source_id, INGEST_V2_SOURCE_ID); + assert_eq!(subrequests[2].doc_batch.as_ref().unwrap().num_docs(), 1); + assert_eq!(subrequests[2].doc_batch.as_ref().unwrap().num_bytes(), 48); + + Ok(IngestResponseV2 { + successes: vec![ + IngestSuccess { + subrequest_id: 0, + index_uid: "my-index-1:0".to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), + shard_id: 1, + replication_position_inclusive: Some(Position::offset(1u64)), + }, + IngestSuccess { + subrequest_id: 1, + index_uid: "my-index-2:0".to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), + shard_id: 1, + replication_position_inclusive: Some(Position::offset(0u64)), + }, + ], + failures: vec![IngestFailure { + subrequest_id: 2, + index_id: "my-index-3".to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), + reason: IngestFailureReason::IndexNotFound as i32, + }], + }) + }); + let ingest_router = IngestRouterServiceClient::from(ingest_router_mock); + let handler = es_compat_bulk_handler_v2(ingest_router); + + let payload = r#" + {"create": {"_index": "my-index-1", "_id" : "1"}} + {"ts": 1, "message": "my-message-1"} + {"create": {"_index": "my-index-2", "_id" : "1"}} + {"ts": 1, "message": "my-message-1"} + {"create": {"_index": "my-index-1"}} + {"ts": 2, "message": "my-message-2"} + {"create": {"_index": "my-index-3"}} + {"ts": 1, "message": "my-message-1"} + "#; + let response = warp::test::request() + .path("/_elastic/_bulk") + .method("POST") + .body(payload) + .reply(&handler) + .await; + assert_eq!(response.status(), 200); + + let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap(); + assert!(bulk_response.errors); + } + + #[tokio::test] + async fn test_bulk_api_accepts_empty_requests() { + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let handler = es_compat_bulk_handler_v2(ingest_router); + + let response = warp::test::request() + .path("/_elastic/_bulk") + .method("POST") + .body("") + .reply(&handler) + .await; + assert_eq!(response.status(), 200); + + let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap(); + assert!(!bulk_response.errors) + } + + #[tokio::test] + async fn test_bulk_api_ignores_blank_lines() { + let mut ingest_router_mock = IngestRouterServiceClient::mock(); + ingest_router_mock + .expect_ingest() + .once() + .returning(|ingest_request| { + assert_eq!(ingest_request.subrequests.len(), 1); + assert_eq!(ingest_request.commit_type(), CommitTypeV2::Auto); + + let subrequest_0 = &ingest_request.subrequests[0]; + + assert_eq!(subrequest_0.index_id, "my-index-1"); + assert_eq!(subrequest_0.source_id, INGEST_V2_SOURCE_ID); + assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_docs(), 1); + assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 48); + + Ok(IngestResponseV2 { + successes: vec![IngestSuccess { + subrequest_id: 0, + index_uid: "my-index-1:0".to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), + shard_id: 1, + replication_position_inclusive: Some(Position::offset(0u64)), + }], + failures: Vec::new(), + }) + }); + let ingest_router = IngestRouterServiceClient::from(ingest_router_mock); + let handler = es_compat_bulk_handler_v2(ingest_router); + + let payload = r#" + + {"create": {"_index": "my-index-1", "_id" : "1"}} + + {"ts": 1, "message": "my-message-1"} + "#; + let response = warp::test::request() + .path("/_elastic/_bulk") + .method("POST") + .body(payload) + .reply(&handler) + .await; + assert_eq!(response.status(), 200); + + let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap(); + assert!(!bulk_response.errors); + } + + #[tokio::test] + async fn test_bulk_api_handles_malformed_requests() { + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let handler = es_compat_bulk_handler_v2(ingest_router); + + let payload = r#" + {"create": {"_index": "my-index-1", "_id" : "1"},} + {"ts": 1, "message": "my-message-1"} + "#; + let response = warp::test::request() + .path("/_elastic/_bulk") + .method("POST") + .body(payload) + .reply(&handler) + .await; + assert_eq!(response.status(), 400); + + let es_error: ElasticSearchError = serde_json::from_slice(response.body()).unwrap(); + assert_eq!(es_error.status, StatusCode::BAD_REQUEST); + + let reason = es_error.error.reason.unwrap(); + assert_eq!( + reason, + "unsupported or malformed action on line #0: `expected value at line 1 column 60`" + ); + + let payload = r#" + {"create": {"_index": "my-index-1", "_id" : "1"}} + "#; + let response = warp::test::request() + .path("/_elastic/_bulk") + .method("POST") + .body(payload) + .reply(&handler) + .await; + assert_eq!(response.status(), 400); + + let es_error: ElasticSearchError = serde_json::from_slice(response.body()).unwrap(); + assert_eq!(es_error.status, StatusCode::BAD_REQUEST); + + let reason = es_error.error.reason.unwrap(); + assert_eq!( + reason, + "associated source data with action on line #0 is missing" + ); + + let payload = r#" + {"create": {"_id" : "1"}} + {"ts": 1, "message": "my-message-1"} + "#; + let response = warp::test::request() + .path("/_elastic/_bulk") + .method("POST") + .body(payload) + .reply(&handler) + .await; + assert_eq!(response.status(), 400); + + let es_error: ElasticSearchError = serde_json::from_slice(response.body()).unwrap(); + assert_eq!(es_error.status, StatusCode::BAD_REQUEST); + + let reason = es_error.error.reason.unwrap(); + assert_eq!(reason, "`_index` field of action on line #0 is missing"); + } +} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs index 9edc0575e8b..8f5b6660fd7 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs @@ -27,11 +27,11 @@ use super::model::{ FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams, }; use crate::elastic_search_api::model::{ - ElasticIngestOptions, ScrollQueryParams, SearchBody, SearchQueryParams, + ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams, }; use crate::search_api::extract_index_id_patterns; -const BODY_LENGTH_LIMIT: ByteSize = ByteSize::mb(1); +const BODY_LENGTH_LIMIT: ByteSize = ByteSize::mib(1); const CONTENT_LENGTH_LIMIT: ByteSize = ByteSize::mib(10); // TODO: Make all elastic endpoint models `utoipa` compatible @@ -69,7 +69,7 @@ pub(crate) fn elastic_search_filter( ) )] pub(crate) fn elastic_bulk_filter( -) -> impl Filter + Clone { +) -> impl Filter + Clone { warp::path!("_elastic" / "_bulk") .and(warp::post()) .and(warp::body::content_length_limit( @@ -142,14 +142,14 @@ pub(crate) fn elastic_index_search_filter( ) )] pub(crate) fn elastic_index_bulk_filter( -) -> impl Filter + Clone { +) -> impl Filter + Clone { warp::path!("_elastic" / String / "_bulk") .and(warp::post()) .and(warp::body::content_length_limit( CONTENT_LENGTH_LIMIT.as_u64(), )) .and(warp::body::bytes()) - .and(serde_qs::warp::query::( + .and(serde_qs::warp::query::( serde_qs::Config::default(), )) } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index d24185f59a8..40f77c64583 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . mod bulk; +mod bulk_v2; mod filter; mod model; mod rest_handler; @@ -29,6 +30,7 @@ pub use filter::ElasticCompatibleApi; use hyper::StatusCode; use quickwit_config::NodeConfig; use quickwit_ingest::IngestServiceClient; +use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_search::SearchService; use rest_handler::{ es_compat_cluster_info_handler, es_compat_index_multi_search_handler, @@ -50,6 +52,7 @@ pub fn elastic_api_handlers( node_config: Arc, search_service: Arc, ingest_service: IngestServiceClient, + ingest_router: IngestRouterServiceClient, ) -> impl Filter + Clone { es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) @@ -59,8 +62,11 @@ pub fn elastic_api_handlers( .or(es_compat_index_search_handler(search_service.clone())) .or(es_compat_scroll_handler(search_service.clone())) .or(es_compat_index_multi_search_handler(search_service)) - .or(es_compat_bulk_handler(ingest_service.clone())) - .or(es_compat_index_bulk_handler(ingest_service)) + .or(es_compat_bulk_handler( + ingest_service.clone(), + ingest_router.clone(), + )) + .or(es_compat_index_bulk_handler(ingest_service, ingest_router)) // Register newly created handlers here. } @@ -113,6 +119,7 @@ mod tests { use mockall::predicate; use quickwit_config::NodeConfig; use quickwit_ingest::{IngestApiService, IngestServiceClient}; + use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_search::MockSearchService; use serde_json::Value as JsonValue; use warp::Filter; @@ -147,10 +154,12 @@ mod tests { }, )) .returning(|_| Ok(Default::default())); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), + ingest_router, ); let msearch_payload = r#" {"index":"index-1"} @@ -194,10 +203,13 @@ mod tests { )) } }); + + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), + ingest_router, ); let msearch_payload = r#" {"index":"index-1"} @@ -229,10 +241,13 @@ mod tests { async fn test_msearch_api_return_400_with_malformed_request_header() { let config = Arc::new(NodeConfig::for_test()); let mock_search_service = MockSearchService::new(); + + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), + ingest_router, ); let msearch_payload = r#" {"index":"index-1" @@ -257,10 +272,13 @@ mod tests { async fn test_msearch_api_return_400_with_malformed_request_body() { let config = Arc::new(NodeConfig::for_test()); let mock_search_service = MockSearchService::new(); + + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); let es_search_api_handler = elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), + ingest_router, ); let msearch_payload = r#" {"index":"index-1"} @@ -285,10 +303,13 @@ mod tests { async fn test_msearch_api_return_400_with_only_a_header_request() { let config = Arc::new(NodeConfig::for_test()); let mock_search_service = MockSearchService::new(); + + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), + ingest_router, ); let msearch_payload = r#" {"index":"index-1"} @@ -312,10 +333,13 @@ mod tests { async fn test_msearch_api_return_400_with_no_index() { let config = Arc::new(NodeConfig::for_test()); let mock_search_service = MockSearchService::new(); + + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), + ingest_router, ); let msearch_payload = r#" {} @@ -352,10 +376,12 @@ mod tests { )) } }); + let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), + ingest_router, ); let msearch_payload = r#" {"index": ["index-1", "index-2"]} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_body.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_body.rs index 32b7de82db5..d4c65e56dad 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_body.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_body.rs @@ -17,17 +17,18 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use quickwit_proto::types::IndexId; use serde::Deserialize; #[derive(Clone, Debug, Deserialize, PartialEq)] #[serde(rename_all(deserialize = "lowercase"))] pub enum BulkAction { - Index(BulkActionMeta), Create(BulkActionMeta), + Index(BulkActionMeta), } impl BulkAction { - pub fn into_index(self) -> Option { + pub fn into_index_id(self) -> Option { match self { BulkAction::Index(meta) => meta.index_id, BulkAction::Create(meta) => meta.index_id, @@ -39,7 +40,7 @@ impl BulkAction { pub struct BulkActionMeta { #[serde(alias = "_index")] #[serde(default)] - pub index_id: Option, + pub index_id: Option, #[serde(alias = "_id")] #[serde(default)] pub doc_id: Option, @@ -47,7 +48,8 @@ pub struct BulkActionMeta { #[cfg(test)] mod tests { - use crate::elastic_search_api::model::{BulkAction, BulkActionMeta}; + use crate::elastic_search_api::model::bulk_body::BulkActionMeta; + use crate::elastic_search_api::model::BulkAction; #[test] fn test_bulk_action_serde() { diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_query_params.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_query_params.rs index 568b135cd68..593c6364bda 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_query_params.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/bulk_query_params.rs @@ -18,10 +18,11 @@ // along with this program. If not, see . use quickwit_ingest::CommitType; +use quickwit_proto::ingest::CommitTypeV2; use serde::Deserialize; -#[derive(Clone, Debug, Default, Deserialize, PartialEq)] -pub struct ElasticIngestOptions { +#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq)] +pub struct ElasticBulkOptions { #[serde(default)] pub refresh: ElasticRefresh, } @@ -32,7 +33,7 @@ pub struct ElasticIngestOptions { /// - Absence of ?refresh parameter or ?refresh=false means no refresh /// - Presence of ?refresh parameter without any values or ?refresh=true means force refresh /// - ?refresh=wait_for means wait for refresh -#[derive(Clone, Debug, Deserialize, PartialEq, utoipa::ToSchema)] +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, utoipa::ToSchema)] #[serde(rename_all(deserialize = "snake_case"))] #[derive(Default)] pub enum ElasticRefresh { @@ -53,51 +54,62 @@ pub enum ElasticRefresh { impl From for CommitType { fn from(val: ElasticRefresh) -> Self { match val { - ElasticRefresh::False => CommitType::Auto, - ElasticRefresh::True => CommitType::Force, - ElasticRefresh::WaitFor => CommitType::WaitFor, + ElasticRefresh::False => Self::Auto, + ElasticRefresh::True => Self::Force, + ElasticRefresh::WaitFor => Self::WaitFor, + } + } +} + +impl From for CommitTypeV2 { + fn from(val: ElasticRefresh) -> Self { + match val { + ElasticRefresh::False => Self::Auto, + ElasticRefresh::True => Self::Force, + ElasticRefresh::WaitFor => Self::Wait, } } } #[cfg(test)] mod tests { - use crate::elastic_search_api::model::{ElasticIngestOptions, ElasticRefresh}; + use crate::elastic_search_api::model::bulk_query_params::ElasticRefresh; + use crate::elastic_search_api::model::ElasticBulkOptions; #[test] fn test_elastic_refresh_parsing() { assert_eq!( - serde_qs::from_str::("") + serde_qs::from_str::("") .unwrap() .refresh, ElasticRefresh::False ); assert_eq!( - serde_qs::from_str::("refresh=true") + serde_qs::from_str::("refresh=true") .unwrap() .refresh, ElasticRefresh::True ); assert_eq!( - serde_qs::from_str::("refresh=false") + serde_qs::from_str::("refresh=false") .unwrap() .refresh, ElasticRefresh::False ); assert_eq!( - serde_qs::from_str::("refresh=wait_for") + serde_qs::from_str::("refresh=wait_for") .unwrap() .refresh, ElasticRefresh::WaitFor ); assert_eq!( - serde_qs::from_str::("refresh") + serde_qs::from_str::("refresh") .unwrap() .refresh, ElasticRefresh::True ); assert_eq!( - serde_qs::from_str::("refresh=wait") + serde_qs::from_str::("refresh=wait") .unwrap_err() .to_string(), "unknown variant `wait`, expected one of `false`, `true`, `wait_for`" diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/error.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/error.rs index 5e3f1120612..4e6f3efc21f 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/error.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/error.rs @@ -20,6 +20,7 @@ use elasticsearch_dsl::search::ErrorCause; use hyper::StatusCode; use quickwit_ingest::IngestServiceError; +use quickwit_proto::ingest::IngestV2Error; use quickwit_proto::ServiceError; use quickwit_search::SearchError; use serde::{Deserialize, Serialize}; @@ -38,9 +39,9 @@ impl ElasticSearchError { error: ErrorCause { reason: Some(reason_string), caused_by: None, - root_cause: vec![], + root_cause: Vec::new(), stack_trace: None, - suppressed: vec![], + suppressed: Vec::new(), ty: None, additional_details: Default::default(), }, @@ -55,9 +56,9 @@ impl From for ElasticSearchError { let reason = ErrorCause { reason: Some(search_error.to_string()), caused_by: None, - root_cause: vec![], + root_cause: Vec::new(), stack_trace: None, - suppressed: vec![], + suppressed: Vec::new(), ty: None, additional_details: Default::default(), }; @@ -75,9 +76,29 @@ impl From for ElasticSearchError { let reason = ErrorCause { reason: Some(ingest_service_error.to_string()), caused_by: None, - root_cause: vec![], + root_cause: Vec::new(), stack_trace: None, - suppressed: vec![], + suppressed: Vec::new(), + ty: None, + additional_details: Default::default(), + }; + ElasticSearchError { + status, + error: reason, + } + } +} + +impl From for ElasticSearchError { + fn from(ingest_error: IngestV2Error) -> Self { + let status = ingest_error.error_code().to_http_status_code(); + + let reason = ErrorCause { + reason: Some(ingest_error.to_string()), + caused_by: None, + root_cause: Vec::new(), + stack_trace: None, + suppressed: Vec::new(), ty: None, additional_details: Default::default(), }; diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs index 7ff0337af40..d5731ff8806 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs @@ -26,8 +26,8 @@ mod scroll; mod search_body; mod search_query_params; -pub use bulk_body::{BulkAction, BulkActionMeta}; -pub use bulk_query_params::{ElasticIngestOptions, ElasticRefresh}; +pub use bulk_body::BulkAction; +pub use bulk_query_params::ElasticBulkOptions; pub use error::ElasticSearchError; pub use field_capability::{ build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index c21e4b246db..1511fe47a06 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -17,17 +17,16 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use quickwit_config::{IngestApiConfig, INGEST_SOURCE_ID}; +use bytes::{Buf, Bytes}; +use quickwit_config::{IngestApiConfig, INGEST_V2_SOURCE_ID}; use quickwit_ingest::{ - CommitType, DocBatchBuilder, FetchResponse, IngestRequest, IngestResponse, IngestService, - IngestServiceClient, IngestServiceError, TailRequest, + CommitType, DocBatchBuilder, DocBatchV2Builder, FetchResponse, IngestRequest, IngestResponse, + IngestService, IngestServiceClient, IngestServiceError, TailRequest, }; use quickwit_proto::ingest::router::{ IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest, }; -use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::types::IndexId; use serde::Deserialize; use thiserror::Error; @@ -128,22 +127,23 @@ async fn ingest_v2( ingest_options: IngestOptions, mut ingest_router: IngestRouterServiceClient, ) -> Result { - let mut doc_buffer = BytesMut::new(); - let mut doc_lengths = Vec::new(); + let mut doc_batch_builder = DocBatchV2Builder::default(); - for line in lines(&body) { - doc_lengths.push(line.len() as u32); - doc_buffer.put(line); + for doc in lines(&body) { + doc_batch_builder.add_doc(doc); } - let num_docs = doc_lengths.len(); - let doc_batch = DocBatchV2 { - doc_buffer: doc_buffer.freeze(), - doc_lengths, + let doc_batch_opt = doc_batch_builder.build(); + + let Some(doc_batch) = doc_batch_opt else { + let response = IngestResponse::default(); + return Ok(response); }; + let num_docs = doc_batch.num_docs(); + let subrequest = IngestSubrequest { subrequest_id: 0, index_id, - source_id: INGEST_SOURCE_ID.to_string(), + source_id: INGEST_V2_SOURCE_ID.to_string(), doc_batch: Some(doc_batch), }; let request = IngestRequestV2 { @@ -259,8 +259,12 @@ async fn tail_endpoint( pub(crate) fn lines(body: &Bytes) -> impl Iterator { body.split(|byte| byte == &b'\n') - .filter(|line| !line.iter().all(|&b| b.is_ascii_whitespace())) - .filter(|line| !line.is_empty()) + .filter(|line| !is_empty_or_blank_line(line)) +} + +#[inline] +fn is_empty_or_blank_line(line: &[u8]) -> bool { + line.is_empty() || line.iter().all(|ch| ch.is_ascii_whitespace()) } #[cfg(test)] diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 47ae8164016..8b9fc868e07 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -154,17 +154,6 @@ pub(crate) async fn start_rest_server( fn api_v1_routes( quickwit_services: Arc, ) -> impl Filter + Clone { - if !quickwit_services - .node_config - .rest_config - .extra_headers - .is_empty() - { - info!( - "Extra headers will be added to all responses: {:?}", - quickwit_services.node_config.rest_config.extra_headers - ); - } let api_v1_root_url = warp::path!("api" / "v1" / ..); api_v1_root_url.and( cluster_handler(quickwit_services.cluster.clone()) @@ -206,6 +195,7 @@ fn api_v1_routes( quickwit_services.node_config.clone(), quickwit_services.search_service.clone(), quickwit_services.ingest_service.clone(), + quickwit_services.ingest_router_service.clone(), )), ) }