Skip to content

Commit

Permalink
refactor connectionactive to include error
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 1, 2023
1 parent c23c9a7 commit 92945d7
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 31 deletions.
8 changes: 4 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,12 @@ func (h *FlowRequestHandler) ValidatePeer(
}, nil
}

status := conn.ConnectionActive()
if !status {
_, connErr := conn.ConnectionActive()
if connErr != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish active connection to %s peer %s.",
req.Peer.Type, req.Peer.Name),
Message: fmt.Sprintf("failed to establish active connection to %s peer %s: %v",
req.Peer.Type, req.Peer.Name, connErr),
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ func (c *BigQueryConnector) Close() error {
}

// ConnectionActive returns true if the connection is active.
func (c *BigQueryConnector) ConnectionActive() bool {
func (c *BigQueryConnector) ConnectionActive() (bool, error) {
_, err := c.client.Dataset(c.datasetID).Metadata(c.ctx)
if err != nil {
return false
return false, fmt.Errorf("failed to get dataset metadata: %v", err)
}

return c.client != nil
return c.client != nil, fmt.Errorf("BigQuery client is nil")
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var ErrUnsupportedFunctionality = errors.New("requested connector does not suppo

type Connector interface {
Close() error
ConnectionActive() bool
ConnectionActive() (bool, error)
}

type CDCPullConnector interface {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (c *EventHubConnector) Close() error {
return allErrors
}

func (c *EventHubConnector) ConnectionActive() bool {
return true
func (c *EventHubConnector) ConnectionActive() (bool, error) {
return true, nil
}

func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ func (c *PostgresConnector) Close() error {
}

// ConnectionActive returns true if the connection is active.
func (c *PostgresConnector) ConnectionActive() bool {
func (c *PostgresConnector) ConnectionActive() (bool, error) {
if c.pool == nil {
return false
return false, fmt.Errorf("connection pool is nil")
}
return c.pool.Ping(c.ctx) == nil
pingErr := c.pool.Ping(c.ctx)
return pingErr == nil, pingErr
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
Expand Down
20 changes: 12 additions & 8 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func NewS3Connector(ctx context.Context,
return nil, err
}

validErr := ValidCheck(s3Client, config.Url, pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return nil, validErr
}

return &S3Connector{
ctx: ctx,
url: config.Url,
Expand Down Expand Up @@ -140,9 +134,19 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos
return nil
}

func (c *S3Connector) ConnectionActive() bool {
func (c *S3Connector) ConnectionActive() (bool, error) {
_, listErr := c.client.ListBuckets(nil)
return listErr == nil
if listErr != nil {
return false, listErr
}

validErr := ValidCheck(&c.client, c.url, c.pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return false, validErr
}

return listErr == nil, listErr
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,12 @@ func (c *SnowflakeConnector) Close() error {
return nil
}

func (c *SnowflakeConnector) ConnectionActive() bool {
func (c *SnowflakeConnector) ConnectionActive() (bool, error) {
if c == nil || c.database == nil {
return false
return false, fmt.Errorf("SnowflakeConnector is nil")
}
return c.database.PingContext(c.ctx) == nil
err := c.database.PingContext(c.ctx)
return err == nil, err
}

func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type SQLQueryExecutor interface {
ConnectionActive() bool
ConnectionActive() (bool, error)
Close() error

CreateSchema(schemaName string) error
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (c *SQLServerConnector) Close() error {
}

// ConnectionActive checks if the connection is still active
func (c *SQLServerConnector) ConnectionActive() bool {
func (c *SQLServerConnector) ConnectionActive() (bool, error) {
if err := c.db.Ping(); err != nil {
return false
return false, err
}
return true
return true, nil
}
6 changes: 3 additions & 3 deletions flow/e2e/sqlserver/sqlserver_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func NewSQLServerHelper(name string) (*SQLServerHelper, error) {
return nil, err
}

testConn := connector.ConnectionActive()
if !testConn {
return nil, fmt.Errorf("invalid connection configs")
_, connErr := connector.ConnectionActive()
if connErr != nil {
return nil, fmt.Errorf("invalid connection configs: %v", connErr)
}

rndNum, err := util.RandomUInt64()
Expand Down

0 comments on commit 92945d7

Please sign in to comment.