From 429e618296503e7d3c2bf76c1505e7abcafc5289 Mon Sep 17 00:00:00 2001 From: Kamalesh Palanisamy Date: Thu, 12 Oct 2023 10:35:27 -0500 Subject: [PATCH] Modify ShardTableEntry to differentiate between local shard and remote shards (#3935) --- .../quickwit-ingest/src/ingest_v2/router.rs | 5 +- .../src/ingest_v2/shard_table.rs | 111 +++++++++++++++--- 2 files changed, 96 insertions(+), 20 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index a0f7aead424..24a0af547dc 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -72,7 +72,10 @@ impl IngestRouter { replication_factor: usize, ) -> Self { let state = RouterState { - shard_table: ShardTable::default(), + shard_table: ShardTable { + self_node_id: self_node_id.clone(), + table: HashMap::default(), + }, }; Self { self_node_id, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs index 3ab58792031..a1089155b1f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs @@ -22,13 +22,15 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use quickwit_proto::ingest::Shard; use quickwit_proto::types::SourceId; -use quickwit_proto::IndexId; +use quickwit_proto::{IndexId, NodeId}; /// A set of open shards for a given index and source. #[derive(Debug, Default)] pub(super) struct ShardTableEntry { - shards: Vec, - round_robin_idx: AtomicUsize, + local_shards: Vec, + local_shards_round_robin_idx: AtomicUsize, + remote_shards: Vec, + remote_shards_round_robin_idx: AtomicUsize, } impl ShardTableEntry { @@ -37,40 +39,59 @@ impl ShardTableEntry { /// # Panics /// /// Panics if `shards` is empty after filtering out closed shards and deduplicating by shard ID. - pub fn new(mut shards: Vec) -> Self { + pub fn new(mut shards: Vec, node_id: &NodeId) -> Self { shards.retain(|shard| shard.is_open()); shards.sort_unstable_by_key(|shard| shard.shard_id); shards.dedup_by_key(|shard| shard.shard_id); assert!(!shards.is_empty(), "`shards` should not be empty"); + let (local_shards, remote_shards) = shards + .into_iter() + .partition(|shard| node_id == &shard.leader_id.as_str()); + Self { - shards, - round_robin_idx: AtomicUsize::default(), + local_shards, + local_shards_round_robin_idx: AtomicUsize::default(), + remote_shards, + remote_shards_round_robin_idx: AtomicUsize::default(), } } /// Returns the next shard in round-robin order. pub fn next_shard_round_robin(&self) -> &Shard { - let shard_idx = self.round_robin_idx.fetch_add(1, Ordering::Relaxed); - &self.shards[shard_idx % self.shards.len()] + if !self.local_shards.is_empty() { + let shard_idx = self + .local_shards_round_robin_idx + .fetch_add(1, Ordering::Relaxed); + &self.local_shards[shard_idx % self.local_shards.len()] + } else { + let shard_idx = self + .remote_shards_round_robin_idx + .fetch_add(1, Ordering::Relaxed); + &self.remote_shards[shard_idx % self.remote_shards.len()] + } } #[cfg(test)] pub fn len(&self) -> usize { - self.shards.len() + self.local_shards.len() + self.remote_shards.len() } #[cfg(test)] - pub fn shards(&self) -> &[Shard] { - &self.shards + pub fn shards(&self) -> Vec<&Shard> { + let mut shards = Vec::with_capacity(self.len()); + shards.extend(&self.local_shards); + shards.extend(&self.remote_shards); + shards } } /// A table of shard entries indexed by index UID and source ID. -#[derive(Debug, Default)] +#[derive(Debug)] pub(super) struct ShardTable { - table: HashMap<(IndexId, SourceId), ShardTableEntry>, + pub(super) table: HashMap<(IndexId, SourceId), ShardTableEntry>, + pub(super) self_node_id: NodeId, } impl ShardTable { @@ -99,7 +120,8 @@ impl ShardTable { shards: Vec, ) { let key = (index_id.into(), source_id.into()); - self.table.insert(key, ShardTableEntry::new(shards)); + self.table + .insert(key, ShardTableEntry::new(shards, &self.self_node_id)); } #[cfg(test)] @@ -120,8 +142,11 @@ mod tests { use super::*; #[test] - fn test_shard_table() { - let mut table = ShardTable::default(); + fn test_shard_table_with_nonempty_local_shards() { + let mut table = ShardTable { + self_node_id: "node-0".into(), + table: HashMap::default(), + }; assert!(!table.contains_entry("test-index", "test-source")); table.insert_shards( @@ -158,9 +183,57 @@ mod tests { assert!(table.contains_entry("test-index", "test-source")); let entry = table.find_entry("test-index", "test-source").unwrap(); - assert_eq!(entry.shards.len(), 2); - assert_eq!(entry.shards[0].shard_id, 0); - assert_eq!(entry.shards[1].shard_id, 1); + assert_eq!(entry.len(), 2); + assert_eq!(entry.shards()[0].shard_id, 0); + assert_eq!(entry.shards()[1].shard_id, 1); + assert_eq!(entry.local_shards.len(), 1); + assert_eq!(entry.remote_shards.len(), 1); + + assert_eq!(entry.next_shard_round_robin().shard_id, 0); + assert_eq!(entry.next_shard_round_robin().shard_id, 0); + } + + #[test] + fn test_shard_table_with_empty_local_shards() { + let mut table = ShardTable { + self_node_id: "node-0".into(), + table: HashMap::default(), + }; + assert!(!table.contains_entry("test-index", "test-source")); + + table.insert_shards( + "test-index", + "test-source", + vec![ + Shard { + index_uid: "test-index:0".to_string(), + shard_id: 0, + leader_id: "node-1".to_string(), + ..Default::default() + }, + Shard { + index_uid: "test-index:0".to_string(), + shard_id: 1, + leader_id: "node-2".to_string(), + ..Default::default() + }, + Shard { + index_uid: "test-index:0".to_string(), + shard_id: 2, + leader_id: "node-2".to_string(), + shard_state: ShardState::Closed as i32, + ..Default::default() + }, + ], + ); + assert!(table.contains_entry("test-index", "test-source")); + + let entry = table.find_entry("test-index", "test-source").unwrap(); + assert_eq!(entry.len(), 2); + assert_eq!(entry.shards()[0].shard_id, 0); + assert_eq!(entry.shards()[1].shard_id, 1); + assert_eq!(entry.local_shards.len(), 0); + assert_eq!(entry.remote_shards.len(), 2); assert_eq!(entry.next_shard_round_robin().shard_id, 0); assert_eq!(entry.next_shard_round_robin().shard_id, 1);