diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e73331be0f..59a345923b 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -1091,31 +1091,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( "flowName": destinationTableIdentifier, }).Infof("[merge] merging records into %s...", destinationTableIdentifier) - // transaction for NormalizeRecords - mergeTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return 0, fmt.Errorf("unable to begin transactions for merge: %w", err) - } - - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := mergeTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - log.Errorf("unexpected error while rolling back transaction for merge: %v", deferErr) - } - }() - - result, err := mergeTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) + result, err := c.database.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err) } - err = mergeTx.Commit() - if err != nil { - return 0, fmt.Errorf("unable to commit transaction for merge: %w", err) - } - endTime := time.Now() log.Infof("[merge] merged records into %s, took: %d seconds", destinationTableIdentifier, endTime.Sub(startTime)/time.Second)