Skip to content

Commit

Permalink
remove txn for merge
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 15, 2023
1 parent 1158cf2 commit 4d9aa82
Showing 1 changed file with 1 addition and 20 deletions.
21 changes: 1 addition & 20 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,31 +1091,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
"flowName": destinationTableIdentifier,
}).Infof("[merge] merging records into %s...", destinationTableIdentifier)

// transaction for NormalizeRecords
mergeTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return 0, fmt.Errorf("unable to begin transactions for merge: %w", err)
}

// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := mergeTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.Errorf("unexpected error while rolling back transaction for merge: %v", deferErr)
}
}()

result, err := mergeTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier)
result, err := c.database.ExecContext(ctx, mergeStatement, destinationTableIdentifier)
if err != nil {
return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w",
destinationTableIdentifier, mergeStatement, err)
}

err = mergeTx.Commit()
if err != nil {
return 0, fmt.Errorf("unable to commit transaction for merge: %w", err)
}

endTime := time.Now()
log.Infof("[merge] merged records into %s, took: %d seconds",
destinationTableIdentifier, endTime.Sub(startTime)/time.Second)
Expand Down

0 comments on commit 4d9aa82

Please sign in to comment.