Skip to content

Commit

Permalink
support for mirror if not exists (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Sep 18, 2023
1 parent 8f2e9a7 commit 2d024df
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 108 deletions.
18 changes: 14 additions & 4 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ pub enum PeerDDL {
if_not_exists: bool,
},
CreateMirrorForCDC {
if_not_exists: bool,
flow_job: FlowJob,
},
CreateMirrorForSelect {
if_not_exists: bool,
qrep_flow_job: QRepFlowJob,
},
ExecuteMirrorForSelect {
Expand Down Expand Up @@ -136,7 +138,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
if_not_exists: *if_not_exists,
}))
}
Statement::CreateMirror { create_mirror } => {
Statement::CreateMirror {
if_not_exists,
create_mirror,
} => {
match create_mirror {
CDC(cdc) => {
let mut flow_job_table_mappings = vec![];
Expand Down Expand Up @@ -243,7 +248,6 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::<i64>()?),
_ => None,
};


let flow_job = FlowJob {
name: cdc.mirror_name.to_string().to_lowercase(),
Expand Down Expand Up @@ -275,7 +279,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
));
}

Ok(Some(PeerDDL::CreateMirrorForCDC { flow_job }))
Ok(Some(PeerDDL::CreateMirrorForCDC {
if_not_exists: *if_not_exists,
flow_job,
}))
}
Select(select) => {
let mut raw_options = HashMap::new();
Expand Down Expand Up @@ -304,7 +311,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
disabled,
};

Ok(Some(PeerDDL::CreateMirrorForSelect { qrep_flow_job }))
Ok(Some(PeerDDL::CreateMirrorForSelect {
if_not_exists: *if_not_exists,
qrep_flow_job,
}))
}
}
}
Expand Down
170 changes: 111 additions & 59 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use analyzer::{PeerDDL, QueryAssocation};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use catalog::{Catalog, CatalogConfig};
use catalog::{Catalog, CatalogConfig, WorkflowDetails};
use clap::Parser;
use cursor::PeerCursors;
use dashmap::DashMap;
Expand Down Expand Up @@ -36,7 +36,7 @@ use pt::{
peerdb_peers::{peer::Config, Peer},
};
use rand::Rng;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{io::AsyncWriteExt, net::TcpListener};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
Expand Down Expand Up @@ -158,6 +158,52 @@ impl NexusBackend {
!unsupported_peer_types.contains(&peer_type)
}

async fn check_for_mirror(
catalog: &MutexGuard<'_, Catalog>,
flow_name: String,
) -> PgWireResult<Option<WorkflowDetails>> {
let workflow_details = catalog
.get_workflow_details_for_flow_job(&flow_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to query catalog for job metadata: {:?}", err),
}))
})?;
Ok(workflow_details)
}

async fn get_peer_of_mirror(
catalog: &MutexGuard<'_, Catalog>,
peer_name: String,
) -> PgWireResult<Peer> {
let peer = catalog.get_peer(&peer_name).await.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err),
}))
})?;
Ok(peer)
}

fn handle_mirror_existence(
if_not_exists: bool,
flow_name: String,
) -> PgWireResult<Vec<Response<'static>>> {
if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new_for_execution(
existing_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"error".to_owned(),
format!("mirror already exists: {:?}", flow_name),
))))
}
}

async fn handle_query<'a>(
&self,
nexus_stmt: NexusStatement,
Expand Down Expand Up @@ -199,79 +245,82 @@ impl NexusBackend {
"OK", None,
))])
}
PeerDDL::CreateMirrorForCDC { flow_job } => {
PeerDDL::CreateMirrorForCDC {
if_not_exists,
flow_job,
} => {
if self.flow_handler.is_none() {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: "flow service is not configured".to_owned(),
})));
}

let catalog = self.catalog.lock().await;
catalog
.create_flow_job_entry(&flow_job)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to create mirror job entry: {:?}", err),
}))
})?;

// get source and destination peers
let src_peer =
let mirror_details =
Self::check_for_mirror(&catalog, flow_job.name.clone()).await?;
if mirror_details.is_none() {
catalog
.get_peer(&flow_job.source_peer)
.create_flow_job_entry(&flow_job)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get source peer: {:?}", err),
err_msg: format!(
"unable to create mirror job entry: {:?}",
err
),
}))
})?;

