diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 6893b46457..8a29fd072b 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -98,9 +98,11 @@ pub enum PeerDDL { if_not_exists: bool, }, CreateMirrorForCDC { + if_not_exists: bool, flow_job: FlowJob, }, CreateMirrorForSelect { + if_not_exists: bool, qrep_flow_job: QRepFlowJob, }, ExecuteMirrorForSelect { @@ -136,7 +138,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { if_not_exists: *if_not_exists, })) } - Statement::CreateMirror { create_mirror } => { + Statement::CreateMirror { + if_not_exists, + create_mirror, + } => { match create_mirror { CDC(cdc) => { let mut flow_job_table_mappings = vec![]; @@ -243,7 +248,6 @@ impl StatementAnalyzer for PeerDDLAnalyzer { Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), _ => None, }; - let flow_job = FlowJob { name: cdc.mirror_name.to_string().to_lowercase(), @@ -275,7 +279,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { )); } - Ok(Some(PeerDDL::CreateMirrorForCDC { flow_job })) + Ok(Some(PeerDDL::CreateMirrorForCDC { + if_not_exists: *if_not_exists, + flow_job, + })) } Select(select) => { let mut raw_options = HashMap::new(); @@ -304,7 +311,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { disabled, }; - Ok(Some(PeerDDL::CreateMirrorForSelect { qrep_flow_job })) + Ok(Some(PeerDDL::CreateMirrorForSelect { + if_not_exists: *if_not_exists, + qrep_flow_job, + })) } } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 943c6f6188..7089f93ee2 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use catalog::{Catalog, CatalogConfig}; +use catalog::{Catalog, CatalogConfig, WorkflowDetails}; use clap::Parser; use cursor::PeerCursors; use dashmap::DashMap; @@ -36,7 +36,7 @@ use pt::{ peerdb_peers::{peer::Config, Peer}, }; use rand::Rng; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, MutexGuard}; use tokio::{io::AsyncWriteExt, net::TcpListener}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -158,6 +158,52 @@ impl NexusBackend { !unsupported_peer_types.contains(&peer_type) } + async fn check_for_mirror( + catalog: &MutexGuard<'_, Catalog>, + flow_name: String, + ) -> PgWireResult> { + let workflow_details = catalog + .get_workflow_details_for_flow_job(&flow_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to query catalog for job metadata: {:?}", err), + })) + })?; + Ok(workflow_details) + } + + async fn get_peer_of_mirror( + catalog: &MutexGuard<'_, Catalog>, + peer_name: String, + ) -> PgWireResult { + let peer = catalog.get_peer(&peer_name).await.map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err), + })) + })?; + Ok(peer) + } + + fn handle_mirror_existence( + if_not_exists: bool, + flow_name: String, + ) -> PgWireResult>> { + if if_not_exists { + let existing_mirror_success = "MIRROR ALREADY EXISTS"; + Ok(vec![Response::Execution(Tag::new_for_execution( + existing_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("mirror already exists: {:?}", flow_name), + )))) + } + } + async fn handle_query<'a>( &self, nexus_stmt: NexusStatement, @@ -199,79 +245,82 @@ impl NexusBackend { "OK", None, ))]) } - PeerDDL::CreateMirrorForCDC { flow_job } => { + PeerDDL::CreateMirrorForCDC { + if_not_exists, + flow_job, + } => { if self.flow_handler.is_none() { return Err(PgWireError::ApiError(Box::new(PgError::Internal { err_msg: "flow service is not configured".to_owned(), }))); } - let catalog = self.catalog.lock().await; - catalog - .create_flow_job_entry(&flow_job) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to create mirror job entry: {:?}", err), - })) - })?; - - // get source and destination peers - let src_peer = + let mirror_details = + Self::check_for_mirror(&catalog, flow_job.name.clone()).await?; + if mirror_details.is_none() { catalog - .get_peer(&flow_job.source_peer) + .create_flow_job_entry(&flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get source peer: {:?}", err), + err_msg: format!( + "unable to create mirror job entry: {:?}", + err + ), })) })?; - let dst_peer = - catalog - .get_peer(&flow_job.target_peer) + // get source and destination peers + let src_peer = + Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone()) + .await?; + let dst_peer = + Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone()) + .await?; + + // make a request to the flow service to start the job. + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let workflow_id = flow_handler + .start_peer_flow_job(&flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get destination peer: {:?}", err), + err_msg: format!("unable to submit job: {:?}", err), })) })?; - // make a request to the flow service to start the job. - let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let workflow_id = flow_handler - .start_peer_flow_job(&flow_job, src_peer, dst_peer) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to submit job: {:?}", err), - })) - })?; - - catalog - .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to save job metadata: {:?}", err), - })) - })?; + catalog + .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to save job metadata: {:?}", err), + })) + })?; - let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]) + let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]) + } else { + Self::handle_mirror_existence(if_not_exists, flow_job.name) + } } - PeerDDL::CreateMirrorForSelect { qrep_flow_job } => { + PeerDDL::CreateMirrorForSelect { + if_not_exists, + qrep_flow_job, + } => { if self.flow_handler.is_none() { return Err(PgWireError::ApiError(Box::new(PgError::Internal { err_msg: "flow service is not configured".to_owned(), }))); } - { - let catalog = self.catalog.lock().await; + let catalog = self.catalog.lock().await; + let mirror_details = + Self::check_for_mirror(&catalog, qrep_flow_job.name.clone()).await?; + if mirror_details.is_none() { catalog .create_qrep_flow_job_entry(&qrep_flow_job) .await @@ -283,22 +332,25 @@ impl NexusBackend { ), })) })?; - } - if qrep_flow_job.disabled { + if qrep_flow_job.disabled { + let create_mirror_success = + format!("CREATE MIRROR {}", qrep_flow_job.name); + return Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]); + } + + let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - return Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new_for_execution( &create_mirror_success, None, - ))]); + ))]) + } else { + Self::handle_mirror_existence(if_not_exists, qrep_flow_job.name) } - - let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; - let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]) } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { if self.flow_handler.is_none() { diff --git a/nexus/server/tests/create_peers/create_bq.rs b/nexus/server/tests/create_peers/create_bq.rs index 39006aea48..766a4763c8 100644 --- a/nexus/server/tests/create_peers/create_bq.rs +++ b/nexus/server/tests/create_peers/create_bq.rs @@ -16,7 +16,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER bq_test FROM BIGQUERY WITH + CREATE PEER IF NOT EXISTS bq_test FROM BIGQUERY WITH ( type = '{}', project_id = '{}', @@ -44,18 +44,5 @@ pub fn create(nexus: &mut Client) { "test_dataset" ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(bq_test) already exists"); - if already_exists_case { - return (); - } - panic!("failed to create bigquery peer: {}", err) - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/create_peers/create_pg.rs b/nexus/server/tests/create_peers/create_pg.rs index 43f084a2a3..308bf818d9 100644 --- a/nexus/server/tests/create_peers/create_pg.rs +++ b/nexus/server/tests/create_peers/create_pg.rs @@ -34,7 +34,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER pg_test FROM POSTGRES WITH + CREATE PEER IF NOT EXISTS pg_test FROM POSTGRES WITH ( host = '{}', port = '{}', @@ -45,17 +45,5 @@ pub fn create(nexus: &mut Client) { &peer_host, &peer_port, &peer_user, &peer_password, &peer_database ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(pg_test) already exists"); - if !already_exists_case { - panic!("failed to create pg peer: {}", err) - } - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/create_peers/create_sf.rs b/nexus/server/tests/create_peers/create_sf.rs index f241ec23e8..0e4c4c5096 100644 --- a/nexus/server/tests/create_peers/create_sf.rs +++ b/nexus/server/tests/create_peers/create_sf.rs @@ -18,7 +18,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER sf_test FROM SNOWFLAKE WITH + CREATE PEER IF NOT EXISTS sf_test FROM SNOWFLAKE WITH ( account_id = '{}', username = '{}', @@ -39,18 +39,5 @@ pub fn create(nexus: &mut Client) { &sf_config.s3_integration ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(sf_test) already exists"); - if already_exists_case { - return (); - } - panic!("failed to create snowflake peer:{}", err) - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 883b84ace2..92bc0e62d8 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 883b84ace2f55a0b4679dc15c4de0f6e7eb5af03 +Subproject commit 92bc0e62d83a957911a3b22a869208fa822a840b