From a8f1f1f23d3adf782836edca326935f268f358af Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 4 Oct 2023 23:11:56 +0530 Subject: [PATCH] api validation for sqlserver --- flow/cmd/handler.go | 7 +++++ flow/connectors/core.go | 9 ++++-- nexus/flow-rs/src/grpc.rs | 23 ++++++++++++++ nexus/server/src/main.rs | 64 +++++++++++++++++++++++++++++---------- 4 files changed, 85 insertions(+), 18 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 510d92b635..b3c74ab634 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -362,6 +362,13 @@ func (h *FlowRequestHandler) CreatePeer( } sfConfig := sfConfigObject.SnowflakeConfig encodedConfig, encodingErr = proto.Marshal(sfConfig) + case protos.DBType_SQLSERVER: + sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig) + if !ok { + return wrongConfigResponse, nil + } + sqlServerConfig := sqlServerConfigObject.SqlserverConfig + encodedConfig, encodingErr = proto.Marshal(sqlServerConfig) default: return wrongConfigResponse, nil diff --git a/flow/connectors/core.go b/flow/connectors/core.go index dbc040a5b2..c31a0f5304 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -219,12 +219,17 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name) } return connsnowflake.NewSnowflakeConnector(ctx, sfConfig) + + case protos.DBType_SQLSERVER: + sqlServerConfig := peer.GetSqlserverConfig() + if sqlServerConfig == nil { + return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name) + } + return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig) // case protos.DBType_S3: // return conns3.NewS3Connector(ctx, config.GetS3Config()) // case protos.DBType_EVENTHUB: // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) - // case protos.DBType_SQLSERVER: - // return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig()) default: return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String()) } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index f15b431066..d2df4c7b7e 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -10,6 +10,11 @@ use pt::{ use serde_json::Value; use tonic_health::pb::health_client; +pub enum PeerValidationResult { + Valid, + Invalid(String), +} + pub struct FlowGrpcClient { client: peerdb_route::flow_service_client::FlowServiceClient, health_client: health_client::HealthClient, @@ -82,6 +87,24 @@ impl FlowGrpcClient { Ok(workflow_id) } + pub async fn validate_peer( + &mut self, + validate_request: &pt::peerdb_route::ValidatePeerRequest, + ) -> anyhow::Result { + let validate_peer_req = pt::peerdb_route::ValidatePeerRequest { + peer: validate_request.peer.clone(), + }; + let response = self.client.validate_peer(validate_peer_req).await?; + let response_body = &response.into_inner(); + let message = response_body.message.clone(); + let status = response_body.status; + if status == pt::peerdb_route::ValidatePeerStatus::Valid as i32 { + Ok(PeerValidationResult::Valid) + } else { + Ok(PeerValidationResult::Invalid(message)) + } + } + async fn start_peer_flow( &mut self, peer_flow_config: pt::peerdb_flow::FlowConnectionConfigs, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 6a29910db9..681ac78d9a 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -7,7 +7,7 @@ use catalog::{Catalog, CatalogConfig, WorkflowDetails}; use clap::Parser; use cursor::PeerCursors; use dashmap::DashMap; -use flow_rs::grpc::FlowGrpcClient; +use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult}; use peer_bigquery::BigQueryQueryExecutor; use peer_connections::{PeerConnectionTracker, PeerConnections}; use peer_cursor::{ @@ -153,7 +153,6 @@ impl NexusBackend { let unsupported_peer_types = vec![ 4, // EVENTHUB 5, // S3 - 6, // SQLSERVER 7, // EVENTHUBGROUP ]; !unsupported_peer_types.contains(&peer_type) @@ -218,20 +217,53 @@ impl NexusBackend { } => { let peer_type = peer.r#type; if Self::is_peer_validity_supported(peer_type) { - let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer executor: {:?}", err), - })) - })?; - peer_executor.is_connection_valid().await.map_err(|e| { - self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - format!("[peer]: invalid configuration: {}", e), - ))) - })?; - self.executors.remove(&peer.name); + if peer_type != 6 { + let peer_executor = + self.get_peer_executor(&peer).await.map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get peer executor: {:?}", err), + })) + })?; + peer_executor.is_connection_valid().await.map_err(|e| { + self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + format!("[peer]: invalid configuration: {}", e), + ))) + })?; + self.executors.remove(&peer.name); + } else { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let validate_request = pt::peerdb_route::ValidatePeerRequest { + peer: Some(Peer { + name: peer.name.clone(), + r#type: peer.r#type, + config: peer.config.clone(), + }), + }; + let validity = flow_handler + .validate_peer(&validate_request) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to check peer validity: {:?}", + err + ), + })) + })?; + if let PeerValidationResult::Invalid(validation_err) = validity { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!( + "Peer: {:?} is invalid: {:#?}", + peer.name, validation_err + ), + )))); + } + } } let catalog = self.catalog.lock().await;