diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 3f01fa5e88..d4f2889506 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -757,15 +757,12 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(c.ctx) - sem := make(chan struct{}, 8) // semaphore to limit parallel merges + g.SetLimit(8) // limit parallel merges to 8 for _, destinationTableName := range destinationTableNames { - sem <- struct{}{} // block if semaphore is full tableName := destinationTableName // local variable for the closure g.Go(func() error { - defer func() { <-sem }() // release semaphore - rowsAffected, err := c.generateAndExecuteMergeStatement( gCtx, tableName,