-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support for mirror if not exists #387
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
0b9040e
support for mirror if not exists
Amogh-Bharadwaj 08fd2c8
add test
Amogh-Bharadwaj f138eae
removes test as its in sqlparser
Amogh-Bharadwaj 4336f86
refactor: create mirror
Amogh-Bharadwaj f09e85a
unused import and sqlparser update
Amogh-Bharadwaj b5a4bad
fix: sqlparser
Amogh-Bharadwaj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; | |
use analyzer::{PeerDDL, QueryAssocation}; | ||
use async_trait::async_trait; | ||
use bytes::{BufMut, BytesMut}; | ||
use catalog::{Catalog, CatalogConfig}; | ||
use catalog::{Catalog, CatalogConfig, WorkflowDetails}; | ||
use clap::Parser; | ||
use cursor::PeerCursors; | ||
use dashmap::DashMap; | ||
|
@@ -36,7 +36,7 @@ use pt::{ | |
peerdb_peers::{peer::Config, Peer}, | ||
}; | ||
use rand::Rng; | ||
use tokio::sync::Mutex; | ||
use tokio::sync::{Mutex, MutexGuard}; | ||
use tokio::{io::AsyncWriteExt, net::TcpListener}; | ||
use tracing_appender::non_blocking::WorkerGuard; | ||
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; | ||
|
@@ -158,6 +158,52 @@ impl NexusBackend { | |
!unsupported_peer_types.contains(&peer_type) | ||
} | ||
|
||
async fn check_for_mirror( | ||
catalog: &MutexGuard<'_, Catalog>, | ||
flow_name: String, | ||
) -> PgWireResult<Option<WorkflowDetails>> { | ||
let workflow_details = catalog | ||
.get_workflow_details_for_flow_job(&flow_name) | ||
.await | ||
.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to query catalog for job metadata: {:?}", err), | ||
})) | ||
})?; | ||
Ok(workflow_details) | ||
} | ||
|
||
async fn get_peer_of_mirror( | ||
catalog: &MutexGuard<'_, Catalog>, | ||
peer_name: String, | ||
) -> PgWireResult<Peer> { | ||
let peer = catalog.get_peer(&peer_name).await.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err), | ||
})) | ||
})?; | ||
Ok(peer) | ||
} | ||
|
||
fn handle_mirror_existence( | ||
if_not_exists: bool, | ||
flow_name: String, | ||
) -> PgWireResult<Vec<Response<'static>>> { | ||
if if_not_exists { | ||
let existing_mirror_success = "MIRROR ALREADY EXISTS"; | ||
Ok(vec![Response::Execution(Tag::new_for_execution( | ||
existing_mirror_success, | ||
None, | ||
))]) | ||
} else { | ||
Err(PgWireError::UserError(Box::new(ErrorInfo::new( | ||
"ERROR".to_owned(), | ||
"error".to_owned(), | ||
format!("mirror already exists: {:?}", flow_name), | ||
)))) | ||
} | ||
} | ||
|
||
async fn handle_query<'a>( | ||
&self, | ||
nexus_stmt: NexusStatement, | ||
|
@@ -199,79 +245,82 @@ impl NexusBackend { | |
"OK", None, | ||
))]) | ||
} | ||
PeerDDL::CreateMirrorForCDC { flow_job } => { | ||
PeerDDL::CreateMirrorForCDC { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we refactor handling this branch to its own function? It seems pretty complex now, can you refactor to make it readable. |
||
if_not_exists, | ||
flow_job, | ||
} => { | ||
if self.flow_handler.is_none() { | ||
return Err(PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: "flow service is not configured".to_owned(), | ||
}))); | ||
} | ||
|
||
let catalog = self.catalog.lock().await; | ||
catalog | ||
.create_flow_job_entry(&flow_job) | ||
.await | ||
.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to create mirror job entry: {:?}", err), | ||
})) | ||
})?; | ||
|
||
// get source and destination peers | ||
let src_peer = | ||
let mirror_details = | ||
Self::check_for_mirror(&catalog, flow_job.name.clone()).await?; | ||
if mirror_details.is_none() { | ||
catalog | ||
.get_peer(&flow_job.source_peer) | ||
.create_flow_job_entry(&flow_job) | ||
.await | ||
.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to get source peer: {:?}", err), | ||
err_msg: format!( | ||
"unable to create mirror job entry: {:?}", | ||
err | ||
), | ||
})) | ||
})?; | ||
|
||
let dst_peer = | ||
catalog | ||
.get_peer(&flow_job.target_peer) | ||
// get source and destination peers | ||
let src_peer = | ||
Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone()) | ||
.await?; | ||
let dst_peer = | ||
Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone()) | ||
.await?; | ||
|
||
// make a request to the flow service to start the job. | ||
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; | ||
let workflow_id = flow_handler | ||
.start_peer_flow_job(&flow_job, src_peer, dst_peer) | ||
.await | ||
.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to get destination peer: {:?}", err), | ||
err_msg: format!("unable to submit job: {:?}", err), | ||
})) | ||
})?; | ||
|
||
// make a request to the flow service to start the job. | ||
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; | ||
let workflow_id = flow_handler | ||
.start_peer_flow_job(&flow_job, src_peer, dst_peer) | ||
.await | ||
.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to submit job: {:?}", err), | ||
})) | ||
})?; | ||
|
||
catalog | ||
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) | ||
.await | ||
.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to save job metadata: {:?}", err), | ||
})) | ||
})?; | ||
catalog | ||
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) | ||
.await | ||
.map_err(|err| { | ||
PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: format!("unable to save job metadata: {:?}", err), | ||
})) | ||
})?; | ||
|
||
let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); | ||
Ok(vec![Response::Execution(Tag::new_for_execution( | ||
&create_mirror_success, | ||
None, | ||
))]) | ||
let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); | ||
Ok(vec![Response::Execution(Tag::new_for_execution( | ||
&create_mirror_success, | ||
None, | ||
))]) | ||
} else { | ||
Self::handle_mirror_existence(if_not_exists, flow_job.name) | ||
} | ||
} | ||
PeerDDL::CreateMirrorForSelect { qrep_flow_job } => { | ||
PeerDDL::CreateMirrorForSelect { | ||
if_not_exists, | ||
qrep_flow_job, | ||
} => { | ||
if self.flow_handler.is_none() { | ||
return Err(PgWireError::ApiError(Box::new(PgError::Internal { | ||
err_msg: "flow service is not configured".to_owned(), | ||
}))); | ||
} | ||
|
||
{ | ||
let catalog = self.catalog.lock().await; | ||
let catalog = self.catalog.lock().await; | ||
let mirror_details = | ||
Self::check_for_mirror(&catalog, qrep_flow_job.name.clone()).await?; | ||
if mirror_details.is_none() { | ||
catalog | ||
.create_qrep_flow_job_entry(&qrep_flow_job) | ||
.await | ||
|
@@ -283,22 +332,25 @@ impl NexusBackend { | |
), | ||
})) | ||
})?; | ||
} | ||
|
||
if qrep_flow_job.disabled { | ||
if qrep_flow_job.disabled { | ||
let create_mirror_success = | ||
format!("CREATE MIRROR {}", qrep_flow_job.name); | ||
return Ok(vec![Response::Execution(Tag::new_for_execution( | ||
&create_mirror_success, | ||
None, | ||
))]); | ||
} | ||
|
||
let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; | ||
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); | ||
return Ok(vec![Response::Execution(Tag::new_for_execution( | ||
Ok(vec![Response::Execution(Tag::new_for_execution( | ||
&create_mirror_success, | ||
None, | ||
))]); | ||
))]) | ||
} else { | ||
Self::handle_mirror_existence(if_not_exists, qrep_flow_job.name) | ||
} | ||
|
||
let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; | ||
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); | ||
Ok(vec![Response::Execution(Tag::new_for_execution( | ||
&create_mirror_success, | ||
None, | ||
))]) | ||
} | ||
PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { | ||
if self.flow_handler.is_none() { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Submodule sqlparser-rs
updated
7 files
+10 −3 | src/ast/mod.rs | |
+4 −1 | src/parser.rs | |
+3 −3 | tests/sqlparser_bigquery.rs | |
+2 −2 | tests/sqlparser_common.rs | |
+5 −5 | tests/sqlparser_mysql.rs | |
+26 −9 | tests/sqlparser_postgres.rs | |
+1 −1 | tests/sqlparser_snowflake.rs |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [clippy] reported by reviewdog 🐶