Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Peer Validation for SQL Server #483

Merged
merged 3 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ func (h *FlowRequestHandler) CreatePeer(
}
sfConfig := sfConfigObject.SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)
case protos.DBType_SQLSERVER:
sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig)
if !ok {
return wrongConfigResponse, nil
}
sqlServerConfig := sqlServerConfigObject.SqlserverConfig
encodedConfig, encodingErr = proto.Marshal(sqlServerConfig)

default:
return wrongConfigResponse, nil
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,17 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)

case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
// case protos.DBType_SQLSERVER:
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
Expand Down
23 changes: 23 additions & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ use pt::{
use serde_json::Value;
use tonic_health::pb::health_client;

pub enum PeerValidationResult {
Valid,
Invalid(String),
}

pub struct FlowGrpcClient {
client: peerdb_route::flow_service_client::FlowServiceClient<tonic::transport::Channel>,
health_client: health_client::HealthClient<tonic::transport::Channel>,
Expand Down Expand Up @@ -82,6 +87,24 @@ impl FlowGrpcClient {
Ok(workflow_id)
}

pub async fn validate_peer(
&mut self,
validate_request: &pt::peerdb_route::ValidatePeerRequest,
) -> anyhow::Result<PeerValidationResult> {
let validate_peer_req = pt::peerdb_route::ValidatePeerRequest {
peer: validate_request.peer.clone(),
};
let response = self.client.validate_peer(validate_peer_req).await?;
let response_body = &response.into_inner();
let message = response_body.message.clone();
let status = response_body.status;
if status == pt::peerdb_route::ValidatePeerStatus::Valid as i32 {
Ok(PeerValidationResult::Valid)
} else {
Ok(PeerValidationResult::Invalid(message))
}
}

async fn start_peer_flow(
&mut self,
peer_flow_config: pt::peerdb_flow::FlowConnectionConfigs,
Expand Down
61 changes: 50 additions & 11 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use catalog::{Catalog, CatalogConfig, WorkflowDetails};
use clap::Parser;
use cursor::PeerCursors;
use dashmap::DashMap;
use flow_rs::grpc::FlowGrpcClient;
use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult};
use peer_bigquery::BigQueryQueryExecutor;
use peer_connections::{PeerConnectionTracker, PeerConnections};
use peer_cursor::{
Expand Down Expand Up @@ -153,7 +153,6 @@ impl NexusBackend {
let unsupported_peer_types = vec![
4, // EVENTHUB
5, // S3
6, // SQLSERVER
7, // EVENTHUBGROUP
];
!unsupported_peer_types.contains(&peer_type)
Expand Down Expand Up @@ -205,6 +204,53 @@ impl NexusBackend {
}
}

async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> {
if peer_type != 6 {
let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [clippy] reported by reviewdog 🐶

warning: this expression creates a reference which is immediately dereferenced by the compiler
   --> server/src/main.rs:209:56
    |
209 |             let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| {
    |                                                        ^^^^^ help: change this to: `peer`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
    = note: `#[warn(clippy::needless_borrow)]` on by default

PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer executor: {:?}", err),
}))
})?;
peer_executor.is_connection_valid().await.map_err(|e| {
self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", e),
)))
})?;
self.executors.remove(&peer.name);
Ok(())
} else {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to check peer validity: {:?}", err),
}))
})?;
if let PeerValidationResult::Invalid(validation_err) = validity {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [clippy] reported by reviewdog 🐶

warning: unneeded `return` statement
   --> server/src/main.rs:242:17
    |
242 | /                 return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
243 | |                     "ERROR".to_owned(),
244 | |                     "internal_error".to_owned(),
245 | |                     format!("[peer]: invalid configuration: {}", validation_err),
246 | |                 )))
247 | |                 .into());
    | |________________________^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return
    = note: `#[warn(clippy::needless_return)]` on by default
help: remove `return`
    |
242 ~                 Err(PgWireError::UserError(Box::new(ErrorInfo::new(
243 +                     "ERROR".to_owned(),
244 +                     "internal_error".to_owned(),
245 +                     format!("[peer]: invalid configuration: {}", validation_err),
246 +                 )))
247 ~                 .into())
    |

"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", validation_err),
)))
.into());
} else {
return Ok(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [clippy] reported by reviewdog 🐶

warning: unneeded `return` statement
   --> server/src/main.rs:249:17
    |
249 |                 return Ok(());
    |                 ^^^^^^^^^^^^^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return
help: remove `return`
    |
249 -                 return Ok(());
249 +                 Ok(())
    |

}
}
}

async fn handle_query<'a>(
&self,
nexus_stmt: NexusStatement,
Expand All @@ -218,20 +264,13 @@ impl NexusBackend {
} => {
let peer_type = peer.r#type;
if Self::is_peer_validity_supported(peer_type) {
let peer_executor = self.get_peer_executor(&peer).await.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer executor: {:?}", err),
}))
})?;
peer_executor.is_connection_valid().await.map_err(|e| {
self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor
self.validate_peer(peer_type, &peer).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", e),
e.to_string(),
)))
})?;
self.executors.remove(&peer.name);
}

let catalog = self.catalog.lock().await;
Expand Down
Loading