From a2bd85dbab1e92944be92df3a6f321351a4be6c7 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 4 Oct 2023 23:11:56 +0530 Subject: [PATCH 1/3] api validation for sqlserver --- flow/cmd/handler.go | 7 +++++ flow/connectors/core.go | 9 ++++-- nexus/flow-rs/src/grpc.rs | 23 ++++++++++++++ nexus/server/src/main.rs | 64 +++++++++++++++++++++++++++++---------- 4 files changed, 85 insertions(+), 18 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 510d92b63..b3c74ab63 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -362,6 +362,13 @@ func (h *FlowRequestHandler) CreatePeer( } sfConfig := sfConfigObject.SnowflakeConfig encodedConfig, encodingErr = proto.Marshal(sfConfig) + case protos.DBType_SQLSERVER: + sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig) + if !ok { + return wrongConfigResponse, nil + } + sqlServerConfig := sqlServerConfigObject.SqlserverConfig + encodedConfig, encodingErr = proto.Marshal(sqlServerConfig) default: return wrongConfigResponse, nil diff --git a/flow/connectors/core.go b/flow/connectors/core.go index dbc040a5b..c31a0f530 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -219,12 +219,17 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name) } return connsnowflake.NewSnowflakeConnector(ctx, sfConfig) + + case protos.DBType_SQLSERVER: + sqlServerConfig := peer.GetSqlserverConfig() + if sqlServerConfig == nil { + return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name) + } + return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig) // case protos.DBType_S3: // return conns3.NewS3Connector(ctx, config.GetS3Config()) // case protos.DBType_EVENTHUB: // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) - // case protos.DBType_SQLSERVER: - // return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig()) default: return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String()) } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index f15b43106..d2df4c7b7 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -10,6 +10,11 @@ use pt::{ use serde_json::Value; use tonic_health::pb::health_client; +pub enum PeerValidationResult { + Valid, + Invalid(String), +} + pub struct FlowGrpcClient { client: peerdb_route::flow_service_client::FlowServiceClient, health_client: health_client::HealthClient, @@ -82,6 +87,24 @@ impl FlowGrpcClient { Ok(workflow_id) } + pub async fn validate_peer( + &mut self, + validate_request: &pt::peerdb_route::ValidatePeerRequest, + ) -> anyhow::Result { + let validate_peer_req = pt::peerdb_route::ValidatePeerRequest { + peer: validate_request.peer.clone(), + }; + let response = self.client.validate_peer(validate_peer_req).await?; + let response_body = &response.into_inner(); + let message = response_body.message.clone(); + let status = response_body.status; + if status == pt::peerdb_route::ValidatePeerStatus::Valid as i32 { + Ok(PeerValidationResult::Valid) + } else { + Ok(PeerValidationResult::Invalid(message)) + } + } + async fn start_peer_flow( &mut self, peer_flow_config: pt::peerdb_flow::FlowConnectionConfigs, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 6a29910db..681ac78d9 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -7,7 +7,7 @@ use catalog::{Catalog, CatalogConfig, WorkflowDetails}; use clap::Parser; use cursor::PeerCursors; use dashmap::DashMap; -use flow_rs::grpc::FlowGrpcClient; +use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult}; use peer_bigquery::BigQueryQueryExecutor; use peer_connections::{PeerConnectionTracker, PeerConnections}; use peer_cursor::{ @@ -153,7 +153,6 @@ impl NexusBackend { let unsupported_peer_types = vec![ 4, // EVENTHUB 5, // S3 - 6, // SQLSERVER 7, // EVENTHUBGROUP ]; !unsupported_peer_types.contains(&peer_type) @@ -218,20 +217,53 @@ impl NexusBackend { } => { let peer_type = peer.r#type; if Self::is_peer_validity_supported(peer_type) { - let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer executor: {:?}", err), - })) - })?; - peer_executor.is_connection_valid().await.map_err(|e| { - self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - format!("[peer]: invalid configuration: {}", e), - ))) - })?; - self.executors.remove(&peer.name); + if peer_type != 6 { + let peer_executor = + self.get_peer_executor(&peer).await.map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get peer executor: {:?}", err), + })) + })?; + peer_executor.is_connection_valid().await.map_err(|e| { + self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + format!("[peer]: invalid configuration: {}", e), + ))) + })?; + self.executors.remove(&peer.name); + } else { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let validate_request = pt::peerdb_route::ValidatePeerRequest { + peer: Some(Peer { + name: peer.name.clone(), + r#type: peer.r#type, + config: peer.config.clone(), + }), + }; + let validity = flow_handler + .validate_peer(&validate_request) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to check peer validity: {:?}", + err + ), + })) + })?; + if let PeerValidationResult::Invalid(validation_err) = validity { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!( + "Peer: {:?} is invalid: {:#?}", + peer.name, validation_err + ), + )))); + } + } } let catalog = self.catalog.lock().await; From f9f56893c41b93b3564dd7ae1c645ffd23679ce6 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 4 Oct 2023 23:42:06 +0530 Subject: [PATCH 2/3] moves validate peer to method --- nexus/server/src/main.rs | 101 +++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 47 deletions(-) diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 681ac78d9..60c13f9fb 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -204,6 +204,53 @@ impl NexusBackend { } } + async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> { + if peer_type != 6 { + let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get peer executor: {:?}", err), + })) + })?; + peer_executor.is_connection_valid().await.map_err(|e| { + self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + format!("[peer]: invalid configuration: {}", e), + ))) + })?; + self.executors.remove(&peer.name); + Ok(()) + } else { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let validate_request = pt::peerdb_route::ValidatePeerRequest { + peer: Some(Peer { + name: peer.name.clone(), + r#type: peer.r#type, + config: peer.config.clone(), + }), + }; + let validity = flow_handler + .validate_peer(&validate_request) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to check peer validity: {:?}", err), + })) + })?; + if let PeerValidationResult::Invalid(validation_err) = validity { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + format!("[peer]: invalid configuration: {}", validation_err), + ))) + .into()); + } else { + return Ok(()); + } + } + } + async fn handle_query<'a>( &self, nexus_stmt: NexusStatement, @@ -217,53 +264,13 @@ impl NexusBackend { } => { let peer_type = peer.r#type; if Self::is_peer_validity_supported(peer_type) { - if peer_type != 6 { - let peer_executor = - self.get_peer_executor(&peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer executor: {:?}", err), - })) - })?; - peer_executor.is_connection_valid().await.map_err(|e| { - self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - format!("[peer]: invalid configuration: {}", e), - ))) - })?; - self.executors.remove(&peer.name); - } else { - let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let validate_request = pt::peerdb_route::ValidatePeerRequest { - peer: Some(Peer { - name: peer.name.clone(), - r#type: peer.r#type, - config: peer.config.clone(), - }), - }; - let validity = flow_handler - .validate_peer(&validate_request) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to check peer validity: {:?}", - err - ), - })) - })?; - if let PeerValidationResult::Invalid(validation_err) = validity { - return Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "error".to_owned(), - format!( - "Peer: {:?} is invalid: {:#?}", - peer.name, validation_err - ), - )))); - } - } + self.validate_peer(peer_type, &peer).await.map_err(|e| { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + e.to_string(), + ))) + })?; } let catalog = self.catalog.lock().await; From 0a81878957b083992bdd39768d6b395b619b39cb Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 4 Oct 2023 23:51:38 +0530 Subject: [PATCH 3/3] minor changes --- nexus/server/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 60c13f9fb..bb7ea02f7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -206,7 +206,7 @@ impl NexusBackend { async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> { if peer_type != 6 { - let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| { + let peer_executor = self.get_peer_executor(peer).await.map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { err_msg: format!("unable to get peer executor: {:?}", err), })) @@ -239,14 +239,14 @@ impl NexusBackend { })) })?; if let PeerValidationResult::Invalid(validation_err) = validity { - return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + Err(PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), "internal_error".to_owned(), format!("[peer]: invalid configuration: {}", validation_err), ))) - .into()); + .into()) } else { - return Ok(()); + Ok(()) } } }