Skip to content

Commit

Permalink
Exposing the node capacity via chitchat
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Nov 6, 2023
1 parent a36180a commit dacaa38
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 58 deletions.
5 changes: 3 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ base64 = "0.21"
byte-unit = { version = "4", default-features = false, features = ["serde", "std"] }
bytes = { version = "1", features = ["serde"] }
bytestring = "1.3.0"
chitchat = "0.6"
chitchat = "0.7"
chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] }
clap = { version = "4.4.1", features = ["env", "string"] }
colored = "2.0.0"
Expand Down
18 changes: 10 additions & 8 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,15 +931,16 @@ impl ThroughputCalculator {

async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
let node_id: NodeId = config.node_id.clone().into();
let self_node = ClusterMember::new(
let self_node = ClusterMember {
node_id,
quickwit_cluster::GenerationId::now(),
false,
HashSet::new(),
config.gossip_advertise_addr,
config.grpc_advertise_addr,
Vec::new(),
);
generation_id: quickwit_cluster::GenerationId::now(),
is_ready: false,
enabled_services: HashSet::new(),
gossip_advertise_addr: config.gossip_advertise_addr,
grpc_advertise_addr: config.grpc_advertise_addr,
indexing_capacity: 0,
indexing_tasks: Vec::new(),
};
let cluster = Cluster::join(
config.cluster_id.clone(),
self_node,
Expand All @@ -949,5 +950,6 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
&ChannelTransport::default(),
)
.await?;

Ok(cluster)
}
19 changes: 10 additions & 9 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -229,7 +229,7 @@ impl Cluster {
}

/// Sets a key-value pair on the cluster node's state.
pub async fn set_self_key_value<K: Into<String>, V: Into<String>>(&self, key: K, value: V) {
pub async fn set_self_key_value(&self, key: impl Display, value: impl Display) {
self.chitchat()
.await
.lock()
Expand Down Expand Up @@ -519,15 +519,16 @@ pub async fn create_cluster_for_test_with_id(
) -> anyhow::Result<Cluster> {
let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], node_id).into();
let node_id: NodeId = format!("node_{node_id}").into();
let self_node = ClusterMember::new(
let self_node = ClusterMember {
node_id,
crate::GenerationId(1),
self_node_readiness,
enabled_services.clone(),
generation_id: crate::GenerationId(1),
is_ready: self_node_readiness,
enabled_services: enabled_services.clone(),
gossip_advertise_addr,
grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
Vec::new(),
);
grpc_advertise_addr: grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
indexing_tasks: Vec::new(),
indexing_capacity: 1_000,
};
let failure_detector_config = create_failure_detector_config_for_test();
let cluster = Cluster::join(
cluster_id,
Expand Down
26 changes: 20 additions & 6 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use crate::change::ClusterChange;
#[cfg(any(test, feature = "testsuite"))]
pub use crate::cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test};
pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema};
pub use crate::member::ClusterMember;
pub use crate::member::{ClusterMember, INDEXING_CAPACITY_KEY};
pub use crate::node::ClusterNode;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -68,15 +68,21 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
let node_id: NodeId = node_config.node_id.clone().into();
let generation_id = GenerationId::now();
let is_ready = false;
let self_node = ClusterMember::new(
let indexing_capacity = if node_config.is_service_enabled(QuickwitService::Indexer) {
node_config.indexer_config.capacity
} else {
0u32
};
let self_node = ClusterMember {
node_id,
generation_id,
is_ready,
node_config.enabled_services.clone(),
node_config.gossip_advertise_addr,
node_config.grpc_advertise_addr,
enabled_services: node_config.enabled_services.clone(),
gossip_advertise_addr: node_config.gossip_advertise_addr,
grpc_advertise_addr: node_config.grpc_advertise_addr,
indexing_tasks,
);
indexing_capacity,
};
let cluster = Cluster::join(
cluster_id,
self_node,
Expand All @@ -86,5 +92,13 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
&UdpTransport,
)
.await?;
if node_config
.enabled_services
.contains(&QuickwitService::Indexer)
{
cluster
.set_self_key_value(INDEXING_CAPACITY_KEY, indexing_capacity)
.await;
}
Ok(cluster)
}
51 changes: 25 additions & 26 deletions quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use chitchat::{ChitchatId, NodeState};
use itertools::Itertools;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::types::NodeId;
use tracing::warn;
use tracing::{error, warn};

