Skip to content

Commit

Permalink
Merge branch 'main' into full-table-partition-patches
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 24, 2023
2 parents 64cfe5d + 963b6f1 commit 413010c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 26 deletions.
18 changes: 10 additions & 8 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ mod stream;

pub struct BigQueryQueryExecutor {
peer_name: String,
config: BigqueryConfig,
project_id: String,
dataset_id: String,
peer_connections: Arc<PeerConnectionTracker>,
client: Box<Client>,
cursor_manager: BigQueryCursorManager,
}

pub async fn bq_client_from_config(config: BigqueryConfig) -> anyhow::Result<Client> {
pub async fn bq_client_from_config(config: &BigqueryConfig) -> anyhow::Result<Client> {
let sa_key = yup_oauth2::ServiceAccountKey {
key_type: Some(config.auth_type.clone()),
project_id: Some(config.project_id.clone()),
Expand All @@ -52,12 +53,13 @@ impl BigQueryQueryExecutor {
config: &BigqueryConfig,
peer_connections: Arc<PeerConnectionTracker>,
) -> anyhow::Result<Self> {
let client = bq_client_from_config(config.clone()).await?;
let client = bq_client_from_config(config).await?;
let client = Box::new(client);
let cursor_manager = BigQueryCursorManager::new();
Ok(Self {
peer_name,
config: config.clone(),
project_id: config.project_id.clone(),
dataset_id: config.dataset_id.clone(),
peer_connections,
client,
cursor_manager,
Expand All @@ -82,7 +84,7 @@ impl BigQueryQueryExecutor {
let result_set = self
.client
.job()
.query(&self.config.project_id, query_req)
.query(&self.project_id, query_req)
.await
.map_err(|err| {
tracing::error!("error running query: {}", err);
Expand Down Expand Up @@ -112,7 +114,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
let mut query = query.clone();
let bq_ast = ast::BigqueryAst::default();
bq_ast
.rewrite(&self.config.dataset_id, &mut query)
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
Expand Down Expand Up @@ -222,7 +224,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
let mut query = query.clone();
let bq_ast = ast::BigqueryAst::default();
bq_ast
.rewrite(&self.config.dataset_id, &mut query)
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
Expand Down Expand Up @@ -260,7 +262,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
let _result_set = self
.client
.job()
.query(&self.config.project_id, QueryRequest::new(sql))
.query(&self.project_id, QueryRequest::new(sql))
.await?;
Ok(true)
}
Expand Down
9 changes: 4 additions & 5 deletions nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,20 @@ pub struct SnowflakeAuth {
}

impl SnowflakeAuth {
// When initializing, private_key must not be copied, to improve security of credentials.
#[tracing::instrument(name = "peer_sflake::init_client_auth", skip_all)]
pub fn new(
account_id: String,
username: String,
private_key: String,
password: Option<String>,
private_key: &str,
password: Option<&str>,
refresh_threshold: u64,
expiry_threshold: u64,
) -> anyhow::Result<Self> {
let pkey = match password {
Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(&private_key, pw)
Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(private_key, pw)
.context("Invalid private key or decryption failed")?,
None => {
DecodePrivateKey::from_pkcs8_pem(&private_key).context("Invalid private key")?
DecodePrivateKey::from_pkcs8_pem(private_key).context("Invalid private key")?
}
};
let mut snowflake_auth: SnowflakeAuth = SnowflakeAuth {
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ impl SnowflakeQueryExecutor {
auth: SnowflakeAuth::new(
config.account_id.clone(),
config.username.clone(),
config.private_key.clone(),
config.password.clone(),
&config.private_key,
config.password.as_deref(),
DEFAULT_REFRESH_THRESHOLD,
DEFAULT_EXPIRY_THRESHOLD,
)?,
Expand Down
20 changes: 9 additions & 11 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ impl NexusBackend {

async fn get_peer_of_mirror(
catalog: &MutexGuard<'_, Catalog>,
peer_name: String,
peer_name: &str,
) -> PgWireResult<Peer> {
let peer = catalog.get_peer(&peer_name).await.map_err(|err| {
let peer = catalog.get_peer(peer_name).await.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err),
}))
Expand All @@ -192,7 +192,7 @@ impl NexusBackend {

fn handle_mirror_existence(
if_not_exists: bool,
flow_name: String,
flow_name: &str,
) -> PgWireResult<Vec<Response<'static>>> {
if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Expand Down Expand Up @@ -389,7 +389,7 @@ impl NexusBackend {
None,
))])
} else {
Self::handle_mirror_existence(*if_not_exists, qrep_flow_job.name.clone())
Self::handle_mirror_existence(*if_not_exists, &qrep_flow_job.name)
}
}
_ => unreachable!(),
Expand Down Expand Up @@ -487,11 +487,9 @@ impl NexusBackend {

// get source and destination peers
let src_peer =
Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone())
.await?;
Self::get_peer_of_mirror(&catalog, &flow_job.source_peer).await?;
let dst_peer =
Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone())
.await?;
Self::get_peer_of_mirror(&catalog, &flow_job.target_peer).await?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand Down Expand Up @@ -519,7 +517,7 @@ impl NexusBackend {
None,
))])
} else {
Self::handle_mirror_existence(*if_not_exists, flow_job.name.clone())
Self::handle_mirror_existence(*if_not_exists, &flow_job.name)
}
}
PeerDDL::CreateMirrorForSelect { .. } => {
Expand Down Expand Up @@ -931,9 +929,9 @@ impl NexusBackend {

let executor: Arc<dyn QueryExecutor> = match &peer.config {
Some(Config::BigqueryConfig(ref c)) => {
let peer_name = peer.name.clone();
let executor =
BigQueryQueryExecutor::new(peer_name, c, self.peer_connections.clone()).await?;
BigQueryQueryExecutor::new(peer.name.clone(), c, self.peer_connections.clone())
.await?;
Arc::new(executor)
}
Some(Config::PostgresConfig(ref c)) => {
Expand Down

0 comments on commit 413010c

Please sign in to comment.