Skip to content

Commit

Permalink
feat(rust): add a command to migrate a postgres database
Browse files Browse the repository at this point in the history
| Conflicts:
|	implementations/rust/ockam/ockam_command/src/lib.rs
|	implementations/rust/ockam/ockam_command/src/subcommand.rs

| Conflicts:
|	implementations/rust/ockam/ockam_command/src/lib.rs
|	implementations/rust/ockam/ockam_command/src/subcommand.rs
  • Loading branch information
etorreborre committed Jan 20, 2025
1 parent 20a9025 commit 618a491
Show file tree
Hide file tree
Showing 20 changed files with 394 additions and 99 deletions.
14 changes: 11 additions & 3 deletions implementations/rust/ockam/ockam_command/src/command_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ockam_node::Executor;
use opentelemetry::trace::{TraceContextExt, Tracer};
use opentelemetry::{global, Context};
use std::collections::HashMap;
use tracing::error;
use tracing::{error, warn};

/// This function creates a journey event describing the execution of a command
pub fn add_command_event(
Expand All @@ -24,9 +24,13 @@ pub fn add_command_event(
APPLICATION_EVENT_COMMAND,
sanitize_command_arguments(command_arguments),
);
cli_state
if let Err(e) = cli_state
.add_journey_event(JourneyEvent::ok(command_name), attributes)
.await
{
warn!("cannot save a journey event: {}", e);
}
Ok::<(), ockam_core::Error>(())
})
})
.into_diagnostic()??;
Expand Down Expand Up @@ -56,9 +60,13 @@ pub fn add_command_error_event(
APPLICATION_EVENT_COMMAND,
sanitize_command_arguments(command_arguments),
);
cli_state
if let Err(e) = cli_state
.add_journey_error(&command, message, attributes)
.await
{
warn!("cannot save a journey event: {}", e);
}
Ok::<(), ockam_core::Error>(())
})
})
.into_diagnostic()??;
Expand Down
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_command/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod lease;
mod manpages;
mod markdown;
mod message;
mod migrate_database;
pub mod node;
mod operation;
mod output;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::docs;
use crate::util::async_cmd;
use crate::CommandGlobalOpts;
use clap::Args;
use miette::IntoDiagnostic;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::Error;
use ockam_node::database::node_migration_set::NodeMigrationSet;
use ockam_node::database::{DatabaseConfiguration, MigrationSet, SqlxDatabase};
use ockam_node::Context;

const LONG_ABOUT: &str = include_str!("./static/long_about.txt");
const AFTER_LONG_HELP: &str = include_str!("./static/after_long_help.txt");

#[derive(Clone, Debug, Args)]
#[command(
long_about = docs::about(LONG_ABOUT),
after_long_help = docs::after_help(AFTER_LONG_HELP)
)]
pub struct MigrateDatabaseCommand {
/// Report which migrations would be applied, but don't apply them
#[arg(short, long, default_value_t = false)]
dry_run: bool,
}

