Skip to content

Commit

Permalink
move merge to own tx
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 15, 2023
1 parent 657d41c commit 755dacf
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
tableNametoUnchangedToastCols[tableName],
getRawTableIdentifier(req.FlowJobName),
syncBatchID, normalizeBatchID,
req,
normalizeRecordsTx)
req)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
Expand Down Expand Up @@ -987,7 +986,6 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
syncBatchID int64,
normalizeBatchID int64,
normalizeReq *model.NormalizeRecordsRequest,
normalizeRecordsTx *sql.Tx,
) (int64, error) {
normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier]
columnNames := maps.Keys(normalizedTableSchema.Columns)
Expand Down Expand Up @@ -1093,16 +1091,28 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
"flowName": destinationTableIdentifier,
}).Infof("[merge] merging records into %s...", destinationTableIdentifier)

result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier)
// transaction for NormalizeRecords
mergeTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return 0, fmt.Errorf("unable to begin transactions for merge: %w", err)
}

// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := mergeTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.Errorf("unexpected error while rolling back transaction for merge: %v", deferErr)
}
}()

result, err := mergeTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier)
if err != nil {
return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w",
destinationTableIdentifier, mergeStatement, err)
}

endTime := time.Now()
log.WithFields(log.Fields{
"flowName": destinationTableIdentifier,
}).Infof("[merge] merged records into %s, took: %d seconds",
log.Infof("[merge] merged records into %s, took: %d seconds",
destinationTableIdentifier, endTime.Sub(startTime)/time.Second)

return result.RowsAffected()
Expand Down

0 comments on commit 755dacf

Please sign in to comment.