From db3d000af575f327058c414c991f550ab1d2c7c1 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 09:01:59 -0500 Subject: [PATCH 01/12] [snowflake] Run merges in parallel during normalize flow --- flow/connectors/snowflake/snowflake.go | 41 ++++++++++++++++++-------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4737a6def6..cbc1501230 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -8,6 +8,7 @@ import ( "fmt" "regexp" "strings" + "sync/atomic" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -20,6 +21,7 @@ import ( "github.com/snowflakedb/gosnowflake" "go.temporal.io/sdk/activity" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" ) //nolint:stylecheck @@ -754,19 +756,34 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest }() var totalRowsAffected int64 = 0 - // execute merge statements per table that uses CTEs to merge data into the normalized table + g, _ := errgroup.WithContext(context.Background()) + sem := make(chan struct{}, 8) // semaphore to limit parallel merges + for _, destinationTableName := range destinationTableNames { - rowsAffected, err := c.generateAndExecuteMergeStatement( - destinationTableName, - tableNametoUnchangedToastCols[destinationTableName], - getRawTableIdentifier(req.FlowJobName), - syncBatchID, normalizeBatchID, - req, - normalizeRecordsTx) - if err != nil { - return nil, err - } - totalRowsAffected += rowsAffected + sem <- struct{}{} // block if semaphore is full + tableName := destinationTableName // local variable for the closure + + g.Go(func() error { + defer func() { <-sem }() // release semaphore + + rowsAffected, err := c.generateAndExecuteMergeStatement( + tableName, + tableNametoUnchangedToastCols[tableName], + getRawTableIdentifier(req.FlowJobName), + syncBatchID, normalizeBatchID, + req, + normalizeRecordsTx) + if err != nil { + return err + } + + atomic.AddInt64(&totalRowsAffected, rowsAffected) + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, fmt.Errorf("error while normalizing records: %w", err) } // updating metadata with new normalizeBatchID From 113dfcc6f68af243268f1b039840aebdcd4ef31f Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:01:25 -0500 Subject: [PATCH 02/12] fix ctx --- flow/connectors/snowflake/snowflake.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index cbc1501230..c9409b7b12 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -756,7 +756,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest }() var totalRowsAffected int64 = 0 - g, _ := errgroup.WithContext(context.Background()) + g, gCtx := errgroup.WithContext(c.ctx) sem := make(chan struct{}, 8) // semaphore to limit parallel merges for _, destinationTableName := range destinationTableNames { @@ -767,6 +767,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest defer func() { <-sem }() // release semaphore rowsAffected, err := c.generateAndExecuteMergeStatement( + gCtx, tableName, tableNametoUnchangedToastCols[tableName], getRawTableIdentifier(req.FlowJobName), @@ -979,6 +980,7 @@ func (c *SnowflakeConnector) insertRecordsInRawTable(rawTableIdentifier string, } func (c *SnowflakeConnector) generateAndExecuteMergeStatement( + ctx context.Context, destinationTableIdentifier string, unchangedToastColumns []string, rawTableIdentifier string, @@ -1086,7 +1088,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( fmt.Sprintf("(%s)", strings.Join(normalizedTableSchema.PrimaryKeyColumns, ",")), pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) - result, err := normalizeRecordsTx.ExecContext(c.ctx, mergeStatement, destinationTableIdentifier) + result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err) From ac31cf6fea5fcceda70cb7455a2bea8ff4e05e38 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:01:58 -0500 Subject: [PATCH 03/12] more error --- flow/connectors/snowflake/snowflake.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index c9409b7b12..3f01fa5e88 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -775,6 +775,9 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest req, normalizeRecordsTx) if err != nil { + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Errorf("[merge] error while normalizing records: %v", err) return err } From e5de1ee0b1cfd040f95aeb1d4492725030e2d948 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:03:18 -0500 Subject: [PATCH 04/12] add set limit --- flow/connectors/snowflake/snowflake.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 3f01fa5e88..d4f2889506 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -757,15 +757,12 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(c.ctx) - sem := make(chan struct{}, 8) // semaphore to limit parallel merges + g.SetLimit(8) // limit parallel merges to 8 for _, destinationTableName := range destinationTableNames { - sem <- struct{}{} // block if semaphore is full tableName := destinationTableName // local variable for the closure g.Go(func() error { - defer func() { <-sem }() // release semaphore - rowsAffected, err := c.generateAndExecuteMergeStatement( gCtx, tableName, From 29f5d0780f7381038030c0ecfa59f3af6df2b19d Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:13:10 -0500 Subject: [PATCH 05/12] add logging --- flow/connectors/snowflake/snowflake.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d4f2889506..7c00309098 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -1088,12 +1088,19 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( fmt.Sprintf("(%s)", strings.Join(normalizedTableSchema.PrimaryKeyColumns, ",")), pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) + startTime := time.Now() result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err) } + endTime := time.Now() + log.WithFields(log.Fields{ + "flowName": destinationTableIdentifier, + }).Infof("[merge] merged records into %s, took: %d seconds", + destinationTableIdentifier, endTime.Sub(startTime)/time.Second) + return result.RowsAffected() } From e8e47cb70da1e8ff75b1a11a899e66f762dc4407 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:21:04 -0500 Subject: [PATCH 06/12] increase timeout --- flow/connectors/snowflake/snowflake.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7c00309098..93dd1a3fbb 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -138,7 +138,7 @@ func NewSnowflakeConnector(ctx context.Context, Database: snowflakeProtoConfig.Database, Warehouse: snowflakeProtoConfig.Warehouse, Role: snowflakeProtoConfig.Role, - RequestTimeout: time.Duration(snowflakeProtoConfig.QueryTimeout), + RequestTimeout: time.Duration(snowflakeProtoConfig.QueryTimeout * 10), DisableTelemetry: true, } snowflakeConfigDSN, err := gosnowflake.DSN(&snowflakeConfig) From 89de1aa4e26dc76239b665783eaaa36803fb466f Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:22:13 -0500 Subject: [PATCH 07/12] add one more useful log --- flow/connectors/snowflake/snowflake.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 93dd1a3fbb..d66977ead0 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -1089,6 +1089,10 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) startTime := time.Now() + log.WithFields(log.Fields{ + "flowName": destinationTableIdentifier, + }).Infof("[merge] merging records into %s...", destinationTableIdentifier) + result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", From 8e49f5417df9c3f093dc8373e5824382a21b9736 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 11:38:56 -0500 Subject: [PATCH 08/12] move merge to own tx --- flow/connectors/snowflake/snowflake.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d66977ead0..ace3cc2ca3 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -769,8 +769,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest tableNametoUnchangedToastCols[tableName], getRawTableIdentifier(req.FlowJobName), syncBatchID, normalizeBatchID, - req, - normalizeRecordsTx) + req) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, @@ -987,7 +986,6 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( syncBatchID int64, normalizeBatchID int64, normalizeReq *model.NormalizeRecordsRequest, - normalizeRecordsTx *sql.Tx, ) (int64, error) { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] columnNames := maps.Keys(normalizedTableSchema.Columns) @@ -1093,16 +1091,28 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( "flowName": destinationTableIdentifier, }).Infof("[merge] merging records into %s...", destinationTableIdentifier) - result, err := normalizeRecordsTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) + // transaction for NormalizeRecords + mergeTx, err := c.database.BeginTx(c.ctx, nil) + if err != nil { + return 0, fmt.Errorf("unable to begin transactions for merge: %w", err) + } + + // in case we return after error, ensure transaction is rolled back + defer func() { + deferErr := mergeTx.Rollback() + if deferErr != sql.ErrTxDone && deferErr != nil { + log.Errorf("unexpected error while rolling back transaction for merge: %v", deferErr) + } + }() + + result, err := mergeTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err) } endTime := time.Now() - log.WithFields(log.Fields{ - "flowName": destinationTableIdentifier, - }).Infof("[merge] merged records into %s, took: %d seconds", + log.Infof("[merge] merged records into %s, took: %d seconds", destinationTableIdentifier, endTime.Sub(startTime)/time.Second) return result.RowsAffected() From 1158cf2933dba704428be4f4c430644c288c9caf Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 14:44:54 -0500 Subject: [PATCH 09/12] commit the tx --- flow/connectors/snowflake/snowflake.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index ace3cc2ca3..e73331be0f 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -1111,6 +1111,11 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( destinationTableIdentifier, mergeStatement, err) } + err = mergeTx.Commit() + if err != nil { + return 0, fmt.Errorf("unable to commit transaction for merge: %w", err) + } + endTime := time.Now() log.Infof("[merge] merged records into %s, took: %d seconds", destinationTableIdentifier, endTime.Sub(startTime)/time.Second) From 4d9aa82f6872a3c4f5d40735a320e5a82b3bd100 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 14:45:43 -0500 Subject: [PATCH 10/12] remove txn for merge --- flow/connectors/snowflake/snowflake.go | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e73331be0f..59a345923b 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -1091,31 +1091,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( "flowName": destinationTableIdentifier, }).Infof("[merge] merging records into %s...", destinationTableIdentifier) - // transaction for NormalizeRecords - mergeTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return 0, fmt.Errorf("unable to begin transactions for merge: %w", err) - } - - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := mergeTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - log.Errorf("unexpected error while rolling back transaction for merge: %v", deferErr) - } - }() - - result, err := mergeTx.ExecContext(ctx, mergeStatement, destinationTableIdentifier) + result, err := c.database.ExecContext(ctx, mergeStatement, destinationTableIdentifier) if err != nil { return 0, fmt.Errorf("failed to merge records into %s (statement: %s): %w", destinationTableIdentifier, mergeStatement, err) } - err = mergeTx.Commit() - if err != nil { - return 0, fmt.Errorf("unable to commit transaction for merge: %w", err) - } - endTime := time.Now() log.Infof("[merge] merged records into %s, took: %d seconds", destinationTableIdentifier, endTime.Sub(startTime)/time.Second) From d92c82e088c62278e467493e479c1194a7089429 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 14:55:39 -0500 Subject: [PATCH 11/12] remove the timeout hack --- flow/connectors/snowflake/snowflake.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 59a345923b..59ace6d19f 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -138,7 +138,7 @@ func NewSnowflakeConnector(ctx context.Context, Database: snowflakeProtoConfig.Database, Warehouse: snowflakeProtoConfig.Warehouse, Role: snowflakeProtoConfig.Role, - RequestTimeout: time.Duration(snowflakeProtoConfig.QueryTimeout * 10), + RequestTimeout: time.Duration(snowflakeProtoConfig.QueryTimeout), DisableTelemetry: true, } snowflakeConfigDSN, err := gosnowflake.DSN(&snowflakeConfig) From c3bbe62ad8441c07708007a263dad94c0411c2be Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 15 Nov 2023 15:18:48 -0500 Subject: [PATCH 12/12] remove one more unneeded txn --- flow/connectors/snowflake/snowflake.go | 30 ++++---------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 59ace6d19f..b2065a39c1 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -740,21 +740,6 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err) } - // transaction for NormalizeRecords - normalizeRecordsTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return nil, fmt.Errorf("unable to begin transactions for NormalizeRecords: %w", err) - } - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := normalizeRecordsTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - log.WithFields(log.Fields{ - "flowName": req.FlowJobName, - }).Errorf("unexpected error while rolling back transaction for NormalizeRecords: %v", deferErr) - } - }() - var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(c.ctx) g.SetLimit(8) // limit parallel merges to 8 @@ -787,12 +772,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest } // updating metadata with new normalizeBatchID - err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID, normalizeRecordsTx) - if err != nil { - return nil, err - } - // transaction commits - err = normalizeRecordsTx.Commit() + err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID) if err != nil { return nil, err } @@ -1159,8 +1139,7 @@ func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64 return nil } -func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, - normalizeBatchID int64, normalizeRecordsTx *sql.Tx) error { +func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64) error { jobMetadataExists, err := c.jobMetadataExists(flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) @@ -1169,9 +1148,8 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, return fmt.Errorf("job metadata does not exist, unable to update") } - _, err = normalizeRecordsTx.ExecContext(c.ctx, - fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier), - normalizeBatchID, flowJobName) + stmt := fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier) + _, err = c.database.ExecContext(c.ctx, stmt, normalizeBatchID, flowJobName) if err != nil { return fmt.Errorf("failed to update metadata for NormalizeTables: %w", err) }