impl MigrateDatabaseCommand {
pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> {
async_cmd(&self.name(), opts.clone(), |ctx| async move {
self.async_run(&ctx, opts).await
})
}

pub fn name(&self) -> String {
"migrate_database".into()
}

/// If a Postgres database is accessible, either:
/// - Get the current migration status with the `--dry-run` option,
/// - Or execute all the possible migrations and return the status after migration.
///
/// This command returns true when used in scripts if the command successfully executed.
async fn async_run(&self, _ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
match DatabaseConfiguration::postgres()? {
Some(configuration) => {
let db = SqlxDatabase::create_no_migration(&configuration).await?;
let migration_set = NodeMigrationSet::new(configuration.database_type());
let migrator = migration_set.create_migrator()?;
if !self.dry_run {
migrator.migrate(&db.pool).await?;
};

let status = migrator.migration_status(&db.pool).await?;
opts.terminal.stdout().plain(&status).json_obj(&status)?.machine(status.up_to_date()).write_line()?;

Ok(())
},
None => Err(Error::new(Origin::Core, Kind::NotFound, "There is no postgres database configuration, or it is incomplete. Please run ockam environment to check the database environment variables".to_string())).into_diagnostic()?,
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```sh
# Migrate a Postgres database shared by multiple nodes.
$ ockam migrate-postgres
$ ockam migrate-postgres --dry-run
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Migrate a Postgres database to the latest schema version.
3 changes: 3 additions & 0 deletions implementations/rust/ockam/ockam_command/src/subcommand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub enum OckamSubcommand {
Worker(WorkerCommand),
Service(ServiceCommand),
Message(MessageCommand),
MigrateDatabase(MigrateDatabaseCommand),
SecureChannelListener(SecureChannelListenerCommand),
SecureChannel(SecureChannelCommand),
TcpListener(TcpListenerCommand),
Expand Down Expand Up @@ -151,6 +152,7 @@ impl OckamSubcommand {
OckamSubcommand::Lease(c) => c.run(opts),
OckamSubcommand::Authority(c) => c.run(opts),
OckamSubcommand::Markdown(c) => c.run(),
OckamSubcommand::MigrateDatabase(c) => c.run(),
OckamSubcommand::Worker(c) => c.run(opts),
OckamSubcommand::Service(c) => c.run(opts),
OckamSubcommand::Message(c) => c.run(opts),
Expand Down Expand Up @@ -295,6 +297,7 @@ impl OckamSubcommand {
OckamSubcommand::Lease(c) => c.name(),
OckamSubcommand::Authority(c) => c.name(),
OckamSubcommand::Markdown(c) => c.name(),
OckamSubcommand::MigrateDatabase(c) => c.name(),
OckamSubcommand::Worker(c) => c.name(),
OckamSubcommand::Service(c) => c.name(),
OckamSubcommand::Message(c) => c.name(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use core::fmt::{Display, Formatter};
use serde::Serialize;

/// This enum models the result of executing one migration.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum MigrationResult {
/// The migration was successful
MigrationSuccess,
/// The migration had a failure
SomeMigrationError(MigrationFailure),
}

impl MigrationResult {
/// Create a success result
pub fn success() -> Self {
MigrationResult::MigrationSuccess
}

/// Create a failure for an incorrect checksum
pub fn incorrect_checksum(
description: String,
sql: String,
actual_checksum: String,
expected_checksum: String,
) -> Self {
Self::SomeMigrationError(MigrationFailure::IncorrectChecksum(
description,
sql,
actual_checksum,
expected_checksum,
))
}

/// Create a failure when a down migration was attempted
pub fn down_migration() -> Self {
Self::SomeMigrationError(MigrationFailure::DownMigration)
}

/// Create a failure when a migration failed for a given version
pub fn dirty_version() -> Self {
Self::SomeMigrationError(MigrationFailure::DirtyVersion)
}
}

/// This enum models possible causes for migration failures. Either for a simple migration or several.
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub enum MigrationFailure {
/// Description, sql, actual checksum, expected checksum
IncorrectChecksum(String, String, String, String),
/// A down migration was attempted
DownMigration,
/// The previous migration version failed to execute
DirtyVersion,
}

impl Display for MigrationFailure {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
MigrationFailure::IncorrectChecksum(
description,
sql,
actual_checksum,
expected_checksum,
) => write!(f, "❌ Incorrect checksum for the migration: {description}. Actual checksum: {actual_checksum}, expected checksum: {expected_checksum}.\nSQL statements\n{sql}")?,
MigrationFailure::DownMigration => write!(f, "A down migration was attempted")?,
MigrationFailure::DirtyVersion => write!(f, "This migration has already been executed and it failed")?,
}
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::database::MigrationFailure;
use crate::storage::database::migrations::migration_support::migrator::Version;
use core::fmt::{Display, Formatter};
use serde::Serialize;

/// This enum models the state of a database with respect to migrations
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub enum MigrationStatus {
/// The database is up to date with the latest version.
UpToDate(Version),
/// The database needs to be updated.
Todo(Option<Version>, Version),
/// A migration was attempted but failed
Failed(Version, MigrationFailure),
}

impl Display for MigrationStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
MigrationStatus::UpToDate(version) => {
write!(f, "✅ The database is up to date (version: {})", version)?
}
MigrationStatus::Todo(current_version, next_version) => write!(
f,
"⚙️ The database needs to be updated ({}next version: {})",
current_version
.map(|v| format!("current version: {v}, "))
.unwrap_or("".to_string()),
next_version
)?,
MigrationStatus::Failed(version, reason) => write!(
f,
"❌ The database failed to be updated at version: {}.\nReason: {reason}",
version
)?,
};
Ok(())
}
}

impl MigrationStatus {
/// Create a new MigrationStatus::UpToDate
pub fn create_up_to_date(current_version: Version) -> Self {
MigrationStatus::UpToDate(current_version)
}

/// Create a new MigrationStatus::Todo
pub fn create_to_do(current_version: Option<Version>, next_version: Version) -> Self {
MigrationStatus::Todo(current_version, next_version)
}

/// Create a new MigrationStatus::Failed
pub fn create_failed(version: Version, failure: MigrationFailure) -> Self {
MigrationStatus::Failed(version, failure)
}

/// Return true if the database is up to date
pub fn up_to_date(&self) -> bool {
matches!(self, MigrationStatus::UpToDate(_))
}
}
Loading

0 comments on commit 618a491

Please sign in to comment.