From 6817c435274aed1ad55b21c7f73b0fe2a835be76 Mon Sep 17 00:00:00 2001 From: Adam Marcus Date: Tue, 13 Aug 2024 01:23:07 -0400 Subject: [PATCH] Cleanups/refactors of snapshot execution logic (#432) * Remove not-yet-used InstantiatedSnapshot * Move snapshot execution/scheduling out of root of snapshots module and into a submodule * Standardize an error * Turn TODO into an issue: https://github.com/marcua/ayb/issues/425 * Remove dependence on WalkDir; cargo fmt * Clippy * More Clippy --- Cargo.lock | 20 --- Cargo.toml | 1 - src/server/server_runner.rs | 2 +- src/server/snapshots.rs | 226 +--------------------------- src/server/snapshots/execution.rs | 235 ++++++++++++++++++++++++++++++ src/server/snapshots/models.rs | 12 +- 6 files changed, 242 insertions(+), 254 deletions(-) create mode 100644 src/server/snapshots/execution.rs diff --git a/Cargo.lock b/Cargo.lock index 8f7bc01..2548d33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -880,7 +880,6 @@ dependencies = [ "url", "urlencoding", "uuid", - "walkdir", "zstd", ] @@ -3537,15 +3536,6 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.21" @@ -4493,16 +4483,6 @@ dependencies = [ "libc", ] -[[package]] -name = "walkdir" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 6561e5c..dd6b7e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,6 @@ prettytable-rs = { version = "0.10.0"} url = { version = "2.5.2", features = ["serde"] } urlencoding = { version = "2.1.3" } uuid = { version = "1.10.0", features = ["v7"] } -walkdir = { version = "2.5.0" } zstd = { version = "0.13.2" } [dev-dependencies] diff --git a/src/server/server_runner.rs b/src/server/server_runner.rs index 5d172d6..5e2bf7c 100644 --- a/src/server/server_runner.rs +++ b/src/server/server_runner.rs @@ -8,7 +8,7 @@ use crate::server::endpoints::{ log_in_endpoint, query_endpoint, register_endpoint, restore_snapshot_endpoint, update_profile_endpoint, }; -use crate::server::snapshots::schedule_periodic_snapshots; +use crate::server::snapshots::execution::schedule_periodic_snapshots; use crate::server::tokens::retrieve_and_validate_api_token; use crate::server::web_frontend::WebFrontendDetails; use actix_cors::Cors; diff --git a/src/server/snapshots.rs b/src/server/snapshots.rs index d0c166b..fec93e9 100644 --- a/src/server/snapshots.rs +++ b/src/server/snapshots.rs @@ -1,228 +1,4 @@ +pub mod execution; pub mod hashes; pub mod models; pub mod storage; - -use crate::ayb_db::db_interfaces::AybDb; -use crate::error::AybError; -use crate::hosted_db::paths::{ - current_database_path, database_parent_path, database_snapshot_path, pathbuf_to_file_name, - pathbuf_to_parent, -}; -use crate::hosted_db::sqlite::query_sqlite; -use crate::server::config::{AybConfig, SqliteSnapshotMethod}; -use crate::server::snapshots::hashes::hash_db_directory; -use crate::server::snapshots::models::{Snapshot, SnapshotType}; -use crate::server::snapshots::storage::SnapshotStorage; -use go_parse_duration::parse_duration; -use std::collections::HashSet; -use std::fs; -use std::path::Path; -use std::time::Duration; -use tokio_cron_scheduler::{Job, JobScheduler}; -use walkdir::WalkDir; - -pub async fn schedule_periodic_snapshots( - config: AybConfig, - ayb_db: Box, -) -> Result<(), AybError> { - if let Some(ref snapshot_config) = config.snapshots { - if let Some(ref automation_config) = snapshot_config.automation { - let scheduler = JobScheduler::new().await?; - // TODO(marcua): Consider something better than - // try_into/unwrap. The problem is that `parse_duration` - // produces an i64 and `from_nanos` expects u64. - let duration = Duration::from_nanos( - parse_duration(&automation_config.interval)? - .try_into() - .unwrap(), - ); - scheduler - .add(Job::new_repeated_async(duration, move |_, _| { - let config = config.clone(); - let ayb_db = ayb_db.clone(); - Box::pin(async move { - if let Some(err) = create_snapshots(&config.clone(), &ayb_db.clone()) - .await - .err() - { - eprintln!("Unable to walk database directory for snapshots: {}", err); - } - }) - })?) - .await?; - scheduler.shutdown_on_ctrl_c(); - - scheduler.start().await?; - } - } - Ok(()) -} - -// TODO(marcua): Figure how how to avoid this Clippy ignore and the -// one on snapshot_database. If I remove the Box, I get an -// unimplemented trait compiler error, but if I keep it, I get a -// Clippy warning. -#[allow(clippy::borrowed_box)] -async fn create_snapshots(config: &AybConfig, ayb_db: &Box) -> Result<(), AybError> { - // Walk the data path for entity slugs, database slugs - println!("Creating snapshots..."); - let mut visited: HashSet = HashSet::new(); - for entry in WalkDir::new(database_parent_path(&config.data_path, true).unwrap()) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| !e.file_type().is_dir() && !e.path_is_symlink()) - { - let path = entry.path(); - if let Some(err) = snapshot_database(config, ayb_db, path, &mut visited) - .await - .err() - { - eprintln!("Unable to snapshot database {}: {}", path.display(), err); - } - } - - Ok(()) -} - -#[allow(clippy::borrowed_box)] -pub async fn snapshot_database( - config: &AybConfig, - ayb_db: &Box, - path: &Path, - visited: &mut HashSet, -) -> Result<(), AybError> { - // TODO(marcua): Replace printlns with some structured logging or - // tracing library. - println!("Trying to back up {}", path.display()); - let entity_slug = pathbuf_to_file_name(&pathbuf_to_parent(&pathbuf_to_parent( - &pathbuf_to_parent(path)?, - )?)?)?; - let database_slug = pathbuf_to_file_name(path)?; - let visited_path = format!("{}/{}", entity_slug, database_slug); - if visited.contains(&visited_path) { - // We only need to snapshot each database once per run, but we - // might encounter multiple versions of the database. Return - // early if we've already taken a backup. - return Ok(()); - } - visited.insert(visited_path); - if config.snapshots.is_none() { - return Err(AybError::SnapshotError { - message: "No snapshot config found".to_string(), - }); - } - let snapshot_config = config.snapshots.as_ref().unwrap(); - - match ayb_db.get_database(&entity_slug, &database_slug).await { - Ok(_db) => { - let db_path = current_database_path(&entity_slug, &database_slug, &config.data_path)?; - let mut snapshot_path = - database_snapshot_path(&entity_slug, &database_slug, &config.data_path)?; - let snapshot_directory = snapshot_path.clone(); - snapshot_path.push(&database_slug); - // Try to remove the file if it already exists, but don't fail if it doesn't. - fs::remove_file(&snapshot_path).ok(); - let backup_query = match snapshot_config.sqlite_method { - // TODO(marcua): Figure out dot commands to make .backup work - SqliteSnapshotMethod::Backup => { - return Err(AybError::SnapshotError { - message: "Backup requires dot commands, which are not yet supported" - .to_string(), - }) - } - SqliteSnapshotMethod::Vacuum => { - format!("VACUUM INTO \"{}\"", snapshot_path.display()) - } - }; - let result = query_sqlite( - &db_path, - &backup_query, - // Run in unsafe mode to allow backup process to - // attach to destination database. - true, - )?; - if !result.rows.is_empty() { - return Err(AybError::SnapshotError { - message: format!("Unexpected snapshot result: {:?}", result), - }); - } - let result = query_sqlite(&snapshot_path, "PRAGMA integrity_check;", false)?; - if result.fields.len() != 1 - || result.rows.len() != 1 - || result.rows[0][0] != Some("ok".to_string()) - { - return Err(AybError::SnapshotError { - message: format!("Snapshot failed integrity check: {:?}", result), - }); - } - - let snapshot_storage = SnapshotStorage::new(snapshot_config).await?; - let existing_snapshots = snapshot_storage - .list_snapshots(&entity_slug, &database_slug) - .await?; - let num_existing_snapshots = existing_snapshots.len(); - let snapshot_hash = hash_db_directory(&snapshot_directory)?; - let mut should_upload_snapshot = true; - for snapshot in &existing_snapshots { - if snapshot.snapshot_id == snapshot_hash { - println!( - "Snapshot with hash {} already exists, not uploading again.", - snapshot_hash - ); - should_upload_snapshot = false; - break; - } - } - if should_upload_snapshot { - println!("Uploading new snapshot with hash {}.", snapshot_hash); - snapshot_storage - .put( - &entity_slug, - &database_slug, - &Snapshot { - snapshot_id: snapshot_hash, - snapshot_type: SnapshotType::Automatic as i16, - }, - &snapshot_path, - ) - .await?; - - // If adding this snapshot resulted in more than the - // maximum snapshots we are allowed, prune old ones. - let max_snapshots: usize = snapshot_config - .automation - .as_ref() - .unwrap() - .max_snapshots - .into(); - let prune_snapshots = (num_existing_snapshots + 1).checked_sub(max_snapshots); - if let Some(prune_snapshots) = prune_snapshots { - println!("Pruning {} oldest snapshots", prune_snapshots); - let mut ids_to_prune: Vec = vec![]; - for snapshot_index in 0..prune_snapshots { - ids_to_prune.push( - existing_snapshots[existing_snapshots.len() - snapshot_index - 1] - .snapshot_id - .clone(), - ) - } - snapshot_storage - .delete_snapshots(&entity_slug, &database_slug, &ids_to_prune) - .await?; - } - } - - // Clean up after uploading snapshot. - fs::remove_dir_all(pathbuf_to_parent(&snapshot_path)?)?; - } - Err(err) => match err { - AybError::RecordNotFound { record_type, .. } if record_type == "database" => { - println!("Not a known database {}/{}", entity_slug, database_slug); - } - _ => { - return Err(err); - } - }, - } - Ok(()) -} diff --git a/src/server/snapshots/execution.rs b/src/server/snapshots/execution.rs new file mode 100644 index 0000000..21c7525 --- /dev/null +++ b/src/server/snapshots/execution.rs @@ -0,0 +1,235 @@ +use crate::ayb_db::db_interfaces::AybDb; +use crate::error::AybError; +use crate::hosted_db::paths::{ + current_database_path, database_parent_path, database_snapshot_path, pathbuf_to_file_name, + pathbuf_to_parent, +}; +use crate::hosted_db::sqlite::query_sqlite; +use crate::server::config::{AybConfig, SqliteSnapshotMethod}; +use crate::server::snapshots::hashes::hash_db_directory; +use crate::server::snapshots::models::{Snapshot, SnapshotType}; +use crate::server::snapshots::storage::SnapshotStorage; +use go_parse_duration::parse_duration; +use std::fs; +use std::time::Duration; +use tokio_cron_scheduler::{Job, JobScheduler}; + +pub async fn schedule_periodic_snapshots( + config: AybConfig, + ayb_db: Box, +) -> Result<(), AybError> { + if let Some(ref snapshot_config) = config.snapshots { + if let Some(ref automation_config) = snapshot_config.automation { + let scheduler = JobScheduler::new().await?; + let duration = Duration::from_nanos( + parse_duration(&automation_config.interval)? + .try_into() + .map_err(|err| AybError::SnapshotError { + message: format!( + "Unable to turn snapshot interval into a duration: {:?}", + err + ), + })?, + ); + scheduler + .add(Job::new_repeated_async(duration, move |_, _| { + let config = config.clone(); + let ayb_db = ayb_db.clone(); + Box::pin(async move { + if let Some(err) = create_snapshots(&config.clone(), &ayb_db.clone()) + .await + .err() + { + eprintln!("Unable to walk database directory for snapshots: {}", err); + } + }) + })?) + .await?; + scheduler.shutdown_on_ctrl_c(); + + scheduler.start().await?; + } + } + Ok(()) +} + +// TODO(marcua): Figure how how to avoid this Clippy ignore and the +// one on snapshot_database. If I remove the Box, I get an +// unimplemented trait compiler error, but if I keep it, I get a +// Clippy warning. +#[allow(clippy::borrowed_box)] +async fn create_snapshots(config: &AybConfig, ayb_db: &Box) -> Result<(), AybError> { + // Walk the data path for entity slugs, database slugs + println!("Creating snapshots..."); + let entity_paths = + fs::read_dir(database_parent_path(&config.data_path, true).unwrap())?.map(|entry| { + let entry_path = entry?.path(); + let entity = pathbuf_to_file_name(&entry_path)?; + if entry_path.is_dir() { + Ok((entity, entry_path)) + } else { + Err(AybError::SnapshotError { + message: format!( + "Unexpected file where entity directory expected: {}", + entry_path.display() + ), + }) + } + }); + for entity_details in entity_paths { + let (entity, entity_path) = entity_details?; + for entry in fs::read_dir(entity_path)? { + let entry_path = entry?.path(); + let database = pathbuf_to_file_name(&entry_path)?; + if entry_path.is_dir() { + if let Some(err) = snapshot_database(config, ayb_db, &entity, &database) + .await + .err() + { + eprintln!( + "Unable to snapshot database {}/{}: {}", + entity, database, err + ); + } + } else { + return Err(AybError::SnapshotError { + message: format!( + "Unexpected file where database directory expected: {}", + entry_path.display() + ), + }); + } + } + } + + Ok(()) +} + +#[allow(clippy::borrowed_box)] +pub async fn snapshot_database( + config: &AybConfig, + ayb_db: &Box, + entity_slug: &str, + database_slug: &str, +) -> Result<(), AybError> { + println!("Trying to back up {}/{}", entity_slug, database_slug); + if config.snapshots.is_none() { + return Err(AybError::SnapshotError { + message: "No snapshot config found".to_string(), + }); + } + let snapshot_config = config.snapshots.as_ref().unwrap(); + + match ayb_db.get_database(entity_slug, database_slug).await { + Ok(_db) => { + let db_path = current_database_path(entity_slug, database_slug, &config.data_path)?; + let mut snapshot_path = + database_snapshot_path(entity_slug, database_slug, &config.data_path)?; + let snapshot_directory = snapshot_path.clone(); + snapshot_path.push(database_slug); + // Try to remove the file if it already exists, but don't fail if it doesn't. + fs::remove_file(&snapshot_path).ok(); + let backup_query = match snapshot_config.sqlite_method { + // TODO(marcua): Figure out dot commands to make .backup work + SqliteSnapshotMethod::Backup => { + return Err(AybError::SnapshotError { + message: "Backup requires dot commands, which are not yet supported" + .to_string(), + }) + } + SqliteSnapshotMethod::Vacuum => { + format!("VACUUM INTO \"{}\"", snapshot_path.display()) + } + }; + let result = query_sqlite( + &db_path, + &backup_query, + // Run in unsafe mode to allow backup process to + // attach to destination database. + true, + )?; + if !result.rows.is_empty() { + return Err(AybError::SnapshotError { + message: format!("Unexpected snapshot result: {:?}", result), + }); + } + let result = query_sqlite(&snapshot_path, "PRAGMA integrity_check;", false)?; + if result.fields.len() != 1 + || result.rows.len() != 1 + || result.rows[0][0] != Some("ok".to_string()) + { + return Err(AybError::SnapshotError { + message: format!("Snapshot failed integrity check: {:?}", result), + }); + } + + let snapshot_storage = SnapshotStorage::new(snapshot_config).await?; + let existing_snapshots = snapshot_storage + .list_snapshots(entity_slug, database_slug) + .await?; + let num_existing_snapshots = existing_snapshots.len(); + let snapshot_hash = hash_db_directory(&snapshot_directory)?; + let mut should_upload_snapshot = true; + for snapshot in &existing_snapshots { + if snapshot.snapshot_id == snapshot_hash { + println!( + "Snapshot with hash {} already exists, not uploading again.", + snapshot_hash + ); + should_upload_snapshot = false; + break; + } + } + if should_upload_snapshot { + println!("Uploading new snapshot with hash {}.", snapshot_hash); + snapshot_storage + .put( + entity_slug, + database_slug, + &Snapshot { + snapshot_id: snapshot_hash, + snapshot_type: SnapshotType::Automatic as i16, + }, + &snapshot_path, + ) + .await?; + + // If adding this snapshot resulted in more than the + // maximum snapshots we are allowed, prune old ones. + let max_snapshots: usize = snapshot_config + .automation + .as_ref() + .unwrap() + .max_snapshots + .into(); + let prune_snapshots = (num_existing_snapshots + 1).checked_sub(max_snapshots); + if let Some(prune_snapshots) = prune_snapshots { + println!("Pruning {} oldest snapshots", prune_snapshots); + let mut ids_to_prune: Vec = vec![]; + for snapshot_index in 0..prune_snapshots { + ids_to_prune.push( + existing_snapshots[existing_snapshots.len() - snapshot_index - 1] + .snapshot_id + .clone(), + ) + } + snapshot_storage + .delete_snapshots(entity_slug, database_slug, &ids_to_prune) + .await?; + } + } + + // Clean up after uploading snapshot. + fs::remove_dir_all(pathbuf_to_parent(&snapshot_path)?)?; + } + Err(err) => match err { + AybError::RecordNotFound { record_type, .. } if record_type == "database" => { + println!("Not a known database {}/{}", entity_slug, database_slug); + } + _ => { + return Err(err); + } + }, + } + Ok(()) +} diff --git a/src/server/snapshots/models.rs b/src/server/snapshots/models.rs index cb6bfc8..5e2af92 100644 --- a/src/server/snapshots/models.rs +++ b/src/server/snapshots/models.rs @@ -38,6 +38,11 @@ impl SnapshotType { #[derive(Debug, Serialize, Deserialize)] pub struct Snapshot { + // TODO(marcua): Eventually we'll want an InstantiatedSnapshot, + // but haven't needed one yet. When we do, it will need this field + // for completeness: + // pub last_modified_at: DateTime, + // A blake3 hash of the snapshot directory before compressing. pub snapshot_id: String, pub snapshot_type: i16, @@ -56,13 +61,6 @@ impl Snapshot { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct InstantiatedSnapshot { - pub last_modified_at: DateTime, - pub snapshot_id: String, - pub snapshot_type: i16, -} - #[derive(Debug, Serialize, Deserialize)] pub struct ListSnapshotResult { pub last_modified_at: DateTime,