Skip to content

Commit

Permalink
Improve Close: use 1 minute timeout instead of potentially canceled c…
Browse files Browse the repository at this point in the history
…ontext (#1313)

Also remove unnecessary nil checks,
& make self nil check in Connector Close methods consistent

This helps gracefully close connections when workflow canceled
  • Loading branch information
serprex authored Feb 16, 2024
1 parent a0fe0cb commit 9910340
Show file tree
Hide file tree
Showing 21 changed files with 51 additions and 77 deletions.
9 changes: 1 addition & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,6 @@ func (a *FlowableActivity) StartNormalize(

dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
return nil, err
Expand Down Expand Up @@ -693,8 +687,7 @@ func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.Q
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}

defer dst.Close(ctx)
defer dst.Close()

return dst.CleanupQRepFlow(ctx, config)
}
Expand Down
7 changes: 2 additions & 5 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s

if s, ok := a.SnapshotConnections[flowJobName]; ok {
close(s.signal.CloneComplete)
s.connector.Close(ctx)
connectors.CloseConnector(ctx, s.connector)
}

return nil
Expand Down Expand Up @@ -60,10 +60,7 @@ func (a *SnapshotActivity) SetupReplication(
logger.Error("failed to setup replication", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
connCloseErr := conn.Close(ctx)
if connCloseErr != nil {
logger.Error("failed to close connection", slog.Any("error", connCloseErr))
}
connectors.CloseConnector(ctx, conn)
}

// This now happens in a goroutine
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer pgConnector.Close(ctx)
defer pgConnector.Close()

slotInfo, err := pgConnector.GetSlotInfo(ctx, "")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
Ok: false,
}, fmt.Errorf("failed to create postgres connector: %v", err)
}
defer pgPeer.Close(ctx)
defer pgPeer.Close()

// Check replication connectivity
err = pgPeer.CheckReplicationConnectivity(ctx)
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (h *FlowRequestHandler) ValidatePeer(
}, nil
}

defer conn.Close(ctx)
defer conn.Close()

if req.Peer.Type == protos.DBType_POSTGRES {
isValid, version, err := conn.(*connpostgres.PostgresConnector).MajorVersionCheck(ctx, connpostgres.POSTGRES_12)
Expand Down
11 changes: 4 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,11 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
}

// Close closes the BigQuery driver.
func (c *BigQueryConnector) Close(_ context.Context) error {
if c == nil || c.client == nil {
return nil
func (c *BigQueryConnector) Close() error {
if c != nil {
return c.client.Close()
}
return c.client.Close()
return nil
}

// ConnectionActive returns nil if the connection is active.
Expand All @@ -250,9 +250,6 @@ func (c *BigQueryConnector) ConnectionActive(ctx context.Context) error {
return fmt.Errorf("failed to get dataset metadata: %v", err)
}

if c.client == nil {
return fmt.Errorf("BigQuery client is nil")
}
return nil
}

Expand Down
21 changes: 7 additions & 14 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,17 @@ func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, err
return conn, nil
}

func (c *ClickhouseConnector) Close(_ context.Context) error {
if c == nil || c.database == nil {
return nil
}

err := c.database.Close()
if err != nil {
return fmt.Errorf("error while closing connection to Clickhouse peer: %w", err)
func (c *ClickhouseConnector) Close() error {
if c != nil {
err := c.database.Close()
if err != nil {
return fmt.Errorf("error while closing connection to Clickhouse peer: %w", err)
}
}
return nil
}

func (c *ClickhouseConnector) ConnectionActive(ctx context.Context) error {
if c == nil || c.database == nil {
return fmt.Errorf("ClickhouseConnector is nil")
}

// This also checks if database exists
err := c.database.PingContext(ctx)
return err
return c.database.PingContext(ctx)
}
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *ClickhouseConnector) SetupNormalizedTable(

_, err = c.database.ExecContext(ctx, normalizedTableCreateSQL)
if err != nil {
return false, fmt.Errorf("[sf] error while creating normalized table: %w", err)
return false, fmt.Errorf("[ch] error while creating normalized table: %w", err)
}
return false, nil
}
Expand Down
8 changes: 2 additions & 6 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")

type Connector interface {
Close(context.Context) error
Close() error
ConnectionActive(context.Context) error
}

Expand Down Expand Up @@ -218,11 +218,7 @@ func GetQRepConsolidateConnector(ctx context.Context, config *protos.Peer) (QRep
}

func CloseConnector(ctx context.Context, conn Connector) {
if conn == nil {
return
}

err := conn.Close(ctx)
err := conn.Close()
if err != nil {
logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err))
}
Expand Down
14 changes: 9 additions & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ func NewEventHubConnector(
}, nil
}

