Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace refinery with our own backwards compatible migration code #932

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 25 additions & 94 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion nexus/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ prost = "0.12"
peer-cursor = { path = "../peer-cursor" }
peer-postgres = { path = "../peer-postgres" }
pt = { path = "../pt" }
refinery = { version = "0.8", features = ["tokio-postgres"] }
include_dir = { version = "0.7", default-features = false }
tokio = { version = "1.13.0", features = ["full"] }
tokio-postgres = { version = "0.7.6", features = [
"with-chrono-0_4",
Expand All @@ -21,4 +21,5 @@ tokio-postgres = { version = "0.7.6", features = [
] }
tracing = "0.1.29"
serde_json = "1.0"
siphasher = "1.0"
postgres-connection = { path = "../postgres-connection" }
153 changes: 133 additions & 20 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::{collections::HashMap, sync::Arc};

use anyhow::{anyhow, Context};
use include_dir::{include_dir, Dir, File};
use peer_cursor::QueryExecutor;
use peer_postgres::PostgresQueryExecutor;
use postgres_connection::{connect_postgres, get_pg_connection_string};
Expand All @@ -11,31 +14,67 @@ use pt::{
peerdb_peers::{peer::Config, DbType, Peer},
};
use serde_json::Value;
use tokio_postgres::{types, Client};
use siphasher::sip::SipHasher13;
use tokio_postgres::{error::SqlState, types, Client};

mod embedded {
use refinery::embed_migrations;
embed_migrations!("migrations");
static MIGRATIONS: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/migrations");

#[derive(Eq)]
struct Migration<'a> {
pub file: &'a File<'a>,
pub version: i32,
pub name: &'a str,
}

pub struct Catalog {
pg: Box<Client>,
executor: Arc<dyn QueryExecutor>,
impl<'a> Migration<'a> {
pub fn new(file: &'a File<'a>) -> anyhow::Result<Self> {
let Some(f) = file.path().to_str() else {
return Err(anyhow!("migration filename must be utf8"));
};
let Some(f) = f.strip_prefix('V') else {
return Err(anyhow!("migration filename must start with V"));
};
let Some(f) = f.strip_suffix(".sql") else {
return Err(anyhow!("migration filename must end with .sql"));
};
let Some(__idx) = f.find("__") else {
return Err(anyhow!("migration filename must contain __"));
};
let Ok(version) = f[..__idx].parse() else {
return Err(anyhow!(
"migration filename must have number between V & __"
));
};
let name = &f[__idx + 2..];
Ok(Self {
file,
version,
name,
})
}
}

impl<'a> PartialEq for Migration<'a> {
fn eq(&self, other: &Self) -> bool {
self.version == other.version
}
}

async fn run_migrations(client: &mut Client) -> anyhow::Result<()> {
let migration_report = embedded::migrations::runner()
.run_async(client)
.await
.context("Failed to run migrations")?;
for migration in migration_report.applied_migrations() {
tracing::info!(
"Migration Applied - Name: {}, Version: {}",
migration.name(),
migration.version()
);
impl<'a> Ord for Migration<'a> {
fn cmp(&self, other: &Self) -> Ordering {
self.version.cmp(&other.version)
}
Ok(())
}

impl<'a> PartialOrd for Migration<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.version.cmp(&other.version))
}
}

pub struct Catalog {
pg: Box<Client>,
executor: Arc<dyn QueryExecutor>,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -86,7 +125,81 @@ impl Catalog {
}

pub async fn run_migrations(&mut self) -> anyhow::Result<()> {
run_migrations(&mut self.pg).await
let mut migrations = MIGRATIONS
.files()
.map(Migration::new)
.collect::<anyhow::Result<Vec<_>>>()?;
migrations.sort();
let create = self
.pg
.query(
"create table if not exists refinery_schema_history(\
version int4 primary key, name text, applied_on text, checksum text)",
&[],
)
.await;
if let Err(err) = create {
if err.code() != Some(&SqlState::UNIQUE_VIOLATION) {
return Err(err.into());
}
}

let tx = self.pg.transaction().await?;
tx.execute(
"lock table refinery_schema_history in share update exclusive mode",
&[],
)
.await?;
let rows = tx
.query(
"select version, name from refinery_schema_history order by version",
&[],
)
.await?;
let mut applied = rows
.iter()
.map(|row| (row.get::<usize, i32>(0), row.get::<usize, &str>(1)));

for migration in migrations {
if let Some((applied_version, applied_name)) = applied.next() {
if migration.version != applied_version {
return Err(anyhow!(
"Migration version mismatch: {} & {}",
migration.version,
applied_version
));
}
if migration.name != applied_name {
return Err(anyhow!(
"Migration name mismatch: '{}' & '{}'",
migration.name,
applied_name
));
}
continue;
}
let Some(sql) = migration.file.contents_utf8() else {
return Err(anyhow!("migration sql must be utf8"));
};
let checksum = {
let mut hasher = SipHasher13::new();
migration.name.hash(&mut hasher);
migration.version.hash(&mut hasher);
sql.hash(&mut hasher);
hasher.finish()
};

tx.batch_execute(sql).await?;
tx.execute("insert into refinery_schema_history (version, name, applied_on, checksum) values ($1, $2, NOW(), $3)",
&[&migration.version, &migration.name, &checksum.to_string()]).await?;
tracing::info!(
"Migration Applied: {} {}",
migration.version,
migration.name
);
}

tx.commit().await.map_err(|err| err.into())
}

pub fn get_executor(&self) -> &Arc<dyn QueryExecutor> {
Expand Down
Loading