From fe3f94dd7055003e9f35393ba697f7edd0ff94bb Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:01:25 -0500 Subject: [PATCH] fix ctx --- flow/connectors/snowflake/snowflake.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index cbc1501230..c9409b7b12 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -756,7 +756,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest }() var totalRowsAffected int64 = 0 - g, _ := errgroup.WithContext(context.Background()) + g, gCtx := errgroup.WithContext(c.ctx) sem := make(chan struct{}, 8) // semaphore to limit parallel merges for _, destinationTableName := range destinationTableNames { @@ -767,6 +767,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest defer func() { <-sem }() // release semaphore rowsAffected, err := c.generateAndExecuteMergeStatement( + gCtx, tableName, tableNametoUnchangedToastCols[tableName], getRawTableIdentifier(req.FlowJobName), @@ -979,6 +980,7 @@ func (c *SnowflakeConnector) insertRecordsInRawTable(rawTableIdentifier string, } func (c *SnowflakeConnector) generateAndExecuteMergeStatement( + ctx context.Context, destinationTableIdentifier string, unchangedToastColumns []string, rawTableIdentifier string, @@ -1086,7 +1088,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( fmt.Sprintf("(%s)", strings.Join(normalizedTableSchema.PrimaryKeyColumns, ",")), pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) - result, err := normalizeRecordsTx.ExecContext(c.ctx, mergeStatement, destinationTableIdentifier) + result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err)