Skip to content

Commit

Permalink
rename Abort to Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 13, 2024
1 parent 770cdc3 commit 4fb2739
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
if err != nil {
return nil, fmt.Errorf("failed to setup normalized tables tx: %w", err)
}
defer conn.AbortSetupNormalizedTables(ctx, tx)
defer conn.CleanupSetupNormalizedTables(ctx, tx)

numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(config.TableNameSchemaMapping))
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context, _ int
return nil
}

func (c *BigQueryConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) {
func (c *BigQueryConnector) CleanupSetupNormalizedTables(_ context.Context, _ interface{}) {
}

// This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided.
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *ClickhouseConnector) FinishSetupNormalizedTables(_ context.Context, _ i
return nil
}

func (c *ClickhouseConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) {
func (c *ClickhouseConnector) CleanupSetupNormalizedTables(_ context.Context, _ interface{}) {
}

func (c *ClickhouseConnector) SetupNormalizedTable(
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ type NormalizedTablesConnector interface {
syncedAtColName string,
) (bool, error)

// AbortSetupNormalizedTables may be used to rollback transaction started by StartSetupNormalizedTables.
// Calling AbortSetupNormalizedTables after FinishSetupNormalizedTables must be a nop.
AbortSetupNormalizedTables(ctx context.Context, tx interface{})
// CleanupSetupNormalizedTables may be used to rollback transaction started by StartSetupNormalizedTables.
// Calling CleanupSetupNormalizedTables after FinishSetupNormalizedTables must be a nop.
CleanupSetupNormalizedTables(ctx context.Context, tx interface{})

// FinishSetupNormalizedTables may be used to finish transaction started by StartSetupNormalizedTables.
FinishSetupNormalizedTables(ctx context.Context, tx interface{}) error
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (c *PostgresConnector) StartSetupNormalizedTables(ctx context.Context) (int
return c.conn.Begin(ctx)
}

func (c *PostgresConnector) AbortSetupNormalizedTables(ctx context.Context, tx interface{}) {
func (c *PostgresConnector) CleanupSetupNormalizedTables(ctx context.Context, tx interface{}) {
err := tx.(pgx.Tx).Rollback(ctx)
if err != pgx.ErrTxClosed && err != nil {
c.logger.Error("error rolling back transaction for creating raw table", slog.Any("error", err))
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (c *SnowflakeConnector) FinishSetupNormalizedTables(_ context.Context, _ in
return nil
}

func (c *SnowflakeConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) {
func (c *SnowflakeConnector) CleanupSetupNormalizedTables(_ context.Context, _ interface{}) {
}

func (c *SnowflakeConnector) SetupNormalizedTable(
Expand Down

0 comments on commit 4fb2739

Please sign in to comment.