Skip to content

Commit

Permalink
fix ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 15, 2023
1 parent a2847f9 commit fe3f94d
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
}()

var totalRowsAffected int64 = 0
g, _ := errgroup.WithContext(context.Background())
g, gCtx := errgroup.WithContext(c.ctx)
sem := make(chan struct{}, 8) // semaphore to limit parallel merges

for _, destinationTableName := range destinationTableNames {
Expand All @@ -767,6 +767,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
defer func() { <-sem }() // release semaphore

rowsAffected, err := c.generateAndExecuteMergeStatement(
gCtx,
tableName,
tableNametoUnchangedToastCols[tableName],
getRawTableIdentifier(req.FlowJobName),
Expand Down Expand Up @@ -979,6 +980,7 @@ func (c *SnowflakeConnector) insertRecordsInRawTable(rawTableIdentifier string,
}

func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
ctx context.Context,
destinationTableIdentifier string,
unchangedToastColumns []string,
rawTableIdentifier string,
Expand Down Expand Up @@ -1086,7 +1088,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
fmt.Sprintf("(%s)", strings.Join(normalizedTableSchema.PrimaryKeyColumns, ",")),
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)

result, err := normalizeRecordsTx.ExecContext(c.ctx, mergeStatement, destinationTableIdentifier)
result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier)
if err != nil {
return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w",
destinationTableIdentifier, mergeStatement, err)
Expand Down

0 comments on commit fe3f94d

Please sign in to comment.