let dst_peer =
catalog
.get_peer(&flow_job.target_peer)
// get source and destination peers
let src_peer =
Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone())
.await?;
let dst_peer =
Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone())
.await?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let workflow_id = flow_handler
.start_peer_flow_job(&flow_job, src_peer, dst_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get destination peer: {:?}", err),
err_msg: format!("unable to submit job: {:?}", err),
}))
})?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let workflow_id = flow_handler
.start_peer_flow_job(&flow_job, src_peer, dst_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to submit job: {:?}", err),
}))
})?;

catalog
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to save job metadata: {:?}", err),
}))
})?;
catalog
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to save job metadata: {:?}", err),
}))
})?;

let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))])
let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))])
} else {
Self::handle_mirror_existence(if_not_exists, flow_job.name)
}
}
PeerDDL::CreateMirrorForSelect { qrep_flow_job } => {
PeerDDL::CreateMirrorForSelect {
if_not_exists,
qrep_flow_job,
} => {
if self.flow_handler.is_none() {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: "flow service is not configured".to_owned(),
})));
}

{
let catalog = self.catalog.lock().await;
let catalog = self.catalog.lock().await;
let mirror_details =
Self::check_for_mirror(&catalog, qrep_flow_job.name.clone()).await?;
if mirror_details.is_none() {
catalog
.create_qrep_flow_job_entry(&qrep_flow_job)
.await
Expand All @@ -283,22 +332,25 @@ impl NexusBackend {
),
}))
})?;
}

if qrep_flow_job.disabled {
if qrep_flow_job.disabled {
let create_mirror_success =
format!("CREATE MIRROR {}", qrep_flow_job.name);
return Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))]);
}

let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?;
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name);
return Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))]);
))])
} else {
Self::handle_mirror_existence(if_not_exists, qrep_flow_job.name)
}

let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?;
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))])
}
PeerDDL::ExecuteMirrorForSelect { flow_job_name } => {
if self.flow_handler.is_none() {
Expand Down
17 changes: 2 additions & 15 deletions nexus/server/tests/create_peers/create_bq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn create(nexus: &mut Client) {

let create_stmt = format!(
"
CREATE PEER bq_test FROM BIGQUERY WITH
CREATE PEER IF NOT EXISTS bq_test FROM BIGQUERY WITH
(
type = '{}',
project_id = '{}',
Expand Down Expand Up @@ -44,18 +44,5 @@ pub fn create(nexus: &mut Client) {
"test_dataset"
);

let creation_status = nexus.simple_query(&create_stmt);
match creation_status {
Ok(_) => (),
Err(err) => {
let create_err = err
.as_db_error()
.expect("failed to unwrap create peer error");
let already_exists_case = create_err.message().contains("(bq_test) already exists");
if already_exists_case {
return ();
}
panic!("failed to create bigquery peer: {}", err)
}
}
let _ = nexus.simple_query(&create_stmt);
}
16 changes: 2 additions & 14 deletions nexus/server/tests/create_peers/create_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn create(nexus: &mut Client) {

let create_stmt = format!(
"
CREATE PEER pg_test FROM POSTGRES WITH
CREATE PEER IF NOT EXISTS pg_test FROM POSTGRES WITH
(
host = '{}',
port = '{}',
Expand All @@ -45,17 +45,5 @@ pub fn create(nexus: &mut Client) {
&peer_host, &peer_port, &peer_user, &peer_password, &peer_database
);

let creation_status = nexus.simple_query(&create_stmt);
match creation_status {
Ok(_) => (),
Err(err) => {
let create_err = err
.as_db_error()
.expect("failed to unwrap create peer error");
let already_exists_case = create_err.message().contains("(pg_test) already exists");
if !already_exists_case {
panic!("failed to create pg peer: {}", err)
}
}
}
let _ = nexus.simple_query(&create_stmt);
}
17 changes: 2 additions & 15 deletions nexus/server/tests/create_peers/create_sf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn create(nexus: &mut Client) {

let create_stmt = format!(
"
CREATE PEER sf_test FROM SNOWFLAKE WITH
CREATE PEER IF NOT EXISTS sf_test FROM SNOWFLAKE WITH
(
account_id = '{}',
username = '{}',
Expand All @@ -39,18 +39,5 @@ pub fn create(nexus: &mut Client) {
&sf_config.s3_integration
);

let creation_status = nexus.simple_query(&create_stmt);
match creation_status {
Ok(_) => (),
Err(err) => {
let create_err = err
.as_db_error()
.expect("failed to unwrap create peer error");
let already_exists_case = create_err.message().contains("(sf_test) already exists");
if already_exists_case {
return ();
}
panic!("failed to create snowflake peer:{}", err)
}
}
let _ = nexus.simple_query(&create_stmt);
}

0 comments on commit 2d024df

Please sign in to comment.