From ff7dbee9010554f008463d2b0279238f46ab575d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 11 Mar 2024 05:25:20 +0000 Subject: [PATCH] sf normalize: merge one at a time --- flow/connectors/snowflake/snowflake.go | 66 ++++++++++---------------- 1 file changed, 26 insertions(+), 40 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d0ff430888..e1594388f9 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -17,7 +17,6 @@ import ( "github.com/snowflakedb/gosnowflake" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" - "golang.org/x/sync/errgroup" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -549,53 +548,40 @@ func (c *SnowflakeConnector) mergeTablesForBatch( } var totalRowsAffected int64 = 0 - g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(8) // limit parallel merges to 8 for _, tableName := range destinationTableNames { - if err := gCtx.Err(); err != nil { - return fmt.Errorf("canceled while normalizing records: %w", err) + mergeGen := &mergeStmtGenerator{ + rawTableName: getRawTableIdentifier(flowName), + dstTableName: tableName, + mergeBatchId: batchId, + normalizedTableSchema: tableToSchema[tableName], + unchangedToastColumns: tableNameToUnchangedToastCols[tableName], + peerdbCols: peerdbCols, + } + mergeStatement, err := mergeGen.generateMergeStmt() + if err != nil { + return err } - g.Go(func() error { - mergeGen := &mergeStmtGenerator{ - rawTableName: getRawTableIdentifier(flowName), - dstTableName: tableName, - mergeBatchId: batchId, - normalizedTableSchema: tableToSchema[tableName], - unchangedToastColumns: tableNameToUnchangedToastCols[tableName], - peerdbCols: peerdbCols, - } - mergeStatement, err := mergeGen.generateMergeStmt() - if err != nil { - return err - } - - startTime := time.Now() - c.logger.Info("[merge] merging records...", "destTable", tableName, "batchId", batchId) - - result, err := c.database.ExecContext(gCtx, mergeStatement, tableName) - if err != nil { - return fmt.Errorf("failed to merge records into %s (statement: %s): %w", - tableName, mergeStatement, err) - } + startTime := time.Now() + c.logger.Info("[merge] merging records...", "destTable", tableName, "batchId", batchId) - endTime := time.Now() - c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", - tableName, endTime.Sub(startTime)/time.Second), "batchId", batchId) + result, err := c.database.ExecContext(ctx, mergeStatement, tableName) + if err != nil { + return fmt.Errorf("failed to merge records into %s (statement: %s): %w", + tableName, mergeStatement, err) + } - rowsAffected, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err) - } + endTime := time.Now() + c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", + tableName, endTime.Sub(startTime)/time.Second), "batchId", batchId) - atomic.AddInt64(&totalRowsAffected, rowsAffected) - return nil - }) - } + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err) + } - if err := g.Wait(); err != nil { - return fmt.Errorf("error while normalizing records: %w", err) + atomic.AddInt64(&totalRowsAffected, rowsAffected) } return nil