Skip to content

Commit

Permalink
remove one more unneeded txn
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 15, 2023
1 parent d92c82e commit c3bbe62
Showing 1 changed file with 4 additions and 26 deletions.
30 changes: 4 additions & 26 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,21 +740,6 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err)
}

// transaction for NormalizeRecords
normalizeRecordsTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return nil, fmt.Errorf("unable to begin transactions for NormalizeRecords: %w", err)
}
// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := normalizeRecordsTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Errorf("unexpected error while rolling back transaction for NormalizeRecords: %v", deferErr)
}
}()

var totalRowsAffected int64 = 0
g, gCtx := errgroup.WithContext(c.ctx)
g.SetLimit(8) // limit parallel merges to 8
Expand Down Expand Up @@ -787,12 +772,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
}

// updating metadata with new normalizeBatchID
err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID, normalizeRecordsTx)
if err != nil {
return nil, err
}
// transaction commits
err = normalizeRecordsTx.Commit()
err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1159,8 +1139,7 @@ func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64
return nil
}

func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string,
normalizeBatchID int64, normalizeRecordsTx *sql.Tx) error {
func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64) error {
jobMetadataExists, err := c.jobMetadataExists(flowJobName)
if err != nil {
return fmt.Errorf("failed to get sync status for flow job: %w", err)
Expand All @@ -1169,9 +1148,8 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string,
return fmt.Errorf("job metadata does not exist, unable to update")
}

_, err = normalizeRecordsTx.ExecContext(c.ctx,
fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier),
normalizeBatchID, flowJobName)
stmt := fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier)
_, err = c.database.ExecContext(c.ctx, stmt, normalizeBatchID, flowJobName)
if err != nil {
return fmt.Errorf("failed to update metadata for NormalizeTables: %w", err)
}
Expand Down

0 comments on commit c3bbe62

Please sign in to comment.