Skip to content

Commit

Permalink
Lock results directory before accessing it
Browse files Browse the repository at this point in the history
CMK-15051
  • Loading branch information
jherbel committed Nov 7, 2023
1 parent b4c0048 commit 3654e02
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 31 deletions.
11 changes: 7 additions & 4 deletions v2/robotmk/src/bin/agent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use camino::Utf8PathBuf;
use clap::Parser;
use robotmk::config::Config;
use robotmk::section::{read, Host, Section};
use robotmk::{
config::Config,
lock::Locker,
section::{read, Host, Section},
};
use serde::Serialize;
use std::env::{var, VarError};
use std::fs::read_to_string;
Expand Down Expand Up @@ -75,7 +78,7 @@ fn main() {
return;
}
};
let raw = match read_to_string(config_path) {
let raw = match read_to_string(&config_path) {
Ok(raw) => raw,
Err(e) => {
report_config_error(e.to_string());
Expand All @@ -90,6 +93,6 @@ fn main() {
return;
}
};
let sections = read(config.results_directory);
let sections = read(config.results_directory, &Locker::new(&config_path, None)).unwrap();
print_sections(&sections, &mut io::stdout());
}
1 change: 1 addition & 0 deletions v2/robotmk/src/bin/scheduler/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub fn build_environments(global_config: &GlobalConfig, suites: Vec<Suite>) -> R
EnvironmentBuildStatesAdministrator::new_with_pending(
&suites,
&global_config.results_directory,
&global_config.results_directory_locker,
)?;
let env_building_stdio_directory =
environment_building_stdio_directory(&global_config.working_directory);
Expand Down
12 changes: 10 additions & 2 deletions v2/robotmk/src/bin/scheduler/internal_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::rf::robot::Robot;
use crate::sessions::session::Session;
use robotmk::{
config::{Config, WorkingDirectoryCleanupConfig},
lock::Locker,
section::Host,
termination::TerminationFlag,
};
Expand All @@ -17,6 +18,7 @@ pub struct GlobalConfig {
pub results_directory: Utf8PathBuf,
pub rcc_binary_path: Utf8PathBuf,
pub termination_flag: TerminationFlag,
pub results_directory_locker: Locker,
}

#[derive(Clone)]
Expand All @@ -33,11 +35,13 @@ pub struct Suite {
pub termination_flag: TerminationFlag,
pub parallelism_protection: Arc<Mutex<usize>>,
pub host: Host,
pub results_directory_locker: Locker,
}

pub fn from_external_config(
external_config: Config,
termination_flag: TerminationFlag,
results_directory_locker: Locker,
) -> (GlobalConfig, Vec<Suite>) {
let mut suites: Vec<Suite> = external_config
.suites
Expand Down Expand Up @@ -68,6 +72,7 @@ pub fn from_external_config(
termination_flag: termination_flag.clone(),
parallelism_protection: Arc::new(Mutex::new(0)),
host: suite_config.host,
results_directory_locker: results_directory_locker.clone(),
})
.collect();
sort_suites_by_name(&mut suites);
Expand All @@ -76,7 +81,8 @@ pub fn from_external_config(
working_directory: external_config.working_directory,
results_directory: external_config.results_directory,
rcc_binary_path: external_config.rcc_binary_path,
termination_flag: termination_flag.clone(),
termination_flag,
results_directory_locker,
},
suites,
)
Expand Down Expand Up @@ -145,6 +151,7 @@ mod tests {

#[test]
fn test_from_external_config() {
let termination_flag = TerminationFlag::new(Arc::new(AtomicBool::new(false)));
let (global_config, suites) = from_external_config(
Config {
working_directory: Utf8PathBuf::from("/working"),
Expand All @@ -155,7 +162,8 @@ mod tests {
(String::from("rcc"), rcc_suite_config()),
]),
},
TerminationFlag::new(Arc::new(AtomicBool::new(false))),
termination_flag.clone(),
Locker::new("/config.json", Some(&termination_flag)),
);
assert_eq!(global_config.working_directory, "/working");
assert_eq!(global_config.results_directory, "/results");
Expand Down
8 changes: 6 additions & 2 deletions v2/robotmk/src/bin/scheduler/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use anyhow::{bail, Context, Result};
use clap::Parser;
use log::{debug, info};
use logging::log_and_return_error;
use robotmk::lock::Locker;

fn main() -> Result<()> {
run().map_err(log_and_return_error)?;
Expand All @@ -34,8 +35,11 @@ fn run() -> Result<()> {
.context("Failed to set up termination control")?;
debug!("Termination control set up");

let (global_config, suites) =
internal_config::from_external_config(external_config, termination_flag);
let (global_config, suites) = internal_config::from_external_config(
external_config,
termination_flag.clone(),
Locker::new(&args.config_path, Some(&termination_flag)),
);

if global_config.termination_flag.should_terminate() {
bail!("Terminated")
Expand Down
23 changes: 16 additions & 7 deletions v2/robotmk/src/bin/scheduler/results.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use super::internal_config::Suite;
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use robotmk::section::{WritePiggybackSection, WriteSection};
use robotmk::{
lock::Locker,
section::{WritePiggybackSection, WriteSection},
};
use serde::Serialize;
use std::collections::HashMap;

Expand All @@ -22,9 +25,10 @@ impl WriteSection for RCCSetupFailures {
}
}

pub struct EnvironmentBuildStatesAdministrator {
pub struct EnvironmentBuildStatesAdministrator<'a> {
build_states: HashMap<String, EnvironmentBuildStatus>,
path: Utf8PathBuf,
locker: &'a Locker,
}

#[derive(Serialize)]
Expand All @@ -36,23 +40,28 @@ impl WriteSection for BuildStates<'_> {
}
}

impl EnvironmentBuildStatesAdministrator {
impl<'a> EnvironmentBuildStatesAdministrator<'a> {
pub fn new_with_pending(
suites: &[Suite],
results_directory: &Utf8Path,
) -> Result<EnvironmentBuildStatesAdministrator> {
locker: &'a Locker,
) -> Result<EnvironmentBuildStatesAdministrator<'a>> {
let build_states: HashMap<_, _> = suites
.iter()
.map(|suite| (suite.name.to_string(), EnvironmentBuildStatus::Pending))
.collect();
let path = results_directory.join("environment_build_states.json");
BuildStates(&build_states).write(&path)?;
Ok(Self { build_states, path })
BuildStates(&build_states).write(&path, locker)?;
Ok(Self {
build_states,
path,
locker,
})
}

pub fn update(&mut self, suite_name: &str, build_status: EnvironmentBuildStatus) -> Result<()> {
self.build_states.insert(suite_name.into(), build_status);
BuildStates(&self.build_states).write(&self.path)
BuildStates(&self.build_states).write(&self.path, self.locker)
}
}

Expand Down
12 changes: 10 additions & 2 deletions v2/robotmk/src/bin/scheduler/scheduling/suites.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ pub fn run_suite(suite: &Suite) -> Result<()> {
outcome: ExecutionReport::AlreadyRunning,
};
report
.write(&suite.results_file, suite.host.clone())
.write(
&suite.results_file,
suite.host.clone(),
&suite.results_directory_locker,
)
.context("Reporting failure to acquire suite lock failed")
.err()
.unwrap_or(err)
Expand All @@ -32,7 +36,11 @@ pub fn run_suite(suite: &Suite) -> Result<()> {
outcome: ExecutionReport::Executed(produce_suite_results(suite)?),
};
report
.write(&suite.results_file, suite.host.clone())
.write(
&suite.results_file,
suite.host.clone(),
&suite.results_directory_locker,
)
.context("Reporting suite results failed")?;
debug!("Suite {} finished", &suite.name);

Expand Down
8 changes: 5 additions & 3 deletions v2/robotmk/src/bin/scheduler/setup/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@ fn setup_results_directories(global_config: &GlobalConfig, suites: &[Suite]) ->
}

