Skip to content

Commit

Permalink
api validation for sqlserver
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 4, 2023
1 parent af126b5 commit a8f1f1f
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 18 deletions.
7 changes: 7 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ func (h *FlowRequestHandler) CreatePeer(
}
sfConfig := sfConfigObject.SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)
case protos.DBType_SQLSERVER:
sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig)
if !ok {
return wrongConfigResponse, nil
}
sqlServerConfig := sqlServerConfigObject.SqlserverConfig
encodedConfig, encodingErr = proto.Marshal(sqlServerConfig)

default:
return wrongConfigResponse, nil
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,17 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)

case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
// case protos.DBType_SQLSERVER:
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
Expand Down
23 changes: 23 additions & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ use pt::{
use serde_json::Value;
use tonic_health::pb::health_client;

pub enum PeerValidationResult {
Valid,
Invalid(String),
}

pub struct FlowGrpcClient {
client: peerdb_route::flow_service_client::FlowServiceClient<tonic::transport::Channel>,
health_client: health_client::HealthClient<tonic::transport::Channel>,
Expand Down Expand Up @@ -82,6 +87,24 @@ impl FlowGrpcClient {
Ok(workflow_id)
}

pub async fn validate_peer(
&mut self,
validate_request: &pt::peerdb_route::ValidatePeerRequest,
) -> anyhow::Result<PeerValidationResult> {
let validate_peer_req = pt::peerdb_route::ValidatePeerRequest {
peer: validate_request.peer.clone(),
};
let response = self.client.validate_peer(validate_peer_req).await?;
let response_body = &response.into_inner();
let message = response_body.message.clone();
let status = response_body.status;
if status == pt::peerdb_route::ValidatePeerStatus::Valid as i32 {
Ok(PeerValidationResult::Valid)
} else {
Ok(PeerValidationResult::Invalid(message))
}
}

async fn start_peer_flow(
&mut self,
peer_flow_config: pt::peerdb_flow::FlowConnectionConfigs,
Expand Down
64 changes: 48 additions & 16 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use catalog::{Catalog, CatalogConfig, WorkflowDetails};
use clap::Parser;
use cursor::PeerCursors;
use dashmap::DashMap;
use flow_rs::grpc::FlowGrpcClient;
use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult};
use peer_bigquery::BigQueryQueryExecutor;
use peer_connections::{PeerConnectionTracker, PeerConnections};
use peer_cursor::{
Expand Down Expand Up @@ -153,7 +153,6 @@ impl NexusBackend {
let unsupported_peer_types = vec![
4, // EVENTHUB
5, // S3
6, // SQLSERVER
7, // EVENTHUBGROUP
];
!unsupported_peer_types.contains(&peer_type)
Expand Down Expand Up @@ -218,20 +217,53 @@ impl NexusBackend {
} => {
let peer_type = peer.r#type;
if Self::is_peer_validity_supported(peer_type) {
let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer executor: {:?}", err),
}))
})?;
peer_executor.is_connection_valid().await.map_err(|e| {
self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", e),
)))
})?;
self.executors.remove(&peer.name);
if peer_type != 6 {
let peer_executor =
self.get_peer_executor(&peer).await.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer executor: {:?}", err),
}))
})?;
peer_executor.is_connection_valid().await.map_err(|e| {
self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", e),
)))
})?;
self.executors.remove(&peer.name);
} else {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to check peer validity: {:?}",
err
),
}))
})?;
if let PeerValidationResult::Invalid(validation_err) = validity {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"error".to_owned(),
format!(
"Peer: {:?} is invalid: {:#?}",
peer.name, validation_err
),
))));
}
}
}

let catalog = self.catalog.lock().await;
Expand Down

0 comments on commit a8f1f1f

Please sign in to comment.