Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update cordoned features handling and support for local file #1119

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ exports_files([
"archive_canister.wasm.gz",
"clippy.toml",
"rustfmt.toml",
"cordoned_features.yaml",
"WORKSPACE.bazel",
])

Expand Down
7 changes: 5 additions & 2 deletions rs/cli/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps")
load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test", "rust_library")
load("@rules_rust//cargo:defs.bzl", "cargo_build_script", "cargo_dep_env")
load("@rules_rust//cargo:defs.bzl", "cargo_build_script")

DEPS = [
"//rs/ic-canisters",
Expand Down Expand Up @@ -34,7 +34,10 @@ rust_binary(
),
deps = all_crate_deps(
normal = True,
) + DEPS + ["//rs/cli:dre-lib", ":build_script"]
) + DEPS + ["//rs/cli:dre-lib", ":build_script"],
data = [
"//:cordoned_features.yaml",
]
)

rust_library(
Expand Down
4 changes: 1 addition & 3 deletions rs/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::path::PathBuf;

use crate::commands::subnet::Subnet;
use api_boundary_nodes::ApiBoundaryNodes;
use clap::Args as ClapArgs;
Expand Down Expand Up @@ -212,7 +210,7 @@ The argument is mandatory for testnets, and is optional for mainnet and staging"

/// Path to file which contains cordoned features
#[clap(long, global = true, visible_aliases = &["cf-file", "cfff"])]
pub cordon_feature_fallback_file: Option<PathBuf>,
pub cordoned_features_file: Option<String>,
}

// Do not use outside of DRE CLI.
Expand Down
42 changes: 24 additions & 18 deletions rs/cli/src/cordoned_feature_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{path::PathBuf, time::Duration};

use decentralization::network::NodeFeaturePair;
use decentralization::network::CordonedFeature;
use futures::future::BoxFuture;
use ic_management_types::NodeFeature;
use itertools::Itertools;
Expand All @@ -11,10 +11,10 @@ use strum::VariantNames;

#[automock]
pub trait CordonedFeatureFetcher: Sync + Send {
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<NodeFeaturePair>>>;
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<CordonedFeature>>>;

#[cfg(test)]
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<NodeFeaturePair>>;
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<CordonedFeature>>;
}

