Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): add a command to migrate a postgres database #8756

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions implementations/rust/ockam/ockam_command/src/command_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ockam_node::Executor;
use opentelemetry::trace::{TraceContextExt, Tracer};
use opentelemetry::{global, Context};
use std::collections::HashMap;
use tracing::error;
use tracing::{error, warn};

/// This function creates a journey event describing the execution of a command
pub fn add_command_event(
Expand All @@ -24,9 +24,13 @@ pub fn add_command_event(
APPLICATION_EVENT_COMMAND,
sanitize_command_arguments(command_arguments),
);
cli_state
if let Err(e) = cli_state
.add_journey_event(JourneyEvent::ok(command_name), attributes)
.await
{
warn!("cannot save a journey event: {}", e);
}
Ok::<(), ockam_core::Error>(())
})
})
.into_diagnostic()??;
Expand Down Expand Up @@ -56,9 +60,13 @@ pub fn add_command_error_event(
APPLICATION_EVENT_COMMAND,
sanitize_command_arguments(command_arguments),
);
cli_state
if let Err(e) = cli_state
.add_journey_error(&command, message, attributes)
.await
{
warn!("cannot save a journey event: {}", e);
}
Ok::<(), ockam_core::Error>(())
})
})
.into_diagnostic()??;
Expand Down
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_command/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod lease;
mod manpages;
mod markdown;
mod message;
mod migrate_database;
pub mod node;
mod operation;
mod output;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::docs;
use crate::util::async_cmd;
use crate::CommandGlobalOpts;
use clap::Args;
use miette::miette;
use ockam_node::database::node_migration_set::NodeMigrationSet;
use ockam_node::database::{DatabaseConfiguration, MigrationSet, SqlxDatabase};
use ockam_node::Context;

const LONG_ABOUT: &str = include_str!("./static/long_about.txt");
const AFTER_LONG_HELP: &str = include_str!("./static/after_long_help.txt");

#[derive(Clone, Debug, Args)]
#[command(
hide = true,
long_about = docs::about(LONG_ABOUT),
after_long_help = docs::after_help(AFTER_LONG_HELP)
)]
pub struct MigrateDatabaseCommand {
/// Report which migrations would be applied, but don't apply them
#[arg(short, long, default_value_t = false)]
dry_run: bool,
}

