diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 9900e59f31..e2a9120933 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -55,7 +55,10 @@ dependencies = [ "anyhow", "async-trait", "catalog", + "clap", + "dotenvy", "flow-rs", + "futures", "lazy_static", "pem 1.1.1", "pt", diff --git a/nexus/analyzer/Cargo.toml b/nexus/analyzer/Cargo.toml index 1680d9dcb9..5632e8db58 100644 --- a/nexus/analyzer/Cargo.toml +++ b/nexus/analyzer/Cargo.toml @@ -9,7 +9,10 @@ edition = "2021" anyhow = "1.0" async-trait = "0.1" catalog = { path = "../catalog" } +clap = { version = "4.0", features = ["derive", "env"] } +dotenvy = "0.15.7" flow-rs = { path = "../flow-rs" } +futures = "0.3" lazy_static = "1.4" pem = "1.1.0" pt = { path = "../pt" } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 6893b46457..0b86fb725b 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -1,12 +1,8 @@ // multipass statement analyzer. -use std::{ - collections::{HashMap, HashSet}, - ops::ControlFlow, - vec, -}; - use anyhow::Context; +use catalog::{Catalog, CatalogConfig}; +use clap::Parser; use pt::{ flow_model::{FlowJob, FlowJobTableMapping, FlowSyncMode, QRepFlowJob}, peerdb_peers::{ @@ -17,9 +13,81 @@ use pt::{ use qrep::process_options; use sqlparser::ast::CreateMirror::{Select, CDC}; use sqlparser::ast::{visit_relations, visit_statements, FetchDirection, SqlOption, Statement}; +use std::{ + collections::{HashMap, HashSet}, + ops::ControlFlow, + vec, +}; mod qrep; +#[derive(Parser, Debug)] +struct CatalogArgs { + #[clap(long, default_value = "localhost", env = "PEERDB_CATALOG_HOST")] + catalog_host: String, + #[clap(long, default_value_t = 5432, env = "PEERDB_CATALOG_PORT")] + catalog_port: u16, + #[clap(long, default_value = "postgres", env = "PEERDB_CATALOG_USER")] + catalog_user: String, + #[clap(long, default_value = "postgres", env = "PEERDB_CATALOG_PASSWORD")] + catalog_password: String, + #[clap(long, default_value = "postgres", env = "PEERDB_CATALOG_DATABASE")] + catalog_database: String, +} + +fn stage_check(stage: String, peer_type: DbType) -> Result<(), anyhow::Error> { + if !stage.is_empty() && !stage.starts_with("s3://") && peer_type == DbType::Snowflake { + return Err(anyhow::anyhow!( + "Staging path for Snowflake must either be an S3 URL or an empty string for staging inside PeerDB." + )); + } + Ok(()) +} + +fn mirror_input_checks(flow_job: &FlowJob) -> anyhow::Result { + dotenvy::dotenv().ok(); + let catalog_args = CatalogArgs::parse(); + let catalog_config = CatalogConfig { + host: catalog_args.catalog_host.clone(), + port: catalog_args.catalog_port, + user: catalog_args.catalog_user.clone(), + password: catalog_args.catalog_password.clone(), + database: catalog_args.catalog_database.clone(), + }; + + let new_catalog = futures::executor::block_on(Catalog::new(&catalog_config))?; + let destination_peer_type = futures::executor::block_on( + new_catalog.get_peer_type_by_name(flow_job.target_peer.clone()), + )?; + + let path_missing_err = "missing or invalid for your destination peer"; + + // Error reporting + if Some(FlowSyncMode::Avro) == flow_job.snapshot_sync_mode { + if flow_job.snapshot_staging_path.is_none() { + return Err(anyhow::anyhow!(format!( + "{} {}", + "snapshot_staging_path", path_missing_err + ))); + } else if let Some(snapshot_stage) = flow_job.snapshot_staging_path.clone() { + stage_check(snapshot_stage, destination_peer_type)?; + } + } + + if Some(FlowSyncMode::Avro) == flow_job.cdc_sync_mode { + if flow_job.cdc_staging_path.is_none() { + return Err(anyhow::anyhow!(format!( + "{} {}", + "cdc_staging_path", path_missing_err + ))); + } else if let Some(cdc_stage) = flow_job.cdc_staging_path.clone() { + stage_check(cdc_stage, destination_peer_type)?; + } + } + + Ok(true) +} + pub trait StatementAnalyzer { type Output; @@ -266,14 +334,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { push_parallelism, }; - // Error reporting - if Some(FlowSyncMode::Avro) == flow_job.snapshot_sync_mode - && flow_job.snapshot_staging_path.is_none() - { - return Err(anyhow::anyhow!( - "snapshot_staging_path must be set for AVRO snapshot mode." - )); - } + mirror_input_checks(&flow_job)?; Ok(Some(PeerDDL::CreateMirrorForCDC { flow_job })) } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index bab49b945c..837bfdb773 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -197,6 +197,24 @@ impl Catalog { .context("Failed to get peer type") } + // get the database type for a given peer name + pub async fn get_peer_type_by_name(&self, peer_name: String) -> anyhow::Result { + let stmt = self + .pg + .prepare_typed( + "SELECT type FROM peers WHERE name = $1", + &[types::Type::TEXT], + ) + .await?; + + self.pg + .query_opt(&stmt, &[&peer_name]) + .await? + .map(|row| row.get(0)) + .map(|r#type| DbType::from_i32(r#type).unwrap()) // if row was inserted properly, this should never fail + .context("Failed to get peer type") + } + pub async fn get_peers(&self) -> anyhow::Result> { let stmt = self .pg diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 9412199fb9..2a905d64d0 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -223,6 +223,64 @@ fn query_unknown_peer_doesnt_crash_server() { assert!(res.is_ok()); } +#[test] +fn mirror_with_bad_staging_path_should_err() { + let server = PeerDBServer::new(); + let mut client = server.connect_dying(); + let cdc_query = "CREATE MIRROR fail_cdc + FROM pg_test TO bq_test + WITH TABLE MAPPING ( + public.cats:cats + ) + WITH ( + cdc_sync_mode = 'avro' + );"; + let res = client.simple_query(cdc_query); + assert!(res.is_err()); + if let Err(e) = res { + assert!(e + .to_string() + .contains("cdc_staging_path missing or invalid for your destination peer")); + } + let snapshot_query = "CREATE MIRROR fail_snapshot + FROM pg_test TO bq_test + WITH TABLE MAPPING ( + public.cats:cats + ) + WITH ( + snapshot_sync_mode = 'avro' + );"; + let res = client.simple_query(snapshot_query); + assert!(res.is_err()); + if let Err(e) = res { + assert!(e + .to_string() + .contains("snapshot_staging_path missing or invalid for your destination peer")); + } +} + +#[test] +fn snowflake_mirror_errs_for_bad_stage() { + let server = PeerDBServer::new(); + let mut client = server.connect_dying(); + let sf_query = "CREATE MIRROR fail_cdc + FROM pg_test TO sf_test + WITH TABLE MAPPING ( + public.cats:cats + ) + WITH ( + snapshot_sync_mode = 'avro', + snapshot_staging_path = 'something' + );"; + let res = client.simple_query(sf_query); + assert!(res.is_err()); + if let Err(e) = res { + assert!(e + .to_string() + .contains("Staging path for Snowflake must either be an S3 URL or an empty string")); + } +} + #[test] #[ignore = "requires some work for extended query prepares on bigquery."] fn extended_query_protocol_no_params_bq() {