fn clean_up_results_directory_atomic(global_config: &GlobalConfig, suites: &[Suite]) -> Result<()> {
let results_directory_lock = global_config
.results_directory_locker
.wait_for_write_lock()?;
let suite_results_directory = suite_results_directory(&global_config.results_directory);
let result_files_to_keep =
HashSet::<Utf8PathBuf>::from_iter(suites.iter().map(|suite| suite.results_file.clone()));
let currently_present_result_files = HashSet::<Utf8PathBuf>::from_iter(
currently_present_result_files(&suite_results_directory)?,
);
for path in currently_present_result_files.difference(&result_files_to_keep) {
remove_file(path)?; // TODO: This fails, if the agent plugin is currently reading the file
// (a non-critical and recoverable) error. How to handle it?
remove_file(path)?;
}
Ok(())
results_directory_lock.release()
}

fn currently_present_result_files(suite_results_directory: &Utf8Path) -> Result<Vec<Utf8PathBuf>> {
Expand Down
2 changes: 1 addition & 1 deletion v2/robotmk/src/bin/scheduler/setup/rcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn rcc_setup(global_config: &GlobalConfig, rcc_suites: Vec<Suite>) -> Result<Vec
let path = global_config
.results_directory
.join("rcc_setup_failures.json");
rcc_setup_failures.write(path)?;
rcc_setup_failures.write(path, &global_config.results_directory_locker)?;

Ok(sucessful_suites)
}
Expand Down
1 change: 1 addition & 0 deletions v2/robotmk/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod config;
pub mod lock;
pub mod section;
pub mod termination;
27 changes: 17 additions & 10 deletions v2/robotmk/src/section.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::lock::Locker;

