From 8e06505f4bd515ee22e93bd38740f4b6a8364265 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 14:45:43 -0500 Subject: [PATCH] remove txn for merge --- flow/connectors/snowflake/snowflake.go | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e73331be0f..59a345923b 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -1091,31 +1091,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( "flowName": destinationTableIdentifier, }).Infof("[merge] merging records into %s...", destinationTableIdentifier) - // transaction for NormalizeRecords - mergeTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return 0, fmt.Errorf("unable to begin transactions for merge: %w", err) - } - - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := mergeTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - log.Errorf("unexpected error while rolling back transaction for merge: %v", deferErr) - } - }() - - result, err := mergeTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) + result, err := c.database.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err) } - err = mergeTx.Commit() - if err != nil { - return 0, fmt.Errorf("unable to commit transaction for merge: %w", err) - } - endTime := time.Now() log.Infof("[merge] merged records into %s, took: %d seconds", destinationTableIdentifier, endTime.Sub(startTime)/time.Second)