diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 9382f59c54..4c7c378dff 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -516,6 +516,10 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No g.SetLimit(8) // limit parallel merges to 8 for _, tableName := range destinationTableNames { + if err := gCtx.Err(); err != nil { + return nil, fmt.Errorf("canceled while normalizing records: %w", err) + } + g.Go(func() error { mergeGen := &mergeStmtGenerator{ rawTableName: getRawTableIdentifier(req.FlowJobName), @@ -547,10 +551,6 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No endTime := time.Now() c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", tableName, endTime.Sub(startTime)/time.Second)) - if err != nil { - c.logger.Error("[merge] error while normalizing records", "error", err) - return err - } rowsAffected, err := result.RowsAffected() if err != nil {