Skip to content

Commit

Permalink
[snowflake] Run merges in parallel during normalize flow
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 15, 2023
1 parent 173142e commit a2847f9
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"regexp"
"strings"
"sync/atomic"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/snowflakedb/gosnowflake"
"go.temporal.io/sdk/activity"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
)

//nolint:stylecheck
Expand Down Expand Up @@ -754,19 +756,34 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
}()

var totalRowsAffected int64 = 0
// execute merge statements per table that uses CTEs to merge data into the normalized table
g, _ := errgroup.WithContext(context.Background())
sem := make(chan struct{}, 8) // semaphore to limit parallel merges

for _, destinationTableName := range destinationTableNames {
rowsAffected, err := c.generateAndExecuteMergeStatement(
destinationTableName,
tableNametoUnchangedToastCols[destinationTableName],
getRawTableIdentifier(req.FlowJobName),
syncBatchID, normalizeBatchID,
req,
normalizeRecordsTx)
if err != nil {
return nil, err
}
totalRowsAffected += rowsAffected
sem <- struct{}{} // block if semaphore is full
tableName := destinationTableName // local variable for the closure

g.Go(func() error {
defer func() { <-sem }() // release semaphore

rowsAffected, err := c.generateAndExecuteMergeStatement(
tableName,
tableNametoUnchangedToastCols[tableName],
getRawTableIdentifier(req.FlowJobName),
syncBatchID, normalizeBatchID,
req,
normalizeRecordsTx)
if err != nil {
return err
}

atomic.AddInt64(&totalRowsAffected, rowsAffected)
return nil
})
}

if err := g.Wait(); err != nil {
return nil, fmt.Errorf("error while normalizing records: %w", err)
}

// updating metadata with new normalizeBatchID
Expand Down

0 comments on commit a2847f9

Please sign in to comment.