Skip to content

Commit

Permalink
Add doc_mapping_json field to Shard
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 27, 2024
1 parent 7b62b40 commit 982a4dd
Show file tree
Hide file tree
Showing 22 changed files with 139 additions and 34 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,7 @@ mod tests {
leader_id: "node1".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: None,
publish_token: None,
}],
Expand Down Expand Up @@ -2039,6 +2040,7 @@ mod tests {
leader_id: "node1".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: None,
publish_token: None,
}],
Expand Down Expand Up @@ -2326,6 +2328,7 @@ mod tests {
leader_id: "test-ingester".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
}),
Expand Down Expand Up @@ -2479,6 +2482,7 @@ mod tests {
leader_id: "test-ingester".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
}),
Expand Down
37 changes: 32 additions & 5 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use quickwit_actors::Mailbox;
use quickwit_common::pretty::PrettySample;
use quickwit_common::Progress;
use quickwit_ingest::{IngesterPool, LeaderId, LocalShardsUpdate};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneResult,
GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest,
Expand Down Expand Up @@ -269,7 +270,10 @@ impl IngestController {
let mut open_shards_subrequests = Vec::new();

for get_open_shards_subrequest in get_open_shards_request.subrequests {
let Some(index_uid) = model.index_uid(&get_open_shards_subrequest.index_id) else {
let Some(index_metadata) = model
.index_uid(&get_open_shards_subrequest.index_id)
.and_then(|index_uid| model.index_metadata(index_uid))
else {
let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_id: get_open_shards_subrequest.index_id,
Expand All @@ -280,7 +284,7 @@ impl IngestController {
continue;
};
let Some(open_shard_entries) = model.find_open_shards(
&index_uid,
&index_metadata.index_uid,
&get_open_shards_subrequest.source_id,
&unavailable_leaders,
) else {
Expand All @@ -300,18 +304,20 @@ impl IngestController {
.collect();
let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_uid: index_uid.into(),
index_uid: Some(index_metadata.index_uid.clone()),
source_id: get_open_shards_subrequest.source_id,
open_shards,
};
get_or_create_open_shards_successes.push(get_or_create_open_shards_success);
} else {
let shard_id = ShardId::from(Ulid::new());
let doc_mapping_json = get_doc_mapping_json(index_metadata);
let open_shard_subrequest = metastore::OpenShardSubrequest {
subrequest_id: get_open_shards_subrequest.subrequest_id,
index_uid: index_uid.into(),
index_uid: Some(index_metadata.index_uid.clone()),
source_id: get_open_shards_subrequest.source_id,
shard_id: Some(shard_id),
doc_mapping_json,
// These attributes will be overwritten in the next stage.
leader_id: "".to_string(),
follower_id: None,
Expand Down Expand Up @@ -586,6 +592,12 @@ impl IngestController {
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
return;
};
let Some(index_metadata) = model.index_metadata(&source_uid.index_uid) else {
warn!("failed to scale up number of shards: index not found");
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
return;
};
let doc_mapping_json = get_doc_mapping_json(index_metadata);
let shard_id = ShardId::from(Ulid::new());
let open_shard_subrequest = metastore::OpenShardSubrequest {
subrequest_id: 0,
Expand All @@ -594,6 +606,7 @@ impl IngestController {
shard_id: Some(shard_id),
leader_id: leader_id.into(),
follower_id: follower_id.map(Into::into),
doc_mapping_json,
};
let open_shards_request = metastore::OpenShardsRequest {
subrequests: vec![open_shard_subrequest],
Expand Down Expand Up @@ -828,6 +841,7 @@ impl IngestController {
shard_id: Some(shard_id.clone()),
leader_id: leader_id.into(),
follower_id: follower_id_opt.map(Into::into),
doc_mapping_json: "{}".to_string(),
};
open_shards_subrequests.push(open_shard_subrequest);

Expand Down Expand Up @@ -1003,6 +1017,12 @@ fn find_scale_down_candidate(
})
}

/// Serializes the `DocMapping` of an index metadata to JSON.
fn get_doc_mapping_json(index_metadata: &IndexMetadata) -> String {
serde_json::to_string(&index_metadata.index_config.doc_mapping)
.expect("`DocMapping` should be JSON serializable")
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -1041,6 +1061,8 @@ mod tests {
let index_metadata_1 = IndexMetadata::for_test(index_id_1, "ram://indexes/test-index-1");
let index_uid_1 = index_metadata_1.index_uid.clone();

let doc_mapping_json = get_doc_mapping_json(&index_metadata_1);

let progress = Progress::default();

let mut mock_metastore = MockMetastoreService::new();
Expand All @@ -1051,6 +1073,7 @@ mod tests {
assert_eq!(request.subrequests.len(), 1);
assert_eq!(request.subrequests[0].index_uid(), &index_uid_1);
assert_eq!(&request.subrequests[0].source_id, source_id);
assert_eq!(&request.subrequests[0].doc_mapping_json, &doc_mapping_json);

let subresponses = vec![metastore::OpenShardSubresponse {
subrequest_id: 1,
Expand All @@ -1060,6 +1083,7 @@ mod tests {
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester-2".to_string(),
doc_mapping_json: doc_mapping_json.clone(),
..Default::default()
}),
}];
Expand Down Expand Up @@ -1657,21 +1681,24 @@ mod tests {
IngestController::new(metastore, ingester_pool.clone(), replication_factor);

let index_uid = IndexUid::for_test("test-index", 0);
let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index");
let source_id: SourceId = "test-source".into();

let source_uid = SourceUid {
index_uid: index_uid.clone(),
source_id: source_id.clone(),
};
let mut model = ControlPlaneModel::default();
model.add_index(index_metadata);

let progress = Progress::default();

let shards = vec![Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(1)),
leader_id: "test-ingester".to_string(),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester".to_string(),
..Default::default()
}];
model.insert_shards(&index_uid, &source_id, shards);
Expand Down
18 changes: 11 additions & 7 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ impl ControlPlaneModel {
Ok(())
}

pub fn index_uid(&self, index_id: &str) -> Option<IndexUid> {
self.index_uid_table.get(index_id).cloned()
pub fn index_uid(&self, index_id: &str) -> Option<&IndexUid> {
self.index_uid_table.get(index_id)
}

pub fn index_metadata(&self, index_uid: &IndexUid) -> Option<&IndexMetadata> {
self.index_table.get(index_uid)
}

fn update_metrics(&self) {
Expand Down Expand Up @@ -457,9 +461,9 @@ mod tests {
.unwrap();

assert_eq!(model.index_table.len(), 3);
assert_eq!(model.index_uid("test-index-0").unwrap(), index_uid);
assert_eq!(model.index_uid("test-index-1").unwrap(), index_uid2);
assert_eq!(model.index_uid("test-index-2").unwrap(), index_uid3);
assert_eq!(*model.index_uid("test-index-0").unwrap(), index_uid);
assert_eq!(*model.index_uid("test-index-1").unwrap(), index_uid2);
assert_eq!(*model.index_uid("test-index-2").unwrap(), index_uid3);

assert_eq!(model.shard_table.num_shards(), 1);

Expand Down Expand Up @@ -500,7 +504,7 @@ mod tests {
assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata);

assert_eq!(model.index_uid_table.len(), 1);
assert_eq!(model.index_uid("test-index").unwrap(), index_uid);
assert_eq!(*model.index_uid("test-index").unwrap(), index_uid);
}

#[test]
Expand All @@ -518,7 +522,7 @@ mod tests {
assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata);

assert_eq!(model.index_uid_table.len(), 1);
assert_eq!(model.index_uid("test-index").unwrap(), index_uid);
assert_eq!(*model.index_uid("test-index").unwrap(), index_uid);

assert_eq!(model.shard_table.num_sources(), 1);

Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ mod tests {
follower_id: None,
shard_id: Some(ShardId::from(0)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::offset(10u64)),
publish_token: Some(publish_token.to_string()),
}],
Expand All @@ -749,6 +750,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
}],
Expand All @@ -772,6 +774,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
},
Expand All @@ -782,6 +785,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(2)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::offset(12u64)),
publish_token: Some(publish_token.to_string()),
},
Expand Down Expand Up @@ -1071,6 +1075,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::eof(11u64)),
publish_token: Some(publish_token.to_string()),
},
Expand All @@ -1081,6 +1086,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(2)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::Beginning.as_eof()),
publish_token: Some(publish_token.to_string()),
},
Expand Down Expand Up @@ -1211,6 +1217,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::offset(11u64)),
publish_token: Some(publish_token.to_string()),
},
Expand All @@ -1221,6 +1228,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(2)),
shard_state: ShardState::Closed as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::eof(22u64)),
publish_token: Some(publish_token.to_string()),
},
Expand Down Expand Up @@ -1561,6 +1569,7 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: Some(Position::Beginning),
publish_token: Some(publish_token.to_string()),
}],
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,7 @@ mod tests {
shard_state: ShardState::Open as i32,
leader_id: ingester_ctx.node_id.to_string(),
follower_id: None,
doc_mapping_json: "{}".to_string(),
publish_position_inclusive: None,
publish_token: None,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE shards
DROP IF EXISTS COLUMN doc_mapping_json;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ALTER TABLE shards
ADD COLUMN IF NOT EXISTS doc_mapping_json VARCHAR;

-- Index metadata has been stable for quite a while, so we allow ourselves to do this,
-- but please, reader of the future, do not reapply this pattern without careful consideration.
UPDATE
shards
SET
doc_mapping_json = (indexes.index_metadata_json::json ->> 'index_config')::json ->> 'doc_mapping'
FROM indexes
WHERE
shards.index_uid = indexes.index_uid;

ALTER TABLE shards
ALTER COLUMN doc_mapping_json SET NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl quickwit_config::TestableForRegression for FileBackedIndex {
shard_state: ShardState::Open as i32,
leader_id: "leader-ingester".to_string(),
follower_id: Some("follower-ingester".to_string()),
doc_mapping_json: r#"{"mode": "dynamic", "timestamp_field": "ts"}"#.to_string(),
publish_position_inclusive: Some(Position::Beginning),
..Default::default()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Shards {
) -> MetastoreResult<MutationOccurred<OpenShardSubresponse>> {
let mut mutation_occurred = false;

let shard_id = subrequest.shard_id();
let shard_id = subrequest.shard_id().clone();
let entry = self.shards.entry(shard_id.clone());
let shard = match entry {
Entry::Occupied(entry) => entry.get().clone(),
Expand All @@ -127,8 +127,9 @@ impl Shards {
source_id: self.source_id.clone(),
shard_id: Some(shard_id.clone()),
shard_state: ShardState::Open as i32,
leader_id: subrequest.leader_id.clone(),
follower_id: subrequest.follower_id.clone(),
leader_id: subrequest.leader_id,
follower_id: subrequest.follower_id,
doc_mapping_json: subrequest.doc_mapping_json,
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
};
Expand Down Expand Up @@ -332,6 +333,7 @@ mod tests {
shard_id: Some(ShardId::from(1)),
leader_id: "leader_id".to_string(),
follower_id: None,
doc_mapping_json: "{}".to_string(),
};
let MutationOccurred::Yes(subresponse) = shards.open_shard(subrequest.clone()).unwrap()
else {
Expand Down Expand Up @@ -363,6 +365,7 @@ mod tests {
shard_id: Some(ShardId::from(2)),
leader_id: "leader_id".to_string(),
follower_id: Some("follower_id".to_string()),
doc_mapping_json: "{}".to_string(),
};
let MutationOccurred::Yes(subresponse) = shards.open_shard(subrequest).unwrap() else {
panic!("Expected `MutationOccured::No`");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,7 @@ async fn open_or_fetch_shard<'e>(
.bind(subrequest.shard_id().as_str())
.bind(&subrequest.leader_id)
.bind(&subrequest.follower_id)
.bind(&subrequest.doc_mapping_json)
.fetch_optional(executor.clone())
.await?;

Expand Down Expand Up @@ -1799,10 +1800,11 @@ mod tests {
sqlx::query(INSERT_SHARD_QUERY)
.bind(index_uid)
.bind(source_id)
.bind(shard.shard_id().as_str())
.bind(shard.shard_id())
.bind(shard.shard_state().as_json_str_name())
.bind(&shard.leader_id)
.bind(&shard.follower_id)
.bind(&shard.doc_mapping_json)
.bind(&shard.publish_position_inclusive().to_string())
.bind(&shard.publish_token)
.execute(&self.connection_pool)
Expand All @@ -1822,7 +1824,7 @@ mod tests {
"#,
)
.bind(index_uid)
.bind(source_id.as_str())
.bind(source_id)
.fetch_all(&self.connection_pool)
.await
.unwrap();
Expand Down
Loading

0 comments on commit 982a4dd

Please sign in to comment.