diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index a00759ee09..8579f608db 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -190,6 +190,11 @@ func (c *BigQueryConnector) Close() error { // ConnectionActive returns true if the connection is active. func (c *BigQueryConnector) ConnectionActive() bool { + _, err := c.client.Dataset(c.datasetID).Metadata(c.ctx) + if err != nil { + return false + } + return c.client != nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 3d1c226737..87627fec94 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -221,17 +221,18 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name) } return connsnowflake.NewSnowflakeConnector(ctx, sfConfig) - + case protos.DBType_S3: + s3Config := peer.GetS3Config() + if s3Config == nil { + return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name) + } + return conns3.NewS3Connector(ctx, s3Config) case protos.DBType_SQLSERVER: sqlServerConfig := peer.GetSqlserverConfig() if sqlServerConfig == nil { return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name) } return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig) - // case protos.DBType_S3: - // return conns3.NewS3Connector(ctx, config.GetS3Config()) - // case protos.DBType_EVENTHUB: - // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) default: return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String()) } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 953f44f197..a044af1c81 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -71,7 +71,7 @@ func (c *EventHubConnector) Close() error { } func (c *EventHubConnector) ConnectionActive() bool { - return true + return c.pgMetadata.Ping() } func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error { diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 6ec7c07142..673b185855 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -39,6 +39,16 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf }, nil } +func (p *PostgresMetadataStore) Ping() bool { + err := p.pool.Ping(p.ctx) + if err != nil { + log.Errorf("failed to connect to metadata db: %v", err) + return false + } + + return true +} + func (p *PostgresMetadataStore) Close() error { if p.pool != nil { p.pool.Close() diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 51e163f13e..b0aeee6076 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -88,8 +88,9 @@ func (c *S3Connector) Close() error { } func (c *S3Connector) ConnectionActive() bool { - _, err := c.client.ListBuckets(nil) - return err == nil + _, listErr := c.client.ListBuckets(nil) + pinged := c.pgMetadata.Ping() + return listErr == nil && pinged } func (c *S3Connector) NeedsSetupMetadataTables() bool { diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 88692f3853..b642562ba4 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -81,7 +81,7 @@ impl FlowGrpcClient { ) -> anyhow::Result { let create_qrep_flow_req = pt::peerdb_route::CreateQRepFlowRequest { qrep_config: Some(qrep_config.clone()), - create_catalog_entry:false + create_catalog_entry: false, }; let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?; let workflow_id = response.into_inner().worflow_id; diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index e663b132ce..a6b6b16544 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -152,8 +152,7 @@ impl NexusBackend { fn is_peer_validity_supported(peer_type: i32) -> bool { let unsupported_peer_types = [ 4, // EVENTHUB - 5, // S3 - 7, + 7, // EVENTHUB_GROUP ]; !unsupported_peer_types.contains(&peer_type) } @@ -204,50 +203,32 @@ impl NexusBackend { } } - async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> { - if peer_type != 6 { - let peer_executor = self.get_peer_executor(peer).await.map_err(|err| { + async fn validate_peer<'a>(&self, peer: &Peer) -> anyhow::Result<()> { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let validate_request = pt::peerdb_route::ValidatePeerRequest { + peer: Some(Peer { + name: peer.name.clone(), + r#type: peer.r#type, + config: peer.config.clone(), + }), + }; + let validity = flow_handler + .validate_peer(&validate_request) + .await + .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer executor: {:?}", err), + err_msg: format!("unable to check peer validity: {:?}", err), })) })?; - peer_executor.is_connection_valid().await.map_err(|e| { - self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - format!("[peer]: invalid configuration: {}", e), - ))) - })?; - self.executors.remove(&peer.name); - Ok(()) + if let PeerValidationResult::Invalid(validation_err) = validity { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + format!("[peer]: invalid configuration: {}", validation_err), + ))) + .into()) } else { - let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let validate_request = pt::peerdb_route::ValidatePeerRequest { - peer: Some(Peer { - name: peer.name.clone(), - r#type: peer.r#type, - config: peer.config.clone(), - }), - }; - let validity = flow_handler - .validate_peer(&validate_request) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to check peer validity: {:?}", err), - })) - })?; - if let PeerValidationResult::Invalid(validation_err) = validity { - Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - format!("[peer]: invalid configuration: {}", validation_err), - ))) - .into()) - } else { - Ok(()) - } + Ok(()) } } @@ -264,7 +245,7 @@ impl NexusBackend { } => { let peer_type = peer.r#type; if Self::is_peer_validity_supported(peer_type) { - self.validate_peer(peer_type, peer).await.map_err(|e| { + self.validate_peer(peer).await.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), "internal_error".to_owned(),