diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index bcaddd5a4e1..73cc428139a 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -785,9 +785,9 @@ impl Handler for ControlPlane { .get_or_create_open_shards(request, &mut self.model, ctx.progress()) .await { - Ok(resp) => { + Ok(response) => { let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx); - Ok(Ok(resp)) + Ok(Ok(response)) } Err(metastore_error) => convert_metastore_error(metastore_error), } @@ -1055,7 +1055,7 @@ mod tests { ListShardsResponse, ListShardsSubresponse, MetastoreError, MockMetastoreService, OpenShardSubresponse, OpenShardsResponse, SourceType, }; - use quickwit_proto::types::Position; + use quickwit_proto::types::{DocMappingUid, Position}; use tokio::sync::Mutex; use super::*; @@ -1702,7 +1702,6 @@ mod tests { #[tokio::test] async fn test_fill_shard_table_position_from_metastore_on_startup() { - quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let node_id = NodeId::new("control-plane-node".to_string()); let indexer_pool = IndexerPool::default(); @@ -1917,6 +1916,7 @@ mod tests { leader_id: "node1".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: None, publish_token: None, }], @@ -2046,6 +2046,7 @@ mod tests { leader_id: "node1".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: None, publish_token: None, }], @@ -2333,6 +2334,7 @@ mod tests { leader_id: "test-ingester".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }), @@ -2486,6 +2488,7 @@ mod tests { leader_id: "test-ingester".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }), diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 6084d30db86..5a229f89130 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -130,17 +130,17 @@ impl fmt::Debug for IngestController { /// It is up to the client to check the error type and see if the control plane actor should be /// restarted. async fn open_shards_on_metastore_and_model( - open_shards_subrequests: Vec, + open_shard_subrequests: Vec, metastore: &mut MetastoreServiceClient, model: &mut ControlPlaneModel, ) -> MetastoreResult { - if open_shards_subrequests.is_empty() { + if open_shard_subrequests.is_empty() { return Ok(OpenShardsResponse { subresponses: Vec::new(), }); } let open_shards_request = OpenShardsRequest { - subrequests: open_shards_subrequests, + subrequests: open_shard_subrequests, }; let open_shards_response = metastore.open_shards(open_shards_request).await?; for open_shard_subresponse in &open_shards_response.subresponses { @@ -163,7 +163,7 @@ fn get_open_shard_from_model( return Err(GetOrCreateOpenShardsFailureReason::IndexNotFound); }; let Some(open_shard_entries) = model.find_open_shards( - &index_uid, + index_uid, &get_open_shards_subrequest.source_id, unavailable_leaders, ) else { @@ -179,7 +179,7 @@ fn get_open_shard_from_model( .collect(); Ok(Some(GetOrCreateOpenShardsSuccess { subrequest_id: get_open_shards_subrequest.subrequest_id, - index_uid: index_uid.into(), + index_uid: Some(index_uid.clone()), source_id: get_open_shards_subrequest.source_id.clone(), open_shards, })) @@ -336,7 +336,7 @@ impl IngestController { let unavailable_leaders: FnvHashSet = get_open_shards_request .unavailable_leaders .into_iter() - .map(Into::into) + .map(NodeId::from) .collect(); // We do a first pass to identify the shards that are missing from the model and need to be @@ -350,7 +350,8 @@ impl IngestController { // create them after this loop. let index_uid = model .index_uid(&get_open_shards_subrequest.index_id) - .unwrap(); + .expect("index should exist") + .clone(); let source_uid = SourceUid { index_uid, source_id: get_open_shards_subrequest.source_id.clone(), @@ -381,7 +382,6 @@ impl IngestController { error!(error=?metastore_error, "failed to open shards on the metastore"); } } - for get_open_shards_subrequest in get_open_shards_request.subrequests { match get_open_shard_from_model( &get_open_shards_subrequest, @@ -403,11 +403,11 @@ impl IngestController { } } } - - Ok(GetOrCreateOpenShardsResponse { + let response = GetOrCreateOpenShardsResponse { successes: get_or_create_open_shards_successes, failures: get_or_create_open_shards_failures, - }) + }; + Ok(response) } /// Allocates and assigns new shards to ingesters. @@ -510,7 +510,7 @@ impl IngestController { let mut failures = Vec::new(); let mut per_leader_shards_to_init: HashMap> = - HashMap::default(); + HashMap::new(); for init_shard_subrequest in init_shard_subrequests { let leader_id = init_shard_subrequest.shard().leader_id.clone(); @@ -593,25 +593,25 @@ impl IngestController { { return Ok(()); } - let new_num_open_shards = shard_stats.num_open_shards + 1; - let new_shard_source_uids: HashMap = - std::iter::once((source_uid.clone(), 1)).collect(); + HashMap::from_iter([(source_uid.clone(), 1)]); let successful_source_uids_res = self .try_open_shards(new_shard_source_uids, model, &Default::default(), progress) .await; + match successful_source_uids_res { Ok(successful_source_uids) => { assert!(successful_source_uids.len() <= 1); + if successful_source_uids.is_empty() { // We did not manage to create the shard. // We can release our permit. model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); warn!( index_uid=%source_uid.index_uid, - source_id=%source_uid.source_id, - "scaling up number of shards to {new_num_open_shards} failed: shard initialization failure" + source_id=%source_uid.source_id, + "scaling up number of shards to {new_num_open_shards} failed: shard initialization failure" ); } else { info!( @@ -660,12 +660,11 @@ impl IngestController { unavailable_leaders: &FnvHashSet, progress: &Progress, ) -> MetastoreResult> { - let num_shards = source_uids.values().sum::(); + let num_shards: usize = source_uids.values().sum(); if num_shards == 0 { return Ok(HashMap::default()); } - // TODO unavailable leaders let Some(leader_follower_pairs) = self.allocate_shards(num_shards, unavailable_leaders, model) @@ -683,14 +682,22 @@ impl IngestController { .zip(leader_follower_pairs) .enumerate() { + let shard_id = ShardId::from(Ulid::new()); + + let index_metadata = model + .index_metadata(&source_uid.index_uid) + .expect("index should exist"); + let doc_mapping_uid = index_metadata.index_config.doc_mapping.doc_mapping_uid; + let shard = Shard { index_uid: Some(source_uid.index_uid.clone()), source_id: source_uid.source_id.clone(), - shard_id: Some(ShardId::from(Ulid::new())), + shard_id: Some(shard_id), leader_id: leader_id.to_string(), follower_id: follower_id_opt.as_ref().map(ToString::to_string), shard_state: ShardState::Open as i32, - publish_position_inclusive: Some(quickwit_proto::types::Position::default()), + doc_mapping_uid: Some(doc_mapping_uid), + publish_position_inclusive: Some(Position::Beginning), publish_token: None, }; let init_shard_subrequest = InitShardSubrequest { @@ -703,12 +710,13 @@ impl IngestController { // Let's first attempt to initialize these shards. let init_shards_response = self.init_shards(init_shard_subrequests, progress).await; - let open_shards_subrequests = init_shards_response + let open_shard_subrequests = init_shards_response .successes .into_iter() .enumerate() .map(|(subrequest_id, init_shard_success)| { let shard = init_shard_success.shard(); + OpenShardSubrequest { subrequest_id: subrequest_id as u32, index_uid: shard.index_uid.clone(), @@ -716,25 +724,27 @@ impl IngestController { shard_id: shard.shard_id.clone(), leader_id: shard.leader_id.clone(), follower_id: shard.follower_id.clone(), + doc_mapping_uid: shard.doc_mapping_uid, } }) .collect(); let OpenShardsResponse { subresponses } = progress .protect_future(open_shards_on_metastore_and_model( - open_shards_subrequests, + open_shard_subrequests, &mut self.metastore, model, )) .await?; - let mut open_shards_count = HashMap::default(); + let mut per_source_num_open_shards: HashMap = HashMap::new(); + for open_shard_subresponse in subresponses { let source_uid = open_shard_subresponse.open_shard().source_uid(); - *open_shards_count.entry(source_uid).or_default() += 1; + *per_source_num_open_shards.entry(source_uid).or_default() += 1; } - Ok(open_shards_count) + Ok(per_source_num_open_shards) } /// Attempts to decrease the number of shards. This operation is rate limited to avoid closing @@ -770,11 +780,12 @@ impl IngestController { return Ok(()); }; let shard_pkeys = vec![ShardPKey { - index_uid: source_uid.index_uid.clone().into(), + index_uid: Some(source_uid.index_uid.clone()), source_id: source_uid.source_id.clone(), shard_id: Some(shard_id.clone()), }]; let close_shards_request = CloseShardsRequest { shard_pkeys }; + if let Err(error) = progress .protect_future(ingester.close_shards(close_shards_request)) .await @@ -888,7 +899,7 @@ impl IngestController { return Vec::new(); } - let mut per_leader_open_shards: HashMap<&str, Vec<&ShardEntry>> = HashMap::default(); + let mut per_leader_open_shards: HashMap<&str, Vec<&ShardEntry>> = HashMap::new(); for shard in model.all_shards() { if shard.is_open() { @@ -948,7 +959,7 @@ impl IngestController { let num_shards_to_move = shards_to_move.len(); info!("rebalancing {} shards", num_shards_to_move); - let mut new_shards_source_uids: HashMap = HashMap::default(); + let mut new_shards_source_uids: HashMap = HashMap::new(); for shard in &shards_to_move { *new_shards_source_uids .entry(shard.source_uid()) @@ -969,13 +980,14 @@ impl IngestController { let mut shards_to_close = Vec::new(); for shard in shards_to_move { let source_uid = shard.source_uid(); - let Some(count) = successfully_source_uids.get_mut(&source_uid) else { + let Some(num_open_shards) = successfully_source_uids.get_mut(&source_uid) else { continue; }; - if *count == 0 { + if *num_open_shards == 0 { continue; }; - *count -= 1; + *num_open_shards -= 1; + let leader_id = NodeId::from(shard.leader_id.clone()); let shard_pkey = ShardPKey { index_uid: shard.index_uid.clone(), @@ -1139,7 +1151,7 @@ mod tests { use quickwit_proto::metastore::{ self, MetastoreError, MockMetastoreService, OpenShardSubresponse, }; - use quickwit_proto::types::{Position, SourceId}; + use quickwit_proto::types::{DocMappingUid, Position, SourceId}; use super::*; @@ -1148,13 +1160,21 @@ mod tests { let source_id: &'static str = "test-source"; let index_id_0 = "test-index-0"; - let index_metadata_0 = IndexMetadata::for_test(index_id_0, "ram://indexes/test-index-0"); + let mut index_metadata_0 = + IndexMetadata::for_test(index_id_0, "ram://indexes/test-index-0"); let index_uid_0 = index_metadata_0.index_uid.clone(); + let doc_mapping_uid_0 = DocMappingUid::random(); + index_metadata_0.index_config.doc_mapping.doc_mapping_uid = doc_mapping_uid_0; + let index_id_1 = "test-index-1"; - let index_metadata_1 = IndexMetadata::for_test(index_id_1, "ram://indexes/test-index-1"); + let mut index_metadata_1 = + IndexMetadata::for_test(index_id_1, "ram://indexes/test-index-1"); let index_uid_1 = index_metadata_1.index_uid.clone(); + let doc_mapping_uid_1 = DocMappingUid::random(); + index_metadata_1.index_config.doc_mapping.doc_mapping_uid = doc_mapping_uid_1; + let progress = Progress::default(); let mut mock_metastore = MockMetastoreService::new(); @@ -1164,7 +1184,8 @@ mod tests { move |request| { assert_eq!(request.subrequests.len(), 1); assert_eq!(request.subrequests[0].index_uid(), &index_uid_1); - assert_eq!(&request.subrequests[0].source_id, source_id); + assert_eq!(request.subrequests[0].source_id, source_id); + assert_eq!(request.subrequests[0].doc_mapping_uid(), doc_mapping_uid_1); let subresponses = vec![metastore::OpenShardSubresponse { subrequest_id: 1, @@ -1174,6 +1195,7 @@ mod tests { shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, leader_id: "test-ingester-2".to_string(), + doc_mapping_uid: Some(doc_mapping_uid_1), ..Default::default() }), }]; @@ -1240,6 +1262,7 @@ mod tests { shard_id: Some(ShardId::from(1)), leader_id: "test-ingester-0".to_string(), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(doc_mapping_uid_0), ..Default::default() }, Shard { @@ -1248,6 +1271,7 @@ mod tests { shard_id: Some(ShardId::from(2)), leader_id: "test-ingester-1".to_string(), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(doc_mapping_uid_0), ..Default::default() }, ]; @@ -1311,6 +1335,7 @@ mod tests { assert_eq!(success.open_shards.len(), 1); assert_eq!(success.open_shards[0].shard_id(), ShardId::from(2)); assert_eq!(success.open_shards[0].leader_id, "test-ingester-1"); + assert_eq!(success.open_shards[0].doc_mapping_uid(), doc_mapping_uid_0); let success = &response.successes[1]; assert_eq!(success.subrequest_id, 1); @@ -1319,6 +1344,7 @@ mod tests { assert_eq!(success.open_shards.len(), 1); assert_eq!(success.open_shards[0].shard_id(), ShardId::from(1)); assert_eq!(success.open_shards[0].leader_id, "test-ingester-2"); + assert_eq!(success.open_shards[0].doc_mapping_uid(), doc_mapping_uid_1); let failure = &response.failures[0]; assert_eq!(failure.subrequest_id, 2); @@ -1754,7 +1780,7 @@ mod tests { // - ingester 2 will time out; // - ingester 3 will be unavailable. - let init_shard_sub_requests: Vec = vec![ + let init_shard_subrequests: Vec = vec![ InitShardSubrequest { subrequest_id: 0, shard: Some(Shard { @@ -1812,7 +1838,7 @@ mod tests { }, ]; let init_shards_response = ingest_controller - .init_shards(init_shard_sub_requests, &Progress::default()) + .init_shards(init_shard_subrequests, &Progress::default()) .await; assert_eq!(init_shards_response.successes.len(), 1); assert_eq!(init_shards_response.failures.len(), 4); @@ -1862,19 +1888,20 @@ mod tests { index_uid: subrequest.index_uid.clone(), source_id: subrequest.source_id.clone(), shard_id: subrequest.shard_id.clone(), + shard_state: ShardState::Open as i32, leader_id: subrequest.leader_id.clone(), follower_id: subrequest.follower_id.clone(), - shard_state: ShardState::Open as i32, + doc_mapping_uid: subrequest.doc_mapping_uid, publish_position_inclusive: Some(Position::Beginning), publish_token: None, }; - let resp = OpenShardsResponse { + let response = OpenShardsResponse { subresponses: vec![OpenShardSubresponse { subrequest_id: subrequest.subrequest_id, open_shard: Some(shard), }], }; - Ok(resp) + Ok(response) }); let metastore = MetastoreServiceClient::from_mock(mock_metastore); let ingester_pool = IngesterPool::default(); @@ -1884,6 +1911,7 @@ mod tests { IngestController::new(metastore, ingester_pool.clone(), replication_factor); let index_uid = IndexUid::for_test("test-index", 0); + let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index"); let source_id: SourceId = "test-source".into(); let source_uid = SourceUid { @@ -1891,6 +1919,7 @@ mod tests { source_id: source_id.clone(), }; let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata); let progress = Progress::default(); let shards = vec![Shard { diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 8948eb0cfca..4883a9bed25 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -152,8 +152,12 @@ impl ControlPlaneModel { Ok(()) } - pub fn index_uid(&self, index_id: &str) -> Option { - self.index_uid_table.get(index_id).cloned() + pub fn index_uid(&self, index_id: &str) -> Option<&IndexUid> { + self.index_uid_table.get(index_id) + } + + pub fn index_metadata(&self, index_uid: &IndexUid) -> Option<&IndexMetadata> { + self.index_table.get(index_uid) } fn update_metrics(&self) { @@ -472,9 +476,9 @@ mod tests { .unwrap(); assert_eq!(model.index_table.len(), 3); - assert_eq!(model.index_uid("test-index-0").unwrap(), index_uid); - assert_eq!(model.index_uid("test-index-1").unwrap(), index_uid2); - assert_eq!(model.index_uid("test-index-2").unwrap(), index_uid3); + assert_eq!(*model.index_uid("test-index-0").unwrap(), index_uid); + assert_eq!(*model.index_uid("test-index-1").unwrap(), index_uid2); + assert_eq!(*model.index_uid("test-index-2").unwrap(), index_uid3); assert_eq!(model.shard_table.num_shards(), 1); @@ -515,7 +519,7 @@ mod tests { assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); assert_eq!(model.index_uid_table.len(), 1); - assert_eq!(model.index_uid("test-index").unwrap(), index_uid); + assert_eq!(*model.index_uid("test-index").unwrap(), index_uid); } #[test] @@ -533,7 +537,7 @@ mod tests { assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); assert_eq!(model.index_uid_table.len(), 1); - assert_eq!(model.index_uid("test-index").unwrap(), index_uid); + assert_eq!(*model.index_uid("test-index").unwrap(), index_uid); assert_eq!(model.shard_table.num_sources(), 1); diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 3d4b85190d4..1d1855bbaed 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -684,7 +684,7 @@ mod tests { }; use quickwit_proto::ingest::{IngestV2Error, MRecordBatch, Shard, ShardState}; use quickwit_proto::metastore::{AcquireShardsResponse, MockMetastoreService}; - use quickwit_proto::types::{IndexUid, PipelineUid}; + use quickwit_proto::types::{DocMappingUid, IndexUid, PipelineUid}; use quickwit_storage::StorageResolver; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::watch; @@ -726,6 +726,7 @@ mod tests { follower_id: None, shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(10u64)), publish_token: Some(publish_token.to_string()), }], @@ -748,6 +749,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), }], @@ -771,6 +773,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), }, @@ -781,6 +784,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(12u64)), publish_token: Some(publish_token.to_string()), }, @@ -1070,6 +1074,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::eof(11u64)), publish_token: Some(publish_token.to_string()), }, @@ -1080,6 +1085,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning.as_eof()), publish_token: Some(publish_token.to_string()), }, @@ -1210,6 +1216,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), }, @@ -1220,6 +1227,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Closed as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::eof(22u64)), publish_token: Some(publish_token.to_string()), }, @@ -1560,6 +1568,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some(publish_token.to_string()), }], diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 8b61438e656..7750e6f5742 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -1263,7 +1263,7 @@ mod tests { use quickwit_proto::ingest::{ DocBatchV2, ShardIdPosition, ShardIdPositions, ShardIds, ShardPKey, }; - use quickwit_proto::types::{queue_id, ShardId, SourceUid}; + use quickwit_proto::types::{queue_id, DocMappingUid, ShardId, SourceUid}; use tokio::task::yield_now; use tokio::time::timeout; use tonic::transport::{Endpoint, Server}; @@ -1585,6 +1585,7 @@ mod tests { shard_state: ShardState::Open as i32, leader_id: ingester_ctx.node_id.to_string(), follower_id: None, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: None, publish_token: None, }; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-uid-field.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-uid-field.down.sql new file mode 100644 index 00000000000..ad769d52f6d --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-uid-field.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE shards + DROP IF EXISTS COLUMN doc_mapping_uid; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-uid-field.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-uid-field.up.sql new file mode 100644 index 00000000000..7fca109b010 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-uid-field.up.sql @@ -0,0 +1,12 @@ +ALTER TABLE shards + ADD COLUMN IF NOT EXISTS doc_mapping_uid VARCHAR(26); + +-- Index metadata has been stable for quite a while, so we allow ourselves to do this, +-- but please, reader of the future, do not reapply this pattern without careful consideration. +UPDATE + shards +SET + doc_mapping_uid = '00000000000000000000000000'; + +ALTER TABLE shards + ALTER COLUMN doc_mapping_uid SET NOT NULL; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 68a6dc56a77..dc5a873fffb 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -82,7 +82,7 @@ pub(crate) struct FileBackedIndex { impl quickwit_config::TestableForRegression for FileBackedIndex { fn sample_for_regression() -> Self { use quickwit_proto::ingest::{Shard, ShardState}; - use quickwit_proto::types::{Position, ShardId}; + use quickwit_proto::types::{DocMappingUid, Position, ShardId}; let index_metadata = IndexMetadata::sample_for_regression(); let index_uid = index_metadata.index_uid.clone(); @@ -104,6 +104,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex { shard_state: ShardState::Open as i32, leader_id: "leader-ingester".to_string(), follower_id: Some("follower-ingester".to_string()), + doc_mapping_uid: Some(DocMappingUid::for_test(1)), publish_position_inclusive: Some(Position::Beginning), ..Default::default() }; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/serialize.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/serialize.rs index 5d0b79c1501..3c10e686dbb 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/serialize.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/serialize.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use itertools::Itertools; use quickwit_proto::ingest::Shard; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::SourceId; +use quickwit_proto::types::{DocMappingUid, SourceId}; use serde::{Deserialize, Serialize}; use super::shards::Shards; @@ -34,6 +34,7 @@ use crate::{IndexMetadata, Split}; #[serde(tag = "version")] pub(crate) enum VersionedFileBackedIndex { #[serde(rename = "0.9")] + V0_9(FileBackedIndexV0_8), // Retro compatibility. #[serde(alias = "0.8")] #[serde(alias = "0.7")] @@ -42,14 +43,22 @@ pub(crate) enum VersionedFileBackedIndex { impl From for VersionedFileBackedIndex { fn from(index: FileBackedIndex) -> Self { - VersionedFileBackedIndex::V0_8(index.into()) + VersionedFileBackedIndex::V0_9(index.into()) } } impl From for FileBackedIndex { fn from(index: VersionedFileBackedIndex) -> Self { match index { - VersionedFileBackedIndex::V0_8(v0_8) => v0_8.into(), + VersionedFileBackedIndex::V0_8(mut v0_8) => { + for shards in v0_8.shards.values_mut() { + for shard in shards { + shard.doc_mapping_uid = Some(DocMappingUid::default()); + } + } + v0_8.into() + } + VersionedFileBackedIndex::V0_9(v0_8) => v0_8.into(), } } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index 3f88cc382a7..3c636afbac4 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -117,7 +117,7 @@ impl Shards { ) -> MetastoreResult> { let mut mutation_occurred = false; - let shard_id = subrequest.shard_id(); + let shard_id = subrequest.shard_id().clone(); let entry = self.shards.entry(shard_id.clone()); let shard = match entry { Entry::Occupied(entry) => entry.get().clone(), @@ -127,8 +127,9 @@ impl Shards { source_id: self.source_id.clone(), shard_id: Some(shard_id.clone()), shard_state: ShardState::Open as i32, - leader_id: subrequest.leader_id.clone(), - follower_id: subrequest.follower_id.clone(), + leader_id: subrequest.leader_id, + follower_id: subrequest.follower_id, + doc_mapping_uid: subrequest.doc_mapping_uid, publish_position_inclusive: Some(Position::Beginning), publish_token: None, }; @@ -307,6 +308,7 @@ impl Shards { #[cfg(test)] mod tests { use quickwit_proto::ingest::ShardState; + use quickwit_proto::types::DocMappingUid; use super::*; @@ -332,6 +334,7 @@ mod tests { shard_id: Some(ShardId::from(1)), leader_id: "leader_id".to_string(), follower_id: None, + doc_mapping_uid: Some(DocMappingUid::default()), }; let MutationOccurred::Yes(subresponse) = shards.open_shard(subrequest.clone()).unwrap() else { @@ -363,6 +366,7 @@ mod tests { shard_id: Some(ShardId::from(2)), leader_id: "leader_id".to_string(), follower_id: Some("follower_id".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), }; let MutationOccurred::Yes(subresponse) = shards.open_shard(subrequest).unwrap() else { panic!("Expected `MutationOccured::No`"); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 9fbe443229b..6577938624f 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -1618,6 +1618,7 @@ async fn open_or_fetch_shard<'e>( .bind(subrequest.shard_id().as_str()) .bind(&subrequest.leader_id) .bind(&subrequest.follower_id) + .bind(subrequest.doc_mapping_uid) .fetch_optional(executor.clone()) .await?; @@ -1777,10 +1778,11 @@ mod tests { sqlx::query(INSERT_SHARD_QUERY) .bind(index_uid) .bind(source_id) - .bind(shard.shard_id().as_str()) + .bind(shard.shard_id()) .bind(shard.shard_state().as_json_str_name()) .bind(&shard.leader_id) .bind(&shard.follower_id) + .bind(shard.doc_mapping_uid) .bind(&shard.publish_position_inclusive().to_string()) .bind(&shard.publish_token) .execute(&self.connection_pool) @@ -1800,7 +1802,7 @@ mod tests { "#, ) .bind(index_uid) - .bind(source_id.as_str()) + .bind(source_id) .fetch_all(&self.connection_pool) .await .unwrap(); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index 1b10190715a..1b8124f30e5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -24,7 +24,7 @@ use std::str::FromStr; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{DeleteQuery, DeleteTask, MetastoreError, MetastoreResult}; -use quickwit_proto::types::{IndexId, IndexUid, ShardId, SourceId, SplitId}; +use quickwit_proto::types::{DocMappingUid, IndexId, IndexUid, ShardId, SourceId, SplitId}; use sea_query::{Iden, Write}; use tracing::error; @@ -259,6 +259,8 @@ pub(super) struct PgShard { pub leader_id: String, pub follower_id: Option, pub shard_state: PgShardState, + #[sqlx(try_from = "String")] + pub doc_mapping_uid: DocMappingUid, pub publish_position_inclusive: String, pub publish_token: Option, } @@ -272,6 +274,7 @@ impl From for Shard { shard_state: ShardState::from(pg_shard.shard_state) as i32, leader_id: pg_shard.leader_id, follower_id: pg_shard.follower_id, + doc_mapping_uid: Some(pg_shard.doc_mapping_uid), publish_position_inclusive: Some(pg_shard.publish_position_inclusive.into()), publish_token: pg_shard.publish_token, } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql index d1a7909b630..9981580a405 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql @@ -1,2 +1,2 @@ -INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, publish_position_inclusive, publish_token) - VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8) +INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_uid, publish_position_inclusive, publish_token) + VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql index c867fa244d5..d6747fcb9f1 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql @@ -1,5 +1,5 @@ -INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id) - VALUES ($1, $2, $3, $4, $5) +INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_uid) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING RETURNING diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index b40b43d5525..1bbbd1e010a 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -26,7 +26,7 @@ use quickwit_proto::metastore::{ ListShardsRequest, ListShardsSubrequest, MetastoreError, MetastoreService, OpenShardSubrequest, OpenShardsRequest, PublishSplitsRequest, }; -use quickwit_proto::types::{IndexUid, Position, ShardId, SourceId}; +use quickwit_proto::types::{DocMappingUid, IndexUid, Position, ShardId, SourceId}; use super::DefaultForTest; use crate::checkpoint::{IndexCheckpointDelta, PartitionId, SourceCheckpointDelta}; @@ -133,6 +133,7 @@ pub async fn test_metastore_open_shards< shard_id: Some(ShardId::from(1)), leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), }], }; let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); @@ -148,6 +149,7 @@ pub async fn test_metastore_open_shards< assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.leader_id, "test-ingester-foo"); assert_eq!(shard.follower_id(), "test-ingester-bar"); + assert_eq!(shard.doc_mapping_uid(), DocMappingUid::default(),); assert_eq!(shard.publish_position_inclusive(), Position::Beginning); assert!(shard.publish_token.is_none()); @@ -160,6 +162,7 @@ pub async fn test_metastore_open_shards< shard_id: Some(ShardId::from(1)), leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), }], }; let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); @@ -201,6 +204,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Closed as i32, leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-foo".to_string()), }, @@ -211,6 +215,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-bar".to_string(), follower_id: Some("test-ingester-qux".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), }, @@ -221,6 +226,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-qux".to_string(), follower_id: Some("test-ingester-baz".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }, @@ -231,6 +237,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-baz".to_string(), follower_id: Some("test-ingester-tux".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }, @@ -321,6 +328,7 @@ pub async fn test_metastore_list_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-foo".to_string()), }, @@ -331,6 +339,7 @@ pub async fn test_metastore_list_shards< shard_state: ShardState::Closed as i32, leader_id: "test-ingester-bar".to_string(), follower_id: Some("test-ingester-qux".to_string()), + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), }, @@ -459,6 +468,7 @@ pub async fn test_metastore_delete_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), ..Default::default() }, @@ -467,6 +477,7 @@ pub async fn test_metastore_delete_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Closed as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), ..Default::default() }, @@ -475,6 +486,7 @@ pub async fn test_metastore_delete_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(3)), shard_state: ShardState::Closed as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Eof(None)), ..Default::default() }, @@ -601,6 +613,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_single_shard< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), ..Default::default() @@ -716,6 +729,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(0u64)), publish_token: Some("test-publish-token-foo".to_string()), ..Default::default() @@ -725,6 +739,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(1u64)), publish_token: Some("test-publish-token-foo".to_string()), ..Default::default() @@ -734,6 +749,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(2u64)), publish_token: Some("test-publish-token-foo".to_string()), ..Default::default() @@ -743,6 +759,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(3)), shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::offset(3u64)), publish_token: Some("test-publish-token-bar".to_string()), ..Default::default() diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json index 831a543694f..7ea56fa7af8 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json @@ -146,13 +146,14 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000000", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "doc_mapping_uid": "00000000000000000000000000", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json index 6ab7aefdff1..a72a954d581 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json @@ -146,8 +146,8 @@ "_ingest-source": [ { "index_uid": "my-index:00000000000000000000000000", - "source_id": "_ingest-source", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, "leader_id": "leader-ingester", "follower_id": "follower-ingester", diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json index 831a543694f..7ea56fa7af8 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json @@ -146,13 +146,14 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000000", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "doc_mapping_uid": "00000000000000000000000000", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json index 147abe615a6..bc8d731ad4a 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json @@ -145,13 +145,13 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000000", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json index 9f97b2b9570..fbd61156d9f 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json @@ -146,13 +146,14 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000001", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "doc_mapping_uid": "00000000000000000000000001", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json index 9f97b2b9570..fbd61156d9f 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json @@ -146,13 +146,14 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000001", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "doc_mapping_uid": "00000000000000000000000001", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index ffe8ad25271..c4e2a3a3de4 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -38,7 +38,12 @@ fn main() -> Result<(), Box> { // Control plane. let mut prost_config = prost_build::Config::default(); - prost_config.extern_path(".quickwit.common.IndexUid", "crate::types::IndexUid"); + prost_config + .extern_path( + ".quickwit.common.DocMappingUid", + "crate::types::DocMappingUid", + ) + .extern_path(".quickwit.common.IndexUid", "crate::types::IndexUid"); Codegen::builder() .with_prost_config(prost_config) @@ -91,8 +96,12 @@ fn main() -> Result<(), Box> { "IndexesMetadataResponse.indexes_metadata_json_zstd", "ListIndexesMetadataResponse.indexes_metadata_json_zstd", ]) - .extern_path(".quickwit.ingest.ShardId", "crate::types::ShardId") + .extern_path( + ".quickwit.common.DocMappingUid", + "crate::types::DocMappingUid", + ) .extern_path(".quickwit.common.IndexUid", "crate::types::IndexUid") + .extern_path(".quickwit.ingest.ShardId", "crate::types::ShardId") .field_attribute("DeleteQuery.index_uid", "#[serde(alias = \"index_id\")]") .field_attribute("DeleteQuery.index_uid", "#[schema(value_type = String)]") .field_attribute("DeleteQuery.query_ast", "#[serde(alias = \"query\")]") @@ -125,9 +134,13 @@ fn main() -> Result<(), Box> { "MRecordBatch.mrecord_buffer", "Position.position", ]) + .extern_path( + ".quickwit.common.DocMappingUid", + "crate::types::DocMappingUid", + ) + .extern_path(".quickwit.common.IndexUid", "crate::types::IndexUid") .extern_path(".quickwit.ingest.Position", "crate::types::Position") .extern_path(".quickwit.ingest.ShardId", "crate::types::ShardId") - .extern_path(".quickwit.common.IndexUid", "crate::types::IndexUid") .type_attribute("Shard", "#[derive(Eq)]") .field_attribute( "Shard.follower_id", diff --git a/quickwit/quickwit-proto/protos/quickwit/common.proto b/quickwit/quickwit-proto/protos/quickwit/common.proto index d8f068f3fd4..c7782e77f1e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/common.proto +++ b/quickwit/quickwit-proto/protos/quickwit/common.proto @@ -21,6 +21,15 @@ syntax = "proto3"; package quickwit.common; +// The corresponding Rust struct [`crate::types::DocMappingUid`] is defined manually and +// externally provided during code generation (see `build.rs`). +// +// Modify at your own risk. +message DocMappingUid { + // ULID encoded as a sequence of 16 bytes (big-endian u128). + bytes doc_mapping_uid = 1; +} + // The corresponding Rust struct [`crate::types::IndexUid`] is defined manually and // externally provided during code generation (see `build.rs`). // diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index bf950698541..f4d24cace5d 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -39,6 +39,7 @@ message ShardId { bytes shard_id = 1; } +// Shard primary key. message ShardPKey { quickwit.common.IndexUid index_uid = 1; string source_id = 2; @@ -93,6 +94,9 @@ message Shard { // A publish token that ensures only one indexer works on a given shard at a time. // For instance, if an indexer goes rogue, eventually the control plane will detect it and assign the shard to another indexer, which will override the publish token. optional string publish_token = 10; + + // The UID of the index doc mapping when the shard was created. + quickwit.common.DocMappingUid doc_mapping_uid = 11; } // A group of shards belonging to the same index and source. diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 084dccad417..fe7df81f6c4 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -405,6 +405,7 @@ message OpenShardSubrequest { quickwit.ingest.ShardId shard_id = 4; string leader_id = 5; optional string follower_id = 6; + quickwit.common.DocMappingUid doc_mapping_uid = 7; } message OpenShardsResponse { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.common.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.common.rs index f67ee55d271..04e2e3f2c3b 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.common.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.common.rs @@ -1,789 +1,12 @@ +/// The corresponding Rust struct \[`crate::types::DocMappingUid`\] is defined manually and +/// externally provided during code generation (see `build.rs`). +/// +/// Modify at your own risk. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetDebugInfoRequest {} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetDebugInfoResponse { +pub struct DocMappingUid { + /// ULID encoded as a sequence of 16 bytes (big-endian u128). #[prost(bytes = "vec", tag = "1")] - pub diagnostic_info_json: ::prost::alloc::vec::Vec, -} -/// BEGIN quickwit-codegen -#[allow(unused_imports)] -use std::str::FromStr; -use tower::{Layer, Service, ServiceExt}; -use quickwit_common::tower::RpcName; -impl RpcName for GetDebugInfoRequest { - fn rpc_name() -> &'static str { - "get_diagnostic_info" - } -} -#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] -#[async_trait::async_trait] -pub trait DiagnosticService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static { - async fn get_diagnostic_info( - &mut self, - request: GetDebugInfoRequest, - ) -> crate::ingest::IngestV2Result; -} -dyn_clone::clone_trait_object!(DiagnosticService); -#[cfg(any(test, feature = "testsuite"))] -impl Clone for MockDiagnosticService { - fn clone(&self) -> Self { - MockDiagnosticService::new() - } -} -#[derive(Debug, Clone)] -pub struct DiagnosticServiceClient { - inner: Box, -} -impl DiagnosticServiceClient { - pub fn new(instance: T) -> Self - where - T: DiagnosticService, - { - #[cfg(any(test, feature = "testsuite"))] - assert!( - std::any::TypeId::of:: < T > () != std::any::TypeId::of:: < - MockDiagnosticService > (), - "`MockDiagnosticService` must be wrapped in a `MockDiagnosticServiceWrapper`. Use `MockDiagnosticService::from(mock)` to instantiate the client." - ); - Self { inner: Box::new(instance) } - } - pub fn as_grpc_service( - &self, - max_message_size: bytesize::ByteSize, - ) -> diagnostic_service_grpc_server::DiagnosticServiceGrpcServer< - DiagnosticServiceGrpcServerAdapter, - > { - let adapter = DiagnosticServiceGrpcServerAdapter::new(self.clone()); - diagnostic_service_grpc_server::DiagnosticServiceGrpcServer::new(adapter) - .max_decoding_message_size(max_message_size.0 as usize) - .max_encoding_message_size(max_message_size.0 as usize) - } - pub fn from_channel( - addr: std::net::SocketAddr, - channel: tonic::transport::Channel, - max_message_size: bytesize::ByteSize, - ) -> Self { - let (_, connection_keys_watcher) = tokio::sync::watch::channel( - std::collections::HashSet::from_iter([addr]), - ); - let client = diagnostic_service_grpc_client::DiagnosticServiceGrpcClient::new( - channel, - ) - .max_decoding_message_size(max_message_size.0 as usize) - .max_encoding_message_size(max_message_size.0 as usize); - let adapter = DiagnosticServiceGrpcClientAdapter::new( - client, - connection_keys_watcher, - ); - Self::new(adapter) - } - pub fn from_balance_channel( - balance_channel: quickwit_common::tower::BalanceChannel, - max_message_size: bytesize::ByteSize, - ) -> DiagnosticServiceClient { - let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let client = diagnostic_service_grpc_client::DiagnosticServiceGrpcClient::new( - balance_channel, - ) - .max_decoding_message_size(max_message_size.0 as usize) - .max_encoding_message_size(max_message_size.0 as usize); - let adapter = DiagnosticServiceGrpcClientAdapter::new( - client, - connection_keys_watcher, - ); - Self::new(adapter) - } - pub fn from_mailbox(mailbox: quickwit_actors::Mailbox) -> Self - where - A: quickwit_actors::Actor + std::fmt::Debug + Send + 'static, - DiagnosticServiceMailbox: DiagnosticService, - { - DiagnosticServiceClient::new(DiagnosticServiceMailbox::new(mailbox)) - } - pub fn tower() -> DiagnosticServiceTowerLayerStack { - DiagnosticServiceTowerLayerStack::default() - } - #[cfg(any(test, feature = "testsuite"))] - pub fn from_mock(mock: MockDiagnosticService) -> Self { - let mock_wrapper = mock_diagnostic_service::MockDiagnosticServiceWrapper { - inner: std::sync::Arc::new(tokio::sync::Mutex::new(mock)), - }; - Self::new(mock_wrapper) - } - #[cfg(any(test, feature = "testsuite"))] - pub fn mocked() -> Self { - Self::from_mock(MockDiagnosticService::new()) - } -} -#[async_trait::async_trait] -impl DiagnosticService for DiagnosticServiceClient { - async fn get_diagnostic_info( - &mut self, - request: GetDebugInfoRequest, - ) -> crate::ingest::IngestV2Result { - self.inner.get_diagnostic_info(request).await - } -} -#[cfg(any(test, feature = "testsuite"))] -pub mod mock_diagnostic_service { - use super::*; - #[derive(Debug, Clone)] - pub struct MockDiagnosticServiceWrapper { - pub(super) inner: std::sync::Arc>, - } - #[async_trait::async_trait] - impl DiagnosticService for MockDiagnosticServiceWrapper { - async fn get_diagnostic_info( - &mut self, - request: super::GetDebugInfoRequest, - ) -> crate::ingest::IngestV2Result { - self.inner.lock().await.get_diagnostic_info(request).await - } - } -} -pub type BoxFuture = std::pin::Pin< - Box> + Send + 'static>, ->; -impl tower::Service for Box { - type Response = GetDebugInfoResponse; - type Error = crate::ingest::IngestV2Error; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } - fn call(&mut self, request: GetDebugInfoRequest) -> Self::Future { - let mut svc = self.clone(); - let fut = async move { svc.get_diagnostic_info(request).await }; - Box::pin(fut) - } -} -/// A tower service stack is a set of tower services. -#[derive(Debug)] -struct DiagnosticServiceTowerServiceStack { - inner: Box, - get_diagnostic_info_svc: quickwit_common::tower::BoxService< - GetDebugInfoRequest, - GetDebugInfoResponse, - crate::ingest::IngestV2Error, - >, -} -impl Clone for DiagnosticServiceTowerServiceStack { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - get_diagnostic_info_svc: self.get_diagnostic_info_svc.clone(), - } - } -} -#[async_trait::async_trait] -impl DiagnosticService for DiagnosticServiceTowerServiceStack { - async fn get_diagnostic_info( - &mut self, - request: GetDebugInfoRequest, - ) -> crate::ingest::IngestV2Result { - self.get_diagnostic_info_svc.ready().await?.call(request).await - } -} -type GetDebugInfoLayer = quickwit_common::tower::BoxLayer< - quickwit_common::tower::BoxService< - GetDebugInfoRequest, - GetDebugInfoResponse, - crate::ingest::IngestV2Error, - >, - GetDebugInfoRequest, - GetDebugInfoResponse, - crate::ingest::IngestV2Error, ->; -#[derive(Debug, Default)] -pub struct DiagnosticServiceTowerLayerStack { - get_diagnostic_info_layers: Vec, -} -impl DiagnosticServiceTowerLayerStack { - pub fn stack_layer(mut self, layer: L) -> Self - where - L: tower::Layer< - quickwit_common::tower::BoxService< - GetDebugInfoRequest, - GetDebugInfoResponse, - crate::ingest::IngestV2Error, - >, - > + Clone + Send + Sync + 'static, - , - >>::Service: tower::Service< - GetDebugInfoRequest, - Response = GetDebugInfoResponse, - Error = crate::ingest::IngestV2Error, - > + Clone + Send + Sync + 'static, - <, - >>::Service as tower::Service>::Future: Send + 'static, - { - self.get_diagnostic_info_layers - .push(quickwit_common::tower::BoxLayer::new(layer.clone())); - self - } - pub fn stack_get_diagnostic_info_layer(mut self, layer: L) -> Self - where - L: tower::Layer< - quickwit_common::tower::BoxService< - GetDebugInfoRequest, - GetDebugInfoResponse, - crate::ingest::IngestV2Error, - >, - > + Send + Sync + 'static, - L::Service: tower::Service< - GetDebugInfoRequest, - Response = GetDebugInfoResponse, - Error = crate::ingest::IngestV2Error, - > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - { - self.get_diagnostic_info_layers - .push(quickwit_common::tower::BoxLayer::new(layer)); - self - } - pub fn build(self, instance: T) -> DiagnosticServiceClient - where - T: DiagnosticService, - { - self.build_from_boxed(Box::new(instance)) - } - pub fn build_from_channel( - self, - addr: std::net::SocketAddr, - channel: tonic::transport::Channel, - max_message_size: bytesize::ByteSize, - ) -> DiagnosticServiceClient { - self.build_from_boxed( - Box::new( - DiagnosticServiceClient::from_channel(addr, channel, max_message_size), - ), - ) - } - pub fn build_from_balance_channel( - self, - balance_channel: quickwit_common::tower::BalanceChannel, - max_message_size: bytesize::ByteSize, - ) -> DiagnosticServiceClient { - self.build_from_boxed( - Box::new( - DiagnosticServiceClient::from_balance_channel( - balance_channel, - max_message_size, - ), - ), - ) - } - pub fn build_from_mailbox( - self, - mailbox: quickwit_actors::Mailbox, - ) -> DiagnosticServiceClient - where - A: quickwit_actors::Actor + std::fmt::Debug + Send + 'static, - DiagnosticServiceMailbox: DiagnosticService, - { - self.build_from_boxed(Box::new(DiagnosticServiceMailbox::new(mailbox))) - } - #[cfg(any(test, feature = "testsuite"))] - pub fn build_from_mock( - self, - mock: MockDiagnosticService, - ) -> DiagnosticServiceClient { - self.build_from_boxed(Box::new(DiagnosticServiceClient::from_mock(mock))) - } - fn build_from_boxed( - self, - boxed_instance: Box, - ) -> DiagnosticServiceClient { - let get_diagnostic_info_svc = self - .get_diagnostic_info_layers - .into_iter() - .rev() - .fold( - quickwit_common::tower::BoxService::new(boxed_instance.clone()), - |svc, layer| layer.layer(svc), - ); - let tower_svc_stack = DiagnosticServiceTowerServiceStack { - inner: boxed_instance.clone(), - get_diagnostic_info_svc, - }; - DiagnosticServiceClient::new(tower_svc_stack) - } -} -#[derive(Debug, Clone)] -struct MailboxAdapter { - inner: quickwit_actors::Mailbox, - phantom: std::marker::PhantomData, -} -impl std::ops::Deref for MailboxAdapter -where - A: quickwit_actors::Actor, -{ - type Target = quickwit_actors::Mailbox; - fn deref(&self) -> &Self::Target { - &self.inner - } -} -#[derive(Debug)] -pub struct DiagnosticServiceMailbox { - inner: MailboxAdapter, -} -impl DiagnosticServiceMailbox { - pub fn new(instance: quickwit_actors::Mailbox) -> Self { - let inner = MailboxAdapter { - inner: instance, - phantom: std::marker::PhantomData, - }; - Self { inner } - } -} -impl Clone for DiagnosticServiceMailbox { - fn clone(&self) -> Self { - let inner = MailboxAdapter { - inner: self.inner.clone(), - phantom: std::marker::PhantomData, - }; - Self { inner } - } -} -impl tower::Service for DiagnosticServiceMailbox -where - A: quickwit_actors::Actor - + quickwit_actors::DeferableReplyHandler> + Send - + 'static, - M: std::fmt::Debug + Send + 'static, - T: Send + 'static, - E: std::fmt::Debug + Send + 'static, - crate::ingest::IngestV2Error: From>, -{ - type Response = T; - type Error = crate::ingest::IngestV2Error; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - //! This does not work with balance middlewares such as `tower::balance::pool::Pool` because - //! this always returns `Poll::Ready`. The fix is to acquire a permit from the - //! mailbox in `poll_ready` and consume it in `call`. - std::task::Poll::Ready(Ok(())) - } - fn call(&mut self, message: M) -> Self::Future { - let mailbox = self.inner.clone(); - let fut = async move { - mailbox.ask_for_res(message).await.map_err(|error| error.into()) - }; - Box::pin(fut) - } -} -#[async_trait::async_trait] -impl DiagnosticService for DiagnosticServiceMailbox -where - A: quickwit_actors::Actor + std::fmt::Debug, - DiagnosticServiceMailbox< - A, - >: tower::Service< - GetDebugInfoRequest, - Response = GetDebugInfoResponse, - Error = crate::ingest::IngestV2Error, - Future = BoxFuture, - >, -{ - async fn get_diagnostic_info( - &mut self, - request: GetDebugInfoRequest, - ) -> crate::ingest::IngestV2Result { - self.call(request).await - } -} -#[derive(Debug, Clone)] -pub struct DiagnosticServiceGrpcClientAdapter { - inner: T, - #[allow(dead_code)] - connection_addrs_rx: tokio::sync::watch::Receiver< - std::collections::HashSet, - >, -} -impl DiagnosticServiceGrpcClientAdapter { - pub fn new( - instance: T, - connection_addrs_rx: tokio::sync::watch::Receiver< - std::collections::HashSet, - >, - ) -> Self { - Self { - inner: instance, - connection_addrs_rx, - } - } -} -#[async_trait::async_trait] -impl DiagnosticService -for DiagnosticServiceGrpcClientAdapter< - diagnostic_service_grpc_client::DiagnosticServiceGrpcClient, -> -where - T: tonic::client::GrpcService + std::fmt::Debug + Clone + Send - + Sync + 'static, - T::ResponseBody: tonic::codegen::Body + Send + 'static, - ::Error: Into - + Send, - T::Future: Send, -{ - async fn get_diagnostic_info( - &mut self, - request: GetDebugInfoRequest, - ) -> crate::ingest::IngestV2Result { - self.inner - .get_diagnostic_info(request) - .await - .map(|response| response.into_inner()) - .map_err(|status| crate::error::grpc_status_to_service_error( - status, - GetDebugInfoRequest::rpc_name(), - )) - } -} -#[derive(Debug)] -pub struct DiagnosticServiceGrpcServerAdapter { - inner: Box, -} -impl DiagnosticServiceGrpcServerAdapter { - pub fn new(instance: T) -> Self - where - T: DiagnosticService, - { - Self { inner: Box::new(instance) } - } -} -#[async_trait::async_trait] -impl diagnostic_service_grpc_server::DiagnosticServiceGrpc -for DiagnosticServiceGrpcServerAdapter { - async fn get_diagnostic_info( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - self.inner - .clone() - .get_diagnostic_info(request.into_inner()) - .await - .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) - } -} -/// Generated client implementations. -pub mod diagnostic_service_grpc_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - use tonic::codegen::http::Uri; - #[derive(Debug, Clone)] - pub struct DiagnosticServiceGrpcClient { - inner: tonic::client::Grpc, - } - impl DiagnosticServiceGrpcClient { - /// Attempt to create a new client by connecting to a given endpoint. - pub async fn connect(dst: D) -> Result - where - D: TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl DiagnosticServiceGrpcClient - where - T: tonic::client::GrpcService, - T::Error: Into, - T::ResponseBody: Body + Send + 'static, - ::Error: Into + Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_origin(inner: T, origin: Uri) -> Self { - let inner = tonic::client::Grpc::with_origin(inner, origin); - Self { inner } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> DiagnosticServiceGrpcClient> - where - F: tonic::service::Interceptor, - T::ResponseBody: Default, - T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, - >, - >, - , - >>::Error: Into + Send + Sync, - { - DiagnosticServiceGrpcClient::new(InterceptedService::new(inner, interceptor)) - } - /// Compress requests with the given encoding. - /// - /// This requires the server to support it otherwise it might respond with an - /// error. - #[must_use] - pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.inner = self.inner.send_compressed(encoding); - self - } - /// Enable decompressing responses. - #[must_use] - pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.inner = self.inner.accept_compressed(encoding); - self - } - /// Limits the maximum size of a decoded message. - /// - /// Default: `4MB` - #[must_use] - pub fn max_decoding_message_size(mut self, limit: usize) -> Self { - self.inner = self.inner.max_decoding_message_size(limit); - self - } - /// Limits the maximum size of an encoded message. - /// - /// Default: `usize::MAX` - #[must_use] - pub fn max_encoding_message_size(mut self, limit: usize) -> Self { - self.inner = self.inner.max_encoding_message_size(limit); - self - } - pub async fn get_diagnostic_info( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.common.DiagnosticService/get_diagnostic_info", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "quickwit.common.DiagnosticService", - "get_diagnostic_info", - ), - ); - self.inner.unary(req, path, codec).await - } - } -} -/// Generated server implementations. -pub mod diagnostic_service_grpc_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with DiagnosticServiceGrpcServer. - #[async_trait] - pub trait DiagnosticServiceGrpc: Send + Sync + 'static { - async fn get_diagnostic_info( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; - } - #[derive(Debug)] - pub struct DiagnosticServiceGrpcServer { - inner: _Inner, - accept_compression_encodings: EnabledCompressionEncodings, - send_compression_encodings: EnabledCompressionEncodings, - max_decoding_message_size: Option, - max_encoding_message_size: Option, - } - struct _Inner(Arc); - impl DiagnosticServiceGrpcServer { - pub fn new(inner: T) -> Self { - Self::from_arc(Arc::new(inner)) - } - pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); - Self { - inner, - accept_compression_encodings: Default::default(), - send_compression_encodings: Default::default(), - max_decoding_message_size: None, - max_encoding_message_size: None, - } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { - InterceptedService::new(Self::new(inner), interceptor) - } - /// Enable decompressing requests with the given encoding. - #[must_use] - pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.accept_compression_encodings.enable(encoding); - self - } - /// Compress responses with the given encoding, if the client supports it. - #[must_use] - pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.send_compression_encodings.enable(encoding); - self - } - /// Limits the maximum size of a decoded message. - /// - /// Default: `4MB` - #[must_use] - pub fn max_decoding_message_size(mut self, limit: usize) -> Self { - self.max_decoding_message_size = Some(limit); - self - } - /// Limits the maximum size of an encoded message. - /// - /// Default: `usize::MAX` - #[must_use] - pub fn max_encoding_message_size(mut self, limit: usize) -> Self { - self.max_encoding_message_size = Some(limit); - self - } - } - impl tonic::codegen::Service> - for DiagnosticServiceGrpcServer - where - T: DiagnosticServiceGrpc, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = std::convert::Infallible; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/quickwit.common.DiagnosticService/get_diagnostic_info" => { - #[allow(non_camel_case_types)] - struct get_diagnostic_infoSvc(pub Arc); - impl< - T: DiagnosticServiceGrpc, - > tonic::server::UnaryService - for get_diagnostic_infoSvc { - type Response = super::GetDebugInfoResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - (*inner).get_diagnostic_info(request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = get_diagnostic_infoSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } - } - } - } - impl Clone for DiagnosticServiceGrpcServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { - inner, - accept_compression_encodings: self.accept_compression_encodings, - send_compression_encodings: self.send_compression_encodings, - max_decoding_message_size: self.max_decoding_message_size, - max_encoding_message_size: self.max_encoding_message_size, - } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::server::NamedService - for DiagnosticServiceGrpcServer { - const NAME: &'static str = "quickwit.common.DiagnosticService"; - } + pub doc_mapping_uid: ::prost::alloc::vec::Vec, } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs index 3660892d429..97592b90e5f 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs @@ -1,3 +1,4 @@ +/// Shard primary key. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -61,6 +62,9 @@ pub struct Shard { #[prost(string, optional, tag = "10")] #[serde(default, skip_serializing_if = "Option::is_none")] pub publish_token: ::core::option::Option<::prost::alloc::string::String>, + /// The UID of the index doc mapping when the shard was created. + #[prost(message, optional, tag = "11")] + pub doc_mapping_uid: ::core::option::Option, } /// A group of shards belonging to the same index and source. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 4c1a3021894..cd057f12a2b 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -335,6 +335,8 @@ pub struct OpenShardSubrequest { pub leader_id: ::prost::alloc::string::String, #[prost(string, optional, tag = "6")] pub follower_id: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag = "7")] + pub doc_mapping_uid: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index b2aae066164..85caadfdb28 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -131,25 +131,25 @@ impl GetOrCreateOpenShardsFailureReason { &self, subrequest: impl Into, ) -> GetOrCreateOpenShardsFailure { - let sub_request = subrequest.into(); + let subrequest = subrequest.into(); + GetOrCreateOpenShardsFailure { - subrequest_id: sub_request.subrequest_id, - index_id: sub_request.index_id, - source_id: sub_request.source_id, + subrequest_id: subrequest.subrequest_id, + index_id: subrequest.index_id, + source_id: subrequest.source_id, reason: *self as i32, } } } impl From for GetOrCreateOpenShardsSubrequest { - fn from(metastore_open_shard_sub_request: OpenShardSubrequest) -> Self { - GetOrCreateOpenShardsSubrequest { - subrequest_id: metastore_open_shard_sub_request.subrequest_id, - index_id: metastore_open_shard_sub_request - .index_uid() - .index_id - .clone(), - source_id: metastore_open_shard_sub_request.source_id, + fn from(metastore_open_shard_subrequest: OpenShardSubrequest) -> Self { + let index_id = metastore_open_shard_subrequest.index_uid().index_id.clone(); + + Self { + subrequest_id: metastore_open_shard_subrequest.subrequest_id, + index_id, + source_id: metastore_open_shard_subrequest.source_id, } } } diff --git a/quickwit/quickwit-proto/src/getters.rs b/quickwit/quickwit-proto/src/getters.rs index 1cf2a06a3ac..0ab50d89fe2 100644 --- a/quickwit/quickwit-proto/src/getters.rs +++ b/quickwit/quickwit-proto/src/getters.rs @@ -56,6 +56,14 @@ macro_rules! generate_copy_getters { } } +// [`DocMappingUid`] getters +generate_copy_getters!( + impl fn doc_mapping_uid() -> DocMappingUid {} for + + OpenShardSubrequest, + Shard +); + // [`IndexUid`] getters generate_getters! { impl fn index_uid() -> &IndexUid {} for diff --git a/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs b/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs index 50f9cd23cae..cbe6e873749 100644 --- a/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs +++ b/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs @@ -19,7 +19,9 @@ use std::borrow::Cow; use std::fmt; +use std::str::FromStr; +use anyhow::Context; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize, Serializer}; pub use ulid::Ulid; @@ -63,9 +65,8 @@ impl DocMappingUid { impl<'de> Deserialize<'de> for DocMappingUid { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de> { - let ulid_str: Cow<'de, str> = Cow::deserialize(deserializer)?; - let ulid = Ulid::from_string(&ulid_str).map_err(D::Error::custom)?; - Ok(Self(ulid)) + let doc_mapping_uid_str: Cow<'de, str> = Cow::deserialize(deserializer)?; + doc_mapping_uid_str.parse().map_err(D::Error::custom) } } @@ -131,6 +132,39 @@ impl prost::Message for DocMappingUid { } } +impl FromStr for DocMappingUid { + type Err = anyhow::Error; + + fn from_str(doc_mapping_uid_str: &str) -> Result { + Ulid::from_string(doc_mapping_uid_str) + .map(Self) + .with_context(|| format!("failed to parse doc mapping UID `{doc_mapping_uid_str}`")) + } +} + +#[cfg(feature = "postgres")] +impl TryFrom for DocMappingUid { + type Error = anyhow::Error; + + fn try_from(doc_mapping_uid_str: String) -> Result { + doc_mapping_uid_str.parse() + } +} + +#[cfg(feature = "postgres")] +impl sqlx::Type for DocMappingUid { + fn type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("VARCHAR(26)") + } +} + +#[cfg(feature = "postgres")] +impl sqlx::Encode<'_, sqlx::Postgres> for DocMappingUid { + fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { + sqlx::Encode::::encode(&self.0.to_string(), buf) + } +} + #[cfg(test)] mod tests { use bytes::Bytes;