diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 96166de6ef..2086aa8cfd 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -163,7 +163,7 @@ func (a *FlowableActivity) CreateNormalizedTable( if err != nil { return nil, fmt.Errorf("failed to setup normalized tables tx: %w", err) } - defer conn.AbortSetupNormalizedTables(ctx, tx) + defer conn.CleanupSetupNormalizedTables(ctx, tx) numTablesSetup := atomic.Uint32{} totalTables := uint32(len(config.TableNameSchemaMapping)) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index a7cf31ce9c..438994a800 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -656,7 +656,7 @@ func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context, _ int return nil } -func (c *BigQueryConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) { +func (c *BigQueryConnector) CleanupSetupNormalizedTables(_ context.Context, _ interface{}) { } // This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided. diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 0a43ed0353..3d92ca9e91 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -27,7 +27,7 @@ func (c *ClickhouseConnector) FinishSetupNormalizedTables(_ context.Context, _ i return nil } -func (c *ClickhouseConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) { +func (c *ClickhouseConnector) CleanupSetupNormalizedTables(_ context.Context, _ interface{}) { } func (c *ClickhouseConnector) SetupNormalizedTable( diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6b2e470ac6..f83a1a7836 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -74,9 +74,9 @@ type NormalizedTablesConnector interface { syncedAtColName string, ) (bool, error) - // AbortSetupNormalizedTables may be used to rollback transaction started by StartSetupNormalizedTables. - // Calling AbortSetupNormalizedTables after FinishSetupNormalizedTables must be a nop. - AbortSetupNormalizedTables(ctx context.Context, tx interface{}) + // CleanupSetupNormalizedTables may be used to rollback transaction started by StartSetupNormalizedTables. + // Calling CleanupSetupNormalizedTables after FinishSetupNormalizedTables must be a nop. + CleanupSetupNormalizedTables(ctx context.Context, tx interface{}) // FinishSetupNormalizedTables may be used to finish transaction started by StartSetupNormalizedTables. FinishSetupNormalizedTables(ctx context.Context, tx interface{}) error diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f9c6d60190..6c9d02631f 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -644,7 +644,7 @@ func (c *PostgresConnector) StartSetupNormalizedTables(ctx context.Context) (int return c.conn.Begin(ctx) } -func (c *PostgresConnector) AbortSetupNormalizedTables(ctx context.Context, tx interface{}) { +func (c *PostgresConnector) CleanupSetupNormalizedTables(ctx context.Context, tx interface{}) { err := tx.(pgx.Tx).Rollback(ctx) if err != pgx.ErrTxClosed && err != nil { c.logger.Error("error rolling back transaction for creating raw table", slog.Any("error", err)) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index befebcf284..1644787119 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -337,7 +337,7 @@ func (c *SnowflakeConnector) FinishSetupNormalizedTables(_ context.Context, _ in return nil } -func (c *SnowflakeConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) { +func (c *SnowflakeConnector) CleanupSetupNormalizedTables(_ context.Context, _ interface{}) { } func (c *SnowflakeConnector) SetupNormalizedTable(