Skip to content

Commit

Permalink
feat(decentralization): from nodes with same dec. prefer those from l…
Browse files Browse the repository at this point in the history
…ess used operators (#1080)
  • Loading branch information
sasa-tomic authored Nov 14, 2024
1 parent b918202 commit 4d305a7
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 68 deletions.
7 changes: 5 additions & 2 deletions rs/cli/src/commands/subnet/replace.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::{error::ErrorKind, Args};
use ic_types::PrincipalId;
use itertools::Itertools;

use crate::{
commands::{AuthRequirement, ExecutableCommand},
Expand Down Expand Up @@ -52,6 +53,9 @@ impl ExecutableCommand for Replace {
_ => SubnetTarget::FromNodesIds(self.nodes.clone()),
};

let runner = ctx.runner().await?;
let all_nodes = ctx.registry().await.nodes().await?.values().cloned().collect_vec();

let subnet_manager = ctx.subnet_manager().await?;
let subnet_change_response = subnet_manager
.with_target(subnet_target)
Expand All @@ -62,11 +66,10 @@ impl ExecutableCommand for Replace {
self.exclude.clone().into(),
self.only.clone(),
self.include.clone().into(),
&all_nodes,
)
.await?;

let runner = ctx.runner().await?;

if let Some(runner_proposal) = runner.propose_subnet_change(subnet_change_response, ctx.forum_post_link()).await? {
let ic_admin = ctx.ic_admin().await?;
ic_admin.propose_run(runner_proposal.cmd, runner_proposal.opts).await?;
Expand Down
31 changes: 23 additions & 8 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use decentralization::network::SubnetChangeRequest;
use decentralization::network::SubnetQueryBy;
use decentralization::subnets::NodesRemover;
use decentralization::SubnetChangeResponse;
use futures::future::try_join3;
use futures::TryFutureExt;
use futures_util::future::try_join;
use ic_management_backend::health::HealthStatusQuerier;
Expand Down Expand Up @@ -141,6 +142,7 @@ impl Runner {
replica_version: Option<String>,
other_args: Vec<String>,
) -> anyhow::Result<Option<RunnerProposal>> {
let all_nodes = self.registry.nodes().await?.values().cloned().collect_vec();
let health_of_nodes = self.health_of_nodes().await?;

let subnet_creation_data = self
Expand All @@ -156,6 +158,7 @@ impl Runner {
warn!("Will continue running as if no features were cordoned");
vec![]
}),
&all_nodes,
)
.await?;
let subnet_creation_data = SubnetChangeResponse::new(&subnet_creation_data, &health_of_nodes, Some(motivation.clone()));
Expand Down Expand Up @@ -523,8 +526,13 @@ impl Runner {
})
.map(|(id, subnet)| (*id, subnet.clone()))
.collect::<IndexMap<_, _>>();
let (available_nodes, health_of_nodes) =
try_join(self.registry.available_nodes().map_err(anyhow::Error::from), self.health_client.nodes()).await?;
let (all_nodes, available_nodes, health_of_nodes) = try_join3(
self.registry.nodes(),
self.registry.available_nodes().map_err(anyhow::Error::from),
self.health_client.nodes(),
)
.await?;
let all_nodes = all_nodes.values().cloned().collect::<Vec<Node>>();

let subnets_change_responses = NetworkHealRequest::new(subnets_without_proposals)
.heal_and_optimize(
Expand All @@ -535,6 +543,7 @@ impl Runner {
warn!("Will continue running as if no features were cordoned");
vec![]
}),
&all_nodes,
)
.await?;