pub struct CordonedFeatureFetcherImpl {
Expand All @@ -24,19 +24,23 @@ pub struct CordonedFeatureFetcherImpl {
// fetch from github. If github is
// unreachable, this cache will be used
local_copy: PathBuf,
offline: bool,
use_local_file: bool, // By default, cordoned features are fetched from github
}

const CORDONED_FEATURES_FILE_URL: &str = "https://raw.githubusercontent.com/dfinity/dre/refs/heads/main/cordoned_features.yaml";

impl CordonedFeatureFetcherImpl {
pub fn new(local_copy: PathBuf, offline: bool) -> anyhow::Result<Self> {
pub fn new(local_copy: PathBuf, use_local_file: bool) -> anyhow::Result<Self> {
let client = ClientBuilder::new().timeout(Duration::from_secs(10)).build()?;

Ok(Self { client, local_copy, offline })
Ok(Self {
client,
local_copy,
use_local_file,
})
}

async fn fetch_from_git(&self) -> anyhow::Result<Vec<NodeFeaturePair>> {
async fn fetch_from_git(&self) -> anyhow::Result<Vec<CordonedFeature>> {
let bytes = self
.client
.get(CORDONED_FEATURES_FILE_URL)
Expand All @@ -58,13 +62,13 @@ impl CordonedFeatureFetcherImpl {
self.parse(&bytes)
}

fn fetch_from_file(&self) -> anyhow::Result<Vec<NodeFeaturePair>> {
fn fetch_from_file(&self) -> anyhow::Result<Vec<CordonedFeature>> {
let contents = std::fs::read(&self.local_copy)?;

self.parse(&contents)
}

fn parse(&self, contents: &[u8]) -> anyhow::Result<Vec<NodeFeaturePair>> {
fn parse(&self, contents: &[u8]) -> anyhow::Result<Vec<CordonedFeature>> {
let valid_yaml = serde_yaml::from_slice::<serde_yaml::Value>(contents)?;

let features = match valid_yaml.get("features") {
Expand All @@ -78,7 +82,7 @@ impl CordonedFeatureFetcherImpl {

let mut valid_features = vec![];
for feature in features {
valid_features.push(NodeFeaturePair {
valid_features.push(CordonedFeature {
feature: feature
.get("feature")
.map(|value| {
Expand All @@ -90,7 +94,7 @@ impl CordonedFeatureFetcherImpl {
)
})
})
.ok_or(anyhow::anyhow!("Expected `feature` key to be present. Got: \n{:?}", feature))??,
.ok_or(anyhow::anyhow!("Expected `feature` field to be present. Got: \n{:?}", feature))??,
value: feature
.get("value")
.map(|value| {
Expand All @@ -102,7 +106,8 @@ impl CordonedFeatureFetcherImpl {
))
.map(|s| s.to_string())
})
.ok_or(anyhow::anyhow!("Expected `value` key to be present. Got: \n{:?}", feature))??,
.ok_or(anyhow::anyhow!("Expected `value` field to be present. Got: \n{:?}", feature))??,
explanation: feature.get("explanation").and_then(|value| value.as_str().map(|s| s.to_string())),
});
}

Expand All @@ -111,12 +116,13 @@ impl CordonedFeatureFetcherImpl {
}

impl CordonedFeatureFetcher for CordonedFeatureFetcherImpl {
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<NodeFeaturePair>>> {
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<CordonedFeature>>> {
Box::pin(async {
if self.offline {
// Offline mode specified, use cache
info!("In offline mode, cordoned features will be loaded from cache");
info!("Cache path for cordoned features: {}", self.local_copy.display());
if self.use_local_file {
info!(
"Received request to load cordoned features from local cache path: {}",
self.local_copy.display()
);
self.fetch_from_file()
} else {
self.fetch_from_git().await
Expand All @@ -125,7 +131,7 @@ impl CordonedFeatureFetcher for CordonedFeatureFetcherImpl {
}

#[cfg(test)]
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<NodeFeaturePair>> {
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<CordonedFeature>> {
self.parse(contents)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl DreContext {
args.subcommands.require_auth(),
args.forum_post_link.clone(),
args.ic_admin_version.clone(),
store.cordoned_features_fetcher()?,
store.cordoned_features_fetcher(args.cordoned_features_file.clone())?,
store.health_client(&network)?,
store,
args.discourse_opts.clone(),
Expand Down
4 changes: 2 additions & 2 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::collections::HashMap;
use std::sync::Arc;

use ahash::AHashMap;
use decentralization::network::CordonedFeature;
use decentralization::network::DecentralizedSubnet;
use decentralization::network::NetworkHealRequest;
use decentralization::network::NodeFeaturePair;
use decentralization::network::SubnetChange;
use decentralization::network::SubnetChangeRequest;
use decentralization::network::SubnetQueryBy;
Expand Down Expand Up @@ -648,7 +648,7 @@ impl Runner {
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
node: &Node,
ensure_assigned: bool,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Option<SubnetChangeResponse> {
let mut best_change: Option<SubnetChangeResponse> = None;
Expand Down
30 changes: 20 additions & 10 deletions rs/cli/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,23 +284,33 @@ impl Store {

#[cfg(test)]
pub fn cordoned_features_file_outer(&self) -> anyhow::Result<PathBuf> {
self.cordoned_features_file()
self.cordoned_features_file(None)
}

fn cordoned_features_file(&self) -> anyhow::Result<PathBuf> {
let file = self.path().join("cordoned_features.yaml");
fn cordoned_features_file(&self, file_path: Option<String>) -> anyhow::Result<PathBuf> {
let file = match file_path {
Some(path) => std::path::PathBuf::from(path).canonicalize()?,
None => {
let file = self.path().join("cordoned_features.yaml");

if !file.exists() {
info!("Cordoned features file was missing. Creating on path `{}`...", file.display());
fs_err::write(&file, "")?;
}
if !file.exists() {
info!("Cordoned features file was missing. Creating on path `{}`...", file.display());
fs_err::write(&file, "")?;
}

file
}
};

Ok(file)
}

pub fn cordoned_features_fetcher(&self) -> anyhow::Result<Arc<dyn CordonedFeatureFetcher>> {
let file = self.cordoned_features_file()?;
Ok(Arc::new(CordonedFeatureFetcherImpl::new(file, self.is_offline())?))
pub fn cordoned_features_fetcher(&self, local_file_path: Option<String>) -> anyhow::Result<Arc<dyn CordonedFeatureFetcher>> {
let file = self.cordoned_features_file(local_file_path.clone())?;
Ok(Arc::new(CordonedFeatureFetcherImpl::new(
file,
self.is_offline() || local_file_path.is_some(),
)?))
}

#[cfg(test)]
Expand Down
16 changes: 9 additions & 7 deletions rs/cli/src/unit_tests/cordoned_feature_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::PathBuf;

use decentralization::network::NodeFeaturePair;
use decentralization::network::CordonedFeature;
use ic_management_types::NodeFeature;
use itertools::Itertools;
use serde_yaml::Mapping;
Expand All @@ -11,7 +11,7 @@ fn ensure_empty(file: PathBuf) {
std::fs::write(file, "").unwrap();
}

fn write_to_cache(contents: &[NodeFeaturePair], path: PathBuf) {
fn write_to_cache(contents: &[CordonedFeature], path: PathBuf) {
let mut mapping = Mapping::new();
mapping.insert("features".into(), serde_yaml::to_value(contents).unwrap());
let root = serde_yaml::Value::Mapping(mapping);
Expand All @@ -23,7 +23,7 @@ fn write_to_cache(contents: &[NodeFeaturePair], path: PathBuf) {
struct TestScenario {
name: String,
offline: bool,
cache_contents: Option<Vec<NodeFeaturePair>>,
cache_contents: Option<Vec<CordonedFeature>>,
should_succeed: bool,
}

Expand Down Expand Up @@ -52,7 +52,7 @@ impl TestScenario {
}
}

fn with_cache(self, pairs: &[NodeFeaturePair]) -> Self {
fn with_cache(self, pairs: &[CordonedFeature]) -> Self {
Self {
cache_contents: Some(pairs.to_vec()),
..self
Expand Down Expand Up @@ -82,16 +82,18 @@ fn cordoned_feature_fetcher_tests() {
TestScenario::new("[Offline] No cache").no_cache().offline().should_fail(),
TestScenario::new("[Offline] Fetch from cache")
.offline()
.with_cache(&[NodeFeaturePair {
.with_cache(&[CordonedFeature {
feature: NodeFeature::NodeProvider,
value: "some-np".to_string(),
explanation: None,
}])
.should_succeed(),
TestScenario::new("[Online] Stale cache")
.online()
.with_cache(&[NodeFeaturePair {
.with_cache(&[CordonedFeature {
feature: NodeFeature::NodeProvider,
value: "some-np".to_string(),
explanation: None,
}])
.should_succeed(),
];
Expand All @@ -107,7 +109,7 @@ fn cordoned_feature_fetcher_tests() {
None => ensure_empty(store.cordoned_features_file_outer().unwrap()),
}

let cordoned_feature_fetcher = store.cordoned_features_fetcher().unwrap();
let cordoned_feature_fetcher = store.cordoned_features_fetcher(None).unwrap();

let maybe_cordoned_features = runtime.block_on(cordoned_feature_fetcher.fetch());

Expand Down
7 changes: 4 additions & 3 deletions rs/cli/src/unit_tests/replace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use decentralization::{
network::{DecentralizedSubnet, NodeFeaturePair},
network::{CordonedFeature, DecentralizedSubnet},
SubnetChangeResponse,
};
use ic_management_backend::{health::MockHealthStatusQuerier, lazy_registry::MockLazyRegistry};
Expand Down Expand Up @@ -54,10 +54,11 @@ fn subnet(id: u64, nodes: &[Node]) -> DecentralizedSubnet {
}
}

fn cordoned_feature(feature: NodeFeature, value: &str) -> NodeFeaturePair {
NodeFeaturePair {
fn cordoned_feature(feature: NodeFeature, value: &str) -> CordonedFeature {
CordonedFeature {
feature,
value: value.to_string(),
explanation: None,
}
}

Expand Down
15 changes: 8 additions & 7 deletions rs/decentralization/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier + Sync {
exclude_nodes: Vec<String>,
only_nodes: Vec<String>,
health_of_nodes: &'a IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &'a [Node],
) -> BoxFuture<'a, Result<SubnetChange, NetworkError>> {
Box::pin(async move {
Expand Down Expand Up @@ -883,9 +883,10 @@ impl<T: Identifies<Node>> MatchAnyNode<T> for std::slice::Iter<'_, T> {
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct NodeFeaturePair {
pub struct CordonedFeature {
pub feature: NodeFeature,
pub value: String,
pub explanation: Option<String>,
}

#[derive(Default, Clone, Debug)]
Expand Down Expand Up @@ -980,7 +981,7 @@ impl SubnetChangeRequest {
optimize_count: usize,
replacements_unhealthy: &[Node],
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
let old_nodes = self.subnet.nodes.clone();
Expand All @@ -999,7 +1000,7 @@ impl SubnetChangeRequest {
pub fn rescue(
mut self,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
let old_nodes = self.subnet.nodes.clone();
Expand Down Expand Up @@ -1031,7 +1032,7 @@ impl SubnetChangeRequest {
how_many_nodes_to_remove: usize,
how_many_nodes_unhealthy: usize,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
let old_nodes = self.subnet.nodes.clone();
Expand Down Expand Up @@ -1133,7 +1134,7 @@ impl SubnetChangeRequest {
pub fn evaluate(
self,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
self.resize(0, 0, 0, health_of_nodes, cordoned_features, all_nodes)
Expand Down Expand Up @@ -1254,7 +1255,7 @@ impl NetworkHealRequest {
&self,
mut available_nodes: Vec<Node>,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
optimize_for_business_rules_compliance: bool,
) -> Result<Vec<SubnetChangeResponse>, NetworkError> {
Expand Down
Loading