diff --git a/rs/cli/src/commands/subnet/replace.rs b/rs/cli/src/commands/subnet/replace.rs index 481c072cc..8c69645c5 100644 --- a/rs/cli/src/commands/subnet/replace.rs +++ b/rs/cli/src/commands/subnet/replace.rs @@ -1,5 +1,6 @@ use clap::{error::ErrorKind, Args}; use ic_types::PrincipalId; +use itertools::Itertools; use crate::{ commands::{AuthRequirement, ExecutableCommand}, @@ -52,6 +53,9 @@ impl ExecutableCommand for Replace { _ => SubnetTarget::FromNodesIds(self.nodes.clone()), }; + let runner = ctx.runner().await?; + let all_nodes = ctx.registry().await.nodes().await?.values().cloned().collect_vec(); + let subnet_manager = ctx.subnet_manager().await?; let subnet_change_response = subnet_manager .with_target(subnet_target) @@ -62,11 +66,10 @@ impl ExecutableCommand for Replace { self.exclude.clone().into(), self.only.clone(), self.include.clone().into(), + &all_nodes, ) .await?; - let runner = ctx.runner().await?; - if let Some(runner_proposal) = runner.propose_subnet_change(subnet_change_response, ctx.forum_post_link()).await? { let ic_admin = ctx.ic_admin().await?; ic_admin.propose_run(runner_proposal.cmd, runner_proposal.opts).await?; diff --git a/rs/cli/src/runner.rs b/rs/cli/src/runner.rs index 1ce4fad4e..3bb3d3aec 100644 --- a/rs/cli/src/runner.rs +++ b/rs/cli/src/runner.rs @@ -11,6 +11,7 @@ use decentralization::network::SubnetChangeRequest; use decentralization::network::SubnetQueryBy; use decentralization::subnets::NodesRemover; use decentralization::SubnetChangeResponse; +use futures::future::try_join3; use futures::TryFutureExt; use futures_util::future::try_join; use ic_management_backend::health::HealthStatusQuerier; @@ -141,6 +142,7 @@ impl Runner { replica_version: Option, other_args: Vec, ) -> anyhow::Result> { + let all_nodes = self.registry.nodes().await?.values().cloned().collect_vec(); let health_of_nodes = self.health_of_nodes().await?; let subnet_creation_data = self @@ -156,6 +158,7 @@ impl Runner { warn!("Will continue running as if no features were cordoned"); vec![] }), + &all_nodes, ) .await?; let subnet_creation_data = SubnetChangeResponse::new(&subnet_creation_data, &health_of_nodes, Some(motivation.clone())); @@ -523,8 +526,13 @@ impl Runner { }) .map(|(id, subnet)| (*id, subnet.clone())) .collect::>(); - let (available_nodes, health_of_nodes) = - try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await?; + let (all_nodes, available_nodes, health_of_nodes) = try_join3( + self.registry.nodes(), + self.registry.available_nodes().map_err(anyhow::Error::from), + self.health_client.nodes(), + ) + .await?; + let all_nodes = all_nodes.values().cloned().collect::>(); let subnets_change_responses = NetworkHealRequest::new(subnets_without_proposals) .heal_and_optimize( @@ -535,6 +543,7 @@ impl Runner { warn!("Will continue running as if no features were cordoned"); vec![] }), + &all_nodes, ) .await?; @@ -635,6 +644,7 @@ impl Runner { node: &Node, ensure_assigned: bool, cordoned_features: Vec, + all_nodes: &[Node], ) -> Option { let mut best_change: Option = None; @@ -648,6 +658,7 @@ impl Runner { 0, health_of_nodes, cordoned_features.clone(), + all_nodes, ) } else { SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![node.clone()], vec![]).resize( @@ -656,6 +667,7 @@ impl Runner { 0, health_of_nodes, cordoned_features.clone(), + all_nodes, ) }; @@ -706,15 +718,16 @@ impl Runner { let mut subnets = self.get_subnets(skip_subnets).await?; let (mut available_nodes, health_of_nodes) = self.get_available_and_healthy_nodes().await?; let all_node_operators = self.registry.operators().await?; - let all_nodes = self.registry.nodes().await?; + let all_nodes_map = self.registry.nodes().await?; + let all_nodes = all_nodes_map.values().cloned().collect_vec(); let all_nodes_grouped_by_operator = all_nodes - .values() + .iter() .cloned() - .into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal); + .into_group_map_by(|node| all_nodes_map.get(&node.principal).expect("Node should exist").operator.principal); let available_nodes_grouped_by_operator = available_nodes .iter() .map(|n| (*n).clone()) - .into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal); + .into_group_map_by(|node| all_nodes_map.get(&node.principal).expect("Node should exist").operator.principal); let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| { warn!("Failed to fetch cordoned features with error: {:?}", e); warn!("Will continue running as if no features were cordoned"); @@ -725,7 +738,7 @@ impl Runner { &all_node_operators, &all_nodes_grouped_by_operator, &available_nodes_grouped_by_operator, - &all_nodes, + &all_nodes_map, &subnets, ensure_assigned, ); @@ -760,6 +773,7 @@ impl Runner { node, ensure_assigned, cordoned_features.clone(), + &all_nodes, ) .await; @@ -872,9 +886,10 @@ impl Runner { None => change_request, }; + let all_nodes = self.registry.nodes().await?.values().cloned().collect_vec(); let health_of_nodes = self.health_of_nodes().await?; - let change = &change_request.rescue(&health_of_nodes, self.cordoned_features_fetcher.fetch().await?)?; + let change = &change_request.rescue(&health_of_nodes, self.cordoned_features_fetcher.fetch().await?, &all_nodes)?; let change = SubnetChangeResponse::new(change, &health_of_nodes, Some("Recovering subnet".to_string())); if change.node_ids_added.is_empty() && change.node_ids_removed.is_empty() { diff --git a/rs/cli/src/subnet_manager.rs b/rs/cli/src/subnet_manager.rs index 19e9fc6f6..5068019ea 100644 --- a/rs/cli/src/subnet_manager.rs +++ b/rs/cli/src/subnet_manager.rs @@ -14,6 +14,7 @@ use ic_management_types::HealthStatus; use ic_management_types::Node; use ic_types::PrincipalId; use indexmap::IndexMap; +use itertools::Itertools; use log::{info, warn}; use crate::cordoned_feature_fetcher::CordonedFeatureFetcher; @@ -123,6 +124,7 @@ impl SubnetManager { exclude: Option>, only: Vec, include: Option>, + all_nodes: &[Node], ) -> anyhow::Result { let subnet_query_by = self.get_subnet_query_by(self.target()?).await?; let mut motivations = vec![]; @@ -156,6 +158,7 @@ impl SubnetManager { &to_be_replaced, &health_of_nodes, self.cordoned_features_fetcher.fetch().await?, + all_nodes, )?; for n in change.removed().iter().filter(|n| !node_ids_unhealthy.contains(&n.principal)) { @@ -186,6 +189,7 @@ impl SubnetManager { health_of_nodes: &IndexMap, ) -> anyhow::Result { let registry = self.registry_instance.clone(); + let all_nodes = registry.nodes().await?.values().cloned().collect_vec(); let mut motivations = vec![]; let change = registry @@ -200,6 +204,7 @@ impl SubnetManager { 0, health_of_nodes, self.cordoned_features_fetcher.fetch().await?, + &all_nodes, )?; for n in change.removed().iter() { diff --git a/rs/cli/src/unit_tests/replace.rs b/rs/cli/src/unit_tests/replace.rs index ad5e19e47..36a860fa6 100644 --- a/rs/cli/src/unit_tests/replace.rs +++ b/rs/cli/src/unit_tests/replace.rs @@ -242,6 +242,7 @@ fn should_skip_cordoned_nodes() { let mut failed_scenarios = vec![]; let registry = Arc::new(registry); + let all_nodes = available_nodes.iter().chain(subnet.nodes.iter()).cloned().collect_vec(); let health_client = Arc::new(health_client); for (cordoned_features, should_succeed) in scenarios { let cordoned_features_clone = cordoned_features.clone(); @@ -260,7 +261,7 @@ fn should_skip_cordoned_nodes() { .with_target(crate::subnet_manager::SubnetTarget::FromNodesIds(vec![ subnet.nodes.first().unwrap().principal, ])) - .membership_replace(false, None, None, None, vec![], None), + .membership_replace(false, None, None, None, vec![], None, &all_nodes), ); // Assert diff --git a/rs/decentralization/src/lib.rs b/rs/decentralization/src/lib.rs index eed0eb5bf..995971e77 100644 --- a/rs/decentralization/src/lib.rs +++ b/rs/decentralization/src/lib.rs @@ -1,5 +1,7 @@ pub mod nakamoto; pub mod network; +#[cfg(test)] +mod network_tests; pub mod subnets; use indexmap::IndexMap; use itertools::Itertools; diff --git a/rs/decentralization/src/nakamoto/mod.rs b/rs/decentralization/src/nakamoto/mod.rs index 21dd611e9..e56951f5e 100644 --- a/rs/decentralization/src/nakamoto/mod.rs +++ b/rs/decentralization/src/nakamoto/mod.rs @@ -656,8 +656,9 @@ mod tests { let subnet_initial = new_test_subnet(0, 12, 1); let nodes_initial = subnet_initial.nodes.clone(); let nodes_available = new_test_nodes("spare", 1, 0); + let all_nodes = nodes_initial.iter().chain(nodes_available.iter()).cloned().collect::>(); - let extended_subnet = subnet_initial.subnet_with_more_nodes(1, &nodes_available).unwrap(); + let extended_subnet = subnet_initial.subnet_with_more_nodes(1, &nodes_available, &all_nodes).unwrap(); assert_eq!( extended_subnet.nodes, nodes_initial.iter().chain(nodes_available.iter()).cloned().collect::>() @@ -687,11 +688,9 @@ mod tests { ) ); let nodes_available = new_test_nodes_with_overrides("spare", 13, 3, 0, (&NodeFeature::Country, &["US", "RO", "JP"])); - let health_of_nodes = nodes_available - .iter() - .chain(subnet_initial.nodes.iter()) - .map(|n| (n.principal, HealthStatus::Healthy)) - .collect::>(); + let all_nodes = nodes_available.iter().chain(subnet_initial.nodes.iter()).cloned().collect::>(); + + let health_of_nodes = all_nodes.iter().map(|n| (n.principal, HealthStatus::Healthy)).collect::>(); println!( "initial {} Countries {:?}", @@ -704,7 +703,7 @@ mod tests { ); let subnet_change_req = SubnetChangeRequest::new(subnet_initial, nodes_available, Vec::new(), Vec::new(), Vec::new()); - let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![]).unwrap(); + let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![], &all_nodes).unwrap(); for log in subnet_change.after().run_log.iter() { println!("{}", log); } @@ -747,10 +746,8 @@ mod tests { ) ); let nodes_available = new_test_nodes_with_overrides("spare", 7, 2, 0, (&NodeFeature::NodeProvider, &["NP6", "NP7"])); - let health_of_nodes = nodes_available - .iter() - .map(|n| (n.principal, HealthStatus::Healthy)) - .collect::>(); + let all_nodes = nodes_available.iter().chain(subnet_initial.nodes.iter()).cloned().collect::>(); + let health_of_nodes = all_nodes.iter().map(|n| (n.principal, HealthStatus::Healthy)).collect::>(); println!( "initial {} NPs {:?}", @@ -763,7 +760,7 @@ mod tests { ); let subnet_change_req = SubnetChangeRequest::new(subnet_initial, nodes_available, Vec::new(), Vec::new(), Vec::new()); - let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![]).unwrap(); + let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![], &all_nodes).unwrap(); println!("Replacement run log:"); for line in subnet_change.after().run_log.iter() { println!("{}", line); @@ -803,11 +800,8 @@ mod tests { // There are 2 spare nodes, but both are DFINITY let nodes_available = new_test_nodes_with_overrides("spare", 7, 2, 2, (&NodeFeature::NodeProvider, &["NP6", "NP7"])); - let health_of_nodes = nodes_available - .iter() - .chain(subnet_initial.nodes.iter()) - .map(|n| (n.principal, HealthStatus::Healthy)) - .collect::>(); + let all_nodes = nodes_available.iter().chain(subnet_initial.nodes.iter()).cloned().collect::>(); + let health_of_nodes = all_nodes.iter().map(|n| (n.principal, HealthStatus::Healthy)).collect::>(); println!( "initial {} NPs {:?}", @@ -820,7 +814,7 @@ mod tests { ); let subnet_change_req = SubnetChangeRequest::new(subnet_initial, nodes_available, Vec::new(), Vec::new(), Vec::new()); - let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![]).unwrap(); + let subnet_change = subnet_change_req.optimize(2, &[], &health_of_nodes, vec![], &all_nodes).unwrap(); println!("Replacement run log:"); for line in subnet_change.after().run_log.iter() { @@ -880,6 +874,7 @@ mod tests { .filter(|n| n.subnet_id.is_none() && n.proposal.is_none()) .cloned() .collect::>(); + let all_nodes = available_nodes.iter().chain(subnet_all.nodes.iter()).cloned().collect::>(); subnet_healthy.check_business_rules().expect("Check business rules failed"); @@ -888,7 +883,7 @@ mod tests { let nakamoto_score_before = subnet_healthy.nakamoto_score(); println!("NakamotoScore before {}", nakamoto_score_before); - let extended_subnet = subnet_healthy.subnet_with_more_nodes(4, &available_nodes).unwrap(); + let extended_subnet = subnet_healthy.subnet_with_more_nodes(4, &available_nodes, &all_nodes).unwrap(); println!("{}", extended_subnet); let nakamoto_score_after = extended_subnet.nakamoto_score(); println!("NakamotoScore after {}", nakamoto_score_after); @@ -907,7 +902,7 @@ mod tests { let empty_subnet = DecentralizedSubnet::default(); let want_subnet_size = 13; - let new_subnet_result = empty_subnet.subnet_with_more_nodes(want_subnet_size, &available_nodes); + let new_subnet_result = empty_subnet.subnet_with_more_nodes(want_subnet_size, &available_nodes, &available_nodes); assert!(new_subnet_result.is_ok(), "error: {:?}", new_subnet_result.err()); let new_subnet = new_subnet_result.unwrap(); @@ -986,16 +981,15 @@ mod tests { .flat_map(PrincipalId::from_str) .collect_vec(); - let health_of_nodes = subnet - .nodes + let all_nodes = nodes_available.iter().chain(subnet.nodes.iter()).cloned().collect::>(); + let health_of_nodes = all_nodes .iter() - .map(|n| n.principal) - .chain(nodes_available.iter().map(|n| n.principal)) .map(|n| { - if unhealthy_principals.contains(&n) { - (n, HealthStatus::Dead) + let node_id = n.principal; + if unhealthy_principals.contains(&node_id) { + (node_id, HealthStatus::Dead) } else { - (n, HealthStatus::Healthy) + (node_id, HealthStatus::Healthy) } }) .collect::>(); @@ -1004,7 +998,7 @@ mod tests { important.insert(subnet.principal, subnet); let network_heal_response = NetworkHealRequest::new(important.clone()) - .heal_and_optimize(nodes_available.clone(), &health_of_nodes, vec![]) + .heal_and_optimize(nodes_available.clone(), &health_of_nodes, vec![], &all_nodes) .await .unwrap(); let result = network_heal_response.first().unwrap().clone(); @@ -1018,18 +1012,15 @@ mod tests { fn test_subnet_rescue() { let nodes_available = new_test_nodes("spare", 10, 1); let subnet_initial = new_test_subnet_with_overrides(0, 11, 7, 1, (&NodeFeature::Country, &["CH", "CA", "CA", "CA", "CA", "CA", "BE"])); - let health_of_nodes = nodes_available - .iter() - .chain(subnet_initial.nodes.iter()) - .map(|n| (n.principal, HealthStatus::Healthy)) - .collect::>(); + let all_nodes = nodes_available.iter().chain(subnet_initial.nodes.iter()).cloned().collect::>(); + let health_of_nodes = all_nodes.iter().map(|n| (n.principal, HealthStatus::Healthy)).collect::>(); let change_initial = SubnetChangeRequest::new(subnet_initial.clone(), nodes_available, Vec::new(), Vec::new(), Vec::new()); let with_keeping_features = change_initial .clone() .keeping_from_used(vec!["CH".to_string()]) - .rescue(&health_of_nodes, vec![]) + .rescue(&health_of_nodes, vec![], &all_nodes) .unwrap(); assert_eq!(with_keeping_features.added().len(), 4); @@ -1047,7 +1038,7 @@ mod tests { let with_keeping_principals = change_initial .clone() .keeping_from_used(vec!["CH".to_string()]) - .rescue(&health_of_nodes, vec![]) + .rescue(&health_of_nodes, vec![], &all_nodes) .unwrap(); assert_eq!(with_keeping_principals.added().len(), 4); @@ -1061,7 +1052,7 @@ mod tests { 1 ); - let rescue_all = change_initial.clone().rescue(&health_of_nodes, vec![]).unwrap(); + let rescue_all = change_initial.clone().rescue(&health_of_nodes, vec![], &all_nodes).unwrap(); assert_eq!(rescue_all.added().len(), 5); assert_eq!(rescue_all.removed().len(), 5); @@ -1070,14 +1061,11 @@ mod tests { #[test] fn test_resize() { let subnet_initial = new_test_subnet(0, 24, 0); - let health_of_nodes = subnet_initial - .nodes - .iter() - .map(|n| (n.principal, HealthStatus::Healthy)) - .collect::>(); + let all_nodes = subnet_initial.nodes.clone(); + let health_of_nodes = all_nodes.iter().map(|n| (n.principal, HealthStatus::Healthy)).collect::>(); let change_initial = SubnetChangeRequest::new(subnet_initial.clone(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); - let after_resize = change_initial.resize(2, 2, 0, &health_of_nodes, vec![]).unwrap(); + let after_resize = change_initial.resize(2, 2, 0, &health_of_nodes, vec![], &all_nodes).unwrap(); assert_eq!(subnet_initial.nodes.len(), after_resize.new_nodes.len()); diff --git a/rs/decentralization/src/network.rs b/rs/decentralization/src/network.rs index 75500471e..dce10a29c 100644 --- a/rs/decentralization/src/network.rs +++ b/rs/decentralization/src/network.rs @@ -33,14 +33,20 @@ pub struct DecentralizedSubnet { pub run_log: Vec, } -#[derive(Clone, Debug)] -struct ReplacementCandidate { - node: Node, +#[derive(Clone, Debug, Default, PartialEq)] +pub(crate) struct ReplacementCandidate { + pub(crate) node: Node, score: NakamotoScore, penalty: usize, business_rules_log: Vec, } +impl ReplacementCandidate { + pub fn new_with_node_for_tests(node: Node) -> Self { + Self { node, ..Default::default() } + } +} + impl DecentralizedSubnet { pub fn new_with_subnet_id_and_nodes(subnet_id: PrincipalId, nodes: Vec) -> Self { Self { @@ -327,7 +333,11 @@ impl DecentralizedSubnet { /// of current nodes. Since the node IDs are unique, we seed a PRNG /// with the sorted joined node IDs. We then choose a result /// randomly but deterministically using this seed. - fn choose_deterministic_random(best_results: &[ReplacementCandidate], current_nodes: &[Node]) -> Option { + pub(crate) fn choose_one_result( + best_results: &[ReplacementCandidate], + current_nodes: &[Node], + all_nodes: &[Node], + ) -> Option { if best_results.is_empty() { None } else { @@ -341,6 +351,52 @@ impl DecentralizedSubnet { } } + // If none of the best_results nodes are already in the subnet, + // sort the nodes by percentage of nodes that the node operator has in subnets + // and choose the one with the lowest percentage. + let num_nodes_per_operator = all_nodes.iter().fold(AHashMap::new(), |mut acc: AHashMap, n| { + *acc.entry(n.operator.principal).or_insert(0) += 1; + acc + }); + let num_nodes_assigned_to_subnets_per_operator = + all_nodes + .iter() + .filter(|n| n.subnet_id.is_some()) + .fold(AHashMap::new(), |mut acc: AHashMap, n| { + *acc.entry(n.operator.principal).or_insert(0) += 1; + acc + }); + let percent_assigned_nodes_per_operator = num_nodes_per_operator + .iter() + .map(|(operator, num_nodes)| { + let num_nodes_in_subnet = num_nodes_assigned_to_subnets_per_operator.get(operator).copied().unwrap_or_default(); + (*operator, 100. * num_nodes_in_subnet as f64 / *num_nodes as f64) + }) + .collect::>(); + let best_results = best_results + .iter() + .map(|r| { + let pct_x1000 = (percent_assigned_nodes_per_operator + .get(&r.node.operator.principal) + .copied() + .unwrap_or_default() + * 1000.) as u32; + let op_nodes = num_nodes_per_operator.get(&r.node.operator.principal).copied().unwrap_or_default() as i32; + (pct_x1000, -op_nodes, r) + }) + // sorted_by_key sorts ascending, so we negate the number of nodes + // we prefer candidate nodes from operators with: + // - the lowest percentage of nodes assigned to subnet + // - highest number of nodes total (to prefer operators with more nodes) + .sorted_by_key(|(pct_x1000, neg_op_nodes, _res)| (*pct_x1000, *neg_op_nodes)) + .collect_vec(); + // filter all the results with the same lowest percentage + let best_results = best_results + .iter() + .take_while(|(pct, neg_op_nodes, _res)| *pct == best_results[0].0 && *neg_op_nodes == best_results[0].1) + .map(|(_pct, _neg_op_nodes, res)| (*res).clone()) + .collect::>(); + // We sort the current nodes by alphabetical order on their // PrincipalIDs to ensure consistency of the seed with the // same machines in the subnet @@ -366,7 +422,12 @@ impl DecentralizedSubnet { } /// Pick the best result amongst the list of "suitable" candidates. - fn choose_best_candidate(&self, candidates: Vec, run_log: &mut Vec) -> Option { + fn choose_best_candidate( + &self, + candidates: Vec, + run_log: &mut Vec, + all_nodes: &[Node], + ) -> Option { // First, sort the candidates by their Nakamoto Coefficients let candidates = candidates .into_iter() @@ -432,11 +493,11 @@ impl DecentralizedSubnet { // // This approach also has the advantage of not favoring one NP over // an other, regardless of the Node PrincipalID - DecentralizedSubnet::choose_deterministic_random(&best_results, &self.nodes) + DecentralizedSubnet::choose_one_result(&best_results, &self.nodes, all_nodes) } /// Add nodes to a subnet in a way that provides the best decentralization. - pub fn subnet_with_more_nodes(self, how_many_nodes: usize, available_nodes: &[Node]) -> anyhow::Result { + pub fn subnet_with_more_nodes(self, how_many_nodes: usize, available_nodes: &[Node], all_nodes: &[Node]) -> anyhow::Result { let mut run_log = self.run_log.clone(); let mut nodes_initial = self.nodes.clone(); @@ -464,7 +525,7 @@ impl DecentralizedSubnet { .collect(); let mut candidate_run_log = Vec::new(); - match self.choose_best_candidate(suitable_candidates, &mut candidate_run_log) { + match self.choose_best_candidate(suitable_candidates, &mut candidate_run_log, all_nodes) { Some(best_result) => { // Append the complete run log run_log.extend( @@ -528,7 +589,7 @@ impl DecentralizedSubnet { /// Remove nodes from a subnet in a way that provides the best /// decentralization. - pub fn subnet_with_fewer_nodes(mut self, how_many_nodes: usize) -> anyhow::Result { + pub fn subnet_with_fewer_nodes(mut self, how_many_nodes: usize, all_nodes: &[Node]) -> anyhow::Result { let mut run_log = self.run_log.clone(); let nodes_initial_len = self.nodes.len(); let mut comment = None; @@ -552,7 +613,7 @@ impl DecentralizedSubnet { .collect(); let mut candidate_run_log = Vec::new(); - match self.choose_best_candidate(suitable_candidates, &mut candidate_run_log) { + match self.choose_best_candidate(suitable_candidates, &mut candidate_run_log, all_nodes) { Some(best_result) => { // Append the complete run log run_log.extend( @@ -766,6 +827,7 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier + Sync { only_nodes: Vec, health_of_nodes: &'a IndexMap, cordoned_features: Vec, + all_nodes: &'a [Node], ) -> BoxFuture<'a, Result> { Box::pin(async move { SubnetChangeRequest { @@ -775,7 +837,7 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier + Sync { .including_from_available(include_nodes) .excluding_from_available(exclude_nodes) .including_from_available(only_nodes) - .resize(size, 0, 0, health_of_nodes, cordoned_features) + .resize(size, 0, 0, health_of_nodes, cordoned_features, all_nodes) }) } } @@ -921,6 +983,7 @@ impl SubnetChangeRequest { replacements_unhealthy: &[Node], health_of_nodes: &IndexMap, cordoned_features: Vec, + all_nodes: &[Node], ) -> Result { let old_nodes = self.subnet.nodes.clone(); self.subnet = self.subnet.without_nodes(replacements_unhealthy)?; @@ -930,6 +993,7 @@ impl SubnetChangeRequest { replacements_unhealthy.len(), health_of_nodes, cordoned_features, + all_nodes, )?; Ok(SubnetChange { old_nodes, ..result }) } @@ -938,6 +1002,7 @@ impl SubnetChangeRequest { mut self, health_of_nodes: &IndexMap, cordoned_features: Vec, + all_nodes: &[Node], ) -> Result { let old_nodes = self.subnet.nodes.clone(); let nodes_to_remove = self @@ -956,6 +1021,7 @@ impl SubnetChangeRequest { self.subnet.removed_nodes.len(), health_of_nodes, cordoned_features, + all_nodes, )?; Ok(SubnetChange { old_nodes, ..result }) } @@ -968,6 +1034,7 @@ impl SubnetChangeRequest { how_many_nodes_unhealthy: usize, health_of_nodes: &IndexMap, cordoned_features: Vec, + all_nodes: &[Node], ) -> Result { let old_nodes = self.subnet.nodes.clone(); @@ -1009,7 +1076,7 @@ impl SubnetChangeRequest { let resized_subnet = if how_many_nodes_to_remove > 0 { self.subnet .clone() - .subnet_with_fewer_nodes(how_many_nodes_to_remove) + .subnet_with_fewer_nodes(how_many_nodes_to_remove, all_nodes) .map_err(|e| NetworkError::ResizeFailed(e.to_string()))? } else { self.subnet.clone() @@ -1038,7 +1105,7 @@ impl SubnetChangeRequest { let resized_subnet = resized_subnet .with_nodes(&self.include_nodes) .without_nodes(&self.nodes_to_remove)? - .subnet_with_more_nodes(how_many_nodes_to_add, &available_nodes) + .subnet_with_more_nodes(how_many_nodes_to_add, &available_nodes, all_nodes) .map_err(|e| NetworkError::ResizeFailed(e.to_string()))? .without_duplicate_added_removed(); @@ -1072,8 +1139,9 @@ impl SubnetChangeRequest { self, health_of_nodes: &IndexMap, cordoned_features: Vec, + all_nodes: &[Node], ) -> Result { - self.resize(0, 0, 0, health_of_nodes, cordoned_features) + self.resize(0, 0, 0, health_of_nodes, cordoned_features, all_nodes) } } @@ -1197,6 +1265,7 @@ impl NetworkHealRequest { mut available_nodes: Vec, health_of_nodes: &IndexMap, cordoned_features: Vec, + all_nodes: &[Node], ) -> Result, NetworkError> { let mut subnets_changed = Vec::new(); let subnets_to_heal = unhealthy_with_nodes(&self.subnets, health_of_nodes) @@ -1261,7 +1330,13 @@ impl NetworkHealRequest { .filter_map(|num_nodes_to_optimize| { change_req .clone() - .optimize(num_nodes_to_optimize, &unhealthy_nodes, health_of_nodes, cordoned_features.clone()) + .optimize( + num_nodes_to_optimize, + &unhealthy_nodes, + health_of_nodes, + cordoned_features.clone(), + all_nodes, + ) .map_err(|e| warn!("{}", e)) .ok() }) diff --git a/rs/decentralization/src/network_tests.rs b/rs/decentralization/src/network_tests.rs new file mode 100644 index 000000000..1d8c261a9 --- /dev/null +++ b/rs/decentralization/src/network_tests.rs @@ -0,0 +1,142 @@ +#[cfg(test)] +mod tests { + use ic_base_types::PrincipalId; + use ic_management_types::{Node, NodeFeatures, Operator}; + + use crate::network::{DecentralizedSubnet, ReplacementCandidate}; + + #[test] + fn test_empty_best_results() { + let subnet = DecentralizedSubnet::new_with_subnet_id_and_nodes(PrincipalId::new_subnet_test_id(1), vec![]); + let result = DecentralizedSubnet::choose_one_result(&[], &subnet.nodes, &[]); + assert!(result.is_none()); + } + + #[test] + fn test_best_results_already_in_subnet() { + let node1 = Node::new_test_node(1, NodeFeatures::default(), true); + let subnet = DecentralizedSubnet::new_with_subnet_id_and_nodes(PrincipalId::new_subnet_test_id(1), vec![node1.clone()]); + let best_results = vec![ReplacementCandidate::new_with_node_for_tests(node1.clone())]; + let all_nodes = vec![node1.clone()]; + + let result = DecentralizedSubnet::choose_one_result(&best_results, &subnet.nodes, &all_nodes); + + // Should return the node already in the subnet. + assert_eq!(result.unwrap().node.principal, node1.principal); + } + + #[test] + fn test_none_of_best_results_in_current_nodes() { + let node1 = Node::new_test_node(1, NodeFeatures::default(), true); + let node2 = Node::new_test_node(2, NodeFeatures::default(), true); + let node3 = Node::new_test_node(3, NodeFeatures::default(), true); + + let subnet = DecentralizedSubnet::new_with_subnet_id_and_nodes(PrincipalId::new_subnet_test_id(1), vec![node1.clone()]); + + let best_results = vec![ + ReplacementCandidate::new_with_node_for_tests(node2.clone()), + ReplacementCandidate::new_with_node_for_tests(node3.clone()), + ]; + + let all_nodes = vec![node1.clone(), node2.clone(), node3.clone()]; + + let result = DecentralizedSubnet::choose_one_result(&best_results, &subnet.nodes, &all_nodes); + assert!(result.is_some()); + } + + #[test] + fn test_operator_percentage_tie_break() { + let op1 = Operator { + principal: PrincipalId::new_user_test_id(1), + ..Default::default() + }; + let op2 = Operator { + principal: PrincipalId::new_user_test_id(2), + ..Default::default() + }; + let subnet_1_id = PrincipalId::new_subnet_test_id(1); + + // op1 and op3 each have 1 node, op2 has 2 nodes + // op3 should be chosen because it has lower percentage of nodes + let node1 = Node::new_test_node(1, NodeFeatures::default(), true) + .with_operator(op1.clone()) + .with_subnet_id(subnet_1_id); + let node2 = Node::new_test_node(2, NodeFeatures::default(), true).with_operator(op1.clone()); + let node3 = Node::new_test_node(3, NodeFeatures::default(), true).with_operator(op1.clone()); + let node4 = Node::new_test_node(4, NodeFeatures::default(), true).with_operator(op2.clone()); + + let subnet = DecentralizedSubnet::new_with_subnet_id_and_nodes(subnet_1_id, vec![node1.clone()]); + + let best_results = vec![ + ReplacementCandidate::new_with_node_for_tests(node2.clone()), + ReplacementCandidate::new_with_node_for_tests(node3.clone()), + ReplacementCandidate::new_with_node_for_tests(node4.clone()), + ]; + + let all_nodes = vec![node1.clone(), node2.clone(), node3.clone(), node4.clone()]; + + let result = DecentralizedSubnet::choose_one_result(&best_results, &subnet.nodes, &all_nodes); + + // op2 has no nodes assigned to subnets, so it should be picked + assert_eq!(result.unwrap().node.principal, node4.principal); + } + + #[test] + fn test_tie_break_with_equal_percentages() { + // When operators have the same percentage, the function should fallback to deterministic random selection. + + let op1 = Operator { + principal: PrincipalId::new_user_test_id(1), + ..Default::default() + }; + let op2 = Operator { + principal: PrincipalId::new_user_test_id(2), + ..Default::default() + }; + + let node1 = Node::new_test_node(1, NodeFeatures::default(), true) + .with_operator(op1.clone()) + .with_subnet_id(PrincipalId::new_subnet_test_id(1)); + let node2 = Node::new_test_node(2, NodeFeatures::default(), true).with_operator(op1.clone()); + let node3 = Node::new_test_node(3, NodeFeatures::default(), true).with_operator(op2.clone()); + let node4 = Node::new_test_node(4, NodeFeatures::default(), true).with_operator(op2.clone()); + + let subnet = DecentralizedSubnet::new_with_subnet_id_and_nodes(PrincipalId::new_subnet_test_id(1), vec![node1.clone()]); + + let best_results = vec![ + ReplacementCandidate::new_with_node_for_tests(node2.clone()), + ReplacementCandidate::new_with_node_for_tests(node3.clone()), + ReplacementCandidate::new_with_node_for_tests(node4.clone()), + ]; + + let all_nodes = vec![node1.clone(), node2.clone(), node3.clone(), node4.clone()]; + + let result1 = DecentralizedSubnet::choose_one_result(&best_results, &subnet.nodes, &all_nodes); + let result2 = DecentralizedSubnet::choose_one_result(&best_results, &subnet.nodes, &all_nodes); + + // Ensure deterministic selection with equal percentages. + assert_eq!(result1, result2); + } + + #[test] + fn test_deterministic_random_selection() { + let node1 = Node::new_test_node(1, NodeFeatures::default(), true); + let node2 = Node::new_test_node(2, NodeFeatures::default(), true); + let node3 = Node::new_test_node(3, NodeFeatures::default(), true); + + let subnet = DecentralizedSubnet::new_with_subnet_id_and_nodes(PrincipalId::new_subnet_test_id(1), vec![node1.clone()]); + + let best_results = vec![ + ReplacementCandidate::new_with_node_for_tests(node2.clone()), + ReplacementCandidate::new_with_node_for_tests(node3.clone()), + ]; + + let all_nodes = vec![node1.clone(), node2.clone(), node3.clone()]; + + let result1 = DecentralizedSubnet::choose_one_result(&best_results, &subnet.nodes, &all_nodes); + let result2 = DecentralizedSubnet::choose_one_result(&best_results, &subnet.nodes, &all_nodes); + + // Deterministic behavior ensures the same result is chosen for the same input. + assert_eq!(result1, result2); + } +} diff --git a/rs/ic-management-types/src/lib.rs b/rs/ic-management-types/src/lib.rs index 639f6253b..01543e9a6 100644 --- a/rs/ic-management-types/src/lib.rs +++ b/rs/ic-management-types/src/lib.rs @@ -328,6 +328,15 @@ impl Node { ..Default::default() } } + pub fn with_operator(self, operator: Operator) -> Self { + Node { operator, ..self } + } + pub fn with_subnet_id(self, subnet_id: PrincipalId) -> Self { + Node { + subnet_id: Some(subnet_id), + ..self + } + } pub fn get_features(&self) -> NodeFeatures { let features = if let Some(features) = &self.cached_features.get() { // Return a clone of the cached value, if it exists