Skip to content

Commit

Permalink
feat: update cordoned features handling and support for local file
Browse files Browse the repository at this point in the history
Previous implementation was using the local file for cordoned features *only if* --offline was provided. That means that the entire dre tool had to be forced to go offline.
With this change, one can simply provide `--cordoned-features-file <file>` on the CLI, and the local file will override the github version
  • Loading branch information
sasa-tomic committed Nov 26, 2024
1 parent 83230c0 commit d51bbbc
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 53 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ exports_files([
"archive_canister.wasm.gz",
"clippy.toml",
"rustfmt.toml",
"cordoned_features.yaml",
"WORKSPACE.bazel",
])

Expand Down
7 changes: 5 additions & 2 deletions rs/cli/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps")
load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test", "rust_library")
load("@rules_rust//cargo:defs.bzl", "cargo_build_script", "cargo_dep_env")
load("@rules_rust//cargo:defs.bzl", "cargo_build_script")

DEPS = [
"//rs/ic-canisters",
Expand Down Expand Up @@ -34,7 +34,10 @@ rust_binary(
),
deps = all_crate_deps(
normal = True,
) + DEPS + ["//rs/cli:dre-lib", ":build_script"]
) + DEPS + ["//rs/cli:dre-lib", ":build_script"],
data = [
"//:cordoned_features.yaml",
]
)

rust_library(
Expand Down
4 changes: 1 addition & 3 deletions rs/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::path::PathBuf;

use crate::commands::subnet::Subnet;
use api_boundary_nodes::ApiBoundaryNodes;
use clap::Args as ClapArgs;
Expand Down Expand Up @@ -212,7 +210,7 @@ The argument is mandatory for testnets, and is optional for mainnet and staging"

/// Path to file which contains cordoned features
#[clap(long, global = true, visible_aliases = &["cf-file", "cfff"])]
pub cordon_feature_fallback_file: Option<PathBuf>,
pub cordoned_features_file: Option<String>,
}

// Do not use outside of DRE CLI.
Expand Down
42 changes: 24 additions & 18 deletions rs/cli/src/cordoned_feature_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{path::PathBuf, time::Duration};

use decentralization::network::NodeFeaturePair;
use decentralization::network::CordonedFeature;
use futures::future::BoxFuture;
use ic_management_types::NodeFeature;
use itertools::Itertools;
Expand All @@ -11,10 +11,10 @@ use strum::VariantNames;

#[automock]
pub trait CordonedFeatureFetcher: Sync + Send {
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<NodeFeaturePair>>>;
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<CordonedFeature>>>;

#[cfg(test)]
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<NodeFeaturePair>>;
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<CordonedFeature>>;
}

