diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 99c25cbd28..cf86e97f57 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -503,12 +503,12 @@ func (h *FlowRequestHandler) ValidatePeer( }, nil } - status := conn.ConnectionActive() - if !status { + _, connErr := conn.ConnectionActive() + if connErr != nil { return &protos.ValidatePeerResponse{ Status: protos.ValidatePeerStatus_INVALID, - Message: fmt.Sprintf("failed to establish active connection to %s peer %s.", - req.Peer.Type, req.Peer.Name), + Message: fmt.Sprintf("failed to establish active connection to %s peer %s: %v", + req.Peer.Type, req.Peer.Name, connErr), }, nil } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3e33dd3853..f1ee54f4f2 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -204,13 +204,13 @@ func (c *BigQueryConnector) Close() error { } // ConnectionActive returns true if the connection is active. -func (c *BigQueryConnector) ConnectionActive() bool { +func (c *BigQueryConnector) ConnectionActive() (bool, error) { _, err := c.client.Dataset(c.datasetID).Metadata(c.ctx) if err != nil { - return false + return false, fmt.Errorf("failed to get dataset metadata: %v", err) } - return c.client != nil + return c.client != nil, fmt.Errorf("BigQuery client is nil") } // NeedsSetupMetadataTables returns true if the metadata tables need to be set up. diff --git a/flow/connectors/core.go b/flow/connectors/core.go index f39e8f397f..c87869509e 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -21,7 +21,7 @@ var ErrUnsupportedFunctionality = errors.New("requested connector does not suppo type Connector interface { Close() error - ConnectionActive() bool + ConnectionActive() (bool, error) } type CDCPullConnector interface { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 73c8ed528c..ed91314885 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -65,8 +65,8 @@ func (c *EventHubConnector) Close() error { return allErrors } -func (c *EventHubConnector) ConnectionActive() bool { - return true +func (c *EventHubConnector) ConnectionActive() (bool, error) { + return true, nil } func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e46c383d4c..f411f3ca2e 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -104,11 +104,12 @@ func (c *PostgresConnector) Close() error { } // ConnectionActive returns true if the connection is active. -func (c *PostgresConnector) ConnectionActive() bool { +func (c *PostgresConnector) ConnectionActive() (bool, error) { if c.pool == nil { - return false + return false, fmt.Errorf("connection pool is nil") } - return c.pool.Ping(c.ctx) == nil + pingErr := c.pool.Ping(c.ctx) + return pingErr == nil, pingErr } // NeedsSetupMetadataTables returns true if the metadata tables need to be set up. diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 1b81cf17df..277c454669 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -68,12 +68,6 @@ func NewS3Connector(ctx context.Context, return nil, err } - validErr := ValidCheck(s3Client, config.Url, pgMetadata) - if validErr != nil { - log.Errorf("failed to validate s3 connector: %v", validErr) - return nil, validErr - } - return &S3Connector{ ctx: ctx, url: config.Url, @@ -140,9 +134,19 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos return nil } -func (c *S3Connector) ConnectionActive() bool { +func (c *S3Connector) ConnectionActive() (bool, error) { _, listErr := c.client.ListBuckets(nil) - return listErr == nil + if listErr != nil { + return false, listErr + } + + validErr := ValidCheck(&c.client, c.url, c.pgMetadata) + if validErr != nil { + log.Errorf("failed to validate s3 connector: %v", validErr) + return false, validErr + } + + return listErr == nil, listErr } func (c *S3Connector) NeedsSetupMetadataTables() bool { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 08ed4b2d35..7d7c0e934d 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -168,11 +168,12 @@ func (c *SnowflakeConnector) Close() error { return nil } -func (c *SnowflakeConnector) ConnectionActive() bool { +func (c *SnowflakeConnector) ConnectionActive() (bool, error) { if c == nil || c.database == nil { - return false + return false, fmt.Errorf("SnowflakeConnector is nil") } - return c.database.PingContext(c.ctx) == nil + err := c.database.PingContext(c.ctx) + return err == nil, err } func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool { diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 29291b40ff..02c70c0d22 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -17,7 +17,7 @@ import ( ) type SQLQueryExecutor interface { - ConnectionActive() bool + ConnectionActive() (bool, error) Close() error CreateSchema(schemaName string) error diff --git a/flow/connectors/sqlserver/sqlserver.go b/flow/connectors/sqlserver/sqlserver.go index 5a529c9408..fe75cb74fc 100644 --- a/flow/connectors/sqlserver/sqlserver.go +++ b/flow/connectors/sqlserver/sqlserver.go @@ -53,9 +53,9 @@ func (c *SQLServerConnector) Close() error { } // ConnectionActive checks if the connection is still active -func (c *SQLServerConnector) ConnectionActive() bool { +func (c *SQLServerConnector) ConnectionActive() (bool, error) { if err := c.db.Ping(); err != nil { - return false + return false, err } - return true + return true, nil } diff --git a/flow/e2e/sqlserver/sqlserver_helper.go b/flow/e2e/sqlserver/sqlserver_helper.go index 1be94fc83e..8c2abba9ec 100644 --- a/flow/e2e/sqlserver/sqlserver_helper.go +++ b/flow/e2e/sqlserver/sqlserver_helper.go @@ -41,9 +41,9 @@ func NewSQLServerHelper(name string) (*SQLServerHelper, error) { return nil, err } - testConn := connector.ConnectionActive() - if !testConn { - return nil, fmt.Errorf("invalid connection configs") + _, connErr := connector.ConnectionActive() + if connErr != nil { + return nil, fmt.Errorf("invalid connection configs: %v", connErr) } rndNum, err := util.RandomUInt64()