From 0b9040ec85c30cb38aea5b505a610fb63224b1d1 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 15 Sep 2023 15:32:32 +0530 Subject: [PATCH 1/6] support for mirror if not exists --- nexus/analyzer/src/lib.rs | 18 +- nexus/server/src/main.rs | 172 +++++++++++++------ nexus/server/tests/create_peers/create_bq.rs | 17 +- nexus/server/tests/create_peers/create_pg.rs | 16 +- nexus/server/tests/create_peers/create_sf.rs | 17 +- nexus/server/tests/server_test.rs | 18 ++ nexus/sqlparser-rs | 2 +- 7 files changed, 159 insertions(+), 101 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 6893b46457..8a29fd072b 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -98,9 +98,11 @@ pub enum PeerDDL { if_not_exists: bool, }, CreateMirrorForCDC { + if_not_exists: bool, flow_job: FlowJob, }, CreateMirrorForSelect { + if_not_exists: bool, qrep_flow_job: QRepFlowJob, }, ExecuteMirrorForSelect { @@ -136,7 +138,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { if_not_exists: *if_not_exists, })) } - Statement::CreateMirror { create_mirror } => { + Statement::CreateMirror { + if_not_exists, + create_mirror, + } => { match create_mirror { CDC(cdc) => { let mut flow_job_table_mappings = vec![]; @@ -243,7 +248,6 @@ impl StatementAnalyzer for PeerDDLAnalyzer { Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), _ => None, }; - let flow_job = FlowJob { name: cdc.mirror_name.to_string().to_lowercase(), @@ -275,7 +279,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { )); } - Ok(Some(PeerDDL::CreateMirrorForCDC { flow_job })) + Ok(Some(PeerDDL::CreateMirrorForCDC { + if_not_exists: *if_not_exists, + flow_job, + })) } Select(select) => { let mut raw_options = HashMap::new(); @@ -304,7 +311,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { disabled, }; - Ok(Some(PeerDDL::CreateMirrorForSelect { qrep_flow_job })) + Ok(Some(PeerDDL::CreateMirrorForSelect { + if_not_exists: *if_not_exists, + qrep_flow_job, + })) } } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 943c6f6188..18a94562bf 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -199,79 +199,134 @@ impl NexusBackend { "OK", None, ))]) } - PeerDDL::CreateMirrorForCDC { flow_job } => { + PeerDDL::CreateMirrorForCDC { + if_not_exists, + flow_job, + } => { if self.flow_handler.is_none() { return Err(PgWireError::ApiError(Box::new(PgError::Internal { err_msg: "flow service is not configured".to_owned(), }))); } - let catalog = self.catalog.lock().await; - catalog - .create_flow_job_entry(&flow_job) + let workflow_details = catalog + .get_workflow_details_for_flow_job(&flow_job.name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to create mirror job entry: {:?}", err), + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), })) })?; - - // get source and destination peers - let src_peer = + tracing::info!( + "got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + if workflow_details.is_none() { catalog - .get_peer(&flow_job.source_peer) + .create_flow_job_entry(&flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get source peer: {:?}", err), + err_msg: format!( + "unable to create mirror job entry: {:?}", + err + ), })) })?; - let dst_peer = - catalog - .get_peer(&flow_job.target_peer) + // get source and destination peers + let src_peer = + catalog + .get_peer(&flow_job.source_peer) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get source peer: {:?}", err), + })) + })?; + + let dst_peer = + catalog + .get_peer(&flow_job.target_peer) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to get destination peer: {:?}", + err + ), + })) + })?; + + // make a request to the flow service to start the job. + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let workflow_id = flow_handler + .start_peer_flow_job(&flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get destination peer: {:?}", err), + err_msg: format!("unable to submit job: {:?}", err), })) })?; - // make a request to the flow service to start the job. - let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let workflow_id = flow_handler - .start_peer_flow_job(&flow_job, src_peer, dst_peer) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to submit job: {:?}", err), - })) - })?; - - catalog - .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to save job metadata: {:?}", err), - })) - })?; + catalog + .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to save job metadata: {:?}", err), + })) + })?; - let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]) + let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]) + } else if if_not_exists { + let existing_mirror_success = "MIRROR ALREADY EXISTS"; + Ok(vec![Response::Execution(Tag::new_for_execution( + existing_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("mirror already exists: {:?}", flow_job.name), + )))) + } } - PeerDDL::CreateMirrorForSelect { qrep_flow_job } => { + PeerDDL::CreateMirrorForSelect { + if_not_exists, + qrep_flow_job, + } => { if self.flow_handler.is_none() { return Err(PgWireError::ApiError(Box::new(PgError::Internal { err_msg: "flow service is not configured".to_owned(), }))); } - { - let catalog = self.catalog.lock().await; + let catalog = self.catalog.lock().await; + let workflow_details = catalog + .get_workflow_details_for_flow_job(&qrep_flow_job.name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), + })) + })?; + tracing::info!( + "got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + if workflow_details.is_none() { catalog .create_qrep_flow_job_entry(&qrep_flow_job) .await @@ -283,22 +338,35 @@ impl NexusBackend { ), })) })?; - } - if qrep_flow_job.disabled { + if qrep_flow_job.disabled { + let create_mirror_success = + format!("CREATE MIRROR {}", qrep_flow_job.name); + return Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]); + } + + let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - return Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new_for_execution( &create_mirror_success, None, - ))]); + ))]) + } else if if_not_exists { + let existing_mirror_success = "MIRROR ALREADY EXISTS"; + Ok(vec![Response::Execution(Tag::new_for_execution( + existing_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("mirror already exists: {:?}", qrep_flow_job.name), + )))) } - - let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; - let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]) } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { if self.flow_handler.is_none() { diff --git a/nexus/server/tests/create_peers/create_bq.rs b/nexus/server/tests/create_peers/create_bq.rs index 39006aea48..766a4763c8 100644 --- a/nexus/server/tests/create_peers/create_bq.rs +++ b/nexus/server/tests/create_peers/create_bq.rs @@ -16,7 +16,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER bq_test FROM BIGQUERY WITH + CREATE PEER IF NOT EXISTS bq_test FROM BIGQUERY WITH ( type = '{}', project_id = '{}', @@ -44,18 +44,5 @@ pub fn create(nexus: &mut Client) { "test_dataset" ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(bq_test) already exists"); - if already_exists_case { - return (); - } - panic!("failed to create bigquery peer: {}", err) - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/create_peers/create_pg.rs b/nexus/server/tests/create_peers/create_pg.rs index 43f084a2a3..308bf818d9 100644 --- a/nexus/server/tests/create_peers/create_pg.rs +++ b/nexus/server/tests/create_peers/create_pg.rs @@ -34,7 +34,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER pg_test FROM POSTGRES WITH + CREATE PEER IF NOT EXISTS pg_test FROM POSTGRES WITH ( host = '{}', port = '{}', @@ -45,17 +45,5 @@ pub fn create(nexus: &mut Client) { &peer_host, &peer_port, &peer_user, &peer_password, &peer_database ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(pg_test) already exists"); - if !already_exists_case { - panic!("failed to create pg peer: {}", err) - } - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/create_peers/create_sf.rs b/nexus/server/tests/create_peers/create_sf.rs index f241ec23e8..0e4c4c5096 100644 --- a/nexus/server/tests/create_peers/create_sf.rs +++ b/nexus/server/tests/create_peers/create_sf.rs @@ -18,7 +18,7 @@ pub fn create(nexus: &mut Client) { let create_stmt = format!( " - CREATE PEER sf_test FROM SNOWFLAKE WITH + CREATE PEER IF NOT EXISTS sf_test FROM SNOWFLAKE WITH ( account_id = '{}', username = '{}', @@ -39,18 +39,5 @@ pub fn create(nexus: &mut Client) { &sf_config.s3_integration ); - let creation_status = nexus.simple_query(&create_stmt); - match creation_status { - Ok(_) => (), - Err(err) => { - let create_err = err - .as_db_error() - .expect("failed to unwrap create peer error"); - let already_exists_case = create_err.message().contains("(sf_test) already exists"); - if already_exists_case { - return (); - } - panic!("failed to create snowflake peer:{}", err) - } - } + let _ = nexus.simple_query(&create_stmt); } diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 9412199fb9..bd7fceffc3 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -223,6 +223,24 @@ fn query_unknown_peer_doesnt_crash_server() { assert!(res.is_ok()); } +#[test] +fn mirror_if_not_exists() { + let server = PeerDBServer::new(); + let mut client = server.connect_dying(); + + create_peers::create_bq::create(&mut client); + create_peers::create_pg::create(&mut client); + // the server should not crash when a query is sent to an unknown peer. + let query = "SELECT * FROM unknown_peer.test_table;"; + let res = client.simple_query(query); + assert!(res.is_err()); + + // assert that server is able to process a valid query after. + let query = "SELECT * FROM peers;"; + let res = client.simple_query(query); + assert!(res.is_ok()); +} + #[test] #[ignore = "requires some work for extended query prepares on bigquery."] fn extended_query_protocol_no_params_bq() { diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 883b84ace2..1532c918f7 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 883b84ace2f55a0b4679dc15c4de0f6e7eb5af03 +Subproject commit 1532c918f75cf9b03e3f902bd52298f0391f976a From 08fd2c81f9b0e19e1a5e6f5b846dce8d0c68701b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 15 Sep 2023 16:14:05 +0530 Subject: [PATCH 2/6] add test --- nexus/server/tests/server_test.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index bd7fceffc3..0b262b087f 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -230,15 +230,21 @@ fn mirror_if_not_exists() { create_peers::create_bq::create(&mut client); create_peers::create_pg::create(&mut client); - // the server should not crash when a query is sent to an unknown peer. - let query = "SELECT * FROM unknown_peer.test_table;"; - let res = client.simple_query(query); - assert!(res.is_err()); + // create a mirror + let create_query = "CREATE MIRROR test1 FROM pg_test TO bq_test WITH TABLE MAPPING(public.a:a);"; + let res = client.simple_query(create_query); + assert!(res.is_ok()); - // assert that server is able to process a valid query after. - let query = "SELECT * FROM peers;"; - let res = client.simple_query(query); + // test if not exists clause + let create_query_again_with_clause = "CREATE MIRROR IF NOT EXISTS test1 + FROM pg_test TO bq_test WITH TABLE MAPPING(public.a:a);"; + let res = client.simple_query(create_query_again_with_clause); assert!(res.is_ok()); + + let create_query_again_but_without_clause = "CREATE MIRROR test1 + FROM pg_test TO bq_test WITH TABLE MAPPING(public.a:a);"; + let res = client.simple_query(create_query_again_but_without_clause); + assert!(res.is_err()); } #[test] From f138eaeaea1ce74779f01f5d6df1434ebc5ca8a4 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 15 Sep 2023 20:53:18 +0530 Subject: [PATCH 3/6] removes test as its in sqlparser --- nexus/server/tests/server_test.rs | 24 ------------------------ nexus/sqlparser-rs | 2 +- 2 files changed, 1 insertion(+), 25 deletions(-) diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 0b262b087f..9412199fb9 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -223,30 +223,6 @@ fn query_unknown_peer_doesnt_crash_server() { assert!(res.is_ok()); } -#[test] -fn mirror_if_not_exists() { - let server = PeerDBServer::new(); - let mut client = server.connect_dying(); - - create_peers::create_bq::create(&mut client); - create_peers::create_pg::create(&mut client); - // create a mirror - let create_query = "CREATE MIRROR test1 FROM pg_test TO bq_test WITH TABLE MAPPING(public.a:a);"; - let res = client.simple_query(create_query); - assert!(res.is_ok()); - - // test if not exists clause - let create_query_again_with_clause = "CREATE MIRROR IF NOT EXISTS test1 - FROM pg_test TO bq_test WITH TABLE MAPPING(public.a:a);"; - let res = client.simple_query(create_query_again_with_clause); - assert!(res.is_ok()); - - let create_query_again_but_without_clause = "CREATE MIRROR test1 - FROM pg_test TO bq_test WITH TABLE MAPPING(public.a:a);"; - let res = client.simple_query(create_query_again_but_without_clause); - assert!(res.is_err()); -} - #[test] #[ignore = "requires some work for extended query prepares on bigquery."] fn extended_query_protocol_no_params_bq() { diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 1532c918f7..92bc0e62d8 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 1532c918f75cf9b03e3f902bd52298f0391f976a +Subproject commit 92bc0e62d83a957911a3b22a869208fa822a840b From 4336f860a65c7f1fd681586b99a969a274192a04 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 18 Sep 2023 17:58:26 +0530 Subject: [PATCH 4/6] refactor: create mirror --- nexus/server/src/main.rs | 138 +++++++++++++++++---------------------- 1 file changed, 61 insertions(+), 77 deletions(-) diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 18a94562bf..3d5c8cc591 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use catalog::{Catalog, CatalogConfig}; +use catalog::{Catalog, CatalogConfig, WorkflowDetails}; use clap::Parser; use cursor::PeerCursors; use dashmap::DashMap; @@ -32,11 +32,11 @@ use pgwire::{ tokio::process_socket, }; use pt::{ - flow_model::QRepFlowJob, + flow_model::{FlowJob, QRepFlowJob}, peerdb_peers::{peer::Config, Peer}, }; use rand::Rng; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, MutexGuard}; use tokio::{io::AsyncWriteExt, net::TcpListener}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -158,6 +158,52 @@ impl NexusBackend { !unsupported_peer_types.contains(&peer_type) } + async fn check_for_mirror( + catalog: &MutexGuard<'_, Catalog>, + flow_name: String, + ) -> PgWireResult> { + let workflow_details = catalog + .get_workflow_details_for_flow_job(&flow_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to query catalog for job metadata: {:?}", err), + })) + })?; + Ok(workflow_details) + } + + async fn get_peer_of_mirror( + catalog: &MutexGuard<'_, Catalog>, + peer_name: String, + ) -> PgWireResult { + let peer = catalog.get_peer(&peer_name).await.map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err), + })) + })?; + Ok(peer) + } + + fn handle_mirror_existence( + if_not_exists: bool, + flow_name: String, + ) -> PgWireResult>> { + if if_not_exists { + let existing_mirror_success = "MIRROR ALREADY EXISTS"; + Ok(vec![Response::Execution(Tag::new_for_execution( + existing_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("mirror already exists: {:?}", flow_name), + )))) + } + } + async fn handle_query<'a>( &self, nexus_stmt: NexusStatement, @@ -209,22 +255,9 @@ impl NexusBackend { }))); } let catalog = self.catalog.lock().await; - let workflow_details = catalog - .get_workflow_details_for_flow_job(&flow_job.name) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for job metadata: {:?}", - err - ), - })) - })?; - tracing::info!( - "got workflow id: {:?}", - workflow_details.as_ref().map(|w| &w.workflow_id) - ); - if workflow_details.is_none() { + let mirror_details = + Self::check_for_mirror(&catalog, flow_job.name.clone()).await?; + if mirror_details.is_none() { catalog .create_flow_job_entry(&flow_job) .await @@ -239,27 +272,11 @@ impl NexusBackend { // get source and destination peers let src_peer = - catalog - .get_peer(&flow_job.source_peer) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get source peer: {:?}", err), - })) - })?; - + Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone()) + .await?; let dst_peer = - catalog - .get_peer(&flow_job.target_peer) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to get destination peer: {:?}", - err - ), - })) - })?; + Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone()) + .await?; // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; @@ -286,18 +303,8 @@ impl NexusBackend { &create_mirror_success, None, ))]) - } else if if_not_exists { - let existing_mirror_success = "MIRROR ALREADY EXISTS"; - Ok(vec![Response::Execution(Tag::new_for_execution( - existing_mirror_success, - None, - ))]) } else { - Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "error".to_owned(), - format!("mirror already exists: {:?}", flow_job.name), - )))) + Self::handle_mirror_existence(if_not_exists, flow_job.name) } } PeerDDL::CreateMirrorForSelect { @@ -311,22 +318,9 @@ impl NexusBackend { } let catalog = self.catalog.lock().await; - let workflow_details = catalog - .get_workflow_details_for_flow_job(&qrep_flow_job.name) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for job metadata: {:?}", - err - ), - })) - })?; - tracing::info!( - "got workflow id: {:?}", - workflow_details.as_ref().map(|w| &w.workflow_id) - ); - if workflow_details.is_none() { + let mirror_details = + Self::check_for_mirror(&catalog, qrep_flow_job.name.clone()).await?; + if mirror_details.is_none() { catalog .create_qrep_flow_job_entry(&qrep_flow_job) .await @@ -354,18 +348,8 @@ impl NexusBackend { &create_mirror_success, None, ))]) - } else if if_not_exists { - let existing_mirror_success = "MIRROR ALREADY EXISTS"; - Ok(vec![Response::Execution(Tag::new_for_execution( - existing_mirror_success, - None, - ))]) } else { - Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "error".to_owned(), - format!("mirror already exists: {:?}", qrep_flow_job.name), - )))) + Self::handle_mirror_existence(if_not_exists, qrep_flow_job.name) } } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { From f09e85a307d88946ab3181720c239feb5944b091 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 18 Sep 2023 18:12:30 +0530 Subject: [PATCH 5/6] unused import and sqlparser update --- nexus/server/src/main.rs | 2 +- nexus/sqlparser-rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 3d5c8cc591..7089f93ee2 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -32,7 +32,7 @@ use pgwire::{ tokio::process_socket, }; use pt::{ - flow_model::{FlowJob, QRepFlowJob}, + flow_model::QRepFlowJob, peerdb_peers::{peer::Config, Peer}, }; use rand::Rng; diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 92bc0e62d8..883b84ace2 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 92bc0e62d83a957911a3b22a869208fa822a840b +Subproject commit 883b84ace2f55a0b4679dc15c4de0f6e7eb5af03 From b5a4badf6c0c69cd11d75874f37e3bcf715d863b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 18 Sep 2023 18:16:10 +0530 Subject: [PATCH 6/6] fix: sqlparser --- nexus/sqlparser-rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 883b84ace2..92bc0e62d8 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 883b84ace2f55a0b4679dc15c4de0f6e7eb5af03 +Subproject commit 92bc0e62d83a957911a3b22a869208fa822a840b