Skip to content

Commit

Permalink
routes peer checks to api
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 25, 2023
1 parent 3b17895 commit d04193f
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 52 deletions.
5 changes: 5 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ func (c *BigQueryConnector) Close() error {

// ConnectionActive returns true if the connection is active.
func (c *BigQueryConnector) ConnectionActive() bool {
_, err := c.client.Dataset(c.datasetID).Metadata(c.ctx)
if err != nil {
return false
}

return c.client != nil
}

Expand Down
11 changes: 6 additions & 5 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,18 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)

case protos.DBType_S3:
s3Config := peer.GetS3Config()
if s3Config == nil {
return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name)
}
return conns3.NewS3Connector(ctx, s3Config)
case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *EventHubConnector) Close() error {
}

func (c *EventHubConnector) ConnectionActive() bool {
return true
return c.pgMetadata.Ping()
}

func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
Expand Down
10 changes: 10 additions & 0 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf
}, nil
}

func (p *PostgresMetadataStore) Ping() bool {
err := p.pool.Ping(p.ctx)
if err != nil {
log.Errorf("failed to connect to metadata db: %v", err)
return false
}

return true
}

func (p *PostgresMetadataStore) Close() error {
if p.pool != nil {
p.pool.Close()
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func (c *S3Connector) Close() error {
}

func (c *S3Connector) ConnectionActive() bool {
_, err := c.client.ListBuckets(nil)
return err == nil
_, listErr := c.client.ListBuckets(nil)
pinged := c.pgMetadata.Ping()
return listErr == nil && pinged
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
Expand Down
2 changes: 1 addition & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl FlowGrpcClient {
) -> anyhow::Result<String> {
let create_qrep_flow_req = pt::peerdb_route::CreateQRepFlowRequest {
qrep_config: Some(qrep_config.clone()),
create_catalog_entry:false
create_catalog_entry: false,
};
let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?;
let workflow_id = response.into_inner().worflow_id;
Expand Down
67 changes: 24 additions & 43 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl NexusBackend {
fn is_peer_validity_supported(peer_type: i32) -> bool {
let unsupported_peer_types = [
4, // EVENTHUB
5, // S3
7,
7, // EVENTHUB_GROUP
];
!unsupported_peer_types.contains(&peer_type)
}
Expand Down Expand Up @@ -204,50 +203,32 @@ impl NexusBackend {
}
}

async fn validate_peer<'a>(&self, peer_type: i32, peer: &Peer) -> anyhow::Result<()> {
if peer_type != 6 {
let peer_executor = self.get_peer_executor(peer).await.map_err(|err| {
async fn validate_peer<'a>(&self, peer: &Peer) -> anyhow::Result<()> {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer executor: {:?}", err),
err_msg: format!("unable to check peer validity: {:?}", err),
}))
})?;
peer_executor.is_connection_valid().await.map_err(|e| {
self.executors.remove(&peer.name); // Otherwise it will keep returning the earlier configured executor
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", e),
)))
})?;
self.executors.remove(&peer.name);
Ok(())
if let PeerValidationResult::Invalid(validation_err) = validity {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", validation_err),
)))
.into())
} else {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let validate_request = pt::peerdb_route::ValidatePeerRequest {
peer: Some(Peer {
name: peer.name.clone(),
r#type: peer.r#type,
config: peer.config.clone(),
}),
};
let validity = flow_handler
.validate_peer(&validate_request)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to check peer validity: {:?}", err),
}))
})?;
if let PeerValidationResult::Invalid(validation_err) = validity {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
format!("[peer]: invalid configuration: {}", validation_err),
)))
.into())
} else {
Ok(())
}
Ok(())
}
}

Expand All @@ -264,7 +245,7 @@ impl NexusBackend {
} => {
let peer_type = peer.r#type;
if Self::is_peer_validity_supported(peer_type) {
self.validate_peer(peer_type, peer).await.map_err(|e| {
self.validate_peer(peer).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
Expand Down

0 comments on commit d04193f

Please sign in to comment.