From 3654e02d66949df54468423fcc0923671d4f303b Mon Sep 17 00:00:00 2001
From: Joerg Herbel <joerg.herbel@checkmk.com>
Date: Tue, 7 Nov 2023 06:44:32 +0100
Subject: [PATCH] Lock results directory before accessing it

CMK-15051
---
 v2/robotmk/src/bin/agent.rs                   | 11 +++++---
 v2/robotmk/src/bin/scheduler/environment.rs   |  1 +
 .../src/bin/scheduler/internal_config.rs      | 12 +++++++--
 v2/robotmk/src/bin/scheduler/main.rs          |  8 ++++--
 v2/robotmk/src/bin/scheduler/results.rs       | 23 +++++++++++-----
 .../src/bin/scheduler/scheduling/suites.rs    | 12 +++++++--
 v2/robotmk/src/bin/scheduler/setup/general.rs |  8 +++---
 v2/robotmk/src/bin/scheduler/setup/rcc.rs     |  2 +-
 v2/robotmk/src/lib.rs                         |  1 +
 v2/robotmk/src/section.rs                     | 27 ++++++++++++-------
 10 files changed, 74 insertions(+), 31 deletions(-)

diff --git a/v2/robotmk/src/bin/agent.rs b/v2/robotmk/src/bin/agent.rs
index 301d68f6..53f5d88d 100644
--- a/v2/robotmk/src/bin/agent.rs
+++ b/v2/robotmk/src/bin/agent.rs
@@ -1,7 +1,10 @@
 use camino::Utf8PathBuf;
 use clap::Parser;
-use robotmk::config::Config;
-use robotmk::section::{read, Host, Section};
+use robotmk::{
+    config::Config,
+    lock::Locker,
+    section::{read, Host, Section},
+};
 use serde::Serialize;
 use std::env::{var, VarError};
 use std::fs::read_to_string;
@@ -75,7 +78,7 @@ fn main() {
             return;
         }
     };
-    let raw = match read_to_string(config_path) {
+    let raw = match read_to_string(&config_path) {
         Ok(raw) => raw,
         Err(e) => {
             report_config_error(e.to_string());
@@ -90,6 +93,6 @@ fn main() {
             return;
         }
     };
-    let sections = read(config.results_directory);
+    let sections = read(config.results_directory, &Locker::new(&config_path, None)).unwrap();
     print_sections(&sections, &mut io::stdout());
 }
diff --git a/v2/robotmk/src/bin/scheduler/environment.rs b/v2/robotmk/src/bin/scheduler/environment.rs
index a350bca2..6255e6ba 100644
--- a/v2/robotmk/src/bin/scheduler/environment.rs
+++ b/v2/robotmk/src/bin/scheduler/environment.rs
@@ -21,6 +21,7 @@ pub fn build_environments(global_config: &GlobalConfig, suites: Vec<Suite>) -> R
         EnvironmentBuildStatesAdministrator::new_with_pending(
             &suites,
             &global_config.results_directory,
+            &global_config.results_directory_locker,
         )?;
     let env_building_stdio_directory =
         environment_building_stdio_directory(&global_config.working_directory);
diff --git a/v2/robotmk/src/bin/scheduler/internal_config.rs b/v2/robotmk/src/bin/scheduler/internal_config.rs
index 1c86d837..627b71c0 100644
--- a/v2/robotmk/src/bin/scheduler/internal_config.rs
+++ b/v2/robotmk/src/bin/scheduler/internal_config.rs
@@ -4,6 +4,7 @@ use crate::rf::robot::Robot;
 use crate::sessions::session::Session;
 use robotmk::{
     config::{Config, WorkingDirectoryCleanupConfig},
+    lock::Locker,
     section::Host,
     termination::TerminationFlag,
 };
@@ -17,6 +18,7 @@ pub struct GlobalConfig {
     pub results_directory: Utf8PathBuf,
     pub rcc_binary_path: Utf8PathBuf,
     pub termination_flag: TerminationFlag,
+    pub results_directory_locker: Locker,
 }
 
 #[derive(Clone)]
@@ -33,11 +35,13 @@ pub struct Suite {
     pub termination_flag: TerminationFlag,
     pub parallelism_protection: Arc<Mutex<usize>>,
     pub host: Host,
+    pub results_directory_locker: Locker,
 }
 
 pub fn from_external_config(
     external_config: Config,
     termination_flag: TerminationFlag,
+    results_directory_locker: Locker,
 ) -> (GlobalConfig, Vec<Suite>) {
     let mut suites: Vec<Suite> = external_config
         .suites
@@ -68,6 +72,7 @@ pub fn from_external_config(
             termination_flag: termination_flag.clone(),
             parallelism_protection: Arc::new(Mutex::new(0)),
             host: suite_config.host,
+            results_directory_locker: results_directory_locker.clone(),
         })
         .collect();
     sort_suites_by_name(&mut suites);
@@ -76,7 +81,8 @@ pub fn from_external_config(
             working_directory: external_config.working_directory,
             results_directory: external_config.results_directory,
             rcc_binary_path: external_config.rcc_binary_path,
-            termination_flag: termination_flag.clone(),
+            termination_flag,
+            results_directory_locker,
         },
         suites,
     )
