Skip to content

Commit

Permalink
return value of connectionactive now just error
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 1, 2023
1 parent ea32608 commit 9051209
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 26 deletions.
2 changes: 1 addition & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (h *FlowRequestHandler) ValidatePeer(
}, nil
}

_, connErr := conn.ConnectionActive()
connErr := conn.ConnectionActive()
if connErr != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,16 @@ func (c *BigQueryConnector) Close() error {
}

// ConnectionActive returns true if the connection is active.
func (c *BigQueryConnector) ConnectionActive() (bool, error) {
func (c *BigQueryConnector) ConnectionActive() error {
_, err := c.client.Dataset(c.datasetID).Metadata(c.ctx)
if err != nil {
return false, fmt.Errorf("failed to get dataset metadata: %v", err)
return fmt.Errorf("failed to get dataset metadata: %v", err)
}

return c.client != nil, fmt.Errorf("BigQuery client is nil")
if c.client == nil {
return fmt.Errorf("BigQuery client is nil")
}
return nil
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var ErrUnsupportedFunctionality = errors.New("requested connector does not suppo

type Connector interface {
Close() error
ConnectionActive() (bool, error)
ConnectionActive() error
}

type CDCPullConnector interface {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (c *EventHubConnector) Close() error {
return allErrors
}

func (c *EventHubConnector) ConnectionActive() (bool, error) {
return true, nil
func (c *EventHubConnector) ConnectionActive() error {
return nil
}

func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func (c *PostgresConnector) Close() error {
}

// ConnectionActive returns true if the connection is active.
func (c *PostgresConnector) ConnectionActive() (bool, error) {
func (c *PostgresConnector) ConnectionActive() error {
if c.pool == nil {
return false, fmt.Errorf("connection pool is nil")
return fmt.Errorf("connection pool is nil")
}
pingErr := c.pool.Ping(c.ctx)
return pingErr == nil, pingErr
return pingErr
}

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() {
err = teardownTx.Commit(context.Background())
require.NoError(suite.T(), err)

suite.True(suite.connector.ConnectionActive())
suite.True(suite.connector.ConnectionActive() == nil)

err = suite.connector.Close()
require.NoError(suite.T(), err)

suite.False(suite.connector.ConnectionActive())
suite.False(suite.connector.ConnectionActive() == nil)
}

func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() {
err = teardownTx.Commit(context.Background())
suite.failTestError(err)

suite.True(suite.connector.ConnectionActive())
suite.True(suite.connector.ConnectionActive() == nil)
err = suite.connector.Close()
suite.failTestError(err)
suite.False(suite.connector.ConnectionActive())
suite.False(suite.connector.ConnectionActive() == nil)
}

func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,19 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos
return nil
}

func (c *S3Connector) ConnectionActive() (bool, error) {
func (c *S3Connector) ConnectionActive() error {
_, listErr := c.client.ListBuckets(nil)
if listErr != nil {
return false, listErr
return listErr
}

validErr := ValidCheck(&c.client, c.url, c.pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return false, validErr
return validErr
}

return listErr == nil, listErr
return nil
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ func (c *SnowflakeConnector) Close() error {
return nil
}

func (c *SnowflakeConnector) ConnectionActive() (bool, error) {
func (c *SnowflakeConnector) ConnectionActive() error {
if c == nil || c.database == nil {
return false, fmt.Errorf("SnowflakeConnector is nil")
return fmt.Errorf("SnowflakeConnector is nil")
}

// This also checks if database exists
err := c.database.PingContext(c.ctx)
return err == nil, err
return err
}

func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type SQLQueryExecutor interface {
ConnectionActive() (bool, error)
ConnectionActive() error
Close() error

CreateSchema(schemaName string) error
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (c *SQLServerConnector) Close() error {
}

// ConnectionActive checks if the connection is still active
func (c *SQLServerConnector) ConnectionActive() (bool, error) {
func (c *SQLServerConnector) ConnectionActive() error {
if err := c.db.Ping(); err != nil {
return false, err
return err
}
return true, nil
return nil
}
2 changes: 1 addition & 1 deletion flow/e2e/sqlserver/sqlserver_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewSQLServerHelper(name string) (*SQLServerHelper, error) {
return nil, err
}

_, connErr := connector.ConnectionActive()
connErr := connector.ConnectionActive()
if connErr != nil {
return nil, fmt.Errorf("invalid connection configs: %v", connErr)
}
Expand Down

0 comments on commit 9051209

Please sign in to comment.