diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d0ff430888..940f171751 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -549,53 +549,40 @@ func (c *SnowflakeConnector) mergeTablesForBatch( } var totalRowsAffected int64 = 0 - g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(8) // limit parallel merges to 8 for _, tableName := range destinationTableNames { - if err := gCtx.Err(); err != nil { - return fmt.Errorf("canceled while normalizing records: %w", err) + mergeGen := &mergeStmtGenerator{ + rawTableName: getRawTableIdentifier(flowName), + dstTableName: tableName, + mergeBatchId: batchId, + normalizedTableSchema: tableToSchema[tableName], + unchangedToastColumns: tableNameToUnchangedToastCols[tableName], + peerdbCols: peerdbCols, + } + mergeStatement, err := mergeGen.generateMergeStmt() + if err != nil { + return err } - g.Go(func() error { - mergeGen := &mergeStmtGenerator{ - rawTableName: getRawTableIdentifier(flowName), - dstTableName: tableName, - mergeBatchId: batchId, - normalizedTableSchema: tableToSchema[tableName], - unchangedToastColumns: tableNameToUnchangedToastCols[tableName], - peerdbCols: peerdbCols, - } - mergeStatement, err := mergeGen.generateMergeStmt() - if err != nil { - return err - } - - startTime := time.Now() - c.logger.Info("[merge] merging records...", "destTable", tableName, "batchId", batchId) - - result, err := c.database.ExecContext(gCtx, mergeStatement, tableName) - if err != nil { - return fmt.Errorf("failed to merge records into %s (statement: %s): %w", - tableName, mergeStatement, err) - } + startTime := time.Now() + c.logger.Info("[merge] merging records...", "destTable", tableName, "batchId", batchId) - endTime := time.Now() - c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", - tableName, endTime.Sub(startTime)/time.Second), "batchId", batchId) + result, err := c.database.ExecContext(ctx, mergeStatement, tableName) + if err != nil { + return fmt.Errorf("failed to merge records into %s (statement: %s): %w", + tableName, mergeStatement, err) + } - rowsAffected, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err) - } + endTime := time.Now() + c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", + tableName, endTime.Sub(startTime)/time.Second), "batchId", batchId) - atomic.AddInt64(&totalRowsAffected, rowsAffected) - return nil - }) - } + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err) + } - if err := g.Wait(); err != nil { - return fmt.Errorf("error while normalizing records: %w", err) + atomic.AddInt64(&totalRowsAffected, rowsAffected) } return nil