Skip to content

Commit

Permalink
Snowflake normalize: check gCtx.Err() while iterating through tables (#…
Browse files Browse the repository at this point in the history
…1427)

errgroup.Go unconditionally executes the function / waits,
causing pauses to be ignored for awhile
  • Loading branch information
serprex authored Mar 4, 2024
1 parent 15c9a0e commit 39c6e7e
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No
g.SetLimit(8) // limit parallel merges to 8

for _, tableName := range destinationTableNames {
if err := gCtx.Err(); err != nil {
return nil, fmt.Errorf("canceled while normalizing records: %w", err)
}

g.Go(func() error {
mergeGen := &mergeStmtGenerator{
rawTableName: getRawTableIdentifier(req.FlowJobName),
Expand Down Expand Up @@ -547,10 +551,6 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No
endTime := time.Now()
c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds",
tableName, endTime.Sub(startTime)/time.Second))
if err != nil {
c.logger.Error("[merge] error while normalizing records", "error", err)
return err
}

rowsAffected, err := result.RowsAffected()
if err != nil {
Expand Down

0 comments on commit 39c6e7e

Please sign in to comment.