Skip to content

Commit

Permalink
nexus: update secrecy (#2127)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Oct 8, 2024
1 parent 888446a commit fab4786
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 17 deletions.
4 changes: 2 additions & 2 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl FlowGrpcClient {
pub async fn resync_mirror(&mut self, flow_job_name: &str) -> anyhow::Result<()> {
let resync_mirror_req = pt::peerdb_route::ResyncMirrorRequest {
flow_job_name: flow_job_name.to_owned(),
drop_stats: true
drop_stats: true,
};
let response = self.client.resync_mirror(resync_mirror_req).await?;
let resync_mirror_response = response.into_inner();
Expand Down
5 changes: 3 additions & 2 deletions nexus/peer-mysql/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ pub fn rewrite_query(peername: &str, query: &mut Query) {
}
}
Expr::Cast {
data_type: DataType::Time(_, ref mut tzinfo), ..
data_type: DataType::Time(_, ref mut tzinfo),
..
} => {
*tzinfo = TimezoneInfo::None;
}
Expr::Cast {
ref mut data_type, ..
} if matches!(data_type, DataType::Timestamp(..)) =>{
} if matches!(data_type, DataType::Timestamp(..)) => {
*data_type = DataType::Datetime(None);
}
_ => {}
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-snowflake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pgwire.workspace = true
pt = { path = "../pt" }
reqwest = { version = "0.12", default-features = false, features = ["json", "gzip", "rustls-tls"] }
rsa = { version = "0.9.2", features = ["pem", "pkcs5"] }
secrecy = { version = "0.8.0" }
secrecy = "0.10"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
Expand Down
13 changes: 5 additions & 8 deletions nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::{
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::Context;
use base64::prelude::{Engine as _, BASE64_STANDARD};
use jsonwebtoken::{encode as jwt_encode, Algorithm, EncodingKey, Header};
use rsa::pkcs1::EncodeRsaPrivateKey;
use rsa::pkcs8::{DecodePrivateKey, EncodePublicKey};
use rsa::RsaPrivateKey;
use secrecy::{Secret, SecretString};
use secrecy::SecretString;
use serde::Serialize;
use sha2::{Digest, Sha256};
use tracing::info;
Expand All @@ -32,7 +29,7 @@ pub struct SnowflakeAuth {
refresh_threshold: u64,
expiry_threshold: u64,
last_refreshed: u64,
current_jwt: Option<Secret<String>>,
current_jwt: Option<SecretString>,
}

impl SnowflakeAuth {
Expand Down Expand Up @@ -128,14 +125,14 @@ impl SnowflakeAuth {
let header: Header = Header::new(Algorithm::RS256);

let encoded_jwt = jwt_encode(&header, &jwt_claims, &private_key_jwt)?;
let secret = SecretString::from_str(&encoded_jwt)?;
let secret = SecretString::from(encoded_jwt);

self.current_jwt = Some(secret);

Ok(())
}

pub fn get_jwt(&mut self) -> anyhow::Result<&Secret<String>> {
pub fn get_jwt(&mut self) -> anyhow::Result<&SecretString> {
if SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
>= (self.last_refreshed + self.refresh_threshold)
{
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl SnowflakeQueryExecutor {
async fn process_query(&self, query_str: &str) -> anyhow::Result<ResultSet> {
let mut auth = self.auth.clone();
let jwt = auth.get_jwt()?;
let secret = jwt.expose_secret().clone();
let secret = jwt.expose_secret();
// TODO: for things other than SELECTs, the robust way to handle retrys is by
// generating a UUID from our end to mark the query as unique and then sending it with the request.
// If we need to retry, send same UUID with retry=true parameter set and Snowflake should prevent duplicate execution.
Expand Down Expand Up @@ -216,7 +216,7 @@ impl SnowflakeQueryExecutor {
) -> anyhow::Result<QueryAttemptResult> {
let mut auth = self.auth.clone();
let jwt = auth.get_jwt()?;
let secret = jwt.expose_secret().clone();
let secret = jwt.expose_secret();
let response = self
.reqwest_client
.get(format!(
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-snowflake/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl SnowflakeRecordStream {
self.partition_number += 1;
self.partition_index = 0;
let partition_number = self.partition_number;
let secret = self.auth.get_jwt()?.expose_secret().clone();
let secret = self.auth.get_jwt()?.expose_secret();
let statement_handle = self.result_set.statementHandle.clone();
let url = self.endpoint_url.clone();
println!("Secret: {:#?}", secret);
Expand Down

0 comments on commit fab4786

Please sign in to comment.