Skip to content

Commit

Permalink
more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 6, 2024
1 parent 5f035af commit cfa49a4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
19 changes: 3 additions & 16 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,12 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
return fmt.Sprintf(cte, pkeyColsStr)
}

// TODO move to utils.
func partition[T any](slice []T, size int) [][]T {
var partitions [][]T

for size < len(slice) {
slice, partitions = slice[size:], append(partitions, slice[0:size])
}

// Add the last remaining values
partitions = append(partitions, slice)

return partitions
}

// generateMergeStmts generates merge statements, partitioned by unchanged toast columns.
func (m *mergeStmtGenerator) generateMergeStmts() []string {
// partition unchanged toast columns into batches of 8
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
partitions := partition(m.unchangedToastColumns, batchSize)
partitions := utils.ArrayPartition(m.unchangedToastColumns, batchSize)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,16 @@ func ArrayMinus(first []string, second []string) []string {
}
return result
}

func ArrayPartition[T any](slice []T, size int) [][]T {
var partitions [][]T

for size < len(slice) {
slice, partitions = slice[size:], append(partitions, slice[0:size])
}

// Add the last remaining values
partitions = append(partitions, slice)

return partitions
}

0 comments on commit cfa49a4

Please sign in to comment.