From 488c2f3e0957904426f28ff6aba4ee17897c276f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 29 Dec 2023 15:49:14 +0000 Subject: [PATCH 1/4] Replace refinery with our own backwards compatible migration code Currently server_tests must be run with --test-threads=1 because of two things: 1. port collision 2. concurrent migrations failing I explored fixing these in #926. Ports are easily solved, but migrations are a pain This migration code works around concurrency with two changes from refinery: 1. ignore unique constraint violation from CREATE TABLE IF NOT EXISTS 2. lock migration table so concurrent migrations are serialized Considered submitting a PR to refinery with these two fixes, but this simple change was non trivial since they support multiple async/sync database drivers --- nexus/Cargo.lock | 119 +++++++------------------------ nexus/catalog/Cargo.toml | 3 +- nexus/catalog/src/lib.rs | 150 +++++++++++++++++++++++++++++++++------ 3 files changed, 157 insertions(+), 115 deletions(-) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 09159e4d29..150f14ec69 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -429,7 +429,7 @@ dependencies = [ "serde_json", "tar", "tempfile", - "toml 0.8.2", + "toml", "xz2", "zopfli", ] @@ -441,7 +441,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a969e13a7589e9e3e4207e153bae624ade2b5622fb4684a4923b23ec3d57719" dependencies = [ "serde", - "toml 0.8.2", + "toml", ] [[package]] @@ -450,13 +450,14 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "include_dir", "peer-cursor", "peer-postgres", "postgres-connection", "prost", "pt", - "refinery", "serde_json", + "siphasher 1.0.0", "tokio", "tokio-postgres", "tracing", @@ -1308,6 +1309,25 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "include_dir" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18762faeff7122e89e0857b02f7ce6fcc0d101d5e9ad2ad7846cc01d61b7f19e" +dependencies = [ + "include_dir_macros", +] + +[[package]] +name = "include_dir_macros" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -2209,7 +2229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97dc5fea232fc28d2f597b37c4876b348a40e33f3b02cc975c8d006d78d94b1a" dependencies = [ "toml_datetime", - "toml_edit 0.20.2", + "toml_edit", ] [[package]] @@ -2415,51 +2435,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "refinery" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "529664dbccc0a296947615c997a857912d72d1c44be1fafb7bae54ecfa7a8c24" -dependencies = [ - "refinery-core", - "refinery-macros", -] - -[[package]] -name = "refinery-core" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e895cb870cf06e92318cbbeb701f274d022d5ca87a16fa8244e291cd035ef954" -dependencies = [ - "async-trait", - "cfg-if", - "lazy_static", - "log", - "regex", - "serde", - "siphasher 1.0.0", - "thiserror", - "time", - "tokio", - "tokio-postgres", - "toml 0.7.8", - "url", - "walkdir", -] - -[[package]] -name = "refinery-macros" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "123e8b80f8010c3ae38330c81e76938fc7adf6cdbfbaad20295bb8c22718b4f1" -dependencies = [ - "proc-macro2", - "quote", - "refinery-core", - "regex", - "syn 2.0.43", -] - [[package]] name = "regex" version = "1.10.2" @@ -2750,15 +2725,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.22" @@ -3372,18 +3338,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "toml" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.19.15", -] - [[package]] name = "toml" version = "0.8.2" @@ -3393,7 +3347,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.20.2", + "toml_edit", ] [[package]] @@ -3405,19 +3359,6 @@ dependencies = [ "serde", ] -[[package]] -name = "toml_edit" -version = "0.19.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" -dependencies = [ - "indexmap 2.1.0", - "serde", - "serde_spanned", - "toml_datetime", - "winnow", -] - [[package]] name = "toml_edit" version = "0.20.2" @@ -3728,16 +3669,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "walkdir" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.1" diff --git a/nexus/catalog/Cargo.toml b/nexus/catalog/Cargo.toml index 4065833565..280582a010 100644 --- a/nexus/catalog/Cargo.toml +++ b/nexus/catalog/Cargo.toml @@ -12,7 +12,7 @@ prost = "0.12" peer-cursor = { path = "../peer-cursor" } peer-postgres = { path = "../peer-postgres" } pt = { path = "../pt" } -refinery = { version = "0.8", features = ["tokio-postgres"] } +include_dir = { version = "0.7", default-features = false } tokio = { version = "1.13.0", features = ["full"] } tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", @@ -21,4 +21,5 @@ tokio-postgres = { version = "0.7.6", features = [ ] } tracing = "0.1.29" serde_json = "1.0" +siphasher = "1.0" postgres-connection = { path = "../postgres-connection" } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 32ee57c034..69bf8986be 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -1,6 +1,9 @@ +use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, Context}; +use include_dir::{include_dir, Dir, File}; use peer_cursor::QueryExecutor; use peer_postgres::PostgresQueryExecutor; use postgres_connection::{connect_postgres, get_pg_connection_string}; @@ -11,31 +14,65 @@ use pt::{ peerdb_peers::{peer::Config, DbType, Peer}, }; use serde_json::Value; -use tokio_postgres::{types, Client}; +use siphasher::sip::SipHasher13; +use tokio_postgres::{error::SqlState, types, Client}; -mod embedded { - use refinery::embed_migrations; - embed_migrations!("migrations"); +static MIGRATIONS: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/migrations"); + +#[derive(Eq)] +struct Migration<'a> { + pub file: &'a File<'a>, + pub version: i32, + pub name: &'a str, } -pub struct Catalog { - pg: Box, - executor: Arc, +impl<'a> Migration<'a> { + pub fn new(file: &'a File<'a>) -> anyhow::Result { + let Some(f) = file.path().to_str() else { + return Err(anyhow!("migration filename must be utf8")); + }; + let Some(f) = f.strip_prefix('V') else { + return Err(anyhow!("migration filename must start with V")); + }; + let Some(f) = f.strip_suffix(".sql") else { + return Err(anyhow!("migration filename must end with .sql")); + }; + let Some(__idx) = f.find("__") else { + return Err(anyhow!("migration filename must contain __")); + }; + let Ok(version) = f[..__idx].parse() else { + return Err(anyhow!("migration filename must have number between V & __")); + }; + let name = &f[__idx + 2..]; + Ok(Self { + file, + version, + name, + }) + } +} + +impl<'a> PartialEq for Migration<'a> { + fn eq(&self, other: &Self) -> bool { + self.version == other.version + } +} + +impl<'a> Ord for Migration<'a> { + fn cmp(&self, other: &Self) -> Ordering { + self.version.cmp(&other.version) + } } -async fn run_migrations(client: &mut Client) -> anyhow::Result<()> { - let migration_report = embedded::migrations::runner() - .run_async(client) - .await - .context("Failed to run migrations")?; - for migration in migration_report.applied_migrations() { - tracing::info!( - "Migration Applied - Name: {}, Version: {}", - migration.name(), - migration.version() - ); +impl<'a> PartialOrd for Migration<'a> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.version.cmp(&other.version)) } - Ok(()) +} + +pub struct Catalog { + pg: Box, + executor: Arc, } #[derive(Debug, Copy, Clone)] @@ -86,7 +123,80 @@ impl Catalog { } pub async fn run_migrations(&mut self) -> anyhow::Result<()> { - run_migrations(&mut self.pg).await + let mut migrations = MIGRATIONS + .files() + .map(Migration::new) + .collect::>>()?; + migrations.sort(); + let tx = self.pg.transaction().await?; + let create = tx + .query( + "create table if not exists refinery_schema_history(\ + version int4 primary key, name text, applied_on text, checksum text)", + &[], + ) + .await; + if let Err(err) = create { + if err.code() != Some(&SqlState::UNIQUE_VIOLATION) { + return Err(err.into()); + } + } + + tx.execute( + "lock table refinery_schema_history in share update exclusive mode", + &[], + ) + .await?; + let rows = tx + .query( + "select version, name from refinery_schema_history order by version", + &[], + ) + .await?; + let mut applied = rows + .iter() + .map(|row| (row.get::(0), row.get::(1))); + + for migration in migrations { + if let Some((applied_version, applied_name)) = applied.next() { + if migration.version != applied_version { + return Err(anyhow!( + "Migration version mismatch: {} & {}", + migration.version, + applied_version + )); + } + if migration.name != applied_name { + return Err(anyhow!( + "Migration name mismatch: '{}' & '{}'", + migration.name, + applied_name + )); + } + continue; + } + let Some(sql) = migration.file.contents_utf8() else { + return Err(anyhow!("migration sql must be utf8")); + }; + let checksum = { + let mut hasher = SipHasher13::new(); + migration.name.hash(&mut hasher); + migration.version.hash(&mut hasher); + sql.hash(&mut hasher); + hasher.finish() + }; + + tx.batch_execute(sql).await?; + tx.execute("insert into refinery_schema_history (version, name, applied_on, checksum) values ($1, $2, NOW(), $3)", + &[&migration.version, &migration.name, &checksum.to_string()]).await?; + tracing::info!( + "Migration Applied: {} {}", + migration.version, + migration.name + ); + } + + tx.commit().await.map_err(|err| err.into()) } pub fn get_executor(&self) -> &Arc { From 5dcda6c9e18f1b627701a3c5cc848179f7c78779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 30 Dec 2023 20:30:38 +0000 Subject: [PATCH 2/4] Create table outside transaction so ignored unique violation doesn't block rest of transaction --- nexus/catalog/src/lib.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 69bf8986be..df5d51bd2b 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -41,7 +41,9 @@ impl<'a> Migration<'a> { return Err(anyhow!("migration filename must contain __")); }; let Ok(version) = f[..__idx].parse() else { - return Err(anyhow!("migration filename must have number between V & __")); + return Err(anyhow!( + "migration filename must have number between V & __" + )); }; let name = &f[__idx + 2..]; Ok(Self { @@ -128,8 +130,8 @@ impl Catalog { .map(Migration::new) .collect::>>()?; migrations.sort(); - let tx = self.pg.transaction().await?; - let create = tx + let create = self + .pg .query( "create table if not exists refinery_schema_history(\ version int4 primary key, name text, applied_on text, checksum text)", @@ -142,6 +144,7 @@ impl Catalog { } } + let tx = self.pg.transaction().await?; tx.execute( "lock table refinery_schema_history in share update exclusive mode", &[], From ef4862dd9fcdcb2d18818fe9249463b3d19a1e0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 28 Dec 2023 19:15:55 +0000 Subject: [PATCH 3/4] nexus: fix tests breaking unless single threaded --- .github/workflows/ci.yml | 2 +- nexus/server/tests/server_test.rs | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8452cea078..9dac618abf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,7 +64,7 @@ jobs: working-directory: ./nexus - name: cargo test - run: cargo test -- --test-threads=1 + run: cargo test working-directory: ./nexus env: RUST_BACKTRACE: 1 diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 9412199fb9..5b0938fb60 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -4,6 +4,7 @@ use std::{ io::{prelude::*, BufReader, Write}, path::Path, process::Command, + sync::atomic::{AtomicU16, Ordering}, thread, time::Duration, }; @@ -38,13 +39,20 @@ fn read_queries(filename: impl AsRef) -> Vec { struct PeerDBServer { server: std::process::Child, + port: u16, } impl PeerDBServer { fn new() -> Self { + static PORT_OFFSET_COUNTER: AtomicU16 = AtomicU16::new(0); + + let port_offset = PORT_OFFSET_COUNTER.fetch_add(1, Ordering::Relaxed); + let server_port = 9900 + port_offset; + let server_port_str = server_port.to_string(); + let console_bind = format!("127.0.0.1:{}", 6669 + port_offset); let mut server_start = Command::new("cargo"); - server_start.envs(std::env::vars()); - server_start.args(["run"]); + server_start.envs(std::env::vars().into_iter().chain([("TOKIO_CONSOLE_BIND".into(), console_bind)].iter().cloned())); + server_start.args(["run", "--", "--port", &server_port_str]); tracing::info!("Starting server..."); let f = File::create("server.log").expect("unable to open server.log"); @@ -55,12 +63,12 @@ impl PeerDBServer { thread::sleep(Duration::from_millis(5000)); tracing::info!("peerdb-server Server started"); - Self { server: child } + Self { server: child, port: server_port } } fn connect_dying(&self) -> Client { - let connection_string = "host=localhost port=9900 password=peerdb user=peerdb"; - let mut client_result = Client::connect(connection_string, NoTls); + let connection_string = format!("host=localhost port={} password=peerdb user=peerdb", self.port); + let mut client_result = Client::connect(&connection_string, NoTls); let mut client_established = false; let max_attempts = 10; @@ -73,7 +81,7 @@ impl PeerDBServer { Err(_) => { attempts += 1; thread::sleep(Duration::from_millis(2000 * attempts)); - client_result = Client::connect(connection_string, NoTls); + client_result = Client::connect(&connection_string, NoTls); } } } From 2b981315e765e7a4722cfd95777ee560d575fa06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 28 Dec 2023 20:03:20 +0000 Subject: [PATCH 4/4] rustfmt --- nexus/server/tests/server_test.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 5b0938fb60..0c20130a2b 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -51,7 +51,13 @@ impl PeerDBServer { let server_port_str = server_port.to_string(); let console_bind = format!("127.0.0.1:{}", 6669 + port_offset); let mut server_start = Command::new("cargo"); - server_start.envs(std::env::vars().into_iter().chain([("TOKIO_CONSOLE_BIND".into(), console_bind)].iter().cloned())); + server_start.envs( + std::env::vars().into_iter().chain( + [("TOKIO_CONSOLE_BIND".into(), console_bind)] + .iter() + .cloned(), + ), + ); server_start.args(["run", "--", "--port", &server_port_str]); tracing::info!("Starting server..."); @@ -63,11 +69,17 @@ impl PeerDBServer { thread::sleep(Duration::from_millis(5000)); tracing::info!("peerdb-server Server started"); - Self { server: child, port: server_port } + Self { + server: child, + port: server_port, + } } fn connect_dying(&self) -> Client { - let connection_string = format!("host=localhost port={} password=peerdb user=peerdb", self.port); + let connection_string = format!( + "host=localhost port={} password=peerdb user=peerdb", + self.port + ); let mut client_result = Client::connect(&connection_string, NoTls); let mut client_established = false;