Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds error reporting for CDC Mirror #335

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions nexus/analyzer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ edition = "2021"
anyhow = "1.0"
async-trait = "0.1"
catalog = { path = "../catalog" }
clap = { version = "4.0", features = ["derive", "env"] }
dotenvy = "0.15.7"
flow-rs = { path = "../flow-rs" }
futures = "0.3"
lazy_static = "1.4"
pem = "1.1.0"
pt = { path = "../pt" }
Expand Down
89 changes: 75 additions & 14 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
// multipass statement analyzer.

use std::{
collections::{HashMap, HashSet},
ops::ControlFlow,
vec,
};

use anyhow::Context;
use catalog::{Catalog, CatalogConfig};
use clap::Parser;
use pt::{
flow_model::{FlowJob, FlowJobTableMapping, FlowSyncMode, QRepFlowJob},
peerdb_peers::{
Expand All @@ -17,9 +13,81 @@ use pt::{
use qrep::process_options;
use sqlparser::ast::CreateMirror::{Select, CDC};
use sqlparser::ast::{visit_relations, visit_statements, FetchDirection, SqlOption, Statement};
use std::{
collections::{HashMap, HashSet},
ops::ControlFlow,
vec,
};

mod qrep;

#[derive(Parser, Debug)]
struct CatalogArgs {
#[clap(long, default_value = "localhost", env = "PEERDB_CATALOG_HOST")]
catalog_host: String,
#[clap(long, default_value_t = 5432, env = "PEERDB_CATALOG_PORT")]
catalog_port: u16,
#[clap(long, default_value = "postgres", env = "PEERDB_CATALOG_USER")]
catalog_user: String,
#[clap(long, default_value = "postgres", env = "PEERDB_CATALOG_PASSWORD")]
catalog_password: String,
#[clap(long, default_value = "postgres", env = "PEERDB_CATALOG_DATABASE")]
catalog_database: String,
}

fn stage_check(stage: String, peer_type: DbType) -> Result<(), anyhow::Error> {
if !stage.is_empty() && !stage.starts_with("s3://") && peer_type == DbType::Snowflake {
return Err(anyhow::anyhow!(
"Staging path for Snowflake must either be an S3 URL or an empty string for staging inside PeerDB."
));
}
Ok(())
}

fn mirror_input_checks(flow_job: &FlowJob) -> anyhow::Result<bool> {
dotenvy::dotenv().ok();
let catalog_args = CatalogArgs::parse();
let catalog_config = CatalogConfig {
host: catalog_args.catalog_host.clone(),
port: catalog_args.catalog_port,
user: catalog_args.catalog_user.clone(),
password: catalog_args.catalog_password.clone(),
database: catalog_args.catalog_database.clone(),
};

let new_catalog = futures::executor::block_on(Catalog::new(&catalog_config))?;
let destination_peer_type = futures::executor::block_on(
new_catalog.get_peer_type_by_name(flow_job.target_peer.clone()),
)?;

let path_missing_err = "missing or invalid for your destination peer";

// Error reporting
if Some(FlowSyncMode::Avro) == flow_job.snapshot_sync_mode {
if flow_job.snapshot_staging_path.is_none() {
return Err(anyhow::anyhow!(format!(
"{} {}",
"snapshot_staging_path", path_missing_err
)));
} else if let Some(snapshot_stage) = flow_job.snapshot_staging_path.clone() {
stage_check(snapshot_stage, destination_peer_type)?;
}
}

if Some(FlowSyncMode::Avro) == flow_job.cdc_sync_mode {
if flow_job.cdc_staging_path.is_none() {
return Err(anyhow::anyhow!(format!(
"{} {}",
"cdc_staging_path", path_missing_err
)));
} else if let Some(cdc_stage) = flow_job.cdc_staging_path.clone() {
stage_check(cdc_stage, destination_peer_type)?;
}
}

Ok(true)
}

pub trait StatementAnalyzer {
type Output;

Expand Down Expand Up @@ -266,14 +334,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
push_parallelism,
};

// Error reporting
if Some(FlowSyncMode::Avro) == flow_job.snapshot_sync_mode
&& flow_job.snapshot_staging_path.is_none()
{
return Err(anyhow::anyhow!(
"snapshot_staging_path must be set for AVRO snapshot mode."
));
}
mirror_input_checks(&flow_job)?;

Ok(Some(PeerDDL::CreateMirrorForCDC { flow_job }))
}
Expand Down
18 changes: 18 additions & 0 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,24 @@ impl Catalog {
.context("Failed to get peer type")
}

// get the database type for a given peer name
pub async fn get_peer_type_by_name(&self, peer_name: String) -> anyhow::Result<DbType> {
let stmt = self
.pg
.prepare_typed(
"SELECT type FROM peers WHERE name = $1",
&[types::Type::TEXT],
)
.await?;

self.pg
.query_opt(&stmt, &[&peer_name])
.await?
.map(|row| row.get(0))
.map(|r#type| DbType::from_i32(r#type).unwrap()) // if row was inserted properly, this should never fail
.context("Failed to get peer type")
}

pub async fn get_peers(&self) -> anyhow::Result<HashMap<String, Peer>> {
let stmt = self
.pg
Expand Down
58 changes: 58 additions & 0 deletions nexus/server/tests/server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,64 @@ fn query_unknown_peer_doesnt_crash_server() {
assert!(res.is_ok());
}

#[test]
fn mirror_with_bad_staging_path_should_err() {
let server = PeerDBServer::new();
let mut client = server.connect_dying();
let cdc_query = "CREATE MIRROR fail_cdc
FROM pg_test TO bq_test
WITH TABLE MAPPING (
public.cats:cats
)
WITH (
cdc_sync_mode = 'avro'
);";
let res = client.simple_query(cdc_query);
assert!(res.is_err());
if let Err(e) = res {
assert!(e
.to_string()
.contains("cdc_staging_path missing or invalid for your destination peer"));
}
let snapshot_query = "CREATE MIRROR fail_snapshot
FROM pg_test TO bq_test
WITH TABLE MAPPING (
public.cats:cats
)
WITH (
snapshot_sync_mode = 'avro'
);";
let res = client.simple_query(snapshot_query);
assert!(res.is_err());
if let Err(e) = res {
assert!(e
.to_string()
.contains("snapshot_staging_path missing or invalid for your destination peer"));
}
}

#[test]
fn snowflake_mirror_errs_for_bad_stage() {
let server = PeerDBServer::new();
let mut client = server.connect_dying();
let sf_query = "CREATE MIRROR fail_cdc
FROM pg_test TO sf_test
WITH TABLE MAPPING (
public.cats:cats
)
WITH (
snapshot_sync_mode = 'avro',
snapshot_staging_path = 'something'
);";
let res = client.simple_query(sf_query);
assert!(res.is_err());
if let Err(e) = res {
assert!(e
.to_string()
.contains("Staging path for Snowflake must either be an S3 URL or an empty string"));
}
}

#[test]
#[ignore = "requires some work for extended query prepares on bigquery."]
fn extended_query_protocol_no_params_bq() {
Expand Down
Loading