pub struct CordonedFeatureFetcherImpl {
Expand All @@ -24,19 +24,23 @@ pub struct CordonedFeatureFetcherImpl {
// fetch from github. If github is
// unreachable, this cache will be used
local_copy: PathBuf,
offline: bool,
use_local_file: bool, // By default, cordoned features are fetched from github
}

const CORDONED_FEATURES_FILE_URL: &str = "https://raw.githubusercontent.com/dfinity/dre/refs/heads/main/cordoned_features.yaml";

impl CordonedFeatureFetcherImpl {
pub fn new(local_copy: PathBuf, offline: bool) -> anyhow::Result<Self> {
pub fn new(local_copy: PathBuf, use_local_file: bool) -> anyhow::Result<Self> {
let client = ClientBuilder::new().timeout(Duration::from_secs(10)).build()?;

Ok(Self { client, local_copy, offline })
Ok(Self {
client,
local_copy,
use_local_file,
})
}

async fn fetch_from_git(&self) -> anyhow::Result<Vec<NodeFeaturePair>> {
async fn fetch_from_git(&self) -> anyhow::Result<Vec<CordonedFeature>> {
let bytes = self
.client
.get(CORDONED_FEATURES_FILE_URL)
Expand All @@ -58,13 +62,13 @@ impl CordonedFeatureFetcherImpl {
self.parse(&bytes)
}

fn fetch_from_file(&self) -> anyhow::Result<Vec<NodeFeaturePair>> {
fn fetch_from_file(&self) -> anyhow::Result<Vec<CordonedFeature>> {
let contents = std::fs::read(&self.local_copy)?;

self.parse(&contents)
}

fn parse(&self, contents: &[u8]) -> anyhow::Result<Vec<NodeFeaturePair>> {
fn parse(&self, contents: &[u8]) -> anyhow::Result<Vec<CordonedFeature>> {
let valid_yaml = serde_yaml::from_slice::<serde_yaml::Value>(contents)?;

let features = match valid_yaml.get("features") {
Expand All @@ -78,7 +82,7 @@ impl CordonedFeatureFetcherImpl {

let mut valid_features = vec![];
for feature in features {
valid_features.push(NodeFeaturePair {
valid_features.push(CordonedFeature {
feature: feature
.get("feature")
.map(|value| {
Expand All @@ -90,7 +94,7 @@ impl CordonedFeatureFetcherImpl {
)
})
})
.ok_or(anyhow::anyhow!("Expected `feature` key to be present. Got: \n{:?}", feature))??,
.ok_or(anyhow::anyhow!("Expected `feature` field to be present. Got: \n{:?}", feature))??,
value: feature
.get("value")
.map(|value| {
Expand All @@ -102,7 +106,8 @@ impl CordonedFeatureFetcherImpl {
))
.map(|s| s.to_string())
})
.ok_or(anyhow::anyhow!("Expected `value` key to be present. Got: \n{:?}", feature))??,
.ok_or(anyhow::anyhow!("Expected `value` field to be present. Got: \n{:?}", feature))??,
explanation: feature.get("explanation").and_then(|value| value.as_str().map(|s| s.to_string())),
});
}

Expand All @@ -111,12 +116,13 @@ impl CordonedFeatureFetcherImpl {
}

impl CordonedFeatureFetcher for CordonedFeatureFetcherImpl {
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<NodeFeaturePair>>> {
fn fetch(&self) -> BoxFuture<'_, anyhow::Result<Vec<CordonedFeature>>> {
Box::pin(async {
if self.offline {
// Offline mode specified, use cache
info!("In offline mode, cordoned features will be loaded from cache");
info!("Cache path for cordoned features: {}", self.local_copy.display());
if self.use_local_file {
info!(
"Received request to load cordoned features from local cache path: {}",
self.local_copy.display()
);
self.fetch_from_file()
} else {
self.fetch_from_git().await
Expand All @@ -125,7 +131,7 @@ impl CordonedFeatureFetcher for CordonedFeatureFetcherImpl {
}

#[cfg(test)]
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<NodeFeaturePair>> {
fn parse_outer(&self, contents: &[u8]) -> anyhow::Result<Vec<CordonedFeature>> {
self.parse(contents)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rs/cli/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl DreContext {
args.subcommands.require_auth(),
args.forum_post_link.clone(),
args.ic_admin_version.clone(),
store.cordoned_features_fetcher()?,
store.cordoned_features_fetcher(args.cordoned_features_file.clone())?,
store.health_client(&network)?,
store,
args.discourse_opts.clone(),
Expand Down
4 changes: 2 additions & 2 deletions rs/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::collections::HashMap;
use std::sync::Arc;

use ahash::AHashMap;
use decentralization::network::CordonedFeature;
use decentralization::network::DecentralizedSubnet;
use decentralization::network::NetworkHealRequest;
use decentralization::network::NodeFeaturePair;
use decentralization::network::SubnetChange;
use decentralization::network::SubnetChangeRequest;
use decentralization::network::SubnetQueryBy;
Expand Down Expand Up @@ -648,7 +648,7 @@ impl Runner {
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
node: &Node,
ensure_assigned: bool,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Option<SubnetChangeResponse> {
let mut best_change: Option<SubnetChangeResponse> = None;
Expand Down
30 changes: 20 additions & 10 deletions rs/cli/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,23 +284,33 @@ impl Store {

#[cfg(test)]
pub fn cordoned_features_file_outer(&self) -> anyhow::Result<PathBuf> {
self.cordoned_features_file()
self.cordoned_features_file(None)
}

fn cordoned_features_file(&self) -> anyhow::Result<PathBuf> {
let file = self.path().join("cordoned_features.yaml");
fn cordoned_features_file(&self, file_path: Option<String>) -> anyhow::Result<PathBuf> {
let file = match file_path {
Some(path) => std::path::PathBuf::from(path).canonicalize()?,
None => {
let file = self.path().join("cordoned_features.yaml");

if !file.exists() {
info!("Cordoned features file was missing. Creating on path `{}`...", file.display());
fs_err::write(&file, "")?;
}
if !file.exists() {
info!("Cordoned features file was missing. Creating on path `{}`...", file.display());
fs_err::write(&file, "")?;
}

file
}
};

Ok(file)
}

pub fn cordoned_features_fetcher(&self) -> anyhow::Result<Arc<dyn CordonedFeatureFetcher>> {
let file = self.cordoned_features_file()?;
Ok(Arc::new(CordonedFeatureFetcherImpl::new(file, self.is_offline())?))
pub fn cordoned_features_fetcher(&self, local_file_path: Option<String>) -> anyhow::Result<Arc<dyn CordonedFeatureFetcher>> {
let file = self.cordoned_features_file(local_file_path.clone())?;
Ok(Arc::new(CordonedFeatureFetcherImpl::new(
file,
self.is_offline() || local_file_path.is_some(),
)?))
}

#[cfg(test)]
Expand Down
16 changes: 9 additions & 7 deletions rs/cli/src/unit_tests/cordoned_feature_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::PathBuf;

use decentralization::network::NodeFeaturePair;
use decentralization::network::CordonedFeature;
use ic_management_types::NodeFeature;
use itertools::Itertools;
use serde_yaml::Mapping;
Expand All @@ -11,7 +11,7 @@ fn ensure_empty(file: PathBuf) {
std::fs::write(file, "").unwrap();
}

fn write_to_cache(contents: &[NodeFeaturePair], path: PathBuf) {
fn write_to_cache(contents: &[CordonedFeature], path: PathBuf) {
let mut mapping = Mapping::new();
mapping.insert("features".into(), serde_yaml::to_value(contents).unwrap());
let root = serde_yaml::Value::Mapping(mapping);
Expand All @@ -23,7 +23,7 @@ fn write_to_cache(contents: &[NodeFeaturePair], path: PathBuf) {
struct TestScenario {
name: String,
offline: bool,
cache_contents: Option<Vec<NodeFeaturePair>>,
cache_contents: Option<Vec<CordonedFeature>>,
should_succeed: bool,
}

Expand Down Expand Up @@ -52,7 +52,7 @@ impl TestScenario {
}
}

fn with_cache(self, pairs: &[NodeFeaturePair]) -> Self {
fn with_cache(self, pairs: &[CordonedFeature]) -> Self {
Self {
cache_contents: Some(pairs.to_vec()),
..self
Expand Down Expand Up @@ -82,16 +82,18 @@ fn cordoned_feature_fetcher_tests() {
TestScenario::new("[Offline] No cache").no_cache().offline().should_fail(),
TestScenario::new("[Offline] Fetch from cache")
.offline()
.with_cache(&[NodeFeaturePair {
.with_cache(&[CordonedFeature {
feature: NodeFeature::NodeProvider,
value: "some-np".to_string(),
explanation: None,
}])
.should_succeed(),
TestScenario::new("[Online] Stale cache")
.online()
.with_cache(&[NodeFeaturePair {
.with_cache(&[CordonedFeature {
feature: NodeFeature::NodeProvider,
value: "some-np".to_string(),
explanation: None,
}])
.should_succeed(),
];
Expand All @@ -107,7 +109,7 @@ fn cordoned_feature_fetcher_tests() {
None => ensure_empty(store.cordoned_features_file_outer().unwrap()),
}

let cordoned_feature_fetcher = store.cordoned_features_fetcher().unwrap();
let cordoned_feature_fetcher = store.cordoned_features_fetcher(None).unwrap();

let maybe_cordoned_features = runtime.block_on(cordoned_feature_fetcher.fetch());

Expand Down
7 changes: 4 additions & 3 deletions rs/cli/src/unit_tests/replace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use decentralization::{
network::{DecentralizedSubnet, NodeFeaturePair},
network::{CordonedFeature, DecentralizedSubnet},
SubnetChangeResponse,
};
use ic_management_backend::{health::MockHealthStatusQuerier, lazy_registry::MockLazyRegistry};
Expand Down Expand Up @@ -54,10 +54,11 @@ fn subnet(id: u64, nodes: &[Node]) -> DecentralizedSubnet {
}
}

fn cordoned_feature(feature: NodeFeature, value: &str) -> NodeFeaturePair {
NodeFeaturePair {
fn cordoned_feature(feature: NodeFeature, value: &str) -> CordonedFeature {
CordonedFeature {
feature,
value: value.to_string(),
explanation: None,
}
}

Expand Down
15 changes: 8 additions & 7 deletions rs/decentralization/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier + Sync {
exclude_nodes: Vec<String>,
only_nodes: Vec<String>,
health_of_nodes: &'a IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &'a [Node],
) -> BoxFuture<'a, Result<SubnetChange, NetworkError>> {
Box::pin(async move {
Expand Down Expand Up @@ -883,9 +883,10 @@ impl<T: Identifies<Node>> MatchAnyNode<T> for std::slice::Iter<'_, T> {
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct NodeFeaturePair {
pub struct CordonedFeature {
pub feature: NodeFeature,
pub value: String,
pub explanation: Option<String>,
}

#[derive(Default, Clone, Debug)]
Expand Down Expand Up @@ -980,7 +981,7 @@ impl SubnetChangeRequest {
optimize_count: usize,
replacements_unhealthy: &[Node],
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
let old_nodes = self.subnet.nodes.clone();
Expand All @@ -999,7 +1000,7 @@ impl SubnetChangeRequest {
pub fn rescue(
mut self,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
let old_nodes = self.subnet.nodes.clone();
Expand Down Expand Up @@ -1031,7 +1032,7 @@ impl SubnetChangeRequest {
how_many_nodes_to_remove: usize,
how_many_nodes_unhealthy: usize,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
let old_nodes = self.subnet.nodes.clone();
Expand Down Expand Up @@ -1133,7 +1134,7 @@ impl SubnetChangeRequest {
pub fn evaluate(
self,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
) -> Result<SubnetChange, NetworkError> {
self.resize(0, 0, 0, health_of_nodes, cordoned_features, all_nodes)
Expand Down Expand Up @@ -1254,7 +1255,7 @@ impl NetworkHealRequest {
&self,
mut available_nodes: Vec<Node>,
health_of_nodes: &IndexMap<PrincipalId, HealthStatus>,
cordoned_features: Vec<NodeFeaturePair>,
cordoned_features: Vec<CordonedFeature>,
all_nodes: &[Node],
optimize_for_business_rules_compliance: bool,
) -> Result<Vec<SubnetChangeResponse>, NetworkError> {
Expand Down

0 comments on commit d51bbbc

Please sign in to comment.