From 2069c1337238e282eccbeadab94f3f390a63f6fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C5=A1a=20Tomi=C4=87?= Date: Wed, 13 Nov 2024 17:09:47 +0100 Subject: [PATCH] refactor(dre): replace "decentralized" node with backend node (#1079) --- Cargo.Bazel.lock | 18 +- Cargo.lock | 1 + rs/cli/src/commands/registry.rs | 5 +- rs/cli/src/operations/hostos_rollout.rs | 92 ++++--- rs/cli/src/qualification/run_workload_test.rs | 2 +- rs/cli/src/qualification/run_xnet_test.rs | 2 +- rs/cli/src/qualification/upgrade_subnets.rs | 2 +- rs/cli/src/runner.rs | 38 ++- rs/cli/src/subnet_manager.rs | 29 +-- rs/cli/src/unit_tests/replace.rs | 68 +++--- rs/decentralization/src/lib.rs | 12 +- rs/decentralization/src/nakamoto/mod.rs | 101 +++----- rs/decentralization/src/network.rs | 227 ++++-------------- rs/decentralization/src/subnets.rs | 7 +- .../src/endpoints/query_decentralization.rs | 9 +- rs/ic-management-backend/src/lazy_registry.rs | 65 ++--- rs/ic-management-backend/src/registry.rs | 54 ++--- rs/ic-management-backend/src/subnets.rs | 18 +- rs/ic-management-types/Cargo.toml | 1 + rs/ic-management-types/src/lib.rs | 212 +++++++++++++++- 20 files changed, 458 insertions(+), 505 deletions(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index d52c0826c..692ccbefa 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "5808defd7f9c6be1d143a5ea8a53f6c33f0b47a6b1b95f16e0ade249061332c8", + "checksum": "93b8abc5567d6cead03998cd02aa972b598bce15ac572479e28cd8e10af65ad3", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -19811,6 +19811,10 @@ "id": "actix-web 4.9.0", "target": "actix_web" }, + { + "id": "ahash 0.8.11", + "target": "ahash" + }, { "id": "anyhow 1.0.93", "target": "anyhow" @@ -28166,35 +28170,29 @@ ], "crate_features": { "common": [ + "elf", + "errno", "general", "ioctl", "no_std" ], "selects": { "aarch64-unknown-linux-gnu": [ - "elf", - "errno", "prctl", "std", "system" ], "arm-unknown-linux-gnueabi": [ - "elf", - "errno", "prctl", "std", "system" ], "armv7-unknown-linux-gnueabi": [ - "elf", - "errno", "prctl", "std", "system" ], "i686-unknown-linux-gnu": [ - "elf", - "errno", "prctl", "std", "system" @@ -28210,8 +28208,6 @@ "system" ], "x86_64-unknown-linux-gnu": [ - "elf", - "errno", "prctl", "std", "system" diff --git a/Cargo.lock b/Cargo.lock index d4e920fb4..00ee05e4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3995,6 +3995,7 @@ name = "ic-management-types" version = "0.5.6" dependencies = [ "actix-web", + "ahash 0.8.11", "anyhow", "candid", "chrono", diff --git a/rs/cli/src/commands/registry.rs b/rs/cli/src/commands/registry.rs index fc0c2b4ad..35f56c16c 100644 --- a/rs/cli/src/commands/registry.rs +++ b/rs/cli/src/commands/registry.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, + net::Ipv6Addr, path::PathBuf, str::FromStr, sync::Arc, @@ -314,11 +315,11 @@ async fn _get_nodes( NodeDetails { node_id: *k, xnet: Some(ConnectionEndpoint { - ip_addr: record.ip_addr.to_string(), + ip_addr: record.ip_addr.unwrap_or(Ipv6Addr::LOCALHOST).to_string(), port: 2497, }), http: Some(ConnectionEndpoint { - ip_addr: record.ip_addr.to_string(), + ip_addr: record.ip_addr.unwrap_or(Ipv6Addr::LOCALHOST).to_string(), port: 8080, }), node_operator_id, diff --git a/rs/cli/src/operations/hostos_rollout.rs b/rs/cli/src/operations/hostos_rollout.rs index be31a8a1c..2414d307a 100644 --- a/rs/cli/src/operations/hostos_rollout.rs +++ b/rs/cli/src/operations/hostos_rollout.rs @@ -292,8 +292,7 @@ impl HostosRollout { } nodes .iter() - .filter(|n| { - let node = decentralization::network::Node::from(n.to_owned()); + .filter(|node| { for filt in self.only_filter.iter() { if node.matches_feature_value(filt) { return true; @@ -311,8 +310,7 @@ impl HostosRollout { } nodes .iter() - .filter(|n| { - let node = decentralization::network::Node::from(n.to_owned()); + .filter(|node| { for filt in self.exclude_filter.iter() { if node.matches_feature_value(filt) { return false; @@ -440,50 +438,47 @@ impl HostosRollout { } } } - NodeAssignment::All => { - use HostosRolloutResponse::{None, Ok}; - try_join( - self.with_nodes_health_and_open_proposals( - nodes_health.clone(), - nodes_with_open_proposals.clone(), - update_group.clone().with_assignment(NodeAssignment::Assigned), - ), - self.with_nodes_health_and_open_proposals( - nodes_health.clone(), - nodes_with_open_proposals.clone(), - update_group.clone().with_assignment(NodeAssignment::Unassigned), - ), - ) - .await - .map(|response| match response { - (Ok(assigned_nodes, subnet_affected), None(reason)) => { - info!("No unassigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason); - Ok(assigned_nodes, subnet_affected) - } - (None(reason), Ok(unassigned_nodes, _)) => { - info!("No assigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason); - Ok(unassigned_nodes, Option::None) - } + NodeAssignment::All => try_join( + self.with_nodes_health_and_open_proposals( + nodes_health.clone(), + nodes_with_open_proposals.clone(), + update_group.clone().with_assignment(NodeAssignment::Assigned), + ), + self.with_nodes_health_and_open_proposals( + nodes_health.clone(), + nodes_with_open_proposals.clone(), + update_group.clone().with_assignment(NodeAssignment::Unassigned), + ), + ) + .await + .map(|response| match response { + (HostosRolloutResponse::Ok(assigned_nodes, subnet_affected), HostosRolloutResponse::None(reason)) => { + info!("No unassigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason); + HostosRolloutResponse::Ok(assigned_nodes, subnet_affected) + } + (HostosRolloutResponse::None(reason), HostosRolloutResponse::Ok(unassigned_nodes, _)) => { + info!("No assigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason); + HostosRolloutResponse::Ok(unassigned_nodes, Option::None) + } - (Ok(assigned_nodes, subnet_affected), Ok(unassigned_nodes, _)) => { - info!( - "{} assigned nodes and {} unassigned nodes selected for: {}", - assigned_nodes.len(), - unassigned_nodes.len(), - update_group.node_group - ); - Ok(assigned_nodes.into_iter().chain(unassigned_nodes).collect(), subnet_affected.clone()) - } + (HostosRolloutResponse::Ok(assigned_nodes, subnet_affected), HostosRolloutResponse::Ok(unassigned_nodes, _)) => { + info!( + "{} assigned nodes and {} unassigned nodes selected for: {}", + assigned_nodes.len(), + unassigned_nodes.len(), + update_group.node_group + ); + HostosRolloutResponse::Ok(assigned_nodes.into_iter().chain(unassigned_nodes).collect(), subnet_affected.clone()) + } - (None(assigned_reason), None(unassigned_reason)) => { - info!( - "No candidate nodes selected for: {:?} ==> {:?} {:?}", - update_group.node_group, assigned_reason, unassigned_reason - ); - None(assigned_reason.into_iter().chain(unassigned_reason).collect()) - } - }) - } + (HostosRolloutResponse::None(assigned_reason), HostosRolloutResponse::None(unassigned_reason)) => { + info!( + "No candidate nodes selected for: {:?} ==> {:?} {:?}", + update_group.node_group, assigned_reason, unassigned_reason + ); + HostosRolloutResponse::None(assigned_reason.into_iter().chain(unassigned_reason).collect()) + } + }), } } @@ -538,7 +533,7 @@ pub mod test { use ic_management_backend::proposal::ProposalAgentImpl; use ic_management_types::{Network, Node, Operator, Provider, Subnet}; use std::collections::BTreeMap; - use std::net::Ipv6Addr; + use std::sync::OnceLock; use super::*; @@ -693,7 +688,7 @@ pub mod test { for i in start_at_number..start_at_number + num_nodes { let node = Node { principal: PrincipalId::new_node_test_id(i), - ip_addr: Ipv6Addr::LOCALHOST, + ip_addr: None, operator: Operator { principal: PrincipalId::new_node_test_id(i), provider: Provider { @@ -706,6 +701,7 @@ pub mod test { rewardable_nodes: BTreeMap::new(), ipv6: "".to_string(), }, + cached_features: OnceLock::new(), hostname: None, hostos_release: None, proposal: None, diff --git a/rs/cli/src/qualification/run_workload_test.rs b/rs/cli/src/qualification/run_workload_test.rs index e1c35e47e..69b783848 100644 --- a/rs/cli/src/qualification/run_workload_test.rs +++ b/rs/cli/src/qualification/run_workload_test.rs @@ -43,7 +43,7 @@ impl Step for Workload { .find(|s| s.subnet_type.eq(&SubnetType::Application)) .ok_or(anyhow::anyhow!("Application subnet required for step `{}`", self.name()))?; - let all_ipv6 = subnet.nodes.iter().map(|n| n.ip_addr).collect_vec(); + let all_ipv6 = subnet.nodes.iter().map(|n| n.ip_addr.unwrap()).collect_vec(); let args = &[ all_ipv6.iter().map(|ip| format!("http://[{}]:8080/", ip)).join(","), "-m=UpdateCounter".to_string(), diff --git a/rs/cli/src/qualification/run_xnet_test.rs b/rs/cli/src/qualification/run_xnet_test.rs index b6233de16..bc857e383 100644 --- a/rs/cli/src/qualification/run_xnet_test.rs +++ b/rs/cli/src/qualification/run_xnet_test.rs @@ -58,7 +58,7 @@ impl Step for RunXnetTest { let args = &[ "--nns_url".to_string(), - format!("http://[{}]:8080/", nns_node.ip_addr), + format!("http://[{}]:8080/", nns_node.ip_addr.unwrap()), "--subnets".to_string(), NUM_SUBNETS.to_string(), "--principal_key".to_string(), diff --git a/rs/cli/src/qualification/upgrade_subnets.rs b/rs/cli/src/qualification/upgrade_subnets.rs index d797200d5..f2595c30d 100644 --- a/rs/cli/src/qualification/upgrade_subnets.rs +++ b/rs/cli/src/qualification/upgrade_subnets.rs @@ -195,7 +195,7 @@ async fn wait_for_subnet_revision(ctx: &StepCtx, subnet: Option, re // Fetch the metrics of each node and check if it // contains the revision somewhere for node in nodes { - let url = format!("http://[{}]:9090/metrics", node.ip_addr); + let url = format!("http://[{}]:9090/metrics", node.ip_addr.unwrap()); let response = match client.get(&url).send().await { Ok(r) => match r.error_for_status() { diff --git a/rs/cli/src/runner.rs b/rs/cli/src/runner.rs index 0dc119175..1ce4fad4e 100644 --- a/rs/cli/src/runner.rs +++ b/rs/cli/src/runner.rs @@ -479,10 +479,9 @@ impl Runner { // Values for nr in &node_removals { let mut row = tabular::Row::new(); - let decentralization_node = decentralization::network::Node::from(&nr.node); row.add_cell(nr.node.principal); for nf in NodeFeature::variants() { - row.add_cell(decentralization_node.get_feature(&nf)); + row.add_cell(nr.node.get_feature(&nf).expect("Feature should exist")); } row.add_cell(nr.node.hostname.clone().unwrap_or_else(|| "N/A".to_string())); row.add_cell(nr.reason.message()); @@ -567,15 +566,15 @@ impl Runner { .collect::>()) } - async fn get_available_and_healthy_nodes(&self) -> anyhow::Result<(Vec, IndexMap)> { + async fn get_available_and_healthy_nodes(&self) -> anyhow::Result<(Vec, IndexMap)> { try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await } fn get_operators_to_optimize( &self, node_operators_all: &IndexMap, - all_nodes_grouped_by_operator: &HashMap>, - available_nodes_grouped_by_operator: &HashMap>, + all_nodes_grouped_by_operator: &HashMap>, + available_nodes_grouped_by_operator: &HashMap>, nodes_all: &IndexMap, subnets: &IndexMap, ensure_assigned: bool, @@ -590,9 +589,9 @@ impl Runner { .filter_map(|(operator_id, operator)| { all_nodes_grouped_by_operator.get(operator_id).and_then(|operator_nodes| { let condition = if ensure_assigned { - operator_nodes.iter().all(|node| !nodes_in_subnets.contains_key(&node.id)) + operator_nodes.iter().all(|node| !nodes_in_subnets.contains_key(&node.principal)) } else { - operator_nodes.iter().all(|node| nodes_in_subnets.contains_key(&node.id)) + operator_nodes.iter().all(|node| nodes_in_subnets.contains_key(&node.principal)) }; if condition { @@ -601,12 +600,12 @@ impl Runner { .get(operator_id) .unwrap_or(&vec![]) .iter() - .map(|node| nodes_all.get(&node.id).expect("Node should exist").clone()) + .map(|node| nodes_all.get(&node.principal).expect("Node should exist").clone()) .collect::>() } else { operator_nodes .iter() - .filter_map(|node| nodes_in_subnets.get(&node.id)) + .filter_map(|node| nodes_in_subnets.get(&node.principal)) .map(|n| (*n).clone()) .collect::>() }; @@ -631,20 +630,19 @@ impl Runner { async fn get_best_change_for_operator( &self, subnets: &IndexMap, - available_nodes: &[decentralization::network::Node], + available_nodes: &[Node], health_of_nodes: &IndexMap, node: &Node, ensure_assigned: bool, cordoned_features: Vec, ) -> Option { - let decentr_node = decentralization::network::Node::from(node); let mut best_change: Option = None; for subnet in subnets.values() { let subnet = DecentralizedSubnet::from(subnet); let subnet_id_short = subnet.id.to_string().split_once('-').unwrap().0.to_string(); let change_request = if ensure_assigned { - SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![decentr_node.clone()], vec![], vec![]).resize( + SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![node.clone()], vec![], vec![]).resize( 0, 1, 0, @@ -652,7 +650,7 @@ impl Runner { cordoned_features.clone(), ) } else { - SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![decentr_node.clone()], vec![]).resize( + SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![node.clone()], vec![]).resize( 1, 0, 0, @@ -711,12 +709,12 @@ impl Runner { let all_nodes = self.registry.nodes().await?; let all_nodes_grouped_by_operator = all_nodes .values() - .map(decentralization::network::Node::from) - .into_group_map_by(|node| all_nodes.get(&node.id).expect("Node should exist").operator.principal); + .cloned() + .into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal); let available_nodes_grouped_by_operator = available_nodes .iter() .map(|n| (*n).clone()) - .into_group_map_by(|node| all_nodes.get(&node.id).expect("Node should exist").operator.principal); + .into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal); let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| { warn!("Failed to fetch cordoned features with error: {:?}", e); warn!("Will continue running as if no features were cordoned"); @@ -786,7 +784,7 @@ impl Runner { .await?, ); subnets.shift_remove(&change.subnet_id.expect("Subnet ID should be present")); - available_nodes.retain(|n| n.id != node.principal); + available_nodes.retain(|n| n.principal != node.principal); } else { warn!( "{} node {} of the operator {} in DC {} would worsen decentralization in all subnets!", @@ -824,7 +822,7 @@ impl Runner { ) -> anyhow::Result<()> { let subnet_before = match override_subnet_nodes { Some(nodes) => { - let nodes = self.registry.get_decentralized_nodes(&nodes).await?; + let nodes = self.registry.get_nodes_from_ids(&nodes).await?; DecentralizedSubnet::new_with_subnet_id_and_nodes(change.subnet_id, nodes) } None => self @@ -837,11 +835,11 @@ impl Runner { let health_of_nodes = self.health_of_nodes().await?; // Simulate node removal - let removed_nodes = self.registry.get_decentralized_nodes(&change.get_removed_node_ids()).await?; + let removed_nodes = self.registry.get_nodes_from_ids(&change.get_removed_node_ids()).await?; let subnet_mid = subnet_before.without_nodes(&removed_nodes).map_err(|e| anyhow::anyhow!(e))?; // Now simulate node addition - let added_nodes = self.registry.get_decentralized_nodes(&change.get_added_node_ids()).await?; + let added_nodes = self.registry.get_nodes_from_ids(&change.get_added_node_ids()).await?; let subnet_after = subnet_mid.with_nodes(&added_nodes); diff --git a/rs/cli/src/subnet_manager.rs b/rs/cli/src/subnet_manager.rs index 6e7bfe3ee..19e9fc6f6 100644 --- a/rs/cli/src/subnet_manager.rs +++ b/rs/cli/src/subnet_manager.rs @@ -5,12 +5,13 @@ use std::sync::Arc; use anyhow::anyhow; use anyhow::Ok; use decentralization::{ - network::{DecentralizedSubnet, Node as DecentralizedNode, SubnetQueryBy}, + network::{DecentralizedSubnet, SubnetQueryBy}, SubnetChangeResponse, }; use ic_management_backend::health::HealthStatusQuerier; use ic_management_backend::lazy_registry::LazyRegistry; use ic_management_types::HealthStatus; +use ic_management_types::Node; use ic_types::PrincipalId; use indexmap::IndexMap; use log::{info, warn}; @@ -71,24 +72,24 @@ impl SubnetManager { .ok_or_else(|| anyhow!(SubnetManagerError::SubnetTargetNotProvided)) } - async fn unhealthy_nodes(&self, subnet: DecentralizedSubnet) -> anyhow::Result> { + async fn unhealthy_nodes(&self, subnet: DecentralizedSubnet) -> anyhow::Result> { let subnet_health = self.health_client.subnet(subnet.id).await?; let unhealthy = subnet .nodes .into_iter() - .filter_map(|n| match subnet_health.get(&n.id) { + .filter_map(|n| match subnet_health.get(&n.principal) { Some(health) => { - if *health == ic_management_types::HealthStatus::Healthy { + if *health == HealthStatus::Healthy { None } else { - info!("Node {} is {:?}", n.id, health); + info!("Node {} is {:?}", n.id_short(), health); Some((n, health.clone())) } } None => { - warn!("Node {} has no known health, assuming unhealthy", n.id); - Some((n, ic_management_types::HealthStatus::Unknown)) + warn!("Node {} has no known health, assuming unhealthy", n.id_short()); + Some((n, HealthStatus::Unknown)) } }) .collect::>(); @@ -125,7 +126,7 @@ impl SubnetManager { ) -> anyhow::Result { let subnet_query_by = self.get_subnet_query_by(self.target()?).await?; let mut motivations = vec![]; - let mut to_be_replaced: Vec = if let SubnetQueryBy::NodeList(nodes) = &subnet_query_by { + let mut to_be_replaced: Vec = if let SubnetQueryBy::NodeList(nodes) = &subnet_query_by { nodes.clone() } else { vec![] @@ -142,8 +143,8 @@ impl SubnetManager { let mut node_ids_unhealthy = HashSet::new(); if heal { for (node, health_status) in self.unhealthy_nodes(subnet_change_request.subnet()).await? { - node_ids_unhealthy.insert(node.id); - motivations.push(format!("replacing {} as it is unhealthy: {:?}", node.id, health_status)); + node_ids_unhealthy.insert(node.principal); + motivations.push(format!("replacing {} as it is unhealthy: {:?}", node.principal, health_status)); to_be_replaced.push(node); } } @@ -157,10 +158,10 @@ impl SubnetManager { self.cordoned_features_fetcher.fetch().await?, )?; - for n in change.removed().iter().filter(|n| !node_ids_unhealthy.contains(&n.id)) { + for n in change.removed().iter().filter(|n| !node_ids_unhealthy.contains(&n.principal)) { motivations.push(format!( "replacing {} as per user request{}", - n.id, + n.id_short(), match motivation { Some(ref m) => format!(": {}", m), None => "".to_string(), @@ -202,11 +203,11 @@ impl SubnetManager { )?; for n in change.removed().iter() { - motivations.push(format!("removing {} for subnet resize", n.id)); + motivations.push(format!("removing node {} for subnet resize", n.principal)); } for n in change.added().iter() { - motivations.push(format!("adding {} for subnet resize", n.id)); + motivations.push(format!("adding node {} for subnet resize", n.principal)); } let motivation = format!( diff --git a/rs/cli/src/unit_tests/replace.rs b/rs/cli/src/unit_tests/replace.rs index c11d93c60..ad5e19e47 100644 --- a/rs/cli/src/unit_tests/replace.rs +++ b/rs/cli/src/unit_tests/replace.rs @@ -1,54 +1,46 @@ use std::sync::Arc; use decentralization::{ - nakamoto::NodeFeatures, - network::{DecentralizedSubnet, Node, NodeFeaturePair}, + network::{DecentralizedSubnet, NodeFeaturePair}, SubnetChangeResponse, }; use ic_management_backend::{health::MockHealthStatusQuerier, lazy_registry::MockLazyRegistry}; -use ic_management_types::NodeFeature; +use ic_management_types::{Node, NodeFeature, NodeFeatures}; use ic_types::PrincipalId; use indexmap::{Equivalent, IndexMap}; use itertools::Itertools; use crate::{cordoned_feature_fetcher::MockCordonedFeatureFetcher, subnet_manager::SubnetManager}; -fn node_principal(id: u64) -> PrincipalId { - PrincipalId::new_node_test_id(id) -} - fn user_principal(id: u64) -> String { PrincipalId::new_user_test_id(id).to_string() } fn node(id: u64, dfinity_owned: bool, features: &[(NodeFeature, &str)]) -> Node { - Node { - id: node_principal(id), - dfinity_owned, - features: NodeFeatures { - feature_map: { - let mut map = IndexMap::new(); - - features.iter().for_each(|(feature, value)| { - map.insert(feature.clone(), value.to_string()); - }); - - // Insert mandatory features - for feature in &[ - NodeFeature::NodeProvider, - NodeFeature::DataCenter, - NodeFeature::DataCenterOwner, - NodeFeature::Country, - ] { - if !map.contains_key(feature) { - map.insert(feature.clone(), "Some value".to_string()); - } + let features = NodeFeatures { + feature_map: { + let mut map = IndexMap::new(); + + features.iter().for_each(|(feature, value)| { + map.insert(feature.clone(), value.to_string()); + }); + + // Insert mandatory features + for feature in &[ + NodeFeature::NodeProvider, + NodeFeature::DataCenter, + NodeFeature::DataCenterOwner, + NodeFeature::Country, + ] { + if !map.contains_key(feature) { + map.insert(feature.clone(), "Some value".to_string()); } + } - map - }, + map }, - } + }; + Node::new_test_node(id, features, dfinity_owned) } fn subnet(id: u64, nodes: &[Node]) -> DecentralizedSubnet { @@ -101,11 +93,11 @@ fn pretty_print_node(node: &Node, num_ident: usize) -> String { format!( "{}- principal: {}\n{} dfinity_owned: {}\n{} features: [{}]", "\t".repeat(num_ident), - node.id, + node.principal, "\t".repeat(num_ident), - node.dfinity_owned, + node.dfinity_owned.unwrap_or_default(), "\t".repeat(num_ident), - node.features + node.get_features() .feature_map .iter() .map(|(feature, value)| format!("({}, {})", feature, value)) @@ -198,8 +190,8 @@ fn should_skip_cordoned_nodes() { // All nodes in the world are healthy for this test let nodes_health = available_nodes .iter() - .map(|n| n.id) - .chain(subnet.nodes.iter().map(|n| n.id)) + .map(|n| n.principal) + .chain(subnet.nodes.iter().map(|n| n.principal)) .map(|node_id| (node_id, ic_management_types::HealthStatus::Healthy)) .collect::>(); health_client.expect_nodes().returning(move || { @@ -265,7 +257,9 @@ fn should_skip_cordoned_nodes() { // Act let response = runtime.block_on( subnet_manager - .with_target(crate::subnet_manager::SubnetTarget::FromNodesIds(vec![subnet.nodes.first().unwrap().id])) + .with_target(crate::subnet_manager::SubnetTarget::FromNodesIds(vec![ + subnet.nodes.first().unwrap().principal, + ])) .membership_replace(false, None, None, None, vec![], None), ); diff --git a/rs/decentralization/src/lib.rs b/rs/decentralization/src/lib.rs index 5255557dc..eed0eb5bf 100644 --- a/rs/decentralization/src/lib.rs +++ b/rs/decentralization/src/lib.rs @@ -3,11 +3,11 @@ pub mod network; pub mod subnets; use indexmap::IndexMap; use itertools::Itertools; -use network::{Node, SubnetChange}; +use network::SubnetChange; use std::fmt::{Display, Formatter}; use ic_base_types::PrincipalId; -use ic_management_types::{HealthStatus, NodeFeature}; +use ic_management_types::{HealthStatus, Node, NodeFeature}; use serde::{self, Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize, Default)] @@ -36,8 +36,8 @@ impl SubnetChangeResponse { pub fn new(change: &SubnetChange, node_health: &IndexMap, motivation: Option) -> Self { Self { nodes_old: change.old_nodes.clone(), - node_ids_added: change.added().iter().map(|n| n.id).collect(), - node_ids_removed: change.removed().iter().map(|n| n.id).collect(), + node_ids_added: change.added().iter().map(|n| n.principal).collect(), + node_ids_removed: change.removed().iter().map(|n| n.principal).collect(), subnet_id: if change.subnet_id == Default::default() { None } else { @@ -60,14 +60,14 @@ impl SubnetChangeResponse { .collect::>(), |mut acc, n| { for f in NodeFeature::variants() { - acc.get_mut(&f).unwrap().entry(n.get_feature(&f)).or_insert((0, 0)).0 += 1; + acc.get_mut(&f).unwrap().entry(n.get_feature(&f).unwrap_or_default()).or_insert((0, 0)).0 += 1; } acc }, ), |mut acc, n| { for f in NodeFeature::variants() { - acc.get_mut(&f).unwrap().entry(n.get_feature(&f)).or_insert((0, 0)).1 += 1; + acc.get_mut(&f).unwrap().entry(n.get_feature(&f).unwrap_or_default()).or_insert((0, 0)).1 += 1; } acc }, diff --git a/rs/decentralization/src/nakamoto/mod.rs b/rs/decentralization/src/nakamoto/mod.rs index ac8023ca5..21dd611e9 100644 --- a/rs/decentralization/src/nakamoto/mod.rs +++ b/rs/decentralization/src/nakamoto/mod.rs @@ -1,4 +1,3 @@ -use crate::network::Node; use ahash::{AHashMap, AHasher}; use indexmap::IndexMap; use itertools::Itertools; @@ -6,63 +5,11 @@ use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::cmp::Ordering; use std::collections::VecDeque; -use std::fmt::{self, Debug, Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::hash::Hasher; -use std::iter::{FromIterator, IntoIterator}; +use std::iter::IntoIterator; -use ic_management_types::NodeFeature; - -#[derive(Eq, PartialEq, Clone, Serialize, Deserialize, Debug)] -pub struct NodeFeatures { - pub feature_map: IndexMap, -} - -impl fmt::Display for NodeFeatures { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for (feature, value) in &self.feature_map { - writeln!(f, "{}: {}", feature, value)?; - } - Ok(()) - } -} - -impl NodeFeatures { - pub fn get(&self, feature: &NodeFeature) -> Option { - self.feature_map.get(feature).cloned() - } - - #[cfg(test)] - fn new_test_feature_set(value: &str) -> Self { - let mut result = IndexMap::new(); - for feature in NodeFeature::variants() { - result.insert(feature, value.to_string()); - } - NodeFeatures { feature_map: result } - } - - #[cfg(test)] - fn with_feature_value(&self, feature: &NodeFeature, value: &str) -> Self { - let mut feature_map = self.feature_map.clone(); - feature_map.insert(feature.clone(), value.to_string()); - NodeFeatures { feature_map } - } -} - -impl FromIterator<(NodeFeature, &'static str)> for NodeFeatures { - fn from_iter>(iter: I) -> Self { - Self { - feature_map: IndexMap::from_iter(iter.into_iter().map(|x| (x.0, String::from(x.1)))), - } - } -} - -impl FromIterator<(NodeFeature, std::string::String)> for NodeFeatures { - fn from_iter>(iter: I) -> Self { - Self { - feature_map: IndexMap::from_iter(iter), - } - } -} +use ic_management_types::{Node, NodeFeature, NodeFeatures}; // A thread-local memoization cache of NakamotoScores thread_local! { @@ -191,8 +138,8 @@ impl NakamotoScore { let mut memoize_key = AHasher::default(); let nodes_iter = nodes.clone().into_iter(); - for node in nodes_iter.sorted_by_cached_key(|n| n.id) { - for byte in node.id.0.as_slice() { + for node in nodes_iter.sorted_by_cached_key(|n| n.principal) { + for byte in node.principal.0.as_slice() { memoize_key.write_u8(*byte); } } @@ -220,7 +167,7 @@ impl NakamotoScore { score.clone() } None => { - let score = Self::new_from_slice_node_features(&nodes.into_iter().map(|n| n.features.clone()).collect::>()); + let score = Self::new_from_slice_node_features(&nodes.into_iter().map(|n| n.get_features()).collect::>()); memoize_cache.insert(memoize_key, score.clone()); score } @@ -743,7 +690,7 @@ mod tests { let health_of_nodes = nodes_available .iter() .chain(subnet_initial.nodes.iter()) - .map(|n| (n.id, HealthStatus::Healthy)) + .map(|n| (n.principal, HealthStatus::Healthy)) .collect::>(); println!( @@ -766,7 +713,7 @@ mod tests { let countries_after = optimized_subnet .nodes .iter() - .map(|n| n.get_feature(&NodeFeature::Country)) + .map(|n| n.get_feature(&NodeFeature::Country).unwrap()) .sorted() .collect::>(); @@ -800,7 +747,10 @@ mod tests { ) ); let nodes_available = new_test_nodes_with_overrides("spare", 7, 2, 0, (&NodeFeature::NodeProvider, &["NP6", "NP7"])); - let health_of_nodes = nodes_available.iter().map(|n| (n.id, HealthStatus::Healthy)).collect::>(); + let health_of_nodes = nodes_available + .iter() + .map(|n| (n.principal, HealthStatus::Healthy)) + .collect::>(); println!( "initial {} NPs {:?}", @@ -823,7 +773,7 @@ mod tests { let nps_after = optimized_subnet .nodes .iter() - .map(|n| n.get_feature(&NodeFeature::NodeProvider)) + .map(|n| n.get_feature(&NodeFeature::NodeProvider).unwrap()) .sorted() .collect::>(); @@ -856,7 +806,7 @@ mod tests { let health_of_nodes = nodes_available .iter() .chain(subnet_initial.nodes.iter()) - .map(|n| (n.id, HealthStatus::Healthy)) + .map(|n| (n.principal, HealthStatus::Healthy)) .collect::>(); println!( @@ -889,7 +839,14 @@ mod tests { println!("optimized {} NPs {:?}", optimized_subnet, nps_after); // There is still only one DFINITY-owned node in the subnet - assert_eq!(1, optimized_subnet.nodes.iter().map(|n| n.dfinity_owned as u32).sum::()); + assert_eq!( + 1, + optimized_subnet + .nodes + .iter() + .map(|n| n.dfinity_owned.unwrap_or_default() as u32) + .sum::() + ); } #[test] @@ -905,7 +862,7 @@ mod tests { nodes: subnet_all .nodes .iter() - .filter(|n| !re_unhealthy_nodes.is_match(&n.id.to_string())) + .filter(|n| !re_unhealthy_nodes.is_match(&n.principal.to_string())) .cloned() .collect(), added_nodes: Vec::new(), @@ -921,7 +878,7 @@ mod tests { .iter() .sorted_by(|a, b| a.principal.cmp(&b.principal)) .filter(|n| n.subnet_id.is_none() && n.proposal.is_none()) - .map(Node::from) + .cloned() .collect::>(); subnet_healthy.check_business_rules().expect("Check business rules failed"); @@ -1033,7 +990,7 @@ mod tests { .nodes .iter() .map(|n| n.principal) - .chain(nodes_available.iter().map(|n| n.id)) + .chain(nodes_available.iter().map(|n| n.principal)) .map(|n| { if unhealthy_principals.contains(&n) { (n, HealthStatus::Dead) @@ -1064,7 +1021,7 @@ mod tests { let health_of_nodes = nodes_available .iter() .chain(subnet_initial.nodes.iter()) - .map(|n| (n.id, HealthStatus::Healthy)) + .map(|n| (n.principal, HealthStatus::Healthy)) .collect::>(); let change_initial = SubnetChangeRequest::new(subnet_initial.clone(), nodes_available, Vec::new(), Vec::new(), Vec::new()); @@ -1080,7 +1037,7 @@ mod tests { with_keeping_features .new_nodes .iter() - .filter(|n| n.features.get(&NodeFeature::Country).unwrap() == *"CH") + .filter(|n| n.get_feature(&NodeFeature::Country).unwrap() == *"CH") .collect_vec() .len(), 1 @@ -1098,7 +1055,7 @@ mod tests { with_keeping_principals .new_nodes .iter() - .filter(|n| n.id == node_to_keep.id) + .filter(|n| n.principal == node_to_keep.principal) .collect_vec() .len(), 1 @@ -1116,7 +1073,7 @@ mod tests { let health_of_nodes = subnet_initial .nodes .iter() - .map(|n| (n.id, HealthStatus::Healthy)) + .map(|n| (n.principal, HealthStatus::Healthy)) .collect::>(); let change_initial = SubnetChangeRequest::new(subnet_initial.clone(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); diff --git a/rs/decentralization/src/network.rs b/rs/decentralization/src/network.rs index dfb1f7093..75500471e 100644 --- a/rs/decentralization/src/network.rs +++ b/rs/decentralization/src/network.rs @@ -1,13 +1,13 @@ -use crate::nakamoto::{self, NakamotoScore}; +use crate::nakamoto::NakamotoScore; use crate::subnets::unhealthy_with_nodes; use crate::SubnetChangeResponse; use actix_web::http::StatusCode; use actix_web::{HttpResponse, ResponseError}; -use ahash::{AHashMap, AHashSet, HashMap, HashSet}; +use ahash::{AHashMap, AHashSet, HashSet}; use anyhow::anyhow; use futures::future::BoxFuture; use ic_base_types::PrincipalId; -use ic_management_types::{HealthStatus, NetworkError, NodeFeature}; +use ic_management_types::{HealthStatus, NetworkError, Node, NodeFeature}; use indexmap::IndexMap; use itertools::Itertools; use log::{debug, info, warn}; @@ -15,7 +15,6 @@ use rand::{seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::fmt::{Debug, Display, Formatter}; -use std::hash::Hash; #[derive(Clone, Serialize, Deserialize, Default)] pub struct DataCenterInfo { @@ -24,147 +23,6 @@ pub struct DataCenterInfo { continent: String, } -#[derive(Clone, Serialize, Deserialize, Debug, Eq)] -pub struct Node { - pub id: PrincipalId, - pub features: nakamoto::NodeFeatures, - pub dfinity_owned: bool, -} - -impl std::fmt::Display for Node { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Node ID: {}\nFeatures:\n{}\nDfinity Owned: {}", - self.id, self.features, self.dfinity_owned - ) - } -} - -impl Node { - pub fn new_test_node(node_number: u64, features: nakamoto::NodeFeatures, dfinity_owned: bool) -> Self { - Node { - id: PrincipalId::new_node_test_id(node_number), - features, - dfinity_owned, - } - } - - pub fn get_features(&self) -> nakamoto::NodeFeatures { - self.features.clone() - } - - pub fn get_feature(&self, feature: &NodeFeature) -> String { - self.features.get(feature).unwrap_or_default() - } - - pub fn matches_feature_value(&self, value: &str) -> bool { - self.id.to_string() == *value.to_lowercase() - || self - .get_features() - .feature_map - .values() - .any(|v| *v.to_lowercase() == *value.to_lowercase()) - } - - pub fn is_country_from_eu(country: &str) -> bool { - // (As of 2024) the EU countries are not properly marked in the registry, so we check membership separately. - let eu_countries: HashMap<&str, &str> = HashMap::from_iter([ - ("AT", "Austria"), - ("BE", "Belgium"), - ("BG", "Bulgaria"), - ("CY", "Cyprus"), - ("CZ", "Czechia"), - ("DE", "Germany"), - ("DK", "Denmark"), - ("EE", "Estonia"), - ("ES", "Spain"), - ("FI", "Finland"), - ("FR", "France"), - ("GR", "Greece"), - ("HR", "Croatia"), - ("HU", "Hungary"), - ("IE", "Ireland"), - ("IT", "Italy"), - ("LT", "Lithuania"), - ("LU", "Luxembourg"), - ("LV", "Latvia"), - ("MT", "Malta"), - ("NL", "Netherlands"), - ("PL", "Poland"), - ("PT", "Portugal"), - ("RO", "Romania"), - ("SE", "Sweden"), - ("SI", "Slovenia"), - ("SK", "Slovakia"), - ]); - eu_countries.contains_key(country) - } -} - -impl Hash for Node { - fn hash(&self, state: &mut H) { - self.id.hash(state); - } -} - -impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl From<&ic_management_types::Node> for Node { - fn from(n: &ic_management_types::Node) -> Self { - let country = n - .operator - .datacenter - .as_ref() - .map(|d| d.country.clone()) - .unwrap_or_else(|| "unknown".to_string()); - let area = n - .operator - .datacenter - .as_ref() - .map(|d| d.area.clone()) - .unwrap_or_else(|| "unknown".to_string()); - - Self { - id: n.principal, - features: nakamoto::NodeFeatures::from_iter([ - (NodeFeature::Area, area), - (NodeFeature::Country, country), - ( - NodeFeature::Continent, - n.operator - .datacenter - .as_ref() - .map(|d| d.continent.clone()) - .unwrap_or_else(|| "unknown".to_string()), - ), - ( - NodeFeature::DataCenterOwner, - n.operator - .datacenter - .as_ref() - .map(|d| d.owner.name.clone()) - .unwrap_or_else(|| "unknown".to_string()), - ), - ( - NodeFeature::DataCenter, - n.operator - .datacenter - .as_ref() - .map(|d| d.name.clone()) - .unwrap_or_else(|| "unknown".to_string()), - ), - (NodeFeature::NodeProvider, n.operator.provider.principal.to_string()), - ]), - dfinity_owned: n.dfinity_owned.unwrap_or_default(), - } - } -} - #[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct DecentralizedSubnet { pub id: PrincipalId, @@ -205,14 +63,14 @@ impl DecentralizedSubnet { let mut new_subnet_nodes = self.nodes.clone(); let mut removed_nodes = self.removed_nodes.clone(); for node in nodes_to_remove { - if let Some(index) = new_subnet_nodes.iter().position(|n| n.id == node.id) { + if let Some(index) = new_subnet_nodes.iter().position(|n| n.principal == node.principal) { removed_nodes.push(new_subnet_nodes.remove(index)); } else { - return Err(NetworkError::NodeNotFound(node.id)); + return Err(NetworkError::NodeNotFound(node.principal)); } } let removed_is_empty = removed_nodes.is_empty(); - let removed_node_ids = removed_nodes.iter().map(|n| n.id).collect::>(); + let removed_node_ids = removed_nodes.iter().map(|n| n.principal).collect::>(); if !removed_is_empty { assert!(new_subnet_nodes.len() <= self.nodes.len()); } @@ -279,7 +137,7 @@ impl DecentralizedSubnet { self.nodes .iter() - .filter(|n| n.get_feature(node_feature) == dominant_feature) + .filter(|n| n.get_feature(node_feature).unwrap() == dominant_feature) .cloned() .collect() } @@ -303,7 +161,7 @@ impl DecentralizedSubnet { let subnet_id_str = subnet_id.to_string(); let is_european_subnet = subnet_id_str == *"bkfrj-6k62g-dycql-7h53p-atvkj-zg4to-gaogh-netha-ptybj-ntsgw-rqe"; - let dfinity_owned_nodes_count: usize = nodes.iter().map(|n| n.dfinity_owned as usize).sum(); + let dfinity_owned_nodes_count: usize = nodes.iter().map(|n| n.dfinity_owned.unwrap_or_default() as usize).sum(); let target_dfinity_owned_nodes_count = if subnet_id_str == *"tdb26-jop6k-aogll-7ltgs-eruif-6kk7m-qpktf-gdiqx-mxtrf-vb5e6-eqe" { 3 } else { @@ -487,11 +345,11 @@ impl DecentralizedSubnet { // PrincipalIDs to ensure consistency of the seed with the // same machines in the subnet let mut id_sorted_current_nodes = current_nodes.to_owned(); - id_sorted_current_nodes.sort_by(|n1, n2| std::cmp::Ord::cmp(&n1.id.to_string(), &n2.id.to_string())); + id_sorted_current_nodes.sort_by(|n1, n2| std::cmp::Ord::cmp(&n1.principal.to_string(), &n2.principal.to_string())); let seed = rand_seeder::Seeder::from( id_sorted_current_nodes .iter() - .map(|n| n.id.to_string()) + .map(|n| n.principal.to_string()) .collect::>() .join("_"), ) @@ -502,7 +360,7 @@ impl DecentralizedSubnet { // the same set of machines with the best score, we always // get the same one. let mut id_sorted_best_results = best_results.to_owned(); - id_sorted_best_results.sort_by(|r1, r2| std::cmp::Ord::cmp(&r1.node.id.to_string(), &r2.node.id.to_string())); + id_sorted_best_results.sort_by(|r1, r2| std::cmp::Ord::cmp(&r1.node.principal.to_string(), &r2.node.principal.to_string())); id_sorted_best_results.choose(&mut rng).cloned() } } @@ -523,9 +381,9 @@ impl DecentralizedSubnet { cmp = a.score.cmp(&b.score); } if cmp == Ordering::Less { - debug!("Better node is {}", a.node.id); + debug!("Better node is {}", a.node.principal); } else { - debug!("Better node is {}", b.node.id); + debug!("Better node is {}", b.node.principal); } cmp }) @@ -534,7 +392,7 @@ impl DecentralizedSubnet { run_log.push("Sorted candidate nodes, with the best candidate at the end:".to_string()); run_log.push(" ".to_string()); for s in &candidates { - run_log.push(format!(" -=> {} {} {}", s.node.id, s.penalty, s.score)); + run_log.push(format!(" -=> {} {} {}", s.node.principal, s.penalty, s.score)); } // Then, pick the candidates with the best (highest) Nakamoto Coefficients. @@ -617,7 +475,7 @@ impl DecentralizedSubnet { ); run_log.push(format!("Nakamoto score after extension {}", best_result.score)); added_nodes.push(best_result.node.clone()); - available_nodes.retain(|n| n.id != best_result.node.id); + available_nodes.retain(|n| n.principal != best_result.node.principal); nodes_after_extension.push(best_result.node.clone()); nodes_initial.push(best_result.node.clone()); total_penalty += best_result.penalty; @@ -630,7 +488,7 @@ impl DecentralizedSubnet { "- adding node {} of {} ({}): {}", i + 1, how_many_nodes, - best_result.node.id.to_string().split('-').next().unwrap_or_default(), + best_result.node.principal.to_string().split('-').next().unwrap_or_default(), s ) }) @@ -688,7 +546,7 @@ impl DecentralizedSubnet { .nodes .iter() .filter_map(|node| { - let candidate_subnet_nodes: Vec = self.nodes.iter().filter(|n| n.id != node.id).cloned().collect(); + let candidate_subnet_nodes: Vec = self.nodes.iter().filter(|n| n.principal != node.principal).cloned().collect(); self._node_to_replacement_candidate(&candidate_subnet_nodes, node, &mut run_log) }) .collect(); @@ -705,7 +563,7 @@ impl DecentralizedSubnet { ); run_log.push(format!("Nakamoto score after removal {}", best_result.score)); self.removed_nodes.push(best_result.node.clone()); - self.nodes.retain(|n| n.id != best_result.node.id); + self.nodes.retain(|n| n.principal != best_result.node.principal); total_penalty += best_result.penalty; business_rules_log.extend( best_result @@ -716,7 +574,7 @@ impl DecentralizedSubnet { "- removing node {} of {} ({}): {}", i + 1, how_many_nodes, - best_result.node.id.to_string().split('-').next().unwrap_or_default(), + best_result.node.principal.to_string().split('-').next().unwrap_or_default(), s ) }) @@ -760,8 +618,8 @@ impl DecentralizedSubnet { .removed_nodes .iter() .filter_map(|node_removed| { - if self.added_nodes.iter().any(|node_added| node_removed.id == node_added.id) { - Some(node_removed.id) + if self.added_nodes.iter().any(|node_added| node_removed.principal == node_added.principal) { + Some(node_removed.principal) } else { None } @@ -774,13 +632,13 @@ impl DecentralizedSubnet { let added_nodes_desc = self .added_nodes .into_iter() - .filter(|node_added| !common_nodes.iter().any(|common_node| common_node == &node_added.id)) + .filter(|node_added| !common_nodes.iter().any(|common_node| common_node == &node_added.principal)) .collect(); let removed_nodes_desc = self .removed_nodes .into_iter() - .filter(|node_removed| !common_nodes.iter().any(|common_node| common_node == &node_removed.id)) + .filter(|node_removed| !common_nodes.iter().any(|common_node| common_node == &node_removed.principal)) .collect(); Self { @@ -808,7 +666,7 @@ impl DecentralizedSubnet { }) } Err(err) => { - err_log.push(format!("Node {} failed business rule {}", touched_node.id, err)); + err_log.push(format!("Node {} failed business rule {}", touched_node.principal, err)); None } } @@ -822,7 +680,7 @@ impl Display for DecentralizedSubnet { "Subnet id {} with {} nodes [{}]", self.id, self.nodes.len(), - self.nodes.iter().map(|n| n.id.to_string()).join(", ") + self.nodes.iter().map(|n| n.principal.to_string()).join(", ") ) } } @@ -837,7 +695,7 @@ impl From<&ic_management_types::Subnet> for DecentralizedSubnet { fn from(s: &ic_management_types::Subnet) -> Self { Self { id: s.principal, - nodes: s.nodes.iter().map(Node::from).collect(), + nodes: s.nodes.clone(), added_nodes: Vec::new(), removed_nodes: Vec::new(), comment: None, @@ -929,7 +787,7 @@ pub trait Identifies { impl Identifies for PrincipalId { fn eq(&self, other: &Node) -> bool { - &other.id == self + &other.principal == self } fn partial_eq(&self, other: &Node) -> bool { Identifies::eq(self, other) @@ -1118,14 +976,14 @@ impl SubnetChangeRequest { .clone() .into_iter() .filter(|n| !self.include_nodes.contains(n)) - .filter(|n| health_of_nodes.get(&n.id).unwrap_or(&HealthStatus::Unknown) == &HealthStatus::Healthy) + .filter(|n| health_of_nodes.get(&n.principal).unwrap_or(&HealthStatus::Unknown) == &HealthStatus::Healthy) .collect::>(); let available_nodes = all_healthy_nodes .into_iter() .filter(|n| { for cordoned_feature in &cordoned_features { - if let Some(node_feature) = n.features.get(&cordoned_feature.feature) { + if let Some(node_feature) = n.get_feature(&cordoned_feature.feature) { if PartialEq::eq(&node_feature, &cordoned_feature.value) { // Node contains cordoned feature // exclude it from available pool @@ -1161,10 +1019,10 @@ impl SubnetChangeRequest { .iter() .cloned() .chain(resized_subnet.removed_nodes.clone()) - .filter(|n| health_of_nodes.get(&n.id).unwrap_or(&HealthStatus::Unknown) == &HealthStatus::Healthy) + .filter(|n| health_of_nodes.get(&n.principal).unwrap_or(&HealthStatus::Unknown) == &HealthStatus::Healthy) .filter(|n| { for cordoned_feature in &cordoned_features { - if let Some(node_feature) = n.features.get(&cordoned_feature.feature) { + if let Some(node_feature) = n.get_feature(&cordoned_feature.feature) { if PartialEq::eq(&node_feature, &cordoned_feature.value) { // Node contains cordoned feature // exclude it from available pool @@ -1344,13 +1202,12 @@ impl NetworkHealRequest { let subnets_to_heal = unhealthy_with_nodes(&self.subnets, health_of_nodes) .iter() .flat_map(|(subnet_id, unhealthy_nodes)| { - let unhealthy_nodes = unhealthy_nodes.iter().map(Node::from).collect::>(); let unhealthy_subnet = self.subnets.get(subnet_id).ok_or(NetworkError::SubnetNotFound(*subnet_id))?; Ok::(NetworkHealSubnets { name: unhealthy_subnet.metadata.name.clone(), decentralized_subnet: DecentralizedSubnet::from(unhealthy_subnet), - unhealthy_nodes, + unhealthy_nodes: unhealthy_nodes.clone(), }) }) .sorted_by(|a, b| a.cmp(b).reverse()) @@ -1372,7 +1229,7 @@ impl NetworkHealRequest { subnet.decentralized_subnet.id, max_replaceable_nodes, unhealthy_nodes.len(), - unhealthy_nodes.iter().map(|node| node.id).collect_vec() + unhealthy_nodes.iter().map(|node| node.principal).collect_vec() ); unhealthy_nodes } else { @@ -1383,7 +1240,7 @@ impl NetworkHealRequest { subnet .unhealthy_nodes .iter() - .map(|node| node.id.to_string().split('-').next().unwrap().to_string()) + .map(|node| node.principal.to_string().split('-').next().unwrap().to_string()) .collect_vec(), max_replaceable_nodes - subnet.unhealthy_nodes.len(), max_replaceable_nodes @@ -1527,19 +1384,19 @@ https://github.com/dfinity/dre/blob/79066127f58c852eaf4adda11610e815a426878c/rs/ motivations.push(format!( "replacing {} node {}", health_of_nodes - .get(&node.id) + .get(&node.principal) .map(|s| s.to_string().to_lowercase()) .unwrap_or("unhealthy".to_string()), - node.id + node.principal )); } - let unhealthy_nodes_ids = unhealthy_nodes.iter().map(|node| node.id).collect::>(); + let unhealthy_nodes_ids = unhealthy_nodes.iter().map(|node| node.principal).collect::>(); for node_id in change.node_ids_removed.iter().filter(|n| !unhealthy_nodes_ids.contains(n)) { motivations.push(format!("replacing node {} to optimize network topology", node_id)); } - available_nodes.retain(|node| !change.node_ids_added.contains(&node.id)); + available_nodes.retain(|node| !change.node_ids_added.contains(&node.principal)); let motivation = format!( "\n{}{}\nNote: the information below is provided for your convenience. Please independently verify the decentralization changes rather than relying solely on this summary.\nHere is [an explaination of how decentralization is currently calculated](https://dfinity.github.io/dre/decentralization.html), \nand there are also [instructions for performing what-if analysis](https://dfinity.github.io/dre/subnet-decentralization-whatif.html) if you are wondering if another node would have improved decentralization more.\n\n", @@ -1554,11 +1411,11 @@ https://github.com/dfinity/dre/blob/79066127f58c852eaf4adda11610e815a426878c/rs/ } pub fn generate_removed_nodes_description(subnet_nodes: &[Node], remove_nodes: &[Node]) -> Vec<(Node, String)> { - let mut subnet_nodes: AHashMap = AHashMap::from_iter(subnet_nodes.iter().map(|n| (n.id, n.clone()))); + let mut subnet_nodes: AHashMap = AHashMap::from_iter(subnet_nodes.iter().map(|n| (n.principal, n.clone()))); let mut result = Vec::new(); for node in remove_nodes { let nakamoto_before = NakamotoScore::new_from_nodes(subnet_nodes.values()); - subnet_nodes.remove(&node.id); + subnet_nodes.remove(&node.principal); let nakamoto_after = NakamotoScore::new_from_nodes(subnet_nodes.values()); let nakamoto_diff = nakamoto_after.describe_difference_from(&nakamoto_before).1; @@ -1568,11 +1425,11 @@ pub fn generate_removed_nodes_description(subnet_nodes: &[Node], remove_nodes: & } pub fn generate_added_node_description(subnet_nodes: &[Node], add_nodes: &[Node]) -> Vec<(Node, String)> { - let mut subnet_nodes: AHashMap = AHashMap::from_iter(subnet_nodes.iter().map(|n| (n.id, n.clone()))); + let mut subnet_nodes: AHashMap = AHashMap::from_iter(subnet_nodes.iter().map(|n| (n.principal, n.clone()))); let mut result = Vec::new(); for node in add_nodes { let nakamoto_before = NakamotoScore::new_from_nodes(subnet_nodes.values()); - subnet_nodes.insert(node.id, node.clone()); + subnet_nodes.insert(node.principal, node.clone()); let nakamoto_after = NakamotoScore::new_from_nodes(subnet_nodes.values()); let nakamoto_diff = nakamoto_after.describe_difference_from(&nakamoto_before).1; diff --git a/rs/decentralization/src/subnets.rs b/rs/decentralization/src/subnets.rs index 0c546d3f9..cd6a5a3f0 100644 --- a/rs/decentralization/src/subnets.rs +++ b/rs/decentralization/src/subnets.rs @@ -1,4 +1,3 @@ -use crate::network::Node; use ic_base_types::PrincipalId; use ic_management_types::{ requests::{NodeRemoval, NodeRemovalReason}, @@ -61,17 +60,15 @@ impl NodesRemover { return None; } - let decentralization_node = Node::from(&n); - if let Some(exclude) = self.exclude.as_ref() { for exclude_feature in exclude { - if decentralization_node.matches_feature_value(exclude_feature) { + if n.matches_feature_value(exclude_feature) { return None; } } } - if let Some(filter) = self.extra_nodes_filter.iter().find(|f| decentralization_node.matches_feature_value(f)) { + if let Some(filter) = self.extra_nodes_filter.iter().find(|f| n.matches_feature_value(f)) { return Some(NodeRemoval { node: n, reason: NodeRemovalReason::MatchedFilter(filter.clone()), diff --git a/rs/ic-management-backend/src/endpoints/query_decentralization.rs b/rs/ic-management-backend/src/endpoints/query_decentralization.rs index 8d0998cdf..f7f30b063 100644 --- a/rs/ic-management-backend/src/endpoints/query_decentralization.rs +++ b/rs/ic-management-backend/src/endpoints/query_decentralization.rs @@ -55,7 +55,7 @@ async fn get_decentralization_analysis( .map(|subnet_id| match subnets.get(&subnet_id) { Some(subnet) => DecentralizedSubnet { id: subnet_id, - nodes: subnet.nodes.iter().map(decentralization::network::Node::from).collect(), + nodes: subnet.nodes.clone(), added_nodes: Vec::new(), removed_nodes: Vec::new(), comment: None, @@ -83,7 +83,7 @@ async fn get_decentralization_analysis( node_ids_to_remove .iter() .filter_map(|n| registry_nodes.get(n)) - .map(decentralization::network::Node::from) + .cloned() .collect::>() }); let updated_subnet = match &nodes_to_remove { @@ -93,10 +93,7 @@ async fn get_decentralization_analysis( let updated_subnet = match &node_ids_to_add { Some(node_ids_to_add) => { - let nodes_to_add = node_ids_to_add - .iter() - .map(|n| decentralization::network::Node::from(®istry_nodes[n])) - .collect::>(); + let nodes_to_add = node_ids_to_add.iter().map(|n| registry_nodes[n].clone()).collect::>(); updated_subnet.with_nodes(&nodes_to_add) } None => updated_subnet, diff --git a/rs/ic-management-backend/src/lazy_registry.rs b/rs/ic-management-backend/src/lazy_registry.rs index 41133a850..c776bb67d 100644 --- a/rs/ic-management-backend/src/lazy_registry.rs +++ b/rs/ic-management-backend/src/lazy_registry.rs @@ -2,7 +2,7 @@ use indexmap::{IndexMap, IndexSet}; use std::net::Ipv6Addr; use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use decentralization::network::{AvailableNodesQuerier, DecentralizedSubnet, NodesConverter, SubnetQuerier, SubnetQueryBy}; use futures::future::BoxFuture; @@ -102,15 +102,10 @@ pub trait LazyRegistry: }) } - fn get_decentralized_nodes<'a>(&'a self, principals: &'a [PrincipalId]) -> BoxFuture<'a, anyhow::Result>> { + fn get_nodes_from_ids<'a>(&'a self, principals: &'a [PrincipalId]) -> BoxFuture<'a, anyhow::Result>> { Box::pin(async { - Ok(self - .nodes() - .await? - .values() - .filter(|n| principals.contains(&n.principal)) - .map(decentralization::network::Node::from) - .collect_vec()) + let all_nodes = self.nodes().await?; + Ok(principals.iter().filter_map(|p| all_nodes.get(p).cloned()).collect()) }) } @@ -146,22 +141,15 @@ pub trait LazyRegistry: } impl NodesConverter for Box { - fn get_nodes<'a>( - &'a self, - from: &'a [PrincipalId], - ) -> BoxFuture<'a, Result, ic_management_types::NetworkError>> { + fn get_nodes<'a>(&'a self, node_ids: &'a [PrincipalId]) -> BoxFuture<'a, Result, ic_management_types::NetworkError>> { Box::pin(async { let nodes = self .nodes() .await .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; - from.iter() - .map(|n| { - nodes - .get(n) - .ok_or(ic_management_types::NetworkError::NodeNotFound(*n)) - .map(decentralization::network::Node::from) - }) + node_ids + .iter() + .map(|n| nodes.get(n).cloned().ok_or(ic_management_types::NetworkError::NodeNotFound(*n))) .collect() }) } @@ -524,7 +512,7 @@ impl LazyRegistry for LazyRegistryImpl { Node { principal, dfinity_owned: Some(dfinity_dcs.contains(&dc_name) || guest.as_ref().map(|g| g.dfinity_owned).unwrap_or_default()), - ip_addr, + ip_addr: Some(ip_addr), hostname: guest .as_ref() .map(|g| g.name.clone()) @@ -548,6 +536,7 @@ impl LazyRegistry for LazyRegistryImpl { // TODO: map hostos release hostos_release: None, operator: operator.clone().unwrap_or_default(), + cached_features: OnceLock::new(), proposal: None, label: guest.map(|g| g.name), duplicates: versioned_node_entries @@ -759,14 +748,14 @@ impl LazyRegistry for LazyRegistryImpl { }) } - fn get_decentralized_nodes<'a>(&'a self, principals: &'a [PrincipalId]) -> BoxFuture<'a, anyhow::Result>> { + fn get_nodes_from_ids<'a>(&'a self, principals: &'a [PrincipalId]) -> BoxFuture<'a, anyhow::Result>> { Box::pin(async { Ok(self .nodes() .await? .values() .filter(|n| principals.contains(&n.principal)) - .map(decentralization::network::Node::from) + .cloned() .collect_vec()) }) } @@ -818,22 +807,14 @@ impl LazyRegistry for LazyRegistryImpl { } impl NodesConverter for LazyRegistryImpl { - fn get_nodes<'a>( - &'a self, - from: &'a [PrincipalId], - ) -> BoxFuture<'a, Result, ic_management_types::NetworkError>> { + fn get_nodes<'a>(&'a self, from: &'a [PrincipalId]) -> BoxFuture<'a, Result, ic_management_types::NetworkError>> { Box::pin(async { let nodes = self .nodes() .await .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; from.iter() - .map(|n| { - nodes - .get(n) - .ok_or(ic_management_types::NetworkError::NodeNotFound(*n)) - .map(decentralization::network::Node::from) - }) + .map(|n| nodes.get(n).cloned().ok_or(ic_management_types::NetworkError::NodeNotFound(*n))) .collect() }) } @@ -850,7 +831,7 @@ impl SubnetQuerier for LazyRegistryImpl { .get(&id) .map(|s| DecentralizedSubnet { id: s.principal, - nodes: s.nodes.iter().map(decentralization::network::Node::from).collect(), + nodes: s.nodes.clone(), added_nodes: vec![], removed_nodes: vec![], comment: None, @@ -861,7 +842,7 @@ impl SubnetQuerier for LazyRegistryImpl { let reg_nodes = self.nodes().await.map_err(|e| NetworkError::DataRequestError(e.to_string()))?; let subnets = nodes .iter() - .map(|n| reg_nodes.get(&n.id).and_then(|n| n.subnet_id)) + .map(|n| reg_nodes.get(&n.principal).and_then(|n| n.subnet_id)) .collect::>(); if subnets.len() > 1 { return Err(NetworkError::IllegalRequest("Nodes don't belong to the same subnet".to_owned())); @@ -876,9 +857,7 @@ impl SubnetQuerier for LazyRegistryImpl { .get(subnet) .ok_or(NetworkError::SubnetNotFound(*subnet))? .nodes - .iter() - .map(decentralization::network::Node::from) - .collect(), + .to_vec(), added_nodes: vec![], removed_nodes: vec![], comment: None, @@ -895,7 +874,7 @@ impl SubnetQuerier for LazyRegistryImpl { impl decentralization::network::TopologyManager for LazyRegistryImpl {} impl AvailableNodesQuerier for LazyRegistryImpl { - fn available_nodes(&self) -> BoxFuture<'_, Result, ic_management_types::NetworkError>> { + fn available_nodes(&self) -> BoxFuture<'_, Result, ic_management_types::NetworkError>> { Box::pin(async { let (nodes_and_proposals, healths) = try_join!(self.nodes_and_proposals(), self.health_client.nodes()) .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; @@ -914,8 +893,8 @@ impl AvailableNodesQuerier for LazyRegistryImpl { .map(|s| matches!(*s, ic_management_types::HealthStatus::Healthy)) .unwrap_or(false) }) - .map(decentralization::network::Node::from) - .sorted_by(|n1, n2| n1.id.cmp(&n2.id)) + .cloned() + .sorted_by(|n1, n2| n1.principal.cmp(&n2.principal)) .collect()) }) } @@ -968,7 +947,7 @@ mock! { } impl NodesConverter for LazyRegistry { - fn get_nodes<'a>(&'a self, from: &'a [PrincipalId]) -> BoxFuture<'_, Result, NetworkError>>; + fn get_nodes<'a>(&'a self, from: &'a [PrincipalId]) -> BoxFuture<'_, Result, NetworkError>>; } impl SubnetQuerier for LazyRegistry { @@ -978,6 +957,6 @@ mock! { impl decentralization::network::TopologyManager for LazyRegistry {} impl AvailableNodesQuerier for LazyRegistry { - fn available_nodes(&self) -> BoxFuture<'_, Result, NetworkError>>; + fn available_nodes(&self) -> BoxFuture<'_, Result, NetworkError>>; } } diff --git a/rs/ic-management-backend/src/registry.rs b/rs/ic-management-backend/src/registry.rs index 8f1b46286..f4f7d52dd 100644 --- a/rs/ic-management-backend/src/registry.rs +++ b/rs/ic-management-backend/src/registry.rs @@ -48,7 +48,7 @@ use std::net::Ipv6Addr; use std::ops::Add; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use tokio::sync::RwLock; use tokio::time::{sleep, Duration}; use url::Url; @@ -472,7 +472,7 @@ impl RegistryState { Node { principal, dfinity_owned: Some(dfinity_dcs.contains(&dc_name) || guest.as_ref().map(|g| g.dfinity_owned).unwrap_or_default()), - ip_addr, + ip_addr: Some(ip_addr), hostname: guest .as_ref() .map(|g| g.name.clone()) @@ -497,6 +497,7 @@ impl RegistryState { .find(|r| r.commit_hash == nr.hostos_version_id.clone().unwrap_or_default()) .cloned(), operator, + cached_features: OnceLock::new(), proposal: None, label: guest.map(|g| g.name), duplicates: node_entries @@ -786,12 +787,9 @@ impl RegistryState { self.network.get_nns_urls() } - pub fn get_decentralized_nodes(&self, principals: &[PrincipalId]) -> Vec { - self.nodes() - .values() - .filter(|node| principals.contains(&node.principal)) - .map(decentralization::network::Node::from) - .collect_vec() + pub fn get_nodes_from_ids(&self, principals: &[PrincipalId]) -> Vec { + let all_nodes = self.nodes(); + principals.iter().filter_map(|p| all_nodes.get(p).cloned()).collect() } pub async fn get_unassigned_nodes_replica_version(&self) -> Result { @@ -809,32 +807,15 @@ impl RegistryState { _ => Err(anyhow::anyhow!("No GuestOS version for unassigned nodes found".to_string(),)), } } - - #[allow(dead_code)] - pub async fn node(&self, node_id: PrincipalId) -> Node { - self.nodes - .iter() - .filter(|(&id, _)| id == node_id) - .collect::>() - .first() - .unwrap() - .1 - .clone() - } } impl decentralization::network::TopologyManager for RegistryState {} impl NodesConverter for RegistryState { - fn get_nodes<'a>(&'a self, from: &'a [PrincipalId]) -> BoxFuture<'a, std::result::Result, NetworkError>> { + fn get_nodes<'a>(&'a self, from: &'a [PrincipalId]) -> BoxFuture<'a, std::result::Result, NetworkError>> { Box::pin(async { from.iter() - .map(|n| { - self.nodes() - .get(n) - .ok_or(NetworkError::NodeNotFound(*n)) - .map(decentralization::network::Node::from) - }) + .map(|n| self.nodes().get(n).cloned().ok_or(NetworkError::NodeNotFound(*n))) .collect() }) } @@ -849,7 +830,7 @@ impl SubnetQuerier for RegistryState { .get(&id) .map(|s| decentralization::network::DecentralizedSubnet { id: s.principal, - nodes: s.nodes.iter().map(decentralization::network::Node::from).collect(), + nodes: s.nodes.clone(), added_nodes: Vec::new(), removed_nodes: Vec::new(), comment: None, @@ -860,7 +841,7 @@ impl SubnetQuerier for RegistryState { let subnets = nodes .to_vec() .iter() - .map(|n| self.nodes.get(&n.id).and_then(|n| n.subnet_id)) + .map(|n| self.nodes.get(&n.principal).and_then(|n| n.subnet_id)) .collect::>(); if subnets.len() > 1 { return Err(NetworkError::IllegalRequest("nodes don't belong to the same subnet".to_string())); @@ -868,14 +849,7 @@ impl SubnetQuerier for RegistryState { if let Some(Some(subnet)) = subnets.into_iter().next() { Ok(decentralization::network::DecentralizedSubnet { id: subnet, - nodes: self - .subnets - .get(&subnet) - .ok_or(NetworkError::SubnetNotFound(subnet))? - .nodes - .iter() - .map(decentralization::network::Node::from) - .collect(), + nodes: self.subnets.get(&subnet).ok_or(NetworkError::SubnetNotFound(subnet))?.nodes.to_vec(), added_nodes: Vec::new(), removed_nodes: Vec::new(), comment: None, @@ -891,7 +865,7 @@ impl SubnetQuerier for RegistryState { } impl AvailableNodesQuerier for RegistryState { - fn available_nodes(&self) -> BoxFuture<'_, Result, NetworkError>> { + fn available_nodes(&self) -> BoxFuture<'_, Result, NetworkError>> { Box::pin(async { let nodes = self .nodes_with_proposals() @@ -915,8 +889,8 @@ impl AvailableNodesQuerier for RegistryState { .map(|s| matches!(*s, ic_management_types::HealthStatus::Healthy)) .unwrap_or(false) }) - .map(decentralization::network::Node::from) - .sorted_by(|n1, n2| n1.id.cmp(&n2.id)) + .cloned() + .sorted_by(|n1, n2| n1.principal.cmp(&n2.principal)) .collect()) }) } diff --git a/rs/ic-management-backend/src/subnets.rs b/rs/ic-management-backend/src/subnets.rs index 822b55fa5..ad80b1d3d 100644 --- a/rs/ic-management-backend/src/subnets.rs +++ b/rs/ic-management-backend/src/subnets.rs @@ -13,18 +13,17 @@ pub fn get_proposed_subnet_changes( ) -> Result { if let Some(proposal) = &subnet.proposal { let proposal: &TopologyChangeProposal = proposal; - let subnet_nodes: Vec<_> = subnet.nodes.iter().map(decentralization::network::Node::from).collect(); - let penalties_before_change = DecentralizedSubnet::check_business_rules_for_subnet_with_nodes(&subnet.principal, &subnet_nodes) + let penalties_before_change = DecentralizedSubnet::check_business_rules_for_subnet_with_nodes(&subnet.principal, &subnet.nodes) .expect("Business rules check should succeed") .0; - let business_rules_check_after_change = DecentralizedSubnet::check_business_rules_for_subnet_with_nodes(&subnet.principal, &subnet_nodes) + let business_rules_check_after_change = DecentralizedSubnet::check_business_rules_for_subnet_with_nodes(&subnet.principal, &subnet.nodes) .expect("Business rules check should succeed"); let change = SubnetChange { subnet_id: subnet.principal, - old_nodes: subnet_nodes.clone(), - new_nodes: subnet_nodes, + old_nodes: subnet.nodes.clone(), + new_nodes: subnet.nodes.clone(), removed_nodes: vec![], added_nodes: vec![], penalties_before_change, @@ -37,14 +36,14 @@ pub fn get_proposed_subnet_changes( &proposal .node_ids_added .iter() - .map(|p| (decentralization::network::Node::from(all_nodes.get(p).unwrap()))) + .map(|p| all_nodes.get(p).unwrap().clone()) .collect::>(), ) .without_nodes( &proposal .node_ids_removed .iter() - .map(|p| (decentralization::network::Node::from(all_nodes.get(p).unwrap()))) + .map(|p| all_nodes.get(p).unwrap().clone()) .collect::>(), ); let mut response = SubnetChangeResponse::new(&change, health_of_nodes, None); @@ -61,7 +60,7 @@ pub fn get_proposed_subnet_changes( // Adding some tests to the above function #[cfg(test)] mod tests { - use std::net::Ipv6Addr; + use std::sync::OnceLock; use ic_management_types::{Datacenter, DatacenterOwner, Operator, Provider}; @@ -157,7 +156,8 @@ mod tests { }, subnet_id: Some(subnet_id), hostos_release: None, - ip_addr: Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), + ip_addr: None, + cached_features: OnceLock::new(), hostname: None, dfinity_owned: None, proposal: None, diff --git a/rs/ic-management-types/Cargo.toml b/rs/ic-management-types/Cargo.toml index 799a8f05d..3e02ce404 100644 --- a/rs/ic-management-types/Cargo.toml +++ b/rs/ic-management-types/Cargo.toml @@ -8,6 +8,7 @@ documentation.workspace = true license.workspace = true [dependencies] +ahash = { workspace = true } actix-web = { workspace = true } chrono = { workspace = true } futures = { workspace = true } diff --git a/rs/ic-management-types/src/lib.rs b/rs/ic-management-types/src/lib.rs index 331541239..639f6253b 100644 --- a/rs/ic-management-types/src/lib.rs +++ b/rs/ic-management-types/src/lib.rs @@ -2,6 +2,7 @@ pub mod errors; pub mod requests; pub use crate::errors::*; +use ahash::AHashMap; use candid::{CandidType, Decode}; use core::hash::Hash; use ic_base_types::NodeId; @@ -15,6 +16,7 @@ use ic_protobuf::registry::subnet::v1::EcdsaConfig; use ic_protobuf::registry::subnet::v1::SubnetFeatures; use ic_registry_subnet_type::SubnetType; use ic_types::PrincipalId; +use indexmap::IndexMap; use registry_canister::mutations::do_add_nodes_to_subnet::AddNodesToSubnetPayload; use registry_canister::mutations::do_change_subnet_membership::ChangeSubnetMembershipPayload; use registry_canister::mutations::do_create_subnet::CreateSubnetPayload; @@ -33,6 +35,7 @@ use std::fmt::Debug; use std::net::Ipv6Addr; use std::ops::Deref; use std::str::FromStr; +use std::sync::OnceLock; use strum::VariantNames; use strum_macros::EnumString; use url::Url; @@ -193,7 +196,7 @@ impl TopologyChangePayload for RemoveNodesPayload { } } -#[derive(CandidType, Serialize, Deserialize, Clone, Debug)] +#[derive(CandidType, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct TopologyChangeProposal { pub node_ids_added: Vec, pub node_ids_removed: Vec, @@ -289,10 +292,10 @@ pub struct SubnetMetadata { pub applications: Option>, } -#[derive(Clone, Serialize, Debug, Deserialize)] +#[derive(Clone, Serialize, Debug, Deserialize, Default)] pub struct Node { pub principal: PrincipalId, - pub ip_addr: Ipv6Addr, + pub ip_addr: Option, pub operator: Operator, #[serde(skip_serializing_if = "Option::is_none")] pub hostname: Option, @@ -300,6 +303,8 @@ pub struct Node { pub subnet_id: Option, pub hostos_release: Option, pub hostos_version: String, + #[serde(skip)] + pub cached_features: OnceLock, pub dfinity_owned: Option, #[serde(skip_serializing_if = "Option::is_none")] pub proposal: Option, @@ -311,6 +316,147 @@ pub struct Node { pub public_ipv4_config: Option, } +impl Node { + pub fn id_short(&self) -> String { + self.principal.to_string().split_once('-').expect("invalid principal").0.to_string() + } + pub fn new_test_node(node_number: u64, features: NodeFeatures, dfinity_owned: bool) -> Self { + Node { + principal: PrincipalId::new_node_test_id(node_number), + cached_features: features.into(), + dfinity_owned: Some(dfinity_owned), + ..Default::default() + } + } + pub fn get_features(&self) -> NodeFeatures { + let features = if let Some(features) = &self.cached_features.get() { + // Return a clone of the cached value, if it exists + (*features).clone() + } else { + let country = self + .operator + .datacenter + .as_ref() + .map(|d| d.country.clone()) + .unwrap_or_else(|| "unknown".to_string()); + let area = self + .operator + .datacenter + .as_ref() + .map(|d| d.area.clone()) + .unwrap_or_else(|| "unknown".to_string()); + + NodeFeatures::from_iter([ + (NodeFeature::Area, area), + (NodeFeature::Country, country), + ( + NodeFeature::Continent, + self.operator + .datacenter + .as_ref() + .map(|d| d.continent.clone()) + .unwrap_or_else(|| "unknown".to_string()), + ), + ( + NodeFeature::DataCenterOwner, + self.operator + .datacenter + .as_ref() + .map(|d| d.owner.name.clone()) + .unwrap_or_else(|| "unknown".to_string()), + ), + ( + NodeFeature::DataCenter, + self.operator + .datacenter + .as_ref() + .map(|d| d.name.clone()) + .unwrap_or_else(|| "unknown".to_string()), + ), + (NodeFeature::NodeProvider, self.operator.provider.principal.to_string()), + ]) + }; + + // Cache the calculated value + self.cached_features.get_or_init(|| features.clone()); + + features + } + + pub fn get_feature(&self, feature: &NodeFeature) -> Option { + self.get_features().get(feature) + } + + pub fn matches_feature_value(&self, value: &str) -> bool { + self.principal.to_string() == *value.to_lowercase() + || self + .get_features() + .feature_map + .values() + .any(|v| *v.to_lowercase() == *value.to_lowercase()) + } + + pub fn is_country_from_eu(country: &str) -> bool { + // (As of 2024) the EU countries are not properly marked in the registry, so we check membership separately. + let eu_countries: AHashMap<&str, &str> = AHashMap::from_iter([ + ("AT", "Austria"), + ("BE", "Belgium"), + ("BG", "Bulgaria"), + ("CY", "Cyprus"), + ("CZ", "Czechia"), + ("DE", "Germany"), + ("DK", "Denmark"), + ("EE", "Estonia"), + ("ES", "Spain"), + ("FI", "Finland"), + ("FR", "France"), + ("GR", "Greece"), + ("HR", "Croatia"), + ("HU", "Hungary"), + ("IE", "Ireland"), + ("IT", "Italy"), + ("LT", "Lithuania"), + ("LU", "Luxembourg"), + ("LV", "Latvia"), + ("MT", "Malta"), + ("NL", "Netherlands"), + ("PL", "Poland"), + ("PT", "Portugal"), + ("RO", "Romania"), + ("SE", "Sweden"), + ("SI", "Slovenia"), + ("SK", "Slovakia"), + ]); + eu_countries.contains_key(country) + } +} + +impl std::fmt::Display for Node { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Node ID: {}\nFeatures:\n{}\nDfinity Owned: {}", + self.principal, + self.get_features(), + self.dfinity_owned.unwrap_or_default() + ) + } +} + +impl Hash for Node { + fn hash(&self, state: &mut H) { + self.principal.hash(state); + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Self) -> bool { + self.principal == other.principal + } +} + +impl Eq for Node {} + #[derive(strum_macros::Display, EnumString, VariantNames, Hash, Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize, Debug)] #[strum(serialize_all = "snake_case")] #[serde(rename_all = "snake_case")] @@ -337,6 +483,56 @@ impl NodeFeature { } } +#[derive(Eq, PartialEq, Clone, Serialize, Deserialize, Debug, Default)] +pub struct NodeFeatures { + pub feature_map: IndexMap, +} + +impl std::fmt::Display for NodeFeatures { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for (feature, value) in &self.feature_map { + writeln!(f, "{}: {}", feature, value)?; + } + Ok(()) + } +} + +impl NodeFeatures { + pub fn get(&self, feature: &NodeFeature) -> Option { + self.feature_map.get(feature).cloned() + } + + pub fn new_test_feature_set(value: &str) -> Self { + let mut result = IndexMap::new(); + for feature in NodeFeature::variants() { + result.insert(feature, value.to_string()); + } + NodeFeatures { feature_map: result } + } + + pub fn with_feature_value(&self, feature: &NodeFeature, value: &str) -> Self { + let mut feature_map = self.feature_map.clone(); + feature_map.insert(feature.clone(), value.to_string()); + NodeFeatures { feature_map } + } +} + +impl FromIterator<(NodeFeature, &'static str)> for NodeFeatures { + fn from_iter>(iter: I) -> Self { + Self { + feature_map: IndexMap::from_iter(iter.into_iter().map(|x| (x.0, String::from(x.1)))), + } + } +} + +impl FromIterator<(NodeFeature, std::string::String)> for NodeFeatures { + fn from_iter>(iter: I) -> Self { + Self { + feature_map: IndexMap::from_iter(iter), + } + } +} + #[derive(Clone, Serialize, Debug, Deserialize)] pub struct TopologyProposal { pub id: u64, @@ -381,7 +577,7 @@ pub struct CreateSubnetProposalInfo { pub nodes: Vec, } -#[derive(Clone, Serialize, Default, Debug, Deserialize)] +#[derive(Clone, Serialize, Default, Debug, Deserialize, PartialEq, Eq)] pub struct Operator { pub principal: PrincipalId, pub provider: Provider, @@ -416,6 +612,14 @@ pub struct Datacenter { pub longitude: Option, } +impl PartialEq for Datacenter { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + } +} + +impl Eq for Datacenter {} + #[derive(Clone, Serialize, Default, Debug, Deserialize)] pub struct DatacenterOwner { pub name: String,