use crate::{GenerationId, QuickwitService};

Expand All @@ -43,6 +43,8 @@ pub(crate) const READINESS_KEY: &str = "readiness";
pub(crate) const READINESS_VALUE_READY: &str = "READY";
pub(crate) const READINESS_VALUE_NOT_READY: &str = "NOT_READY";

pub const INDEXING_CAPACITY_KEY: &str = "indexing_capacity";

pub(crate) trait NodeStateExt {
fn grpc_advertise_addr(&self) -> anyhow::Result<SocketAddr>;

Expand Down Expand Up @@ -91,30 +93,13 @@ pub struct ClusterMember {
/// None if the node is not an indexer or the indexer has not yet started some indexing
/// pipelines.
pub indexing_tasks: Vec<IndexingTask>,
/// Indexing capacity of the node expressed in milli pipeline.
/// An indexer able to index 4 pipelines at full capacity should have a value of 4_000.
pub indexing_capacity: u32,
pub is_ready: bool,
}

impl ClusterMember {
pub fn new(
node_id: NodeId,
generation_id: GenerationId,
is_ready: bool,
enabled_services: HashSet<QuickwitService>,
gossip_advertise_addr: SocketAddr,
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
) -> Self {
Self {
node_id,
generation_id,
is_ready,
enabled_services,
gossip_advertise_addr,
grpc_advertise_addr,
indexing_tasks,
}
}

pub fn chitchat_id(&self) -> ChitchatId {
ChitchatId::new(
self.node_id.clone().into(),
Expand All @@ -130,6 +115,18 @@ impl From<ClusterMember> for ChitchatId {
}
}

fn parse_indexing_capacity(node_state: &NodeState) -> u32 {
let Some(indexing_capacity_str) = node_state.get(INDEXING_CAPACITY_KEY) else {
return 0;
};
if let Ok(indexing_capacity) = indexing_capacity_str.parse::<u32>() {
indexing_capacity
} else {
error!(indexing_capacity=?indexing_capacity_str, "Received an unparseable indexing capacity from node.");
0
}
}

// Builds a cluster member from a [`NodeState`].
pub(crate) fn build_cluster_member(
chitchat_id: ChitchatId,
Expand All @@ -150,15 +147,17 @@ pub(crate) fn build_cluster_member(
})?;
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
let indexing_tasks = parse_indexing_tasks(node_state, &chitchat_id.node_id);
let member = ClusterMember::new(
chitchat_id.node_id.into(),
chitchat_id.generation_id.into(),
let indexing_capacity = parse_indexing_capacity(node_state);
let member = ClusterMember {
node_id: chitchat_id.node_id.into(),
generation_id: chitchat_id.generation_id.into(),
is_ready,
enabled_services,
chitchat_id.gossip_advertise_addr,
gossip_advertise_addr: chitchat_id.gossip_advertise_addr,
grpc_advertise_addr,
indexing_tasks,
);
indexing_capacity,
};
Ok(member)
}

Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl ClusterNode {
enabled_services: member.enabled_services,
grpc_advertise_addr: member.grpc_advertise_addr,
indexing_tasks: member.indexing_tasks,
indexing_capacity: member.indexing_capacity,
is_ready: member.is_ready,
is_self_node,
};
Expand Down Expand Up @@ -112,6 +113,10 @@ impl ClusterNode {
&self.inner.indexing_tasks
}

pub fn indexing_capacity(&self) -> u32 {
self.inner.indexing_capacity
}

pub fn is_ready(&self) -> bool {
self.inner.is_ready
}
Expand Down Expand Up @@ -149,6 +154,7 @@ struct InnerNode {
enabled_services: HashSet<QuickwitService>,
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
indexing_capacity: u32,
is_ready: bool,
is_self_node: bool,
}
1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ humantime = { workspace = true }
itertools = { workspace = true }
json_comments = { workspace = true }
new_string_template = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
Expand Down
26 changes: 26 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct IndexerConfig {
pub enable_otlp_endpoint: bool,
#[serde(default = "IndexerConfig::default_enable_cooperative_indexing")]
pub enable_cooperative_indexing: bool,
#[serde(default = "IndexerConfig::default_capacity")]
pub capacity: u32,
}

impl IndexerConfig {
Expand All @@ -77,6 +79,13 @@ impl IndexerConfig {
12
}

/// Default capacity expressed in milli-"number of pipelines".
/// 4_000 means 4 pipeline at full capacity.
// TODO add some validation.
fn default_capacity() -> u32 {
num_cpus::get() as u32 * 1000u32 / 4u32
}

pub fn default_split_store_max_num_bytes() -> Byte {
Byte::from_bytes(100_000_000_000) // 100G
}
Expand All @@ -93,6 +102,7 @@ impl IndexerConfig {
split_store_max_num_bytes: Byte::from_bytes(1_000_000),
split_store_max_num_splits: 3,
max_concurrent_split_uploads: 4,
capacity: 1_000,
};
Ok(indexer_config)
}
Expand All @@ -106,6 +116,7 @@ impl Default for IndexerConfig {
split_store_max_num_bytes: Self::default_split_store_max_num_bytes(),
split_store_max_num_splits: Self::default_split_store_max_num_splits(),
max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(),
capacity: Self::default_capacity(),
}
}
}
Expand Down Expand Up @@ -368,3 +379,18 @@ impl NodeConfig {
serialize::node_config_for_test()
}
}

