diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 6d77932b83..a7a890a543 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -120,25 +120,12 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { return fmt.Sprintf(cte, pkeyColsStr) } -// TODO move to utils. -func partition[T any](slice []T, size int) [][]T { - var partitions [][]T - - for size < len(slice) { - slice, partitions = slice[size:], append(partitions, slice[0:size]) - } - - // Add the last remaining values - partitions = append(partitions, slice) - - return partitions -} - // generateMergeStmts generates merge statements, partitioned by unchanged toast columns. func (m *mergeStmtGenerator) generateMergeStmts() []string { - // partition unchanged toast columns into batches of 8 + // TODO (kaushik): This is so that the statement size for individual merge statements + // doesn't exceed the limit. We should make this configurable. const batchSize = 8 - partitions := partition(m.unchangedToastColumns, batchSize) + partitions := utils.ArrayPartition(m.unchangedToastColumns, batchSize) mergeStmts := make([]string, 0, len(partitions)) for _, partition := range partitions { diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index 3fe95df21e..0b74edb905 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -16,3 +16,16 @@ func ArrayMinus(first []string, second []string) []string { } return result } + +func ArrayPartition[T any](slice []T, size int) [][]T { + var partitions [][]T + + for size < len(slice) { + slice, partitions = slice[size:], append(partitions, slice[0:size]) + } + + // Add the last remaining values + partitions = append(partitions, slice) + + return partitions +}