Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better peer checks #572

Merged
merged 5 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
  • Loading branch information
Amogh-Bharadwaj committed Dec 1, 2023
commit c23c9a7fa2fd8adb7da7e70ec72d8b123dde0ad7
6 changes: 3 additions & 3 deletions nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ const QREP_OPTIONS: &[QRepOptionType] = &[
QRepOptionType::Boolean {
name: "setup_watermark_table_on_destination",
default_value: false,
required: false
required: false,
},
QRepOptionType::Boolean {
name: "dst_table_full_resync",
default_value: false,
required: false
}
required: false,
},
];

pub fn process_options(
Expand Down
3 changes: 2 additions & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts: HashMap<String, Value> = row.get::<&str, Option<Value>>("flow_metadata")
let flow_opts: HashMap<String, Value> = row
.get::<&str, Option<Value>>("flow_metadata")
.and_then(|flow_opts| serde_json::from_value(flow_opts).ok())
.unwrap_or_default();

Expand Down
1 change: 0 additions & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

use catalog::WorkflowDetails;
use pt::{
flow_model::{FlowJob, QRepFlowJob},
Expand Down
89 changes: 45 additions & 44 deletions nexus/peer-bigquery/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use std::ops::ControlFlow;
use sqlparser::ast::Value::Number;

use sqlparser::ast::{
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut,
Array, ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr,
Function, FunctionArg, FunctionArgExpr, Ident,
ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, Array,
ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg,
FunctionArgExpr, Ident, ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
};

#[derive(Default)]
Expand Down Expand Up @@ -99,11 +98,7 @@ impl BigqueryAst {

visit_expressions_mut(query, |node| {
// CAST AS Text to CAST AS String
if let Expr::Cast {
data_type: dt,
..
} = node
{
if let Expr::Cast { data_type: dt, .. } = node {
if let DataType::Text = dt {
*dt = DataType::String(None);
}
Expand Down Expand Up @@ -177,7 +172,6 @@ impl BigqueryAst {
distinct: false,
special: false,
order_by: vec![],

})
} else if let BinaryOperator::Plus = op {
*node = Expr::Function(Function {
Expand Down Expand Up @@ -241,7 +235,12 @@ impl BigqueryAst {

// flatten ANY to IN operation overall.
visit_expressions_mut(query, |node| {
if let Expr::AnyOp { left, compare_op, right } = node {
if let Expr::AnyOp {
left,
compare_op,
right,
} = node
{
if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) {
let list = self
.flatten_expr_to_in_list(right)
Expand All @@ -257,8 +256,6 @@ impl BigqueryAst {
ControlFlow::<()>::Continue(())
});



Ok(())
}

Expand Down Expand Up @@ -300,7 +297,10 @@ impl BigqueryAst {
fn flatten_expr_to_in_list(&self, expr: &Expr) -> anyhow::Result<Vec<Expr>> {
let mut list = vec![];
// check if expr is of type Cast
if let Expr::Cast { expr, data_type, .. } = expr {
if let Expr::Cast {
expr, data_type, ..
} = expr
{
// assert that expr is of type SingleQuotedString
if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() {
// trim the starting and ending curly braces
Expand All @@ -309,39 +309,40 @@ impl BigqueryAst {
let split = s.split(',');
// match on data type, and create a vector of Expr::Value
match data_type {
DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) |
DataType::Array(ArrayElemTypeDef::SquareBracket(inner))
=> match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::SingleQuotedString(
s.to_string(),
)));
DataType::Array(ArrayElemTypeDef::AngleBracket(inner))
| DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => {
match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(
sqlparser::ast::Value::SingleQuotedString(s.to_string()),
));
}
}
}
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
},
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported data type for IN list: {:?}",
Expand Down
10 changes: 8 additions & 2 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ impl BqSchema {
.schema
.as_ref()
.expect("Schema is not present");
let fields = bq_schema.fields.as_ref().expect("Schema fields are not present");
let fields = bq_schema
.fields
.as_ref()
.expect("Schema fields are not present");

let schema = SchemaRef::new(Schema {
fields: fields
Expand All @@ -76,7 +79,10 @@ impl BqSchema {
.collect(),
});

Self { schema, fields: fields.clone() }
Self {
schema,
fields: fields.clone(),
}
}

pub fn schema(&self) -> SchemaRef {
Expand Down
4 changes: 1 addition & 3 deletions nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ impl SnowflakeAuth {
let pkey = match password {
Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(private_key, pw)
.context("Invalid private key or decryption failed")?,
None => {
DecodePrivateKey::from_pkcs8_pem(private_key).context("Invalid private key")?
}
None => DecodePrivateKey::from_pkcs8_pem(private_key).context("Invalid private key")?,
};
let mut snowflake_auth: SnowflakeAuth = SnowflakeAuth {
// moved normalized_account_id above account_id to satisfy the borrow checker.
Expand Down
68 changes: 27 additions & 41 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,50 +208,36 @@ impl NexusBackend {
}
}

async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> {
if peer_type != 6 {
let peer_executor = self.get_peer_executor(peer).await.map_err(|err| {
async fn validate_peer<'a>(&self, peer: &Peer) -> anyhow::Result<()> {
//if flow handler does not exist, skip validation
if self.flow_handler.is_none() {
return Ok(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this always happen? A lot of our peer types are not usable from the query layer and having flow not validate them doesn't seem strong enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only Eventhub and Eventhub Group are not covered by our validation API

}
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer executor: {:?}", err),
err_msg: format!("unable to check peer validity: {:?}", err),
}))
})?;
peer_executor.is_connection_valid().await.map_err(|e| {
self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", e),
)))
})?;
self.executors.remove(&peer.name);
Ok(())
if let PeerValidationResult::Invalid(validation_err) = validity {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", validation_err),
)))
.into())
} else {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to check peer validity: {:?}", err),
}))
})?;
if let PeerValidationResult::Invalid(validation_err) = validity {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", validation_err),
)))
.into())
} else {
Ok(())
}
Ok(())
}
}

Expand Down Expand Up @@ -410,7 +396,7 @@ impl NexusBackend {
} => {
let peer_type = peer.r#type;
if Self::is_peer_validity_supported(peer_type) {
self.validate_peer(peer_type,peer).await.map_err(|e| {
self.validate_peer(peer).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
Expand Down