diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 81f198620..ffafcb950 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -407,11 +407,11 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync resultMap := make(map[string][]string) // Process the query results using an iterator. + var row struct { + Tablename string `bigquery:"_peerdb_destination_table_name"` + UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"` + } for { - var row struct { - Tablename string `bigquery:"_peerdb_destination_table_name"` - UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"` - } err := it.Next(&row) if err == iterator.Done { break @@ -529,29 +529,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) }, shortColumn: map[string]string{}, } - // normalize anything between last normalized batch id to last sync batchid - // TODO (kaushik): This is so that the statement size for individual merge statements - // doesn't exceed the limit. We should make this configurable. - const batchSize = 8 - stmtNum := 0 - err = utils.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error { - stmtNum += 1 - mergeStmt := mergeGen.generateMergeStmt(chunk) - c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..", - stmtNum, tableName)) - + mergeStmts := mergeGen.generateMergeStmts(unchangedToastColumns) + for i, mergeStmt := range mergeStmts { + c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..", + i+1, len(mergeStmts), tableName)) q := c.client.Query(mergeStmt) q.DefaultProjectID = c.projectID q.DefaultDatasetID = dstDatasetTable.dataset - _, err := q.Read(c.ctx) + _, err = q.Read(c.ctx) if err != nil { - return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) } - return nil - }) - if err != nil { - return nil, err } } diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 21350fc76..db7b4c83a 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -177,6 +177,20 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) } +func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string { + // TODO (kaushik): This is so that the statement size for individual merge statements + // doesn't exceed the limit. We should make this configurable. + const batchSize = 8 + partitions := utils.ArrayChunks(allUnchangedToastColas, batchSize) + + mergeStmts := make([]string, 0, len(partitions)) + for _, partition := range partitions { + mergeStmts = append(mergeStmts, m.generateMergeStmt(partition)) + } + + return mergeStmts +} + /* This function takes an array of unique unchanged toast column groups and an array of all column names, and returns suitable UPDATE statements as part of a MERGE operation. diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index 1646ba022..d131e3c65 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -18,19 +18,17 @@ func ArrayMinus[T comparable](first, second []T) []T { return result } -func ArrayIterChunks[T any](slice []T, size int, f func(chunk []T) error) error { - if size <= 0 { - return nil - } - lo := 0 - for lo < len(slice) { - hi := min(lo+size, len(slice)) - if err := f(slice[lo:hi:hi]); err != nil { - return err - } - lo = hi +func ArrayChunks[T any](slice []T, size int) [][]T { + var partitions [][]T + + for size < len(slice) { + slice, partitions = slice[size:], append(partitions, slice[0:size]) } - return nil + + // Add the last remaining values + partitions = append(partitions, slice) + + return partitions } func ArraysHaveOverlap[T comparable](first, second []T) bool {