Skip to content

Commit

Permalink
refactor(dre): replace "decentralized" node with backend node (#1079)
Browse files Browse the repository at this point in the history
  • Loading branch information
sasa-tomic authored Nov 13, 2024
1 parent 7834f29 commit 2069c13
Show file tree
Hide file tree
Showing 20 changed files with 458 additions and 505 deletions.
18 changes: 7 additions & 11 deletions Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "5808defd7f9c6be1d143a5ea8a53f6c33f0b47a6b1b95f16e0ade249061332c8",
"checksum": "93b8abc5567d6cead03998cd02aa972b598bce15ac572479e28cd8e10af65ad3",
"crates": {
"actix-codec 0.5.2": {
"name": "actix-codec",
Expand Down Expand Up @@ -19811,6 +19811,10 @@
"id": "actix-web 4.9.0",
"target": "actix_web"
},
{
"id": "ahash 0.8.11",
"target": "ahash"
},
{
"id": "anyhow 1.0.93",
"target": "anyhow"
Expand Down Expand Up @@ -28166,35 +28170,29 @@
],
"crate_features": {
"common": [
"elf",
"errno",
"general",
"ioctl",
"no_std"
],
"selects": {
"aarch64-unknown-linux-gnu": [
"elf",
"errno",
"prctl",
"std",
"system"
],
"arm-unknown-linux-gnueabi": [
"elf",
"errno",
"prctl",
"std",
"system"
],
"armv7-unknown-linux-gnueabi": [
"elf",
"errno",
"prctl",
"std",
"system"
],
"i686-unknown-linux-gnu": [
"elf",
"errno",
"prctl",
"std",
"system"
Expand All @@ -28210,8 +28208,6 @@
"system"
],
"x86_64-unknown-linux-gnu": [
"elf",
"errno",
"prctl",
"std",
"system"
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions rs/cli/src/commands/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, HashMap},
net::Ipv6Addr,
path::PathBuf,
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -314,11 +315,11 @@ async fn _get_nodes(
NodeDetails {
node_id: *k,
xnet: Some(ConnectionEndpoint {
ip_addr: record.ip_addr.to_string(),
ip_addr: record.ip_addr.unwrap_or(Ipv6Addr::LOCALHOST).to_string(),
port: 2497,
}),
http: Some(ConnectionEndpoint {
ip_addr: record.ip_addr.to_string(),
ip_addr: record.ip_addr.unwrap_or(Ipv6Addr::LOCALHOST).to_string(),
port: 8080,
}),
node_operator_id,
Expand Down
92 changes: 44 additions & 48 deletions rs/cli/src/operations/hostos_rollout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ impl HostosRollout {
}
nodes
.iter()
.filter(|n| {
let node = decentralization::network::Node::from(n.to_owned());
.filter(|node| {
for filt in self.only_filter.iter() {
if node.matches_feature_value(filt) {
return true;
Expand All @@ -311,8 +310,7 @@ impl HostosRollout {
}
nodes
.iter()
.filter(|n| {
let node = decentralization::network::Node::from(n.to_owned());
.filter(|node| {
for filt in self.exclude_filter.iter() {
if node.matches_feature_value(filt) {
return false;
Expand Down Expand Up @@ -440,50 +438,47 @@ impl HostosRollout {
}
}
}
NodeAssignment::All => {
use HostosRolloutResponse::{None, Ok};
try_join(
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Assigned),
),
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Unassigned),
),
)
.await
.map(|response| match response {
(Ok(assigned_nodes, subnet_affected), None(reason)) => {
info!("No unassigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
Ok(assigned_nodes, subnet_affected)
}
(None(reason), Ok(unassigned_nodes, _)) => {
info!("No assigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
Ok(unassigned_nodes, Option::None)
}
NodeAssignment::All => try_join(
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Assigned),
),
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Unassigned),
),
)
.await
.map(|response| match response {
(HostosRolloutResponse::Ok(assigned_nodes, subnet_affected), HostosRolloutResponse::None(reason)) => {
info!("No unassigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
HostosRolloutResponse::Ok(assigned_nodes, subnet_affected)
}
(HostosRolloutResponse::None(reason), HostosRolloutResponse::Ok(unassigned_nodes, _)) => {
info!("No assigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
HostosRolloutResponse::Ok(unassigned_nodes, Option::None)
}

(Ok(assigned_nodes, subnet_affected), Ok(unassigned_nodes, _)) => {
info!(
"{} assigned nodes and {} unassigned nodes selected for: {}",
assigned_nodes.len(),
unassigned_nodes.len(),
update_group.node_group
);
Ok(assigned_nodes.into_iter().chain(unassigned_nodes).collect(), subnet_affected.clone())
}
(HostosRolloutResponse::Ok(assigned_nodes, subnet_affected), HostosRolloutResponse::Ok(unassigned_nodes, _)) => {
info!(
"{} assigned nodes and {} unassigned nodes selected for: {}",
assigned_nodes.len(),
unassigned_nodes.len(),
update_group.node_group
);
HostosRolloutResponse::Ok(assigned_nodes.into_iter().chain(unassigned_nodes).collect(), subnet_affected.clone())
}

(None(assigned_reason), None(unassigned_reason)) => {
info!(
"No candidate nodes selected for: {:?} ==> {:?} {:?}",
update_group.node_group, assigned_reason, unassigned_reason
);
None(assigned_reason.into_iter().chain(unassigned_reason).collect())
}
})
}
(HostosRolloutResponse::None(assigned_reason), HostosRolloutResponse::None(unassigned_reason)) => {
info!(
"No candidate nodes selected for: {:?} ==> {:?} {:?}",
update_group.node_group, assigned_reason, unassigned_reason
);
HostosRolloutResponse::None(assigned_reason.into_iter().chain(unassigned_reason).collect())
}
}),
}
}

Expand Down Expand Up @@ -538,7 +533,7 @@ pub mod test {
use ic_management_backend::proposal::ProposalAgentImpl;
use ic_management_types::{Network, Node, Operator, Provider, Subnet};
use std::collections::BTreeMap;
use std::net::Ipv6Addr;
use std::sync::OnceLock;

use super::*;

Expand Down Expand Up @@ -693,7 +688,7 @@ pub mod test {
for i in start_at_number..start_at_number + num_nodes {
let node = Node {
principal: PrincipalId::new_node_test_id(i),
ip_addr: Ipv6Addr::LOCALHOST,
ip_addr: None,
operator: Operator {
principal: PrincipalId::new_node_test_id(i),
provider: Provider {
Expand All @@ -706,6 +701,7 @@ pub mod test {
rewardable_nodes: BTreeMap::new(),
ipv6: "".to_string(),
},
cached_features: OnceLock::new(),
hostname: None,
hostos_release: None,
proposal: None,
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/qualification/run_workload_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Step for Workload {
.find(|s| s.subnet_type.eq(&SubnetType::Application))
.ok_or(anyhow::anyhow!("Application subnet required for step `{}`", self.name()))?;

let all_ipv6 = subnet.nodes.iter().map(|n| n.ip_addr).collect_vec();
let all_ipv6 = subnet.nodes.iter().map(|n| n.ip_addr.unwrap()).collect_vec();
let args = &[
all_ipv6.iter().map(|ip| format!("http://[{}]:8080/", ip)).join(","),
"-m=UpdateCounter".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/qualification/run_xnet_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Step for RunXnetTest {

let args = &[
"--nns_url".to_string(),
format!("http://[{}]:8080/", nns_node.ip_addr),
format!("http://[{}]:8080/", nns_node.ip_addr.unwrap()),
"--subnets".to_string(),
NUM_SUBNETS.to_string(),
"--principal_key".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/qualification/upgrade_subnets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async fn wait_for_subnet_revision(ctx: &StepCtx, subnet: Option<PrincipalId>, re
// Fetch the metrics of each node and check if it
// contains the revision somewhere
for node in nodes {
let url = format!("http://[{}]:9090/metrics", node.ip_addr);
let url = format!("http://[{}]:9090/metrics", node.ip_addr.unwrap());

let response = match client.get(&url).send().await {
Ok(r) => match r.error_for_status() {
Expand Down
38 changes: 18 additions & 20 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,9 @@ impl Runner {
// Values
for nr in &node_removals {
let mut row = tabular::Row::new();
let decentralization_node = decentralization::network::Node::from(&nr.node);
row.add_cell(nr.node.principal);
for nf in NodeFeature::variants() {
row.add_cell(decentralization_node.get_feature(&nf));
row.add_cell(nr.node.get_feature(&nf).expect("Feature should exist"));
}
row.add_cell(nr.node.hostname.clone().unwrap_or_else(|| "N/A".to_string()));
row.add_cell(nr.reason.message());
Expand Down Expand Up @@ -567,15 +566,15 @@ impl Runner {
.collect::<IndexMap<_, _>>())
}

async fn get_available_and_healthy_nodes(&self) -> anyhow::Result<(Vec<decentralization::network::Node>, IndexMap<PrincipalId, HealthStatus>)> {
async fn get_available_and_healthy_nodes(&self) -> anyhow::Result<(Vec<Node>, IndexMap<PrincipalId, HealthStatus>)> {
try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await
}

fn get_operators_to_optimize(
&self,
node_operators_all: &IndexMap<PrincipalId, Operator>,
all_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<decentralization::network::Node>>,
available_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<decentralization::network::Node>>,
all_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<Node>>,
available_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<Node>>,
nodes_all: &IndexMap<PrincipalId, Node>,
subnets: &IndexMap<PrincipalId, Subnet>,
ensure_assigned: bool,
Expand All @@ -590,9 +589,9 @@ impl Runner {
.filter_map(|(operator_id, operator)| {
all_nodes_grouped_by_operator.get(operator_id).and_then(|operator_nodes| {
let condition = if ensure_assigned {
operator_nodes.iter().all(|node| !nodes_in_subnets.contains_key(&node.id))
operator_nodes.iter().all(|node| !nodes_in_subnets.contains_key(&node.principal))
} else {
operator_nodes.iter().all(|node| nodes_in_subnets.contains_key(&node.id))
operator_nodes.iter().all(|node| nodes_in_subnets.contains_key(&node.principal))
};

if condition {
Expand All @@ -601,12 +600,12 @@ impl Runner {
.get(operator_id)
.unwrap_or(&vec![])
.iter()
.map(|node| nodes_all.get(&node.id).expect("Node should exist").clone())
.map(|node| nodes_all.get(&node.principal).expect("Node should exist").clone())
.collect::<Vec<_>>()
} else {
operator_nodes
.iter()
.filter_map(|node| nodes_in_subnets.get(&node.id))
.filter_map(|node| nodes_in_subnets.get(&node.principal))
.map(|n| (*n).clone())
.collect::<Vec<_>>()
};
Expand All @@ -631,28 +630,27 @@ impl Runner {
async fn get_best_change_for_operator(
&self,
subnets: &IndexMap<PrincipalId, Subnet>,
available_nodes: &[decentralization::network::Node],
available_nodes: &[Node],
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
node: &Node,
ensure_assigned: bool,
cordoned_features: Vec<NodeFeaturePair>,
) -> Option<SubnetChangeResponse> {
let decentr_node = decentralization::network::Node::from(node);
let mut best_change: Option<SubnetChangeResponse> = None;

for subnet in subnets.values() {
let subnet = DecentralizedSubnet::from(subnet);
let subnet_id_short = subnet.id.to_string().split_once('-').unwrap().0.to_string();
let change_request = if ensure_assigned {
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![decentr_node.clone()], vec![], vec![]).resize(
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![node.clone()], vec![], vec![]).resize(
0,
1,
0,
health_of_nodes,
cordoned_features.clone(),
)
} else {
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![decentr_node.clone()], vec![]).resize(
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![node.clone()], vec![]).resize(
1,
0,
0,
Expand Down Expand Up @@ -711,12 +709,12 @@ impl Runner {
let all_nodes = self.registry.nodes().await?;
let all_nodes_grouped_by_operator = all_nodes
.values()
.map(decentralization::network::Node::from)
.into_group_map_by(|node| all_nodes.get(&node.id).expect("Node should exist").operator.principal);
.cloned()
.into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal);
let available_nodes_grouped_by_operator = available_nodes
.iter()
.map(|n| (*n).clone())
.into_group_map_by(|node| all_nodes.get(&node.id).expect("Node should exist").operator.principal);
.into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal);
let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| {
warn!("Failed to fetch cordoned features with error: {:?}", e);
warn!("Will continue running as if no features were cordoned");
Expand Down Expand Up @@ -786,7 +784,7 @@ impl Runner {
.await?,
);
subnets.shift_remove(&change.subnet_id.expect("Subnet ID should be present"));
available_nodes.retain(|n| n.id != node.principal);
available_nodes.retain(|n| n.principal != node.principal);
} else {
warn!(
"{} node {} of the operator {} in DC {} would worsen decentralization in all subnets!",
Expand Down Expand Up @@ -824,7 +822,7 @@ impl Runner {
) -> anyhow::Result<()> {
let subnet_before = match override_subnet_nodes {
Some(nodes) => {
let nodes = self.registry.get_decentralized_nodes(&nodes).await?;
let nodes = self.registry.get_nodes_from_ids(&nodes).await?;
DecentralizedSubnet::new_with_subnet_id_and_nodes(change.subnet_id, nodes)
}
None => self
Expand All @@ -837,11 +835,11 @@ impl Runner {
let health_of_nodes = self.health_of_nodes().await?;

// Simulate node removal
let removed_nodes = self.registry.get_decentralized_nodes(&change.get_removed_node_ids()).await?;
let removed_nodes = self.registry.get_nodes_from_ids(&change.get_removed_node_ids()).await?;
let subnet_mid = subnet_before.without_nodes(&removed_nodes).map_err(|e| anyhow::anyhow!(e))?;

// Now simulate node addition
let added_nodes = self.registry.get_decentralized_nodes(&change.get_added_node_ids()).await?;
let added_nodes = self.registry.get_nodes_from_ids(&change.get_added_node_ids()).await?;

let subnet_after = subnet_mid.with_nodes(&added_nodes);

Expand Down
Loading

0 comments on commit 2069c13

Please sign in to comment.