Skip to content

Commit

Permalink
support for mirror if not exists
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 15, 2023
1 parent 2da793b commit 5899d4c
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 101 deletions.
18 changes: 14 additions & 4 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ pub enum PeerDDL {
if_not_exists: bool,
},
CreateMirrorForCDC {
if_not_exists: bool,
flow_job: FlowJob,
},
CreateMirrorForSelect {
if_not_exists: bool,
qrep_flow_job: QRepFlowJob,
},
ExecuteMirrorForSelect {
Expand Down Expand Up @@ -136,7 +138,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
if_not_exists: *if_not_exists,
}))
}
Statement::CreateMirror { create_mirror } => {
Statement::CreateMirror {
if_not_exists,
create_mirror,
} => {
match create_mirror {
CDC(cdc) => {
let mut flow_job_table_mappings = vec![];
Expand Down Expand Up @@ -243,7 +248,6 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::<i64>()?),
_ => None,
};


let flow_job = FlowJob {
name: cdc.mirror_name.to_string().to_lowercase(),
Expand Down Expand Up @@ -275,7 +279,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
));
}

Ok(Some(PeerDDL::CreateMirrorForCDC { flow_job }))
Ok(Some(PeerDDL::CreateMirrorForCDC {
if_not_exists: *if_not_exists,
flow_job,
}))
}
Select(select) => {
let mut raw_options = HashMap::new();
Expand Down Expand Up @@ -304,7 +311,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
disabled,
};

Ok(Some(PeerDDL::CreateMirrorForSelect { qrep_flow_job }))
Ok(Some(PeerDDL::CreateMirrorForSelect {
if_not_exists: *if_not_exists,
qrep_flow_job,
}))
}
}
}
Expand Down
172 changes: 120 additions & 52 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,79 +199,134 @@ impl NexusBackend {
"OK", None,
))])
}
PeerDDL::CreateMirrorForCDC { flow_job } => {
PeerDDL::CreateMirrorForCDC {
if_not_exists,
flow_job,
} => {
if self.flow_handler.is_none() {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: "flow service is not configured".to_owned(),
})));
}

let catalog = self.catalog.lock().await;
catalog
.create_flow_job_entry(&flow_job)
let workflow_details = catalog
.get_workflow_details_for_flow_job(&flow_job.name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to create mirror job entry: {:?}", err),
err_msg: format!(
"unable to query catalog for job metadata: {:?}",
err
),
}))
})?;

// get source and destination peers
let src_peer =
tracing::info!(
"got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);
if workflow_details.is_none() {
catalog
.get_peer(&flow_job.source_peer)
.create_flow_job_entry(&flow_job)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get source peer: {:?}", err),
err_msg: format!(
"unable to create mirror job entry: {:?}",
err
),
}))
})?;

let dst_peer =
catalog
.get_peer(&flow_job.target_peer)
// get source and destination peers
let src_peer =
catalog
.get_peer(&flow_job.source_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get source peer: {:?}", err),
}))
})?;

let dst_peer =
catalog
.get_peer(&flow_job.target_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to get destination peer: {:?}",
err
),
}))
})?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let workflow_id = flow_handler
.start_peer_flow_job(&flow_job, src_peer, dst_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get destination peer: {:?}", err),
err_msg: format!("unable to submit job: {:?}", err),
}))
})?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let workflow_id = flow_handler
.start_peer_flow_job(&flow_job, src_peer, dst_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to submit job: {:?}", err),
}))
})?;

catalog
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to save job metadata: {:?}", err),
}))
})?;
catalog
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to save job metadata: {:?}", err),
}))
})?;

let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))])
let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))])
} else if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new_for_execution(
existing_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"error".to_owned(),
format!("mirror already exists: {:?}", flow_job.name),
))))
}
}
PeerDDL::CreateMirrorForSelect { qrep_flow_job } => {
PeerDDL::CreateMirrorForSelect {
if_not_exists,
qrep_flow_job,
} => {
if self.flow_handler.is_none() {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: "flow service is not configured".to_owned(),
})));
}

{
let catalog = self.catalog.lock().await;
let catalog = self.catalog.lock().await;
let workflow_details = catalog
.get_workflow_details_for_flow_job(&qrep_flow_job.name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to query catalog for job metadata: {:?}",
err
),
}))
})?;
tracing::info!(
"got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);
if workflow_details.is_none() {
catalog
.create_qrep_flow_job_entry(&qrep_flow_job)
.await
Expand All @@ -283,22 +338,35 @@ impl NexusBackend {
),
}))
})?;
}