impl MigrateDatabaseCommand {
pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> {
async_cmd(&self.name(), opts.clone(), |ctx| async move {
self.async_run(&ctx, opts).await
})
}

pub fn name(&self) -> String {
"migrate_database".into()
}

/// If a Postgres database is accessible, either:
/// - Get the current migration status with the `--dry-run` option,
/// - Or execute all the possible migrations and return the status after migration.
///
/// This command returns true when used in scripts if the command successfully executed.
async fn async_run(&self, _ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
match DatabaseConfiguration::postgres()? {
Some(configuration) => {
let db = SqlxDatabase::create_no_migration(&configuration).await?;
let migration_set = NodeMigrationSet::new(configuration.database_type());
let migrator = migration_set.create_migrator()?;
if !self.dry_run {
migrator.migrate(&db.pool).await?;
};

let status = migrator.migration_status(&db.pool).await?;
opts.terminal.stdout().plain(&status).json_obj(&status)?.machine(status.up_to_date()).write_line()?;

Ok(())
},
None => Err(miette!("There is no postgres database configuration, or it is incomplete. Please run ockam environment to check the database environment variables")),
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```sh
# Migrate a Postgres database shared by multiple nodes.
$ ockam migrate-postgres
$ ockam migrate-postgres --dry-run
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Migrate a Postgres database to the latest schema version.
4 changes: 4 additions & 0 deletions implementations/rust/ockam/ockam_command/src/subcommand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::lease::LeaseCommand;
use crate::manpages::ManpagesCommand;
use crate::markdown::MarkdownCommand;
use crate::message::MessageCommand;
use crate::migrate_database::MigrateDatabaseCommand;
use crate::node::{NodeCommand, NodeSubcommand};
use crate::policy::PolicyCommand;
use crate::project::ProjectCommand;
Expand Down Expand Up @@ -105,6 +106,7 @@ pub enum OckamSubcommand {
Worker(WorkerCommand),
Service(ServiceCommand),
Message(MessageCommand),
MigrateDatabase(MigrateDatabaseCommand),
SecureChannelListener(SecureChannelListenerCommand),
SecureChannel(SecureChannelCommand),
TcpListener(TcpListenerCommand),
Expand Down Expand Up @@ -151,6 +153,7 @@ impl OckamSubcommand {
OckamSubcommand::Lease(c) => c.run(opts),
OckamSubcommand::Authority(c) => c.run(opts),
OckamSubcommand::Markdown(c) => c.run(),
OckamSubcommand::MigrateDatabase(c) => c.run(opts),
OckamSubcommand::Worker(c) => c.run(opts),
OckamSubcommand::Service(c) => c.run(opts),
OckamSubcommand::Message(c) => c.run(opts),
Expand Down Expand Up @@ -295,6 +298,7 @@ impl OckamSubcommand {
OckamSubcommand::Lease(c) => c.name(),
OckamSubcommand::Authority(c) => c.name(),
OckamSubcommand::Markdown(c) => c.name(),
OckamSubcommand::MigrateDatabase(c) => c.name(),
OckamSubcommand::Worker(c) => c.name(),
OckamSubcommand::Service(c) => c.name(),
OckamSubcommand::Message(c) => c.name(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use core::fmt::{Display, Formatter};
use serde::Serialize;

/// This enum models the result of executing one migration.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum MigrationResult {
/// The migration was successful
MigrationSuccess,
/// The migration had a failure
MigrationFailure(MigrationFailure),
}

impl MigrationResult {
/// Create a success result
pub fn success() -> Self {
MigrationResult::MigrationSuccess
}

/// Create a failure for an incorrect checksum
pub fn incorrect_checksum(
description: String,
sql: String,
actual_checksum: String,
expected_checksum: String,
) -> Self {
Self::MigrationFailure(MigrationFailure::IncorrectChecksum(
description,
sql,
actual_checksum,
expected_checksum,
))
}

/// Create a failure when a down migration was attempted
pub fn down_migration() -> Self {
Self::MigrationFailure(MigrationFailure::DownMigration)
}

/// Create a failure when a migration failed for a given version
pub fn dirty_version() -> Self {
Self::MigrationFailure(MigrationFailure::DirtyVersion)
}
}

/// This enum models possible causes for migration failures. Either for a simple migration or several.
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub enum MigrationFailure {
/// Description, sql, actual checksum, expected checksum
IncorrectChecksum(String, String, String, String),
/// A down migration was attempted
DownMigration,
/// The previous migration version failed to execute
DirtyVersion,
}

impl Display for MigrationFailure {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
MigrationFailure::IncorrectChecksum(
description,
sql,
actual_checksum,
expected_checksum,
) => write!(f, "❌ Incorrect checksum for the migration: {description}. Actual checksum: {actual_checksum}, expected checksum: {expected_checksum}.\nSQL statements\n{sql}")?,
MigrationFailure::DownMigration => write!(f, "A down migration was attempted")?,
MigrationFailure::DirtyVersion => write!(f, "This migration has already been executed and it failed")?,
}
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::database::MigrationFailure;
use crate::storage::database::migrations::migration_support::migrator::Version;
use core::fmt::{Display, Formatter};
use serde::Serialize;

/// This enum models the state of a database with respect to migrations
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub enum MigrationStatus {
/// The database is up to date with the latest version.
UpToDate(Version),
/// The database needs to be updated.
Todo(Option<Version>, Version),
/// A migration was attempted but failed
Failed(Version, MigrationFailure),
}

impl Display for MigrationStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
MigrationStatus::UpToDate(version) => {
write!(f, "✅ The database is up to date (version: {})", version)?
}
MigrationStatus::Todo(current_version, next_version) => write!(
f,
"⚙️ The database needs to be updated ({}next version: {})",
current_version
.map(|v| format!("current version: {v}, "))
.unwrap_or("".to_string()),
next_version
)?,
MigrationStatus::Failed(version, reason) => write!(
f,
"❌ The database failed to be updated at version: {}.\nReason: {reason}",
version
)?,
};
Ok(())
}
}

impl MigrationStatus {
/// Return true if the database is up to date
pub fn up_to_date(&self) -> bool {
matches!(self, MigrationStatus::UpToDate(_))
}
}
Loading
Loading