diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 510d92b63..b3c74ab63 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -362,6 +362,13 @@ func (h *FlowRequestHandler) CreatePeer( } sfConfig := sfConfigObject.SnowflakeConfig encodedConfig, encodingErr = proto.Marshal(sfConfig) + case protos.DBType_SQLSERVER: + sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig) + if !ok { + return wrongConfigResponse, nil + } + sqlServerConfig := sqlServerConfigObject.SqlserverConfig + encodedConfig, encodingErr = proto.Marshal(sqlServerConfig) default: return wrongConfigResponse, nil diff --git a/flow/connectors/core.go b/flow/connectors/core.go index dbc040a5b..c31a0f530 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -219,12 +219,17 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name) } return connsnowflake.NewSnowflakeConnector(ctx, sfConfig) + + case protos.DBType_SQLSERVER: + sqlServerConfig := peer.GetSqlserverConfig() + if sqlServerConfig == nil { + return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name) + } + return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig) // case protos.DBType_S3: // return conns3.NewS3Connector(ctx, config.GetS3Config()) // case protos.DBType_EVENTHUB: // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) - // case protos.DBType_SQLSERVER: - // return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig()) default: return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String()) } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index f15b43106..d2df4c7b7 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -10,6 +10,11 @@ use pt::{ use serde_json::Value; use tonic_health::pb::health_client; +pub enum PeerValidationResult { + Valid, + Invalid(String), +} + pub struct FlowGrpcClient { client: peerdb_route::flow_service_client::FlowServiceClient, health_client: health_client::HealthClient, @@ -82,6 +87,24 @@ impl FlowGrpcClient { Ok(workflow_id) } + pub async fn validate_peer( + &mut self, + validate_request: &pt::peerdb_route::ValidatePeerRequest, + ) -> anyhow::Result { + let validate_peer_req = pt::peerdb_route::ValidatePeerRequest { + peer: validate_request.peer.clone(), + }; + let response = self.client.validate_peer(validate_peer_req).await?; + let response_body = &response.into_inner(); + let message = response_body.message.clone(); + let status = response_body.status; + if status == pt::peerdb_route::ValidatePeerStatus::Valid as i32 { + Ok(PeerValidationResult::Valid) + } else { + Ok(PeerValidationResult::Invalid(message)) + } + } + async fn start_peer_flow( &mut self, peer_flow_config: pt::peerdb_flow::FlowConnectionConfigs, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 6a29910db..bb7ea02f7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -7,7 +7,7 @@ use catalog::{Catalog, CatalogConfig, WorkflowDetails}; use clap::Parser; use cursor::PeerCursors; use dashmap::DashMap; -use flow_rs::grpc::FlowGrpcClient; +use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult}; use peer_bigquery::BigQueryQueryExecutor; use peer_connections::{PeerConnectionTracker, PeerConnections}; use peer_cursor::{ @@ -153,7 +153,6 @@ impl NexusBackend { let unsupported_peer_types = vec![ 4, // EVENTHUB 5, // S3 - 6, // SQLSERVER 7, // EVENTHUBGROUP ]; !unsupported_peer_types.contains(&peer_type) @@ -205,6 +204,53 @@ impl NexusBackend { } } + async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> { + if peer_type != 6 { + let peer_executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to get peer executor: {:?}", err), + })) + })?; + peer_executor.is_connection_valid().await.map_err(|e| { + self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + format!("[peer]: invalid configuration: {}", e), + ))) + })?; + self.executors.remove(&peer.name); + Ok(()) + } else { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let validate_request = pt::peerdb_route::ValidatePeerRequest { + peer: Some(Peer { + name: peer.name.clone(), + r#type: peer.r#type, + config: peer.config.clone(), + }), + }; + let validity = flow_handler + .validate_peer(&validate_request) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to check peer validity: {:?}", err), + })) + })?; + if let PeerValidationResult::Invalid(validation_err) = validity { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + format!("[peer]: invalid configuration: {}", validation_err), + ))) + .into()) + } else { + Ok(()) + } + } + } + async fn handle_query<'a>( &self, nexus_stmt: NexusStatement, @@ -218,20 +264,13 @@ impl NexusBackend { } => { let peer_type = peer.r#type; if Self::is_peer_validity_supported(peer_type) { - let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer executor: {:?}", err), - })) - })?; - peer_executor.is_connection_valid().await.map_err(|e| { - self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor + self.validate_peer(peer_type, &peer).await.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), "internal_error".to_owned(), - format!("[peer]: invalid configuration: {}", e), + e.to_string(), ))) })?; - self.executors.remove(&peer.name); } let catalog = self.catalog.lock().await;