Expand Down Expand Up @@ -635,6 +644,7 @@ impl Runner {
node: &Node,
ensure_assigned: bool,
cordoned_features: Vec<NodeFeaturePair>,
all_nodes: &[Node],
) -> Option<SubnetChangeResponse> {
let mut best_change: Option<SubnetChangeResponse> = None;

Expand All @@ -648,6 +658,7 @@ impl Runner {
0,
health_of_nodes,
cordoned_features.clone(),
all_nodes,
)
} else {
SubnetChangeRequest::new(subnet, available_nodes.to_vec(), vec![], vec![node.clone()], vec![]).resize(
Expand All @@ -656,6 +667,7 @@ impl Runner {
0,
health_of_nodes,
cordoned_features.clone(),
all_nodes,
)
};

Expand Down Expand Up @@ -706,15 +718,16 @@ impl Runner {
let mut subnets = self.get_subnets(skip_subnets).await?;
let (mut available_nodes, health_of_nodes) = self.get_available_and_healthy_nodes().await?;
let all_node_operators = self.registry.operators().await?;
let all_nodes = self.registry.nodes().await?;
let all_nodes_map = self.registry.nodes().await?;
let all_nodes = all_nodes_map.values().cloned().collect_vec();
let all_nodes_grouped_by_operator = all_nodes
.values()
.iter()
.cloned()
.into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal);
.into_group_map_by(|node| all_nodes_map.get(&node.principal).expect("Node should exist").operator.principal);
let available_nodes_grouped_by_operator = available_nodes
.iter()
.map(|n| (*n).clone())
.into_group_map_by(|node| all_nodes.get(&node.principal).expect("Node should exist").operator.principal);
.into_group_map_by(|node| all_nodes_map.get(&node.principal).expect("Node should exist").operator.principal);
let cordoned_features = self.cordoned_features_fetcher.fetch().await.unwrap_or_else(|e| {
warn!("Failed to fetch cordoned features with error: {:?}", e);
warn!("Will continue running as if no features were cordoned");
Expand All @@ -725,7 +738,7 @@ impl Runner {
&all_node_operators,
&all_nodes_grouped_by_operator,
&available_nodes_grouped_by_operator,
&all_nodes,
&all_nodes_map,
&subnets,
ensure_assigned,
);
Expand Down Expand Up @@ -760,6 +773,7 @@ impl Runner {
node,
ensure_assigned,
cordoned_features.clone(),
&all_nodes,
)
.await;

Expand Down Expand Up @@ -872,9 +886,10 @@ impl Runner {
None => change_request,
};

let all_nodes = self.registry.nodes().await?.values().cloned().collect_vec();
let health_of_nodes = self.health_of_nodes().await?;

let change = &change_request.rescue(&health_of_nodes, self.cordoned_features_fetcher.fetch().await?)?;
let change = &change_request.rescue(&health_of_nodes, self.cordoned_features_fetcher.fetch().await?, &all_nodes)?;
let change = SubnetChangeResponse::new(change, &health_of_nodes, Some("Recovering subnet".to_string()));

if change.node_ids_added.is_empty() && change.node_ids_removed.is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions rs/cli/src/subnet_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use ic_management_types::HealthStatus;
use ic_management_types::Node;
use ic_types::PrincipalId;
use indexmap::IndexMap;
use itertools::Itertools;
use log::{info, warn};

use crate::cordoned_feature_fetcher::CordonedFeatureFetcher;
Expand Down Expand Up @@ -123,6 +124,7 @@ impl SubnetManager {
exclude: Option<Vec<String>>,
only: Vec<String>,
include: Option<Vec<PrincipalId>>,
all_nodes: &[Node],
) -> anyhow::Result<SubnetChangeResponse> {
let subnet_query_by = self.get_subnet_query_by(self.target()?).await?;
let mut motivations = vec![];
Expand Down Expand Up @@ -156,6 +158,7 @@ impl SubnetManager {
&to_be_replaced,
&health_of_nodes,
self.cordoned_features_fetcher.fetch().await?,
all_nodes,
)?;

for n in change.removed().iter().filter(|n| !node_ids_unhealthy.contains(&n.principal)) {
Expand Down Expand Up @@ -186,6 +189,7 @@ impl SubnetManager {
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
) -> anyhow::Result<SubnetChangeResponse> {
let registry = self.registry_instance.clone();
let all_nodes = registry.nodes().await?.values().cloned().collect_vec();
let mut motivations = vec![];

let change = registry
Expand All @@ -200,6 +204,7 @@ impl SubnetManager {
0,
health_of_nodes,
self.cordoned_features_fetcher.fetch().await?,
&all_nodes,
)?;

for n in change.removed().iter() {
Expand Down
3 changes: 2 additions & 1 deletion rs/cli/src/unit_tests/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ fn should_skip_cordoned_nodes() {
let mut failed_scenarios = vec![];

let registry = Arc::new(registry);
let all_nodes = available_nodes.iter().chain(subnet.nodes.iter()).cloned().collect_vec();
let health_client = Arc::new(health_client);
for (cordoned_features, should_succeed) in scenarios {
let cordoned_features_clone = cordoned_features.clone();
Expand All @@ -260,7 +261,7 @@ fn should_skip_cordoned_nodes() {
.with_target(crate::subnet_manager::SubnetTarget::FromNodesIds(vec![
subnet.nodes.first().unwrap().principal,
]))
.membership_replace(false, None, None, None, vec![], None),
.membership_replace(false, None, None, None, vec![], None, &all_nodes),
);

// Assert
Expand Down
2 changes: 2 additions & 0 deletions rs/decentralization/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod nakamoto;
pub mod network;
#[cfg(test)]
mod network_tests;
pub mod subnets;
use indexmap::IndexMap;
use itertools::Itertools;
Expand Down
Loading

0 comments on commit 4d305a7

Please sign in to comment.