From 75f15b506c75818130c3964a37d1ab373fb2ce0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 9 Feb 2024 23:27:10 +0000 Subject: [PATCH] BQ merge fixes BQ `it.Next(&row)` seems to mutate the target, https://github.com/googleapis/google-cloud-go/blob/f049c9751415f9fc4c81c1839a8371782cfc016c/bigquery/value.go#L490 so reallocate a fresh row each iteration ArrayChunks: replace with ArrayIterChunks. Document behavior when input is empty slice This reduces allocations, since we've seen people having 1000+ columns go 1.22 has a rangefunc experiment, so eventually we can replace this with that https://go.dev/wiki/RangefuncExperiment --- flow/connectors/bigquery/bigquery.go | 31 +++++++++++++------ .../bigquery/merge_stmt_generator.go | 14 --------- flow/connectors/utils/array.go | 26 ++++++++++------ 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index ffafcb9505..81f1986205 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -407,11 +407,11 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync resultMap := make(map[string][]string) // Process the query results using an iterator. - var row struct { - Tablename string `bigquery:"_peerdb_destination_table_name"` - UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"` - } for { + var row struct { + Tablename string `bigquery:"_peerdb_destination_table_name"` + UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"` + } err := it.Next(&row) if err == iterator.Done { break @@ -529,18 +529,29 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) }, shortColumn: map[string]string{}, } + // normalize anything between last normalized batch id to last sync batchid - mergeStmts := mergeGen.generateMergeStmts(unchangedToastColumns) - for i, mergeStmt := range mergeStmts { - c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..", - i+1, len(mergeStmts), tableName)) + // TODO (kaushik): This is so that the statement size for individual merge statements + // doesn't exceed the limit. We should make this configurable. + const batchSize = 8 + stmtNum := 0 + err = utils.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error { + stmtNum += 1 + mergeStmt := mergeGen.generateMergeStmt(chunk) + c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..", + stmtNum, tableName)) + q := c.client.Query(mergeStmt) q.DefaultProjectID = c.projectID q.DefaultDatasetID = dstDatasetTable.dataset - _, err = q.Read(c.ctx) + _, err := q.Read(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) } + return nil + }) + if err != nil { + return nil, err } } diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index db7b4c83af..21350fc76e 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -177,20 +177,6 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) } -func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string { - // TODO (kaushik): This is so that the statement size for individual merge statements - // doesn't exceed the limit. We should make this configurable. - const batchSize = 8 - partitions := utils.ArrayChunks(allUnchangedToastColas, batchSize) - - mergeStmts := make([]string, 0, len(partitions)) - for _, partition := range partitions { - mergeStmts = append(mergeStmts, m.generateMergeStmt(partition)) - } - - return mergeStmts -} - /* This function takes an array of unique unchanged toast column groups and an array of all column names, and returns suitable UPDATE statements as part of a MERGE operation. diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index d131e3c65e..d203beacc2 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -18,17 +18,23 @@ func ArrayMinus[T comparable](first, second []T) []T { return result } -func ArrayChunks[T any](slice []T, size int) [][]T { - var partitions [][]T - - for size < len(slice) { - slice, partitions = slice[size:], append(partitions, slice[0:size]) +// Call f with subslices of slice. An empty slice will call f once with nil. +func ArrayIterChunks[T any](slice []T, size int, f func(chunk []T) error) error { + if len(slice) == 0 { + return f(nil) } - - // Add the last remaining values - partitions = append(partitions, slice) - - return partitions + if size <= 0 { + return nil + } + lo := 0 + for lo < len(slice) { + hi := min(lo+size, len(slice)) + if err := f(slice[lo:hi:hi]); err != nil { + return err + } + lo = hi + } + return nil } func ArraysHaveOverlap[T comparable](first, second []T) bool {