Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 10, 2024
1 parent 858ece3 commit 04029b5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 33 deletions.
31 changes: 10 additions & 21 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,11 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
resultMap := make(map[string][]string)

// Process the query results using an iterator.
var row struct {
Tablename string `bigquery:"_peerdb_destination_table_name"`
UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"`
}
for {
var row struct {
Tablename string `bigquery:"_peerdb_destination_table_name"`
UnchangedToastColumns []string `bigquery:"unchanged_toast_columns"`
}
err := it.Next(&row)
if err == iterator.Done {
break
Expand Down Expand Up @@ -529,29 +529,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
stmtNum := 0
err = utils.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
stmtNum += 1
mergeStmt := mergeGen.generateMergeStmt(chunk)
c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..",
stmtNum, tableName))

mergeStmts := mergeGen.generateMergeStmts(unchangedToastColumns)
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = dstDatasetTable.dataset
_, err := q.Read(c.ctx)
_, err = q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
return nil
})
if err != nil {
return nil, err
}
}

Expand Down
14 changes: 14 additions & 0 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,20 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string {
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
partitions := utils.ArrayChunks(allUnchangedToastColas, batchSize)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
mergeStmts = append(mergeStmts, m.generateMergeStmt(partition))
}

return mergeStmts
}

/*
This function takes an array of unique unchanged toast column groups and an array of all column names,
and returns suitable UPDATE statements as part of a MERGE operation.
Expand Down
22 changes: 10 additions & 12 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ func ArrayMinus[T comparable](first, second []T) []T {
return result
}

func ArrayIterChunks[T any](slice []T, size int, f func(chunk []T) error) error {
if size <= 0 {
return nil
}
lo := 0
for lo < len(slice) {
hi := min(lo+size, len(slice))
if err := f(slice[lo:hi:hi]); err != nil {
return err
}
lo = hi
func ArrayChunks[T any](slice []T, size int) [][]T {
var partitions [][]T

for size < len(slice) {
slice, partitions = slice[size:], append(partitions, slice[0:size])
}
return nil

// Add the last remaining values
partitions = append(partitions, slice)

return partitions
}

func ArraysHaveOverlap[T comparable](first, second []T) bool {
Expand Down

0 comments on commit 04029b5

Please sign in to comment.