Skip to content

Commit

Permalink
Modify ShardTableEntry to differentiate between local shard and remot…
Browse files Browse the repository at this point in the history
…e shards (#3935)
  • Loading branch information
kamalesh0406 authored Oct 12, 2023
1 parent a99dfd9 commit 429e618
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 20 deletions.
5 changes: 4 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ impl IngestRouter {
replication_factor: usize,
) -> Self {
let state = RouterState {
shard_table: ShardTable::default(),
shard_table: ShardTable {
self_node_id: self_node_id.clone(),
table: HashMap::default(),
},
};
Self {
self_node_id,
Expand Down
111 changes: 92 additions & 19 deletions quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use std::sync::atomic::{AtomicUsize, Ordering};

use quickwit_proto::ingest::Shard;
use quickwit_proto::types::SourceId;
use quickwit_proto::IndexId;
use quickwit_proto::{IndexId, NodeId};

/// A set of open shards for a given index and source.
#[derive(Debug, Default)]
pub(super) struct ShardTableEntry {
shards: Vec<Shard>,
round_robin_idx: AtomicUsize,
local_shards: Vec<Shard>,
local_shards_round_robin_idx: AtomicUsize,
remote_shards: Vec<Shard>,
remote_shards_round_robin_idx: AtomicUsize,
}

impl ShardTableEntry {
Expand All @@ -37,40 +39,59 @@ impl ShardTableEntry {
/// # Panics
///
/// Panics if `shards` is empty after filtering out closed shards and deduplicating by shard ID.
pub fn new(mut shards: Vec<Shard>) -> Self {
pub fn new(mut shards: Vec<Shard>, node_id: &NodeId) -> Self {
shards.retain(|shard| shard.is_open());
shards.sort_unstable_by_key(|shard| shard.shard_id);
shards.dedup_by_key(|shard| shard.shard_id);

assert!(!shards.is_empty(), "`shards` should not be empty");

let (local_shards, remote_shards) = shards
.into_iter()
.partition(|shard| node_id == &shard.leader_id.as_str());

Self {
shards,
round_robin_idx: AtomicUsize::default(),
local_shards,
local_shards_round_robin_idx: AtomicUsize::default(),
remote_shards,
remote_shards_round_robin_idx: AtomicUsize::default(),
}
}

/// Returns the next shard in round-robin order.
pub fn next_shard_round_robin(&self) -> &Shard {
let shard_idx = self.round_robin_idx.fetch_add(1, Ordering::Relaxed);
&self.shards[shard_idx % self.shards.len()]
if !self.local_shards.is_empty() {
let shard_idx = self
.local_shards_round_robin_idx
.fetch_add(1, Ordering::Relaxed);
&self.local_shards[shard_idx % self.local_shards.len()]
} else {
let shard_idx = self
.remote_shards_round_robin_idx
.fetch_add(1, Ordering::Relaxed);
&self.remote_shards[shard_idx % self.remote_shards.len()]
}
}

#[cfg(test)]
pub fn len(&self) -> usize {
self.shards.len()
self.local_shards.len() + self.remote_shards.len()
}

#[cfg(test)]
pub fn shards(&self) -> &[Shard] {
&self.shards
pub fn shards(&self) -> Vec<&Shard> {
let mut shards = Vec::with_capacity(self.len());
shards.extend(&self.local_shards);
shards.extend(&self.remote_shards);
shards
}
}

/// A table of shard entries indexed by index UID and source ID.
#[derive(Debug, Default)]
#[derive(Debug)]
pub(super) struct ShardTable {
table: HashMap<(IndexId, SourceId), ShardTableEntry>,
pub(super) table: HashMap<(IndexId, SourceId), ShardTableEntry>,
pub(super) self_node_id: NodeId,
}

impl ShardTable {
Expand Down Expand Up @@ -99,7 +120,8 @@ impl ShardTable {
shards: Vec<Shard>,
) {
let key = (index_id.into(), source_id.into());
self.table.insert(key, ShardTableEntry::new(shards));
self.table
.insert(key, ShardTableEntry::new(shards, &self.self_node_id));
}

#[cfg(test)]
Expand All @@ -120,8 +142,11 @@ mod tests {
use super::*;

#[test]
fn test_shard_table() {
let mut table = ShardTable::default();
fn test_shard_table_with_nonempty_local_shards() {
let mut table = ShardTable {
self_node_id: "node-0".into(),
table: HashMap::default(),
};
assert!(!table.contains_entry("test-index", "test-source"));

table.insert_shards(
Expand Down Expand Up @@ -158,9 +183,57 @@ mod tests {
assert!(table.contains_entry("test-index", "test-source"));

let entry = table.find_entry("test-index", "test-source").unwrap();
assert_eq!(entry.shards.len(), 2);
assert_eq!(entry.shards[0].shard_id, 0);
assert_eq!(entry.shards[1].shard_id, 1);
assert_eq!(entry.len(), 2);
assert_eq!(entry.shards()[0].shard_id, 0);
assert_eq!(entry.shards()[1].shard_id, 1);
assert_eq!(entry.local_shards.len(), 1);
assert_eq!(entry.remote_shards.len(), 1);

assert_eq!(entry.next_shard_round_robin().shard_id, 0);
assert_eq!(entry.next_shard_round_robin().shard_id, 0);
}

#[test]
fn test_shard_table_with_empty_local_shards() {
let mut table = ShardTable {
self_node_id: "node-0".into(),
table: HashMap::default(),
};
assert!(!table.contains_entry("test-index", "test-source"));

table.insert_shards(
"test-index",
"test-source",
vec![
Shard {
index_uid: "test-index:0".to_string(),
shard_id: 0,
leader_id: "node-1".to_string(),
..Default::default()
},
Shard {
index_uid: "test-index:0".to_string(),
shard_id: 1,
leader_id: "node-2".to_string(),
..Default::default()
},
Shard {
index_uid: "test-index:0".to_string(),
shard_id: 2,
leader_id: "node-2".to_string(),
shard_state: ShardState::Closed as i32,
..Default::default()
},
],
);
assert!(table.contains_entry("test-index", "test-source"));

let entry = table.find_entry("test-index", "test-source").unwrap();
assert_eq!(entry.len(), 2);
assert_eq!(entry.shards()[0].shard_id, 0);
assert_eq!(entry.shards()[1].shard_id, 1);
assert_eq!(entry.local_shards.len(), 0);
assert_eq!(entry.remote_shards.len(), 2);

assert_eq!(entry.next_shard_round_robin().shard_id, 0);
assert_eq!(entry.next_shard_round_robin().shard_id, 1);
Expand Down

0 comments on commit 429e618

Please sign in to comment.