@@ -145,6 +151,7 @@ mod tests {
 
     #[test]
     fn test_from_external_config() {
+        let termination_flag = TerminationFlag::new(Arc::new(AtomicBool::new(false)));
         let (global_config, suites) = from_external_config(
             Config {
                 working_directory: Utf8PathBuf::from("/working"),
@@ -155,7 +162,8 @@ mod tests {
                     (String::from("rcc"), rcc_suite_config()),
                 ]),
             },
-            TerminationFlag::new(Arc::new(AtomicBool::new(false))),
+            termination_flag.clone(),
+            Locker::new("/config.json", Some(&termination_flag)),
         );
         assert_eq!(global_config.working_directory, "/working");
         assert_eq!(global_config.results_directory, "/results");
diff --git a/v2/robotmk/src/bin/scheduler/main.rs b/v2/robotmk/src/bin/scheduler/main.rs
index 0ca74614..835edafc 100644
--- a/v2/robotmk/src/bin/scheduler/main.rs
+++ b/v2/robotmk/src/bin/scheduler/main.rs
@@ -15,6 +15,7 @@ use anyhow::{bail, Context, Result};
 use clap::Parser;
 use log::{debug, info};
 use logging::log_and_return_error;
+use robotmk::lock::Locker;
 
 fn main() -> Result<()> {
     run().map_err(log_and_return_error)?;
@@ -34,8 +35,11 @@ fn run() -> Result<()> {
         .context("Failed to set up termination control")?;
     debug!("Termination control set up");
 
-    let (global_config, suites) =
-        internal_config::from_external_config(external_config, termination_flag);
+    let (global_config, suites) = internal_config::from_external_config(
+        external_config,
+        termination_flag.clone(),
+        Locker::new(&args.config_path, Some(&termination_flag)),
+    );
 
     if global_config.termination_flag.should_terminate() {
         bail!("Terminated")
diff --git a/v2/robotmk/src/bin/scheduler/results.rs b/v2/robotmk/src/bin/scheduler/results.rs
index 22d8faaa..25143579 100644
--- a/v2/robotmk/src/bin/scheduler/results.rs
+++ b/v2/robotmk/src/bin/scheduler/results.rs
@@ -1,7 +1,10 @@
 use super::internal_config::Suite;
 use anyhow::Result;
 use camino::{Utf8Path, Utf8PathBuf};
-use robotmk::section::{WritePiggybackSection, WriteSection};
+use robotmk::{
+    lock::Locker,
+    section::{WritePiggybackSection, WriteSection},
+};
 use serde::Serialize;
 use std::collections::HashMap;
 
@@ -22,9 +25,10 @@ impl WriteSection for RCCSetupFailures {
     }
 }
 
-pub struct EnvironmentBuildStatesAdministrator {
+pub struct EnvironmentBuildStatesAdministrator<'a> {
     build_states: HashMap<String, EnvironmentBuildStatus>,
     path: Utf8PathBuf,
+    locker: &'a Locker,
 }
 
 #[derive(Serialize)]
@@ -36,23 +40,28 @@ impl WriteSection for BuildStates<'_> {
     }
 }
 
-impl EnvironmentBuildStatesAdministrator {
+impl<'a> EnvironmentBuildStatesAdministrator<'a> {
     pub fn new_with_pending(
         suites: &[Suite],
         results_directory: &Utf8Path,
-    ) -> Result<EnvironmentBuildStatesAdministrator> {
+        locker: &'a Locker,
+    ) -> Result<EnvironmentBuildStatesAdministrator<'a>> {
         let build_states: HashMap<_, _> = suites
             .iter()
             .map(|suite| (suite.name.to_string(), EnvironmentBuildStatus::Pending))
             .collect();
         let path = results_directory.join("environment_build_states.json");
-        BuildStates(&build_states).write(&path)?;
-        Ok(Self { build_states, path })
+        BuildStates(&build_states).write(&path, locker)?;
+        Ok(Self {
+            build_states,
+            path,
+            locker,
+        })
     }
 
     pub fn update(&mut self, suite_name: &str, build_status: EnvironmentBuildStatus) -> Result<()> {
         self.build_states.insert(suite_name.into(), build_status);
-        BuildStates(&self.build_states).write(&self.path)
+        BuildStates(&self.build_states).write(&self.path, self.locker)
     }
 }
 
diff --git a/v2/robotmk/src/bin/scheduler/scheduling/suites.rs b/v2/robotmk/src/bin/scheduler/scheduling/suites.rs
index 03163f71..2c263a32 100644
--- a/v2/robotmk/src/bin/scheduler/scheduling/suites.rs
+++ b/v2/robotmk/src/bin/scheduler/scheduling/suites.rs
@@ -20,7 +20,11 @@ pub fn run_suite(suite: &Suite) -> Result<()> {
             outcome: ExecutionReport::AlreadyRunning,
         };
         report
-            .write(&suite.results_file, suite.host.clone())
+            .write(
+                &suite.results_file,
+                suite.host.clone(),
+                &suite.results_directory_locker,
+            )
             .context("Reporting failure to acquire suite lock failed")
             .err()
             .unwrap_or(err)
@@ -32,7 +36,11 @@ pub fn run_suite(suite: &Suite) -> Result<()> {
         outcome: ExecutionReport::Executed(produce_suite_results(suite)?),
     };
     report
-        .write(&suite.results_file, suite.host.clone())
+        .write(
+            &suite.results_file,
+            suite.host.clone(),
+            &suite.results_directory_locker,
+        )
         .context("Reporting suite results failed")?;
     debug!("Suite {} finished", &suite.name);
 
diff --git a/v2/robotmk/src/bin/scheduler/setup/general.rs b/v2/robotmk/src/bin/scheduler/setup/general.rs
index a54b90ed..d2836adc 100644
--- a/v2/robotmk/src/bin/scheduler/setup/general.rs
+++ b/v2/robotmk/src/bin/scheduler/setup/general.rs
@@ -37,6 +37,9 @@ fn setup_results_directories(global_config: &GlobalConfig, suites: &[Suite]) ->
 }
 
 fn clean_up_results_directory_atomic(global_config: &GlobalConfig, suites: &[Suite]) -> Result<()> {
+    let results_directory_lock = global_config
+        .results_directory_locker
+        .wait_for_write_lock()?;
     let suite_results_directory = suite_results_directory(&global_config.results_directory);
     let result_files_to_keep =
         HashSet::<Utf8PathBuf>::from_iter(suites.iter().map(|suite| suite.results_file.clone()));
@@ -44,10 +47,9 @@ fn clean_up_results_directory_atomic(global_config: &GlobalConfig, suites: &[Sui
         currently_present_result_files(&suite_results_directory)?,
     );
     for path in currently_present_result_files.difference(&result_files_to_keep) {
-        remove_file(path)?; // TODO: This fails, if the agent plugin is currently reading the file
-                            // (a non-critical and recoverable) error. How to handle it?
+        remove_file(path)?;
     }
-    Ok(())
+    results_directory_lock.release()
 }
 
 fn currently_present_result_files(suite_results_directory: &Utf8Path) -> Result<Vec<Utf8PathBuf>> {
diff --git a/v2/robotmk/src/bin/scheduler/setup/rcc.rs b/v2/robotmk/src/bin/scheduler/setup/rcc.rs
index b9f09d5a..7a3d6f1e 100644
--- a/v2/robotmk/src/bin/scheduler/setup/rcc.rs
+++ b/v2/robotmk/src/bin/scheduler/setup/rcc.rs
@@ -84,7 +84,7 @@ fn rcc_setup(global_config: &GlobalConfig, rcc_suites: Vec<Suite>) -> Result<Vec
     let path = global_config
         .results_directory
         .join("rcc_setup_failures.json");
-    rcc_setup_failures.write(path)?;
+    rcc_setup_failures.write(path, &global_config.results_directory_locker)?;
 
     Ok(sucessful_suites)
 }
diff --git a/v2/robotmk/src/lib.rs b/v2/robotmk/src/lib.rs
index 06580d5d..2b96875a 100644
--- a/v2/robotmk/src/lib.rs
+++ b/v2/robotmk/src/lib.rs
@@ -1,3 +1,4 @@
 pub mod config;
+pub mod lock;
 pub mod section;
 pub mod termination;
diff --git a/v2/robotmk/src/section.rs b/v2/robotmk/src/section.rs
index 0e5c5937..eda21208 100644
--- a/v2/robotmk/src/section.rs
+++ b/v2/robotmk/src/section.rs
@@ -1,3 +1,5 @@
+use super::lock::Locker;
+
 use anyhow::{Context, Result};
 use camino::Utf8Path;
 use serde::{Deserialize, Serialize};
@@ -20,7 +22,7 @@ pub struct Section {
     pub content: String,
 }
 
-fn write(section: &Section, path: impl AsRef<Utf8Path>) -> Result<()> {
+fn write(section: &Section, path: impl AsRef<Utf8Path>, locker: &Locker) -> Result<()> {
     let path = path.as_ref();
     let section = serde_json::to_string(&section).unwrap();
     let mut file = NamedTempFile::new().context("Opening tempfile failed")?;
@@ -28,15 +30,17 @@ fn write(section: &Section, path: impl AsRef<Utf8Path>) -> Result<()> {
         "Writing tempfile failed, {}",
         file.path().display()
     ))?;
+
+    let lock = locker.wait_for_write_lock()?;
     file.persist(path)
-        .context(format!("Persisting tempfile failed, final_path: {path}"))
-        .map(|_| ())
+        .context(format!("Persisting tempfile failed, final_path: {path}"))?;
+    lock.release()
 }
 
 pub trait WriteSection {
     fn name() -> &'static str;
 
-    fn write(&self, path: impl AsRef<Utf8Path>) -> Result<()>
+    fn write(&self, path: impl AsRef<Utf8Path>, locker: &Locker) -> Result<()>
     where
         Self: Serialize,
     {
@@ -45,14 +49,14 @@ pub trait WriteSection {
             content: serde_json::to_string(&self).unwrap(),
             host: Host::Source,
         };
-        write(&section, path)
+        write(&section, path, locker)
     }
 }
 
 pub trait WritePiggybackSection {
     fn name() -> &'static str;
 
-    fn write(&self, path: impl AsRef<Utf8Path>, host: Host) -> Result<()>
+    fn write(&self, path: impl AsRef<Utf8Path>, host: Host, locker: &Locker) -> Result<()>
     where
         Self: Serialize,
     {
@@ -61,7 +65,7 @@ pub trait WritePiggybackSection {
             content: serde_json::to_string(&self).unwrap(),
             host,
         };
-        write(&section, path)
+        write(&section, path, locker)
     }
 }
 
@@ -71,11 +75,14 @@ fn read_entry(entry: Result<DirEntry, Error>) -> Result<Section> {
     Ok(serde_json::from_str(&raw)?)
 }
 
-pub fn read(directory: impl AsRef<Path>) -> Vec<Section> {
+pub fn read(directory: impl AsRef<Path>, locker: &Locker) -> Result<Vec<Section>> {
     // TODO: Test this function.
-    WalkDir::new(directory)
+    let lock = locker.wait_for_read_lock()?;
+    let sections = WalkDir::new(directory)
         .sort_by_file_name()
         .into_iter()
         .filter_map(|entry| read_entry(entry).ok())
-        .collect()
+        .collect();
+    lock.release()?;
+    Ok(sections)
 }