Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 1, 2023
1 parent f780cfc commit 92b1830
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 95 deletions.
6 changes: 3 additions & 3 deletions nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ const QREP_OPTIONS: &[QRepOptionType] = &[
QRepOptionType::Boolean {
name: "setup_watermark_table_on_destination",
default_value: false,
required: false
required: false,
},
QRepOptionType::Boolean {
name: "dst_table_full_resync",
default_value: false,
required: false
}
required: false,
},
];

pub fn process_options(
Expand Down
3 changes: 2 additions & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts: HashMap<String, Value> = row.get::<&str, Option<Value>>("flow_metadata")
let flow_opts: HashMap<String, Value> = row
.get::<&str, Option<Value>>("flow_metadata")
.and_then(|flow_opts| serde_json::from_value(flow_opts).ok())
.unwrap_or_default();

Expand Down
1 change: 0 additions & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

use catalog::WorkflowDetails;
use pt::{
flow_model::{FlowJob, QRepFlowJob},
Expand Down
89 changes: 45 additions & 44 deletions nexus/peer-bigquery/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use std::ops::ControlFlow;
use sqlparser::ast::Value::Number;

use sqlparser::ast::{
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut,
Array, ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr,
Function, FunctionArg, FunctionArgExpr, Ident,
ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, Array,
ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg,
FunctionArgExpr, Ident, ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
};

#[derive(Default)]
Expand Down Expand Up @@ -99,11 +98,7 @@ impl BigqueryAst {

visit_expressions_mut(query, |node| {
// CAST AS Text to CAST AS String
if let Expr::Cast {
data_type: dt,
..
} = node
{
if let Expr::Cast { data_type: dt, .. } = node {
if let DataType::Text = dt {
*dt = DataType::String(None);
}
Expand Down Expand Up @@ -177,7 +172,6 @@ impl BigqueryAst {
distinct: false,
special: false,
order_by: vec![],

})
} else if let BinaryOperator::Plus = op {
*node = Expr::Function(Function {
Expand Down Expand Up @@ -241,7 +235,12 @@ impl BigqueryAst {

// flatten ANY to IN operation overall.
visit_expressions_mut(query, |node| {
if let Expr::AnyOp { left, compare_op, right } = node {
if let Expr::AnyOp {
left,
compare_op,
right,
} = node
{
if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) {
let list = self
.flatten_expr_to_in_list(right)
Expand All @@ -257,8 +256,6 @@ impl BigqueryAst {
ControlFlow::<()>::Continue(())
});



Ok(())
}

Expand Down Expand Up @@ -300,7 +297,10 @@ impl BigqueryAst {
fn flatten_expr_to_in_list(&self, expr: &Expr) -> anyhow::Result<Vec<Expr>> {
let mut list = vec![];
// check if expr is of type Cast
if let Expr::Cast { expr, data_type, .. } = expr {
if let Expr::Cast {
expr, data_type, ..
} = expr
{
// assert that expr is of type SingleQuotedString
if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() {
// trim the starting and ending curly braces
Expand All @@ -309,39 +309,40 @@ impl BigqueryAst {
let split = s.split(',');
// match on data type, and create a vector of Expr::Value
match data_type {
DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) |
DataType::Array(ArrayElemTypeDef::SquareBracket(inner))
=> match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::SingleQuotedString(
s.to_string(),
)));
DataType::Array(ArrayElemTypeDef::AngleBracket(inner))
| DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => {
match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(
sqlparser::ast::Value::SingleQuotedString(s.to_string()),
));
}
}
}
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
},
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported data type for IN list: {:?}",
Expand Down
10 changes: 8 additions & 2 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ impl BqSchema {
.schema
.as_ref()
.expect("Schema is not present");
let fields = bq_schema.fields.as_ref().expect("Schema fields are not present");
let fields = bq_schema
.fields
.as_ref()
.expect("Schema fields are not present");

let schema = SchemaRef::new(Schema {
fields: fields
Expand All @@ -76,7 +79,10 @@ impl BqSchema {
.collect(),
});

Self { schema, fields: fields.clone() }
Self {
schema,
fields: fields.clone(),
}
}

pub fn schema(&self) -> SchemaRef {
Expand Down
4 changes: 1 addition & 3 deletions nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ impl SnowflakeAuth {
let pkey = match password {
Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(private_key, pw)
.context("Invalid private key or decryption failed")?,
None => {
DecodePrivateKey::from_pkcs8_pem(private_key).context("Invalid private key")?
}
None => DecodePrivateKey::from_pkcs8_pem(private_key).context("Invalid private key")?,
};
let mut snowflake_auth: SnowflakeAuth = SnowflakeAuth {
// moved normalized_account_id above account_id to satisfy the borrow checker.
Expand Down
68 changes: 27 additions & 41 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,50 +208,36 @@ impl NexusBackend {
}
}

async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> {
if peer_type != 6 {
let peer_executor = self.get_peer_executor(peer).await.map_err(|err| {
async fn validate_peer<'a>(&self, peer: &Peer) -> anyhow::Result<()> {
//if flow handler does not exist, skip validation
if self.flow_handler.is_none() {
return Ok(());
}
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer executor: {:?}", err),
err_msg: format!("unable to check peer validity: {:?}", err),
}))
})?;
peer_executor.is_connection_valid().await.map_err(|e| {
self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", e),
)))
})?;
self.executors.remove(&peer.name);
Ok(())
if let PeerValidationResult::Invalid(validation_err) = validity {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", validation_err),
)))
.into())
} else {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to check peer validity: {:?}", err),
}))
})?;
if let PeerValidationResult::Invalid(validation_err) = validity {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", validation_err),
)))
.into())
} else {
Ok(())
}
Ok(())
}
}

Expand Down Expand Up @@ -410,7 +396,7 @@ impl NexusBackend {
} => {
let peer_type = peer.r#type;
if Self::is_peer_validity_supported(peer_type) {
self.validate_peer(peer_type,peer).await.map_err(|e| {
self.validate_peer(peer).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
Expand Down

0 comments on commit 92b1830

Please sign in to comment.