Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(dre): replace "decentralized" node with backend node #1079

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "5808defd7f9c6be1d143a5ea8a53f6c33f0b47a6b1b95f16e0ade249061332c8",
"checksum": "93b8abc5567d6cead03998cd02aa972b598bce15ac572479e28cd8e10af65ad3",
"crates": {
"actix-codec 0.5.2": {
"name": "actix-codec",
Expand Down Expand Up @@ -19811,6 +19811,10 @@
"id": "actix-web 4.9.0",
"target": "actix_web"
},
{
"id": "ahash 0.8.11",
"target": "ahash"
},
{
"id": "anyhow 1.0.93",
"target": "anyhow"
Expand Down Expand Up @@ -28166,35 +28170,29 @@
],
"crate_features": {
"common": [
"elf",
"errno",
"general",
"ioctl",
"no_std"
],
"selects": {
"aarch64-unknown-linux-gnu": [
"elf",
"errno",
"prctl",
"std",
"system"
],
"arm-unknown-linux-gnueabi": [
"elf",
"errno",
"prctl",
"std",
"system"
],
"armv7-unknown-linux-gnueabi": [
"elf",
"errno",
"prctl",
"std",
"system"
],
"i686-unknown-linux-gnu": [
"elf",
"errno",
"prctl",
"std",
"system"
Expand All @@ -28210,8 +28208,6 @@
"system"
],
"x86_64-unknown-linux-gnu": [
"elf",
"errno",
"prctl",
"std",
"system"
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions rs/cli/src/commands/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, HashMap},
net::Ipv6Addr,
path::PathBuf,
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -314,11 +315,11 @@ async fn _get_nodes(
NodeDetails {
node_id: *k,
xnet: Some(ConnectionEndpoint {
ip_addr: record.ip_addr.to_string(),
ip_addr: record.ip_addr.unwrap_or(Ipv6Addr::LOCALHOST).to_string(),
port: 2497,
}),
http: Some(ConnectionEndpoint {
ip_addr: record.ip_addr.to_string(),
ip_addr: record.ip_addr.unwrap_or(Ipv6Addr::LOCALHOST).to_string(),
port: 8080,
}),
node_operator_id,
Expand Down
92 changes: 44 additions & 48 deletions rs/cli/src/operations/hostos_rollout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ impl HostosRollout {
}
nodes
.iter()
.filter(|n| {
let node = decentralization::network::Node::from(n.to_owned());
.filter(|node| {
for filt in self.only_filter.iter() {
if node.matches_feature_value(filt) {
return true;
Expand All @@ -311,8 +310,7 @@ impl HostosRollout {
}
nodes
.iter()
.filter(|n| {
let node = decentralization::network::Node::from(n.to_owned());
.filter(|node| {
for filt in self.exclude_filter.iter() {
if node.matches_feature_value(filt) {
return false;
Expand Down Expand Up @@ -440,50 +438,47 @@ impl HostosRollout {
}
}
}
NodeAssignment::All => {
use HostosRolloutResponse::{None, Ok};
try_join(
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Assigned),
),
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Unassigned),
),
)
.await
.map(|response| match response {
(Ok(assigned_nodes, subnet_affected), None(reason)) => {
info!("No unassigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
Ok(assigned_nodes, subnet_affected)
}
(None(reason), Ok(unassigned_nodes, _)) => {
info!("No assigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
Ok(unassigned_nodes, Option::None)
}
NodeAssignment::All => try_join(
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Assigned),
),
self.with_nodes_health_and_open_proposals(
nodes_health.clone(),
nodes_with_open_proposals.clone(),
update_group.clone().with_assignment(NodeAssignment::Unassigned),
),
)
.await
.map(|response| match response {
(HostosRolloutResponse::Ok(assigned_nodes, subnet_affected), HostosRolloutResponse::None(reason)) => {
info!("No unassigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
HostosRolloutResponse::Ok(assigned_nodes, subnet_affected)
}
(HostosRolloutResponse::None(reason), HostosRolloutResponse::Ok(unassigned_nodes, _)) => {
info!("No assigned nodes selected for: {:?} ==> {:?}", update_group.node_group, reason);
HostosRolloutResponse::Ok(unassigned_nodes, Option::None)
}

(Ok(assigned_nodes, subnet_affected), Ok(unassigned_nodes, _)) => {
info!(
"{} assigned nodes and {} unassigned nodes selected for: {}",
assigned_nodes.len(),
unassigned_nodes.len(),
update_group.node_group
);
Ok(assigned_nodes.into_iter().chain(unassigned_nodes).collect(), subnet_affected.clone())
}
(HostosRolloutResponse::Ok(assigned_nodes, subnet_affected), HostosRolloutResponse::Ok(unassigned_nodes, _)) => {
info!(
"{} assigned nodes and {} unassigned nodes selected for: {}",
assigned_nodes.len(),
unassigned_nodes.len(),
update_group.node_group
);
HostosRolloutResponse::Ok(assigned_nodes.into_iter().chain(unassigned_nodes).collect(), subnet_affected.clone())
}

(None(assigned_reason), None(unassigned_reason)) => {
info!(
"No candidate nodes selected for: {:?} ==> {:?} {:?}",
update_group.node_group, assigned_reason, unassigned_reason
);
None(assigned_reason.into_iter().chain(unassigned_reason).collect())
}
})
}
(HostosRolloutResponse::None(assigned_reason), HostosRolloutResponse::None(unassigned_reason)) => {
info!(
"No candidate nodes selected for: {:?} ==> {:?} {:?}",
update_group.node_group, assigned_reason, unassigned_reason
);
HostosRolloutResponse::None(assigned_reason.into_iter().chain(unassigned_reason).collect())
}
}),
}
}

Expand Down Expand Up @@ -538,7 +533,7 @@ pub mod test {
use ic_management_backend::proposal::ProposalAgentImpl;
use ic_management_types::{Network, Node, Operator, Provider, Subnet};
use std::collections::BTreeMap;
use std::net::Ipv6Addr;
use std::sync::OnceLock;

use super::*;

Expand Down Expand Up @@ -693,7 +688,7 @@ pub mod test {
for i in start_at_number..start_at_number + num_nodes {
let node = Node {
principal: PrincipalId::new_node_test_id(i),
ip_addr: Ipv6Addr::LOCALHOST,
ip_addr: None,
operator: Operator {
principal: PrincipalId::new_node_test_id(i),
provider: Provider {
Expand All @@ -706,6 +701,7 @@ pub mod test {
rewardable_nodes: BTreeMap::new(),
ipv6: "".to_string(),
},
cached_features: OnceLock::new(),
hostname: None,
hostos_release: None,
proposal: None,
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/qualification/run_workload_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Step for Workload {
.find(|s| s.subnet_type.eq(&SubnetType::Application))
.ok_or(anyhow::anyhow!("Application subnet required for step `{}`", self.name()))?;

let all_ipv6 = subnet.nodes.iter().map(|n| n.ip_addr).collect_vec();
let all_ipv6 = subnet.nodes.iter().map(|n| n.ip_addr.unwrap()).collect_vec();
let args = &[
all_ipv6.iter().map(|ip| format!("http://[{}]:8080/", ip)).join(","),
"-m=UpdateCounter".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/qualification/run_xnet_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Step for RunXnetTest {

let args = &[
"--nns_url".to_string(),
format!("http://[{}]:8080/", nns_node.ip_addr),
format!("http://[{}]:8080/", nns_node.ip_addr.unwrap()),
"--subnets".to_string(),
NUM_SUBNETS.to_string(),
"--principal_key".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/qualification/upgrade_subnets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async fn wait_for_subnet_revision(ctx: &StepCtx, subnet: Option<PrincipalId>, re
// Fetch the metrics of each node and check if it
// contains the revision somewhere
for node in nodes {
let url = format!("http://[{}]:9090/metrics", node.ip_addr);
let url = format!("http://[{}]:9090/metrics", node.ip_addr.unwrap());

let response = match client.get(&url).send().await {
Ok(r) => match r.error_for_status() {
Expand Down
38 changes: 18 additions & 20 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,9 @@ impl Runner {
// Values
for nr in &node_removals {
let mut row = tabular::Row::new();
let decentralization_node = decentralization::network::Node::from(&nr.node);
row.add_cell(nr.node.principal);
for nf in NodeFeature::variants() {
row.add_cell(decentralization_node.get_feature(&nf));
row.add_cell(nr.node.get_feature(&nf).expect("Feature should exist"));
}
row.add_cell(nr.node.hostname.clone().unwrap_or_else(|| "N/A".to_string()));
row.add_cell(nr.reason.message());
Expand Down Expand Up @@ -567,15 +566,15 @@ impl Runner {
.collect::<IndexMap<_, _>>())
}

async fn get_available_and_healthy_nodes(&self) -> anyhow::Result<(Vec<decentralization::network::Node>, IndexMap<PrincipalId, HealthStatus>)> {
async fn get_available_and_healthy_nodes(&self) -> anyhow::Result<(Vec<Node>, IndexMap<PrincipalId, HealthStatus>)> {
try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await
}

fn get_operators_to_optimize(
&self,
node_operators_all: &IndexMap<PrincipalId, Operator>,
all_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<decentralization::network::Node>>,
available_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<decentralization::network::Node>>,
all_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<Node>>,
available_nodes_grouped_by_operator: &HashMap<PrincipalId, Vec<Node>>,
nodes_all: &IndexMap<PrincipalId, Node>,
subnets: &IndexMap<PrincipalId, Subnet>,
ensure_assigned: bool,
Expand All @@ -590,9 +589,9 @@ impl Runner {
.filter_map(|(operator_id, operator)| {
all_nodes_grouped_by_operator.get(operator_id).and_then(|operator_nodes| {
let condition = if ensure_assigned {
operator_nodes.iter().all(|node| !nodes_in_subnets.contains_key(&node.id))
operator_nodes.iter().all(|node| !nodes_in_subnets.contains_key(&node.principal))
} else {
operator_nodes.iter().all(|node| nodes_in_subnets.contains_key(&node.id))
operator_nodes.iter().all(|node| nodes_in_subnets.contains_key(&node.principal))
};

if condition {
Expand All @@ -601,12 +600,12 @@ impl Runner {
.get(operator_id)
.unwrap_or(&vec![])
.iter()
.map(|node| nodes_all.get(&node.id).expect("Node should exist").clone())
.map(|node| nodes_all.get(&node.principal).expect("Node should exist").clone())
.collect::<Vec<_>>()
} else {
operator_nodes
.iter()
.filter_map(|node| nodes_in_subnets.get(&node.id))
.filter_map(|node| nodes_in_subnets.get(&node.principal))
.map(|n| (*n).clone())
.collect::<Vec<_>>()
};
Expand All @@ -631,28 +630,27 @@ impl Runner {
async fn get_best_change_for_operator(
&self,
subnets: &IndexMap<PrincipalId, Subnet>,
available_nodes: &[decentralization::network::Node],
available_nodes: &[Node],
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
node: &Node,
ensure_assigned: bool,
cordoned_features: Vec<NodeFeaturePair>,
) -> Option<SubnetChangeResponse> {
let decentr_node = decentralization::network::Node::from(node);
let mut best_change: Option<SubnetChangeResponse> = None;

for subnet in subnets.values() {
let subnet = DecentralizedSubnet::from(subnet);
let subnet_id_short = subnet.id.to_string().split_once('-').unwrap().0.to_string();
let change_request = if ensure_assigned {
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![decentr_node.clone()], vec![], vec![]).resize(
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![node.clone()], vec![], vec![]).resize(
0,
1,
0,
health_of_nodes,
cordoned_features.clone(),
)
} else {
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![decentr_node.clone()], vec![]).resize(
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![node.clone()], vec![]).resize(
1,
0,
0,
Expand Down Expand Up @@ -711,12 +709,12 @@ impl Runner {
let all_nodes = self.registry.nodes().await?;
let all_nodes_grouped_by_operator = all_nodes
.values()
.map(decentralization::network::Node::from)
.into_group_map_by(|node| all_nodes.get(&node.id).expect("Node should exist").operator.principal);
.cloned()
.into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal);
let available_nodes_grouped_by_operator = available_nodes
.iter()
.map(|n| (*n).clone())
.into_group_map_by(|node| all_nodes.get(&node.id).expect("Node should exist").operator.principal);
.into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal);
let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| {
warn!("Failed to fetch cordoned features with error: {:?}", e);
warn!("Will continue running as if no features were cordoned");
Expand Down Expand Up @@ -786,7 +784,7 @@ impl Runner {
.await?,
);
subnets.shift_remove(&change.subnet_id.expect("Subnet ID should be present"));
available_nodes.retain(|n| n.id != node.principal);
available_nodes.retain(|n| n.principal != node.principal);
} else {
warn!(
"{} node {} of the operator {} in DC {} would worsen decentralization in all subnets!",
Expand Down Expand Up @@ -824,7 +822,7 @@ impl Runner {
) -> anyhow::Result<()> {
let subnet_before = match override_subnet_nodes {
Some(nodes) => {
let nodes = self.registry.get_decentralized_nodes(&nodes).await?;
let nodes = self.registry.get_nodes_from_ids(&nodes).await?;
DecentralizedSubnet::new_with_subnet_id_and_nodes(change.subnet_id, nodes)
}
None => self
Expand All @@ -837,11 +835,11 @@ impl Runner {
let health_of_nodes = self.health_of_nodes().await?;

// Simulate node removal
let removed_nodes = self.registry.get_decentralized_nodes(&change.get_removed_node_ids()).await?;
let removed_nodes = self.registry.get_nodes_from_ids(&change.get_removed_node_ids()).await?;
let subnet_mid = subnet_before.without_nodes(&removed_nodes).map_err(|e| anyhow::anyhow!(e))?;

// Now simulate node addition
let added_nodes = self.registry.get_decentralized_nodes(&change.get_added_node_ids()).await?;
let added_nodes = self.registry.get_nodes_from_ids(&change.get_added_node_ids()).await?;

let subnet_after = subnet_mid.with_nodes(&added_nodes);

Expand Down
Loading
Loading