#[cfg(test)]
mod tests {
use crate::IndexerConfig;

#[test]
fn test_default_indexing_capacity() {
let default_indexer_config = IndexerConfig::default();
let capacity = default_indexer_config.capacity;
assert_eq!(capacity % 250, 0);
let quarter_of_pipeline = capacity / 250;
assert!(quarter_of_pipeline > 0);
assert!(quarter_of_pipeline < 64);
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ mod tests {
split_store_max_num_bytes: Byte::from_str("1T").unwrap(),
split_store_max_num_splits: 10_000,
max_concurrent_split_uploads: 8,
capacity: IndexerConfig::default_capacity(),
enable_cooperative_indexing: false,
}
);
Expand Down
8 changes: 3 additions & 5 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::indexing_scheduler::scheduling::{build_physical_indexing_plan, Load};
use crate::{IndexerNodeInfo, IndexerPool};

const PIPELINE_FULL_LOAD: Load = 1_000u32;
const LOAD_PER_NODE: Load = 4_000u32;

pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
if cfg!(any(test, feature = "testsuite")) {
Expand Down Expand Up @@ -185,7 +184,7 @@ impl IndexingScheduler {
// has happened.
pub(crate) fn schedule_indexing_plan_if_needed(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();
let mut indexers = self.get_indexers_from_indexer_pool();
let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();
if indexers.is_empty() {
warn!("No indexer available, cannot schedule an indexing plan.");
return;
Expand All @@ -195,9 +194,8 @@ impl IndexingScheduler {

let indexer_max_loads: FnvHashMap<String, Load> = indexers
.iter()
.map(|(indexer_id, _)| {
// TODO Get info from chitchat.
(indexer_id.to_string(), LOAD_PER_NODE)
.map(|(indexer_id, indexer_node_info)| {
(indexer_id.to_string(), indexer_node_info.indexing_capacity)
})
.collect();

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct SourceUid {
pub struct IndexerNodeInfo {
pub client: IndexingServiceClient,
pub indexing_tasks: Vec<IndexingTask>,
pub indexing_capacity: u32,
}

pub type IndexerPool = Pool<String, IndexerNodeInfo>;
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub fn test_indexer_change_stream(
IndexerNodeInfo {
client,
indexing_tasks,
indexing_capacity: 1_000,
},
))
}
Expand Down
Loading

0 comments on commit dacaa38

Please sign in to comment.