diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2ffdc816d3..b44abdfd40 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -405,12 +405,6 @@ func (a *FlowableActivity) StartNormalize( dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination) if errors.Is(err, connectors.ErrUnsupportedFunctionality) { - dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) - if err != nil { - return nil, fmt.Errorf("failed to get connector: %w", err) - } - defer connectors.CloseConnector(ctx, dstConn) - err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) return nil, err @@ -693,8 +687,7 @@ func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.Q a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } - - defer dst.Close(ctx) + defer dst.Close() return dst.CleanupQRepFlow(ctx, config) } diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 95faba6b54..7643629b2c 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -27,7 +27,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s if s, ok := a.SnapshotConnections[flowJobName]; ok { close(s.signal.CloneComplete) - s.connector.Close(ctx) + connectors.CloseConnector(ctx, s.connector) } return nil @@ -60,10 +60,7 @@ func (a *SnapshotActivity) SetupReplication( logger.Error("failed to setup replication", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, config.FlowJobName, err) // it is important to close the connection here as it is not closed in CloseSlotKeepAlive - connCloseErr := conn.Close(ctx) - if connCloseErr != nil { - logger.Error("failed to close connection", slog.Any("error", connCloseErr)) - } + connectors.CloseConnector(ctx, conn) } // This now happens in a goroutine diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index a009226929..eb45003b41 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetSlotInfo( slog.Error("Failed to create postgres connector", slog.Any("error", err)) return &protos.PeerSlotResponse{SlotData: nil}, err } - defer pgConnector.Close(ctx) + defer pgConnector.Close() slotInfo, err := pgConnector.GetSlotInfo(ctx, "") if err != nil { diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 84527fbe94..0ef515a14e 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -32,7 +32,7 @@ func (h *FlowRequestHandler) ValidateCDCMirror( Ok: false, }, fmt.Errorf("failed to create postgres connector: %v", err) } - defer pgPeer.Close(ctx) + defer pgPeer.Close() // Check replication connectivity err = pgPeer.CheckReplicationConnectivity(ctx) diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go index d8ed016bbf..2ef7d6c063 100644 --- a/flow/cmd/validate_peer.go +++ b/flow/cmd/validate_peer.go @@ -37,7 +37,7 @@ func (h *FlowRequestHandler) ValidatePeer( }, nil } - defer conn.Close(ctx) + defer conn.Close() if req.Peer.Type == protos.DBType_POSTGRES { isValid, version, err := conn.(*connpostgres.PostgresConnector).MajorVersionCheck(ctx, connpostgres.POSTGRES_12) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5aed634e60..749bbcf8c4 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -236,11 +236,11 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* } // Close closes the BigQuery driver. -func (c *BigQueryConnector) Close(_ context.Context) error { - if c == nil || c.client == nil { - return nil +func (c *BigQueryConnector) Close() error { + if c != nil { + return c.client.Close() } - return c.client.Close() + return nil } // ConnectionActive returns nil if the connection is active. @@ -250,9 +250,6 @@ func (c *BigQueryConnector) ConnectionActive(ctx context.Context) error { return fmt.Errorf("failed to get dataset metadata: %v", err) } - if c.client == nil { - return fmt.Errorf("BigQuery client is nil") - } return nil } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index f197880c95..d7f0b39ec3 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -167,24 +167,17 @@ func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, err return conn, nil } -func (c *ClickhouseConnector) Close(_ context.Context) error { - if c == nil || c.database == nil { - return nil - } - - err := c.database.Close() - if err != nil { - return fmt.Errorf("error while closing connection to Clickhouse peer: %w", err) +func (c *ClickhouseConnector) Close() error { + if c != nil { + err := c.database.Close() + if err != nil { + return fmt.Errorf("error while closing connection to Clickhouse peer: %w", err) + } } return nil } func (c *ClickhouseConnector) ConnectionActive(ctx context.Context) error { - if c == nil || c.database == nil { - return fmt.Errorf("ClickhouseConnector is nil") - } - // This also checks if database exists - err := c.database.PingContext(ctx) - return err + return c.database.PingContext(ctx) } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 87ff157144..a499540aef 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -58,7 +58,7 @@ func (c *ClickhouseConnector) SetupNormalizedTable( _, err = c.database.ExecContext(ctx, normalizedTableCreateSQL) if err != nil { - return false, fmt.Errorf("[sf] error while creating normalized table: %w", err) + return false, fmt.Errorf("[ch] error while creating normalized table: %w", err) } return false, nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index f83a1a7836..1e9401e3b0 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -24,7 +24,7 @@ import ( var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") type Connector interface { - Close(context.Context) error + Close() error ConnectionActive(context.Context) error } @@ -218,11 +218,7 @@ func GetQRepConsolidateConnector(ctx context.Context, config *protos.Peer) (QRep } func CloseConnector(ctx context.Context, conn Connector) { - if conn == nil { - return - } - - err := conn.Close(ctx) + err := conn.Close() if err != nil { logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err)) } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 2da33c1043..bf4a11f3bb 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -53,11 +53,15 @@ func NewEventHubConnector( }, nil } -func (c *EventHubConnector) Close(ctx context.Context) error { - err := c.hubManager.Close(ctx) - if err != nil { - c.logger.Error("failed to close event hub manager", slog.Any("error", err)) - return err +func (c *EventHubConnector) Close() error { + if c != nil { + timeout, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + err := c.hubManager.Close(timeout) + if err != nil { + c.logger.Error("failed to close event hub manager", slog.Any("error", err)) + return err + } } return nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index fa2ebdc201..1079431c17 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -100,9 +100,11 @@ func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, erro } // Close closes all connections. -func (c *PostgresConnector) Close(ctx context.Context) error { +func (c *PostgresConnector) Close() error { if c != nil { - c.conn.Close(ctx) + timeout, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + c.conn.Close(timeout) c.ssh.Close() } return nil diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 5e3166a195..6196cbc91f 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -217,8 +217,7 @@ func TestPostgresSchemaDeltaTestSuite(t *testing.T) { require.NoError(s.t, err) require.NoError(s.t, s.connector.ConnectionActive(context.Background())) - err = s.connector.Close(context.Background()) - require.NoError(s.t, err) + require.NoError(s.t, s.connector.Close()) require.Error(s.t, s.connector.ConnectionActive(context.Background())) }) } diff --git a/flow/connectors/postgres/qrep_bench_test.go b/flow/connectors/postgres/qrep_bench_test.go index 1dddc617fc..c36ee844ca 100644 --- a/flow/connectors/postgres/qrep_bench_test.go +++ b/flow/connectors/postgres/qrep_bench_test.go @@ -22,7 +22,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) { if err != nil { b.Fatalf("failed to create connection: %v", err) } - defer connector.Close(ctx) + defer connector.Close() // Create a new QRepQueryExecutor instance qe := connector.NewQRepQueryExecutor("test flow", "test part") diff --git a/flow/connectors/postgres/qrep_query_executor_test.go b/flow/connectors/postgres/qrep_query_executor_test.go index 32d0a1a154..24eb88a0a9 100644 --- a/flow/connectors/postgres/qrep_query_executor_test.go +++ b/flow/connectors/postgres/qrep_query_executor_test.go @@ -54,7 +54,7 @@ func TestExecuteAndProcessQuery(t *testing.T) { ctx := context.Background() connector, schemaName := setupDB(t) conn := connector.conn - defer connector.Close(ctx) + defer connector.Close() defer teardownDB(t, conn, schemaName) query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.test(id SERIAL PRIMARY KEY, data TEXT);", schemaName) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index dabfc75ef5..e3526d9488 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -85,7 +85,7 @@ func (c *S3Connector) CreateRawTable(_ context.Context, req *protos.CreateRawTab return nil, nil } -func (c *S3Connector) Close(_ context.Context) error { +func (c *S3Connector) Close() error { return nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 2f6c0210e5..e57e95791d 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -219,26 +219,19 @@ func NewSnowflakeConnector( }, nil } -func (c *SnowflakeConnector) Close(_ context.Context) error { - if c == nil || c.database == nil { - return nil - } - - err := c.database.Close() - if err != nil { - return fmt.Errorf("error while closing connection to Snowflake peer: %w", err) +func (c *SnowflakeConnector) Close() error { + if c != nil { + err := c.database.Close() + if err != nil { + return fmt.Errorf("error while closing connection to Snowflake peer: %w", err) + } } return nil } func (c *SnowflakeConnector) ConnectionActive(ctx context.Context) error { - if c == nil || c.database == nil { - return fmt.Errorf("SnowflakeConnector is nil") - } - // This also checks if database exists - err := c.database.PingContext(ctx) - return err + return c.database.PingContext(ctx) } func (c *SnowflakeConnector) NeedsSetupMetadataTables(_ context.Context) bool { diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 64ff4ecc54..2a06adabdb 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -21,7 +21,7 @@ import ( type SQLQueryExecutor interface { ConnectionActive(context.Context) error - Close(context.Context) error + Close() error CreateSchema(ctx context.Context, schemaName string) error DropSchema(ctx context.Context, schemaName string) error diff --git a/flow/connectors/sqlserver/sqlserver.go b/flow/connectors/sqlserver/sqlserver.go index 4fea9e3f68..505052e0c2 100644 --- a/flow/connectors/sqlserver/sqlserver.go +++ b/flow/connectors/sqlserver/sqlserver.go @@ -50,8 +50,8 @@ func NewSQLServerConnector(ctx context.Context, config *protos.SqlServerConfig) } // Close closes the database connection -func (c *SQLServerConnector) Close(_ context.Context) error { - if c.db != nil { +func (c *SQLServerConnector) Close() error { + if c != nil { return c.db.Close() } return nil diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 8a7ff6235e..a1b1a0601c 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -130,13 +130,13 @@ func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector err = cleanPostgres(conn, suffix) if err != nil { - connector.Close(context.Background()) + connector.Close() return nil, err } err = setupPostgresSchema(t, conn, suffix) if err != nil { - connector.Close(context.Background()) + connector.Close() return nil, err } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 716eee667c..2c346be640 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -70,7 +70,7 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { } } - err := s.connector.Close(context.Background()) + err := s.connector.Close() if err != nil { s.t.Fatalf("failed to close Snowflake connector: %v", err) } diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 03246ae617..db3459404d 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -307,6 +307,6 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { e2eshared.RunSuite(t, setupSchemaDeltaSuite, func(s SnowflakeSchemaDeltaTestSuite) { require.NoError(s.t, s.sfTestHelper.Cleanup()) - require.NoError(s.t, s.connector.Close(context.Background())) + require.NoError(s.t, s.connector.Close()) }) }