Skip to content

Commit

Permalink
sf normalize: merge one at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 11, 2024
1 parent 7d6c489 commit 652d5da
Showing 1 changed file with 26 additions and 39 deletions.
65 changes: 26 additions & 39 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,53 +549,40 @@ func (c *SnowflakeConnector) mergeTablesForBatch(
}

var totalRowsAffected int64 = 0
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(8) // limit parallel merges to 8

for _, tableName := range destinationTableNames {
if err := gCtx.Err(); err != nil {
return fmt.Errorf("canceled while normalizing records: %w", err)
mergeGen := &mergeStmtGenerator{
rawTableName: getRawTableIdentifier(flowName),
dstTableName: tableName,
mergeBatchId: batchId,
normalizedTableSchema: tableToSchema[tableName],
unchangedToastColumns: tableNameToUnchangedToastCols[tableName],
peerdbCols: peerdbCols,
}
mergeStatement, err := mergeGen.generateMergeStmt()
if err != nil {
return err
}

g.Go(func() error {
mergeGen := &mergeStmtGenerator{
rawTableName: getRawTableIdentifier(flowName),
dstTableName: tableName,
mergeBatchId: batchId,
normalizedTableSchema: tableToSchema[tableName],
unchangedToastColumns: tableNameToUnchangedToastCols[tableName],
peerdbCols: peerdbCols,
}
mergeStatement, err := mergeGen.generateMergeStmt()
if err != nil {
return err
}

startTime := time.Now()
c.logger.Info("[merge] merging records...", "destTable", tableName, "batchId", batchId)

result, err := c.database.ExecContext(gCtx, mergeStatement, tableName)
if err != nil {
return fmt.Errorf("failed to merge records into %s (statement: %s): %w",
tableName, mergeStatement, err)
}
startTime := time.Now()
c.logger.Info("[merge] merging records...", "destTable", tableName, "batchId", batchId)

endTime := time.Now()
c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds",
tableName, endTime.Sub(startTime)/time.Second), "batchId", batchId)
result, err := c.database.ExecContext(ctx, mergeStatement, tableName)
if err != nil {
return fmt.Errorf("failed to merge records into %s (statement: %s): %w",
tableName, mergeStatement, err)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err)
}
endTime := time.Now()
c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds",
tableName, endTime.Sub(startTime)/time.Second), "batchId", batchId)

atomic.AddInt64(&totalRowsAffected, rowsAffected)
return nil
})
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err)
}

if err := g.Wait(); err != nil {
return fmt.Errorf("error while normalizing records: %w", err)
atomic.AddInt64(&totalRowsAffected, rowsAffected)
}

return nil
Expand Down

0 comments on commit 652d5da

Please sign in to comment.