Skip to content

Commit

Permalink
BQ merge fixes (#1241)
Browse files Browse the repository at this point in the history
BQ `it.Next(&row)` seems to mutate the target,

https://github.com/googleapis/google-cloud-go/blob/f049c9751415f9fc4c81c1839a8371782cfc016c/bigquery/value.go#L490
so reallocate a fresh row each iteration

ArrayChunks: replace with ArrayIterChunks. Document behavior when input
is empty slice
Also reduces allocations, since we've seen people having 1000+ columns

go 1.22 has a rangefunc experiment, so eventually we can replace this
with that https://go.dev/wiki/RangefuncExperiment
  • Loading branch information
serprex authored Feb 10, 2024
1 parent 774787a commit d7aef0a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
31 changes: 21 additions & 10 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,11 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
resultMap := make(map[string][]string)

// Process the query results using an iterator.
var row struct {
Tablename string `bigquery:"_peerdb_destination_table_name"`
UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"`
}
for {
var row struct {
Tablename string `bigquery:"_peerdb_destination_table_name"`
UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"`
}
err := it.Next(&row)
if err == iterator.Done {
break
Expand Down Expand Up @@ -529,18 +529,29 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts(unchangedToastColumns)
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
stmtNum := 0
err = utils.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
stmtNum += 1
mergeStmt := mergeGen.generateMergeStmt(chunk)
c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..",
stmtNum, tableName))

q := c.client.Query(mergeStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = dstDatasetTable.dataset
_, err = q.Read(c.ctx)
_, err := q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
return nil
})
if err != nil {
return nil, err
}
}

Expand Down
14 changes: 0 additions & 14 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,6 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string {
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
partitions := utils.ArrayChunks(allUnchangedToastColas, batchSize)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
mergeStmts = append(mergeStmts, m.generateMergeStmt(partition))
}

return mergeStmts
}

/*
This function takes an array of unique unchanged toast column groups and an array of all column names,
and returns suitable UPDATE statements as part of a MERGE operation.
Expand Down
26 changes: 16 additions & 10 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ func ArrayMinus[T comparable](first, second []T) []T {
return result
}

func ArrayChunks[T any](slice []T, size int) [][]T {
var partitions [][]T

for size < len(slice) {
slice, partitions = slice[size:], append(partitions, slice[0:size])
// Call f with subslices of slice. An empty slice will call f once with nil.
func ArrayIterChunks[T any](slice []T, size int, f func(chunk []T) error) error {
if len(slice) == 0 {
return f(nil)
}

// Add the last remaining values
partitions = append(partitions, slice)

return partitions
if size <= 0 {
return nil
}
lo := 0
for lo < len(slice) {
hi := min(lo+size, len(slice))
if err := f(slice[lo:hi:hi]); err != nil {
return err
}
lo = hi
}
return nil
}

func ArraysHaveOverlap[T comparable](first, second []T) bool {
Expand Down

0 comments on commit d7aef0a

Please sign in to comment.