From 982a4ddfbc1881f4edc5268036189dcd09126336 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 17 May 2024 11:24:44 -0400 Subject: [PATCH] Add `doc_mapping_json` field to `Shard` --- .../src/control_plane.rs | 4 ++ .../src/ingest/ingest_controller.rs | 37 ++++++++++++++++--- .../quickwit-control-plane/src/model/mod.rs | 18 +++++---- .../src/source/ingest/mod.rs | 9 +++++ .../quickwit-ingest/src/ingest_v2/ingester.rs | 1 + ..._add-shard-doc-mapping-json-field.down.sql | 2 + ...20_add-shard-doc-mapping-json-field.up.sql | 15 ++++++++ .../file_backed/file_backed_index/mod.rs | 1 + .../file_backed/file_backed_index/shards.rs | 9 +++-- .../src/metastore/postgres/metastore.rs | 6 ++- .../src/metastore/postgres/model.rs | 2 + .../postgres/queries/shards/insert.sql | 4 +- .../postgres/queries/shards/open.sql | 4 +- .../quickwit-metastore/src/tests/shard.rs | 20 ++++++++++ .../file-backed-index/v0.7.expected.json | 9 +++-- .../test-data/file-backed-index/v0.7.json | 3 +- .../file-backed-index/v0.8.expected.json | 9 +++-- .../test-data/file-backed-index/v0.8.json | 9 +++-- .../protos/quickwit/ingest.proto | 4 ++ .../protos/quickwit/metastore.proto | 1 + .../src/codegen/quickwit/quickwit.ingest.rs | 4 ++ .../codegen/quickwit/quickwit.metastore.rs | 2 + 22 files changed, 139 insertions(+), 34 deletions(-) create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.down.sql create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.up.sql diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 51676895363..2b49e431642 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -1910,6 +1910,7 @@ mod tests { leader_id: "node1".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: None, publish_token: None, }], @@ -2039,6 +2040,7 @@ mod tests { leader_id: "node1".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: None, publish_token: None, }], @@ -2326,6 +2328,7 @@ mod tests { leader_id: "test-ingester".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }), @@ -2479,6 +2482,7 @@ mod tests { leader_id: "test-ingester".to_string(), follower_id: None, shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }), diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 2566721e438..f19f0a707d1 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -32,6 +32,7 @@ use quickwit_actors::Mailbox; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; use quickwit_ingest::{IngesterPool, LeaderId, LocalShardsUpdate}; +use quickwit_metastore::IndexMetadata; use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneResult, GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest, @@ -269,7 +270,10 @@ impl IngestController { let mut open_shards_subrequests = Vec::new(); for get_open_shards_subrequest in get_open_shards_request.subrequests { - let Some(index_uid) = model.index_uid(&get_open_shards_subrequest.index_id) else { + let Some(index_metadata) = model + .index_uid(&get_open_shards_subrequest.index_id) + .and_then(|index_uid| model.index_metadata(index_uid)) + else { let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure { subrequest_id: get_open_shards_subrequest.subrequest_id, index_id: get_open_shards_subrequest.index_id, @@ -280,7 +284,7 @@ impl IngestController { continue; }; let Some(open_shard_entries) = model.find_open_shards( - &index_uid, + &index_metadata.index_uid, &get_open_shards_subrequest.source_id, &unavailable_leaders, ) else { @@ -300,18 +304,20 @@ impl IngestController { .collect(); let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess { subrequest_id: get_open_shards_subrequest.subrequest_id, - index_uid: index_uid.into(), + index_uid: Some(index_metadata.index_uid.clone()), source_id: get_open_shards_subrequest.source_id, open_shards, }; get_or_create_open_shards_successes.push(get_or_create_open_shards_success); } else { let shard_id = ShardId::from(Ulid::new()); + let doc_mapping_json = get_doc_mapping_json(index_metadata); let open_shard_subrequest = metastore::OpenShardSubrequest { subrequest_id: get_open_shards_subrequest.subrequest_id, - index_uid: index_uid.into(), + index_uid: Some(index_metadata.index_uid.clone()), source_id: get_open_shards_subrequest.source_id, shard_id: Some(shard_id), + doc_mapping_json, // These attributes will be overwritten in the next stage. leader_id: "".to_string(), follower_id: None, @@ -586,6 +592,12 @@ impl IngestController { model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); return; }; + let Some(index_metadata) = model.index_metadata(&source_uid.index_uid) else { + warn!("failed to scale up number of shards: index not found"); + model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + return; + }; + let doc_mapping_json = get_doc_mapping_json(index_metadata); let shard_id = ShardId::from(Ulid::new()); let open_shard_subrequest = metastore::OpenShardSubrequest { subrequest_id: 0, @@ -594,6 +606,7 @@ impl IngestController { shard_id: Some(shard_id), leader_id: leader_id.into(), follower_id: follower_id.map(Into::into), + doc_mapping_json, }; let open_shards_request = metastore::OpenShardsRequest { subrequests: vec![open_shard_subrequest], @@ -828,6 +841,7 @@ impl IngestController { shard_id: Some(shard_id.clone()), leader_id: leader_id.into(), follower_id: follower_id_opt.map(Into::into), + doc_mapping_json: "{}".to_string(), }; open_shards_subrequests.push(open_shard_subrequest); @@ -1003,6 +1017,12 @@ fn find_scale_down_candidate( }) } +/// Serializes the `DocMapping` of an index metadata to JSON. +fn get_doc_mapping_json(index_metadata: &IndexMetadata) -> String { + serde_json::to_string(&index_metadata.index_config.doc_mapping) + .expect("`DocMapping` should be JSON serializable") +} + #[cfg(test)] mod tests { @@ -1041,6 +1061,8 @@ mod tests { let index_metadata_1 = IndexMetadata::for_test(index_id_1, "ram://indexes/test-index-1"); let index_uid_1 = index_metadata_1.index_uid.clone(); + let doc_mapping_json = get_doc_mapping_json(&index_metadata_1); + let progress = Progress::default(); let mut mock_metastore = MockMetastoreService::new(); @@ -1051,6 +1073,7 @@ mod tests { assert_eq!(request.subrequests.len(), 1); assert_eq!(request.subrequests[0].index_uid(), &index_uid_1); assert_eq!(&request.subrequests[0].source_id, source_id); + assert_eq!(&request.subrequests[0].doc_mapping_json, &doc_mapping_json); let subresponses = vec![metastore::OpenShardSubresponse { subrequest_id: 1, @@ -1060,6 +1083,7 @@ mod tests { shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, leader_id: "test-ingester-2".to_string(), + doc_mapping_json: doc_mapping_json.clone(), ..Default::default() }), }]; @@ -1657,6 +1681,7 @@ mod tests { IngestController::new(metastore, ingester_pool.clone(), replication_factor); let index_uid = IndexUid::for_test("test-index", 0); + let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index"); let source_id: SourceId = "test-source".into(); let source_uid = SourceUid { @@ -1664,14 +1689,16 @@ mod tests { source_id: source_id.clone(), }; let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata); + let progress = Progress::default(); let shards = vec![Shard { index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_id: Some(ShardId::from(1)), - leader_id: "test-ingester".to_string(), shard_state: ShardState::Open as i32, + leader_id: "test-ingester".to_string(), ..Default::default() }]; model.insert_shards(&index_uid, &source_id, shards); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 229f9275558..4c43120388c 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -152,8 +152,12 @@ impl ControlPlaneModel { Ok(()) } - pub fn index_uid(&self, index_id: &str) -> Option { - self.index_uid_table.get(index_id).cloned() + pub fn index_uid(&self, index_id: &str) -> Option<&IndexUid> { + self.index_uid_table.get(index_id) + } + + pub fn index_metadata(&self, index_uid: &IndexUid) -> Option<&IndexMetadata> { + self.index_table.get(index_uid) } fn update_metrics(&self) { @@ -457,9 +461,9 @@ mod tests { .unwrap(); assert_eq!(model.index_table.len(), 3); - assert_eq!(model.index_uid("test-index-0").unwrap(), index_uid); - assert_eq!(model.index_uid("test-index-1").unwrap(), index_uid2); - assert_eq!(model.index_uid("test-index-2").unwrap(), index_uid3); + assert_eq!(*model.index_uid("test-index-0").unwrap(), index_uid); + assert_eq!(*model.index_uid("test-index-1").unwrap(), index_uid2); + assert_eq!(*model.index_uid("test-index-2").unwrap(), index_uid3); assert_eq!(model.shard_table.num_shards(), 1); @@ -500,7 +504,7 @@ mod tests { assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); assert_eq!(model.index_uid_table.len(), 1); - assert_eq!(model.index_uid("test-index").unwrap(), index_uid); + assert_eq!(*model.index_uid("test-index").unwrap(), index_uid); } #[test] @@ -518,7 +522,7 @@ mod tests { assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); assert_eq!(model.index_uid_table.len(), 1); - assert_eq!(model.index_uid("test-index").unwrap(), index_uid); + assert_eq!(*model.index_uid("test-index").unwrap(), index_uid); assert_eq!(model.shard_table.num_sources(), 1); diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 9eb5d6d5b69..2678576b2c2 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -727,6 +727,7 @@ mod tests { follower_id: None, shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::offset(10u64)), publish_token: Some(publish_token.to_string()), }], @@ -749,6 +750,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), }], @@ -772,6 +774,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), }, @@ -782,6 +785,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::offset(12u64)), publish_token: Some(publish_token.to_string()), }, @@ -1071,6 +1075,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::eof(11u64)), publish_token: Some(publish_token.to_string()), }, @@ -1081,6 +1086,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::Beginning.as_eof()), publish_token: Some(publish_token.to_string()), }, @@ -1211,6 +1217,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::offset(11u64)), publish_token: Some(publish_token.to_string()), }, @@ -1221,6 +1228,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Closed as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::eof(22u64)), publish_token: Some(publish_token.to_string()), }, @@ -1561,6 +1569,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: Some(publish_token.to_string()), }], diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index f7fc40a3177..69fdd9b2d34 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -1568,6 +1568,7 @@ mod tests { shard_state: ShardState::Open as i32, leader_id: ingester_ctx.node_id.to_string(), follower_id: None, + doc_mapping_json: "{}".to_string(), publish_position_inclusive: None, publish_token: None, }; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.down.sql new file mode 100644 index 00000000000..5be9d35bb2e --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE shards + DROP IF EXISTS COLUMN doc_mapping_json; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.up.sql new file mode 100644 index 00000000000..94e8001ca8b --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-doc-mapping-json-field.up.sql @@ -0,0 +1,15 @@ +ALTER TABLE shards + ADD COLUMN IF NOT EXISTS doc_mapping_json VARCHAR; + +-- Index metadata has been stable for quite a while, so we allow ourselves to do this, +-- but please, reader of the future, do not reapply this pattern without careful consideration. +UPDATE + shards +SET + doc_mapping_json = (indexes.index_metadata_json::json ->> 'index_config')::json ->> 'doc_mapping' +FROM indexes +WHERE + shards.index_uid = indexes.index_uid; + +ALTER TABLE shards + ALTER COLUMN doc_mapping_json SET NOT NULL; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 567ae67918f..8f6c4f12912 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -102,6 +102,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex { shard_state: ShardState::Open as i32, leader_id: "leader-ingester".to_string(), follower_id: Some("follower-ingester".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), ..Default::default() }; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index ba43cbc995d..4a2c7a4ef36 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -117,7 +117,7 @@ impl Shards { ) -> MetastoreResult> { let mut mutation_occurred = false; - let shard_id = subrequest.shard_id(); + let shard_id = subrequest.shard_id().clone(); let entry = self.shards.entry(shard_id.clone()); let shard = match entry { Entry::Occupied(entry) => entry.get().clone(), @@ -127,8 +127,9 @@ impl Shards { source_id: self.source_id.clone(), shard_id: Some(shard_id.clone()), shard_state: ShardState::Open as i32, - leader_id: subrequest.leader_id.clone(), - follower_id: subrequest.follower_id.clone(), + leader_id: subrequest.leader_id, + follower_id: subrequest.follower_id, + doc_mapping_json: subrequest.doc_mapping_json, publish_position_inclusive: Some(Position::Beginning), publish_token: None, }; @@ -332,6 +333,7 @@ mod tests { shard_id: Some(ShardId::from(1)), leader_id: "leader_id".to_string(), follower_id: None, + doc_mapping_json: "{}".to_string(), }; let MutationOccurred::Yes(subresponse) = shards.open_shard(subrequest.clone()).unwrap() else { @@ -363,6 +365,7 @@ mod tests { shard_id: Some(ShardId::from(2)), leader_id: "leader_id".to_string(), follower_id: Some("follower_id".to_string()), + doc_mapping_json: "{}".to_string(), }; let MutationOccurred::Yes(subresponse) = shards.open_shard(subrequest).unwrap() else { panic!("Expected `MutationOccured::No`"); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c6fcfb45ada..ee6527338c8 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -1640,6 +1640,7 @@ async fn open_or_fetch_shard<'e>( .bind(subrequest.shard_id().as_str()) .bind(&subrequest.leader_id) .bind(&subrequest.follower_id) + .bind(&subrequest.doc_mapping_json) .fetch_optional(executor.clone()) .await?; @@ -1799,10 +1800,11 @@ mod tests { sqlx::query(INSERT_SHARD_QUERY) .bind(index_uid) .bind(source_id) - .bind(shard.shard_id().as_str()) + .bind(shard.shard_id()) .bind(shard.shard_state().as_json_str_name()) .bind(&shard.leader_id) .bind(&shard.follower_id) + .bind(&shard.doc_mapping_json) .bind(&shard.publish_position_inclusive().to_string()) .bind(&shard.publish_token) .execute(&self.connection_pool) @@ -1822,7 +1824,7 @@ mod tests { "#, ) .bind(index_uid) - .bind(source_id.as_str()) + .bind(source_id) .fetch_all(&self.connection_pool) .await .unwrap(); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index 1b10190715a..3d3f56c164e 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -259,6 +259,7 @@ pub(super) struct PgShard { pub leader_id: String, pub follower_id: Option, pub shard_state: PgShardState, + pub doc_mapping_json: String, pub publish_position_inclusive: String, pub publish_token: Option, } @@ -272,6 +273,7 @@ impl From for Shard { shard_state: ShardState::from(pg_shard.shard_state) as i32, leader_id: pg_shard.leader_id, follower_id: pg_shard.follower_id, + doc_mapping_json: pg_shard.doc_mapping_json, publish_position_inclusive: Some(pg_shard.publish_position_inclusive.into()), publish_token: pg_shard.publish_token, } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql index d1a7909b630..b6ab4511891 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/insert.sql @@ -1,2 +1,2 @@ -INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, publish_position_inclusive, publish_token) - VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8) +INSERT INTO shards(index_uid, source_id, shard_id, shard_state, leader_id, follower_id, doc_mapping_json, publish_position_inclusive, publish_token) + VALUES ($1, $2, $3, CAST($4 AS SHARD_STATE), $5, $6, $7, $8, $9) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql index c867fa244d5..1365cb4c98f 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/open.sql @@ -1,5 +1,5 @@ -INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id) - VALUES ($1, $2, $3, $4, $5) +INSERT INTO shards(index_uid, source_id, shard_id, leader_id, follower_id, doc_mapping_json) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING RETURNING diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index ec5e251fc42..6c4b200d885 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -138,6 +138,7 @@ pub async fn test_metastore_open_shards< shard_id: Some(ShardId::from(1)), leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), }], }; let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); @@ -153,6 +154,10 @@ pub async fn test_metastore_open_shards< assert_eq!(shard.shard_state(), ShardState::Open); assert_eq!(shard.leader_id, "test-ingester-foo"); assert_eq!(shard.follower_id(), "test-ingester-bar"); + assert_eq!( + shard.doc_mapping_json, + r#"{"mode": "dynamic", "timestamp_field": "ts"}"#, + ); assert_eq!(shard.publish_position_inclusive(), Position::Beginning); assert!(shard.publish_token.is_none()); @@ -165,6 +170,7 @@ pub async fn test_metastore_open_shards< shard_id: Some(ShardId::from(1)), leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), }], }; let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); @@ -206,6 +212,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Closed as i32, leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-foo".to_string()), }, @@ -216,6 +223,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-bar".to_string(), follower_id: Some("test-ingester-qux".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), }, @@ -226,6 +234,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-qux".to_string(), follower_id: Some("test-ingester-baz".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }, @@ -236,6 +245,7 @@ pub async fn test_metastore_acquire_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-baz".to_string(), follower_id: Some("test-ingester-tux".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: None, }, @@ -326,6 +336,7 @@ pub async fn test_metastore_list_shards< shard_state: ShardState::Open as i32, leader_id: "test-ingester-foo".to_string(), follower_id: Some("test-ingester-bar".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-foo".to_string()), }, @@ -336,6 +347,7 @@ pub async fn test_metastore_list_shards< shard_state: ShardState::Closed as i32, leader_id: "test-ingester-bar".to_string(), follower_id: Some("test-ingester-qux".to_string()), + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), }, @@ -464,6 +476,7 @@ pub async fn test_metastore_delete_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), ..Default::default() }, @@ -472,6 +485,7 @@ pub async fn test_metastore_delete_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Closed as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), ..Default::default() }, @@ -480,6 +494,7 @@ pub async fn test_metastore_delete_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(3)), shard_state: ShardState::Closed as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Eof(None)), ..Default::default() }, @@ -606,6 +621,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_single_shard< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::Beginning), publish_token: Some("test-publish-token-bar".to_string()), ..Default::default() @@ -721,6 +737,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::offset(0u64)), publish_token: Some("test-publish-token-foo".to_string()), ..Default::default() @@ -730,6 +747,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::offset(1u64)), publish_token: Some("test-publish-token-foo".to_string()), ..Default::default() @@ -739,6 +757,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Open as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::offset(2u64)), publish_token: Some("test-publish-token-foo".to_string()), ..Default::default() @@ -748,6 +767,7 @@ pub async fn test_metastore_apply_checkpoint_delta_v2_multi_shards< source_id: test_index.source_id.clone(), shard_id: Some(ShardId::from(3)), shard_state: ShardState::Open as i32, + doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(), publish_position_inclusive: Some(Position::offset(3u64)), publish_token: Some("test-publish-token-bar".to_string()), ..Default::default() diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json index 643b3b8e4fe..9cc93dc3e08 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json @@ -145,13 +145,14 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000000", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "doc_mapping_json": "{\"mode\": \"dynamic\", \"timestamp_field\": \"ts\"}", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json index 6ab7aefdff1..f14d16b99c6 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.json @@ -146,11 +146,12 @@ "_ingest-source": [ { "index_uid": "my-index:00000000000000000000000000", - "source_id": "_ingest-source", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, "leader_id": "leader-ingester", "follower_id": "follower-ingester", + "doc_mapping_json": "{\"mode\": \"dynamic\", \"timestamp_field\": \"ts\"}", "publish_position_inclusive": "" } ] diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json index 643b3b8e4fe..9cc93dc3e08 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json @@ -145,13 +145,14 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000000", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "doc_mapping_json": "{\"mode\": \"dynamic\", \"timestamp_field\": \"ts\"}", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json index 643b3b8e4fe..9cc93dc3e08 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.json @@ -145,13 +145,14 @@ "shards": { "_ingest-source": [ { - "follower_id": "follower-ingester", "index_uid": "my-index:00000000000000000000000000", - "leader_id": "leader-ingester", - "publish_position_inclusive": "", "shard_id": "00000000000000000001", + "source_id": "_ingest-source", "shard_state": 1, - "source_id": "_ingest-source" + "leader_id": "leader-ingester", + "follower_id": "follower-ingester", + "doc_mapping_json": "{\"mode\": \"dynamic\", \"timestamp_field\": \"ts\"}", + "publish_position_inclusive": "" } ] }, diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index bf950698541..80fb5e97c2b 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -39,6 +39,7 @@ message ShardId { bytes shard_id = 1; } +// Shard primary key. message ShardPKey { quickwit.common.IndexUid index_uid = 1; string source_id = 2; @@ -93,6 +94,9 @@ message Shard { // A publish token that ensures only one indexer works on a given shard at a time. // For instance, if an indexer goes rogue, eventually the control plane will detect it and assign the shard to another indexer, which will override the publish token. optional string publish_token = 10; + + // The doc mapping of the index when the shard was created, stored as a JSON string. + string doc_mapping_json = 12; } // A group of shards belonging to the same index and source. diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index ac4e35b0817..cab8093d2f6 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -404,6 +404,7 @@ message OpenShardSubrequest { quickwit.ingest.ShardId shard_id = 4; string leader_id = 5; optional string follower_id = 6; + string doc_mapping_json = 7; } message OpenShardsResponse { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs index 3660892d429..24196f16e20 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs @@ -1,3 +1,4 @@ +/// Shard primary key. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -61,6 +62,9 @@ pub struct Shard { #[prost(string, optional, tag = "10")] #[serde(default, skip_serializing_if = "Option::is_none")] pub publish_token: ::core::option::Option<::prost::alloc::string::String>, + /// The doc mapping of the index when the shard was created, stored as a JSON string. + #[prost(string, tag = "12")] + pub doc_mapping_json: ::prost::alloc::string::String, } /// A group of shards belonging to the same index and source. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index d3537e02799..431fedaf2c0 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -333,6 +333,8 @@ pub struct OpenShardSubrequest { pub leader_id: ::prost::alloc::string::String, #[prost(string, optional, tag = "6")] pub follower_id: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, tag = "7")] + pub doc_mapping_json: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)]