use anyhow::{Context, Result};
use camino::Utf8Path;
use serde::{Deserialize, Serialize};
Expand All @@ -20,23 +22,25 @@ pub struct Section {
pub content: String,
}

fn write(section: &Section, path: impl AsRef<Utf8Path>) -> Result<()> {
fn write(section: &Section, path: impl AsRef<Utf8Path>, locker: &Locker) -> Result<()> {
let path = path.as_ref();
let section = serde_json::to_string(&section).unwrap();
let mut file = NamedTempFile::new().context("Opening tempfile failed")?;
file.write_all(section.as_bytes()).context(format!(
"Writing tempfile failed, {}",
file.path().display()
))?;

let lock = locker.wait_for_write_lock()?;
file.persist(path)
.context(format!("Persisting tempfile failed, final_path: {path}"))
.map(|_| ())
.context(format!("Persisting tempfile failed, final_path: {path}"))?;
lock.release()
}

pub trait WriteSection {
fn name() -> &'static str;

fn write(&self, path: impl AsRef<Utf8Path>) -> Result<()>
fn write(&self, path: impl AsRef<Utf8Path>, locker: &Locker) -> Result<()>
where
Self: Serialize,
{
Expand All @@ -45,14 +49,14 @@ pub trait WriteSection {
content: serde_json::to_string(&self).unwrap(),
host: Host::Source,
};
write(&section, path)
write(&section, path, locker)
}
}

pub trait WritePiggybackSection {
fn name() -> &'static str;

fn write(&self, path: impl AsRef<Utf8Path>, host: Host) -> Result<()>
fn write(&self, path: impl AsRef<Utf8Path>, host: Host, locker: &Locker) -> Result<()>
where
Self: Serialize,
{
Expand All @@ -61,7 +65,7 @@ pub trait WritePiggybackSection {
content: serde_json::to_string(&self).unwrap(),
host,
};
write(&section, path)
write(&section, path, locker)
}
}

Expand All @@ -71,11 +75,14 @@ fn read_entry(entry: Result<DirEntry, Error>) -> Result<Section> {
Ok(serde_json::from_str(&raw)?)
}

pub fn read(directory: impl AsRef<Path>) -> Vec<Section> {
pub fn read(directory: impl AsRef<Path>, locker: &Locker) -> Result<Vec<Section>> {
// TODO: Test this function.
WalkDir::new(directory)
let lock = locker.wait_for_read_lock()?;
let sections = WalkDir::new(directory)
.sort_by_file_name()
.into_iter()
.filter_map(|entry| read_entry(entry).ok())
.collect()
.collect();
lock.release()?;
Ok(sections)
}

0 comments on commit 3654e02

Please sign in to comment.