From 755dacf29776f114ce283b75b025c86b2f4ea240 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:38:56 -0500 Subject: [PATCH] move merge to own tx --- flow/connectors/snowflake/snowflake.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d66977ead0..ace3cc2ca3 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -769,8 +769,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest tableNametoUnchangedToastCols[tableName], getRawTableIdentifier(req.FlowJobName), syncBatchID, normalizeBatchID, - req, - normalizeRecordsTx) + req) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, @@ -987,7 +986,6 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( syncBatchID int64, normalizeBatchID int64, normalizeReq *model.NormalizeRecordsRequest, - normalizeRecordsTx *sql.Tx, ) (int64, error) { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] columnNames := maps.Keys(normalizedTableSchema.Columns) @@ -1093,16 +1091,28 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( "flowName": destinationTableIdentifier, }).Infof("[merge] merging records into %s...", destinationTableIdentifier) - result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) + // transaction for NormalizeRecords + mergeTx, err := c.database.BeginTx(c.ctx, nil) + if err != nil { + return 0, fmt.Errorf("unable to begin transactions for merge: %w", err) + } + + // in case we return after error, ensure transaction is rolled back + defer func() { + deferErr := mergeTx.Rollback() + if deferErr != sql.ErrTxDone && deferErr != nil { + log.Errorf("unexpected error while rolling back transaction for merge: %v", deferErr) + } + }() + + result, err := mergeTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err) } endTime := time.Now() - log.WithFields(log.Fields{ - "flowName": destinationTableIdentifier, - }).Infof("[merge] merged records into %s, took: %d seconds", + log.Infof("[merge] merged records into %s, took: %d seconds", destinationTableIdentifier, endTime.Sub(startTime)/time.Second) return result.RowsAffected()