diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 6893b46457..8a29fd072b 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -98,9 +98,11 @@ pub enum PeerDDL { if_not_exists: bool, }, CreateMirrorForCDC { + if_not_exists: bool, flow_job: FlowJob, }, CreateMirrorForSelect { + if_not_exists: bool, qrep_flow_job: QRepFlowJob, }, ExecuteMirrorForSelect { @@ -136,7 +138,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { if_not_exists: *if_not_exists, })) } - Statement::CreateMirror { create_mirror } => { + Statement::CreateMirror { + if_not_exists, + create_mirror, + } => { match create_mirror { CDC(cdc) => { let mut flow_job_table_mappings = vec![]; @@ -243,7 +248,6 @@ impl StatementAnalyzer for PeerDDLAnalyzer { Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), _ => None, }; - let flow_job = FlowJob { name: cdc.mirror_name.to_string().to_lowercase(), @@ -275,7 +279,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { )); } - Ok(Some(PeerDDL::CreateMirrorForCDC { flow_job })) + Ok(Some(PeerDDL::CreateMirrorForCDC { + if_not_exists: *if_not_exists, + flow_job, + })) } Select(select) => { let mut raw_options = HashMap::new(); @@ -304,7 +311,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { disabled, }; - Ok(Some(PeerDDL::CreateMirrorForSelect { qrep_flow_job })) + Ok(Some(PeerDDL::CreateMirrorForSelect { + if_not_exists: *if_not_exists, + qrep_flow_job, + })) } } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 943c6f6188..18a94562bf 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -199,79 +199,134 @@ impl NexusBackend { "OK", None, ))]) } - PeerDDL::CreateMirrorForCDC { flow_job } => { + PeerDDL::CreateMirrorForCDC { + if_not_exists, + flow_job, + } => { if self.flow_handler.is_none() { return Err(PgWireError::ApiError(Box::new(PgError::Internal { err_msg: "flow service is not configured".to_owned(), }))); } - let catalog = self.catalog.lock().await; - catalog - .create_flow_job_entry(&flow_job) + let workflow_details = catalog + .get_workflow_details_for_flow_job(&flow_job.name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to create mirror job entry: {:?}", err), + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), })) })?; - - // get source and destination peers - let src_peer = + tracing::info!( + "got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + if workflow_details.is_none() { catalog - .get_peer(&flow_job.source_peer) + .create_flow_job_entry(&flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get source peer: {:?}", err), + err_msg: format!( + "unable to create mirror job entry: {:?}", + err + ), })) })?; - let dst_peer = - catalog - .get_peer(&flow_job.target_peer) + // get source and destination peers + let src_peer = + catalog + .get_peer(&flow_job.source_peer) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get source peer: {:?}", err), + })) + })?; + + let dst_peer = + catalog + .get_peer(&flow_job.target_peer) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to get destination peer: {:?}", + err + ), + })) + })?; + + // make a request to the flow service to start the job. + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let workflow_id = flow_handler + .start_peer_flow_job(&flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get destination peer: {:?}", err), + err_msg: format!("unable to submit job: {:?}", err), })) })?; - // make a request to the flow service to start the job. - let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let workflow_id = flow_handler - .start_peer_flow_job(&flow_job, src_peer, dst_peer) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to submit job: {:?}", err), - })) - })?; - - catalog - .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to save job metadata: {:?}", err), - })) - })?; + catalog + .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to save job metadata: {:?}", err), + })) + })?; - let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]) + let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]) + } else if if_not_exists { + let existing_mirror_success = "MIRROR ALREADY EXISTS"; + Ok(vec![Response::Execution(Tag::new_for_execution( + existing_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("mirror already exists: {:?}", flow_job.name), + )))) + } } - PeerDDL::CreateMirrorForSelect { qrep_flow_job } => { + PeerDDL::CreateMirrorForSelect { + if_not_exists, + qrep_flow_job, + } => { if self.flow_handler.is_none() { return Err(PgWireError::ApiError(Box::new(PgError::Internal { err_msg: "flow service is not configured".to_owned(), }))); } - { - let catalog = self.catalog.lock().await; + let catalog = self.catalog.lock().await; + let workflow_details = catalog + .get_workflow_details_for_flow_job(&qrep_flow_job.name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), + })) + })?; + tracing::info!( + "got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + if workflow_details.is_none() { catalog .create_qrep_flow_job_entry(&qrep_flow_job) .await @@ -283,22 +338,35 @@ impl NexusBackend { ), })) })?; - } - if qrep_flow_job.disabled { + if qrep_flow_job.disabled { + let create_mirror_success = + format!("CREATE MIRROR {}", qrep_flow_job.name); + return Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]); + } + + let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - return Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new_for_execution( &create_mirror_success, None, - ))]); + ))]) + } else if if_not_exists { + let existing_mirror_success = "MIRROR ALREADY EXISTS"; + Ok(vec![Response::Execution(Tag::new_for_execution( + existing_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("mirror already exists: {:?}", qrep_flow_job.name), + )))) } - - let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; - let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]) } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { if self.flow_handler.is_none() { diff --git a/nexus/server/tests/create_peers/create_bq.rs b/nexus/server/tests/create_peers/create_bq.rs index 39006aea48..766a4763c8 100644 --- a/nexus/server/tests/create_peers/create_bq.rs +++ b/nexus/server/tests/create_peers/create_bq.rs @@ -16,7 +16,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER bq_test FROM BIGQUERY WITH + CREATE PEER IF NOT EXISTS bq_test FROM BIGQUERY WITH ( type = '{}', project_id = '{}', @@ -44,18 +44,5 @@ pub fn create(nexus: &mut Client) { "test_dataset" ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(bq_test) already exists"); - if already_exists_case { - return (); - } - panic!("failed to create bigquery peer: {}", err) - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/create_peers/create_pg.rs b/nexus/server/tests/create_peers/create_pg.rs index 43f084a2a3..308bf818d9 100644 --- a/nexus/server/tests/create_peers/create_pg.rs +++ b/nexus/server/tests/create_peers/create_pg.rs @@ -34,7 +34,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER pg_test FROM POSTGRES WITH + CREATE PEER IF NOT EXISTS pg_test FROM POSTGRES WITH ( host = '{}', port = '{}', @@ -45,17 +45,5 @@ pub fn create(nexus: &mut Client) { &peer_host, &peer_port, &peer_user, &peer_password, &peer_database ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(pg_test) already exists"); - if !already_exists_case { - panic!("failed to create pg peer: {}", err) - } - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/create_peers/create_sf.rs b/nexus/server/tests/create_peers/create_sf.rs index f241ec23e8..0e4c4c5096 100644 --- a/nexus/server/tests/create_peers/create_sf.rs +++ b/nexus/server/tests/create_peers/create_sf.rs @@ -18,7 +18,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER sf_test FROM SNOWFLAKE WITH + CREATE PEER IF NOT EXISTS sf_test FROM SNOWFLAKE WITH ( account_id = '{}', username = '{}', @@ -39,18 +39,5 @@ pub fn create(nexus: &mut Client) { &sf_config.s3_integration ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(sf_test) already exists"); - if already_exists_case { - return (); - } - panic!("failed to create snowflake peer:{}", err) - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 9412199fb9..bd7fceffc3 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -223,6 +223,24 @@ fn query_unknown_peer_doesnt_crash_server() { assert!(res.is_ok()); } +#[test] +fn mirror_if_not_exists() { + let server = PeerDBServer::new(); + let mut client = server.connect_dying(); + + create_peers::create_bq::create(&mut client); + create_peers::create_pg::create(&mut client); + // the server should not crash when a query is sent to an unknown peer. + let query = "SELECT * FROM unknown_peer.test_table;"; + let res = client.simple_query(query); + assert!(res.is_err()); + + // assert that server is able to process a valid query after. + let query = "SELECT * FROM peers;"; + let res = client.simple_query(query); + assert!(res.is_ok()); +} + #[test] #[ignore = "requires some work for extended query prepares on bigquery."] fn extended_query_protocol_no_params_bq() { diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 883b84ace2..1532c918f7 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 883b84ace2f55a0b4679dc15c4de0f6e7eb5af03 +Subproject commit 1532c918f75cf9b03e3f902bd52298f0391f976a