From 90512092421fb1acc0764868053735538e76a079 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 1 Dec 2023 22:11:12 +0530 Subject: [PATCH] return value of connectionactive now just error --- flow/cmd/handler.go | 2 +- flow/connectors/bigquery/bigquery.go | 9 ++++++--- flow/connectors/core.go | 2 +- flow/connectors/eventhub/eventhub.go | 4 ++-- flow/connectors/postgres/postgres.go | 6 +++--- flow/connectors/postgres/postgres_repl_test.go | 4 ++-- flow/connectors/postgres/postgres_schema_delta_test.go | 4 ++-- flow/connectors/s3/s3.go | 8 ++++---- flow/connectors/snowflake/snowflake.go | 8 +++++--- flow/connectors/sql/query_executor.go | 2 +- flow/connectors/sqlserver/sqlserver.go | 6 +++--- flow/e2e/sqlserver/sqlserver_helper.go | 2 +- 12 files changed, 31 insertions(+), 26 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index cf86e97f57..14a9603125 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -503,7 +503,7 @@ func (h *FlowRequestHandler) ValidatePeer( }, nil } - _, connErr := conn.ConnectionActive() + connErr := conn.ConnectionActive() if connErr != nil { return &protos.ValidatePeerResponse{ Status: protos.ValidatePeerStatus_INVALID, diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index f1ee54f4f2..4d642f402f 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -204,13 +204,16 @@ func (c *BigQueryConnector) Close() error { } // ConnectionActive returns true if the connection is active. -func (c *BigQueryConnector) ConnectionActive() (bool, error) { +func (c *BigQueryConnector) ConnectionActive() error { _, err := c.client.Dataset(c.datasetID).Metadata(c.ctx) if err != nil { - return false, fmt.Errorf("failed to get dataset metadata: %v", err) + return fmt.Errorf("failed to get dataset metadata: %v", err) } - return c.client != nil, fmt.Errorf("BigQuery client is nil") + if c.client == nil { + return fmt.Errorf("BigQuery client is nil") + } + return nil } // NeedsSetupMetadataTables returns true if the metadata tables need to be set up. diff --git a/flow/connectors/core.go b/flow/connectors/core.go index c87869509e..6327ed4bb9 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -21,7 +21,7 @@ var ErrUnsupportedFunctionality = errors.New("requested connector does not suppo type Connector interface { Close() error - ConnectionActive() (bool, error) + ConnectionActive() error } type CDCPullConnector interface { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index ed91314885..961fb979ff 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -65,8 +65,8 @@ func (c *EventHubConnector) Close() error { return allErrors } -func (c *EventHubConnector) ConnectionActive() (bool, error) { - return true, nil +func (c *EventHubConnector) ConnectionActive() error { + return nil } func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f411f3ca2e..469775e49e 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -104,12 +104,12 @@ func (c *PostgresConnector) Close() error { } // ConnectionActive returns true if the connection is active. -func (c *PostgresConnector) ConnectionActive() (bool, error) { +func (c *PostgresConnector) ConnectionActive() error { if c.pool == nil { - return false, fmt.Errorf("connection pool is nil") + return fmt.Errorf("connection pool is nil") } pingErr := c.pool.Ping(c.ctx) - return pingErr == nil, pingErr + return pingErr } // NeedsSetupMetadataTables returns true if the metadata tables need to be set up. diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index 11ef7e2be8..7e10d9a154 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -94,12 +94,12 @@ func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() { err = teardownTx.Commit(context.Background()) require.NoError(suite.T(), err) - suite.True(suite.connector.ConnectionActive()) + suite.True(suite.connector.ConnectionActive() == nil) err = suite.connector.Close() require.NoError(suite.T(), err) - suite.False(suite.connector.ConnectionActive()) + suite.False(suite.connector.ConnectionActive() == nil) } func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index eec14c915a..98a6a47b99 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -67,10 +67,10 @@ func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() { err = teardownTx.Commit(context.Background()) suite.failTestError(err) - suite.True(suite.connector.ConnectionActive()) + suite.True(suite.connector.ConnectionActive() == nil) err = suite.connector.Close() suite.failTestError(err) - suite.False(suite.connector.ConnectionActive()) + suite.False(suite.connector.ConnectionActive() == nil) } func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 277c454669..1650303235 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -134,19 +134,19 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos return nil } -func (c *S3Connector) ConnectionActive() (bool, error) { +func (c *S3Connector) ConnectionActive() error { _, listErr := c.client.ListBuckets(nil) if listErr != nil { - return false, listErr + return listErr } validErr := ValidCheck(&c.client, c.url, c.pgMetadata) if validErr != nil { log.Errorf("failed to validate s3 connector: %v", validErr) - return false, validErr + return validErr } - return listErr == nil, listErr + return nil } func (c *S3Connector) NeedsSetupMetadataTables() bool { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7d7c0e934d..ba78223a42 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -168,12 +168,14 @@ func (c *SnowflakeConnector) Close() error { return nil } -func (c *SnowflakeConnector) ConnectionActive() (bool, error) { +func (c *SnowflakeConnector) ConnectionActive() error { if c == nil || c.database == nil { - return false, fmt.Errorf("SnowflakeConnector is nil") + return fmt.Errorf("SnowflakeConnector is nil") } + + // This also checks if database exists err := c.database.PingContext(c.ctx) - return err == nil, err + return err } func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool { diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 02c70c0d22..7600099f88 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -17,7 +17,7 @@ import ( ) type SQLQueryExecutor interface { - ConnectionActive() (bool, error) + ConnectionActive() error Close() error CreateSchema(schemaName string) error diff --git a/flow/connectors/sqlserver/sqlserver.go b/flow/connectors/sqlserver/sqlserver.go index fe75cb74fc..8e8d402828 100644 --- a/flow/connectors/sqlserver/sqlserver.go +++ b/flow/connectors/sqlserver/sqlserver.go @@ -53,9 +53,9 @@ func (c *SQLServerConnector) Close() error { } // ConnectionActive checks if the connection is still active -func (c *SQLServerConnector) ConnectionActive() (bool, error) { +func (c *SQLServerConnector) ConnectionActive() error { if err := c.db.Ping(); err != nil { - return false, err + return err } - return true, nil + return nil } diff --git a/flow/e2e/sqlserver/sqlserver_helper.go b/flow/e2e/sqlserver/sqlserver_helper.go index 8c2abba9ec..ed6ccfa97e 100644 --- a/flow/e2e/sqlserver/sqlserver_helper.go +++ b/flow/e2e/sqlserver/sqlserver_helper.go @@ -41,7 +41,7 @@ func NewSQLServerHelper(name string) (*SQLServerHelper, error) { return nil, err } - _, connErr := connector.ConnectionActive() + connErr := connector.ConnectionActive() if connErr != nil { return nil, fmt.Errorf("invalid connection configs: %v", connErr) }