Skip to content

Commit

Permalink
routes peer checks to api
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 1, 2023
1 parent 864e502 commit 29aa53a
Show file tree
Hide file tree
Showing 7 changed files with 485 additions and 383 deletions.
5 changes: 5 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ func (c *BigQueryConnector) Close() error {

// ConnectionActive returns true if the connection is active.
func (c *BigQueryConnector) ConnectionActive() bool {
_, err := c.client.Dataset(c.datasetID).Metadata(c.ctx)
if err != nil {
return false
}

return c.client != nil
}

Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,12 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)

case protos.DBType_S3:
s3Config := peer.GetS3Config()
if s3Config == nil {
return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name)
}
return conns3.NewS3Connector(ctx, s3Config)
case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *EventHubConnector) Close() error {
}

func (c *EventHubConnector) ConnectionActive() bool {
return true
return c.pgMetadata.Ping()
}

func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
Expand Down
10 changes: 10 additions & 0 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf
}, nil
}

func (p *PostgresMetadataStore) Ping() bool {
err := p.pool.Ping(p.ctx)
if err != nil {
log.Errorf("failed to connect to metadata db: %v", err)
return false
}

return true
}

func (p *PostgresMetadataStore) Close() error {
if p.pool != nil {
p.pool.Close()
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos
}

func (c *S3Connector) ConnectionActive() bool {
_, err := c.client.ListBuckets(nil)
return err == nil
_, listErr := c.client.ListBuckets(nil)
pinged := c.pgMetadata.Ping()
return listErr == nil && pinged
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
Expand Down
Loading

0 comments on commit 29aa53a

Please sign in to comment.