func (c *EventHubConnector) Close(ctx context.Context) error {
err := c.hubManager.Close(ctx)
if err != nil {
c.logger.Error("failed to close event hub manager", slog.Any("error", err))
return err
func (c *EventHubConnector) Close() error {
if c != nil {
timeout, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := c.hubManager.Close(timeout)
if err != nil {
c.logger.Error("failed to close event hub manager", slog.Any("error", err))
return err
}
}

return nil
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, erro
}

// Close closes all connections.
func (c *PostgresConnector) Close(ctx context.Context) error {
func (c *PostgresConnector) Close() error {
if c != nil {
c.conn.Close(ctx)
timeout, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
c.conn.Close(timeout)
c.ssh.Close()
}
return nil
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
require.NoError(s.t, err)

require.NoError(s.t, s.connector.ConnectionActive(context.Background()))
err = s.connector.Close(context.Background())
require.NoError(s.t, err)
require.NoError(s.t, s.connector.Close())
require.Error(s.t, s.connector.ConnectionActive(context.Background()))
})
}
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
defer connector.Close(ctx)
defer connector.Close()

// Create a new QRepQueryExecutor instance
qe := connector.NewQRepQueryExecutor("test flow", "test part")
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestExecuteAndProcessQuery(t *testing.T) {
ctx := context.Background()
connector, schemaName := setupDB(t)
conn := connector.conn
defer connector.Close(ctx)
defer connector.Close()
defer teardownDB(t, conn, schemaName)

query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.test(id SERIAL PRIMARY KEY, data TEXT);", schemaName)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *S3Connector) CreateRawTable(_ context.Context, req *protos.CreateRawTab
return nil, nil
}

func (c *S3Connector) Close(_ context.Context) error {
func (c *S3Connector) Close() error {
return nil
}

Expand Down
21 changes: 7 additions & 14 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,26 +219,19 @@ func NewSnowflakeConnector(
}, nil
}

func (c *SnowflakeConnector) Close(_ context.Context) error {
if c == nil || c.database == nil {
return nil
}

err := c.database.Close()
if err != nil {
return fmt.Errorf("error while closing connection to Snowflake peer: %w", err)
func (c *SnowflakeConnector) Close() error {
if c != nil {
err := c.database.Close()
if err != nil {
return fmt.Errorf("error while closing connection to Snowflake peer: %w", err)
}
}
return nil
}

func (c *SnowflakeConnector) ConnectionActive(ctx context.Context) error {
if c == nil || c.database == nil {
return fmt.Errorf("SnowflakeConnector is nil")
}

// This also checks if database exists
err := c.database.PingContext(ctx)
return err
return c.database.PingContext(ctx)
}

func (c *SnowflakeConnector) NeedsSetupMetadataTables(_ context.Context) bool {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

type SQLQueryExecutor interface {
ConnectionActive(context.Context) error
Close(context.Context) error
Close() error

CreateSchema(ctx context.Context, schemaName string) error
DropSchema(ctx context.Context, schemaName string) error
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func NewSQLServerConnector(ctx context.Context, config *protos.SqlServerConfig)
}

// Close closes the database connection
func (c *SQLServerConnector) Close(_ context.Context) error {
if c.db != nil {
func (c *SQLServerConnector) Close() error {
if c != nil {
return c.db.Close()
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector

err = cleanPostgres(conn, suffix)
if err != nil {
connector.Close(context.Background())
connector.Close()
return nil, err
}

err = setupPostgresSchema(t, conn, suffix)
if err != nil {
connector.Close(context.Background())
connector.Close()
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
}
}

err := s.connector.Close(context.Background())
err := s.connector.Close()
if err != nil {
s.t.Fatalf("failed to close Snowflake connector: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/snowflake/snowflake_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,6 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() {
func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) {
e2eshared.RunSuite(t, setupSchemaDeltaSuite, func(s SnowflakeSchemaDeltaTestSuite) {
require.NoError(s.t, s.sfTestHelper.Cleanup())
require.NoError(s.t, s.connector.Close(context.Background()))
require.NoError(s.t, s.connector.Close())
})
}

0 comments on commit 9910340

Please sign in to comment.