From 39c6e7e380ceb61ba257fc3b43aa7300d2de1c51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Mar 2024 17:45:11 +0000 Subject: [PATCH] Snowflake normalize: check gCtx.Err() while iterating through tables (#1427) errgroup.Go unconditionally executes the function / waits, causing pauses to be ignored for awhile --- flow/connectors/snowflake/snowflake.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 9382f59c54..4c7c378dff 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -516,6 +516,10 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No g.SetLimit(8) // limit parallel merges to 8 for _, tableName := range destinationTableNames { + if err := gCtx.Err(); err != nil { + return nil, fmt.Errorf("canceled while normalizing records: %w", err) + } + g.Go(func() error { mergeGen := &mergeStmtGenerator{ rawTableName: getRawTableIdentifier(req.FlowJobName), @@ -547,10 +551,6 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No endTime := time.Now() c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", tableName, endTime.Sub(startTime)/time.Second)) - if err != nil { - c.logger.Error("[merge] error while normalizing records", "error", err) - return err - } rowsAffected, err := result.RowsAffected() if err != nil {