if qrep_flow_job.disabled {
if qrep_flow_job.disabled {
let create_mirror_success =
format!("CREATE MIRROR {}", qrep_flow_job.name);
return Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))]);
}

let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?;
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name);
return Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))]);
))])
} else if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new_for_execution(
existing_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"error".to_owned(),
format!("mirror already exists: {:?}", qrep_flow_job.name),
))))
}

let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?;
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&create_mirror_success,
None,
))])
}
PeerDDL::ExecuteMirrorForSelect { flow_job_name } => {
if self.flow_handler.is_none() {
Expand Down
17 changes: 2 additions & 15 deletions nexus/server/tests/create_peers/create_bq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn create(nexus: &mut Client) {

let create_stmt = format!(
"
CREATE PEER bq_test FROM BIGQUERY WITH
CREATE PEER IF NOT EXISTS bq_test FROM BIGQUERY WITH
(
type = '{}',
project_id = '{}',
Expand Down Expand Up @@ -44,18 +44,5 @@ pub fn create(nexus: &mut Client) {
"test_dataset"
);

let creation_status = nexus.simple_query(&create_stmt);
match creation_status {
Ok(_) => (),
Err(err) => {
let create_err = err
.as_db_error()
.expect("failed to unwrap create peer error");
let already_exists_case = create_err.message().contains("(bq_test) already exists");
if already_exists_case {
return ();
}
panic!("failed to create bigquery peer: {}", err)
}
}
let _ = nexus.simple_query(&create_stmt);
}
16 changes: 2 additions & 14 deletions nexus/server/tests/create_peers/create_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn create(nexus: &mut Client) {

let create_stmt = format!(
"
CREATE PEER pg_test FROM POSTGRES WITH
CREATE PEER IF NOT EXISTS pg_test FROM POSTGRES WITH
(
host = '{}',
port = '{}',
Expand All @@ -45,17 +45,5 @@ pub fn create(nexus: &mut Client) {
&peer_host, &peer_port, &peer_user, &peer_password, &peer_database
);

let creation_status = nexus.simple_query(&create_stmt);
match creation_status {
Ok(_) => (),
Err(err) => {
let create_err = err
.as_db_error()
.expect("failed to unwrap create peer error");
let already_exists_case = create_err.message().contains("(pg_test) already exists");
if !already_exists_case {
panic!("failed to create pg peer: {}", err)
}
}
}
let _ = nexus.simple_query(&create_stmt);
}
17 changes: 2 additions & 15 deletions nexus/server/tests/create_peers/create_sf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn create(nexus: &mut Client) {

let create_stmt = format!(
"
CREATE PEER sf_test FROM SNOWFLAKE WITH
CREATE PEER IF NOT EXISTS sf_test FROM SNOWFLAKE WITH
(
account_id = '{}',
username = '{}',
Expand All @@ -39,18 +39,5 @@ pub fn create(nexus: &mut Client) {
&sf_config.s3_integration
);

let creation_status = nexus.simple_query(&create_stmt);
match creation_status {
Ok(_) => (),
Err(err) => {
let create_err = err
.as_db_error()
.expect("failed to unwrap create peer error");
let already_exists_case = create_err.message().contains("(sf_test) already exists");
if already_exists_case {
return ();
}
panic!("failed to create snowflake peer:{}", err)
}
}
let _ = nexus.simple_query(&create_stmt);
}
18 changes: 18 additions & 0 deletions nexus/server/tests/server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,24 @@ fn query_unknown_peer_doesnt_crash_server() {
assert!(res.is_ok());
}

#[test]
fn mirror_if_not_exists() {
let server = PeerDBServer::new();
let mut client = server.connect_dying();

create_peers::create_bq::create(&mut client);
create_peers::create_pg::create(&mut client);
// the server should not crash when a query is sent to an unknown peer.
let query = "SELECT * FROM unknown_peer.test_table;";
let res = client.simple_query(query);
assert!(res.is_err());

// assert that server is able to process a valid query after.
let query = "SELECT * FROM peers;";
let res = client.simple_query(query);
assert!(res.is_ok());
}

#[test]
#[ignore = "requires some work for extended query prepares on bigquery."]
fn extended_query_protocol_no_params_bq() {
Expand Down
2 changes: 1 addition & 1 deletion nexus/sqlparser-rs

0 comments on commit 5899d4c

Please sign in to comment.