From 0c736504d40ec8608e37869c1272057f9243c93b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 11 Jan 2024 18:58:23 -0500 Subject: [PATCH] fix some error (#1063) --- flow/connectors/bigquery/merge_stmt_generator.go | 2 +- flow/connectors/utils/array.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index d93e31ad89..6637678226 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -183,7 +183,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s func (m *mergeStmtGenerator) generateMergeStmts() []string { // TODO (kaushik): This is so that the statement size for individual merge statements // doesn't exceed the limit. We should make this configurable. - const batchSize = 8 + const batchSize = 4 partitions := utils.ArrayChunks(m.unchangedToastColumns, batchSize) mergeStmts := make([]string, 0, len(partitions)) diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index 487e8791ad..3db4d53bb3 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -18,13 +18,14 @@ func ArrayMinus(first []string, second []string) []string { } func ArrayChunks[T any](slice []T, size int) [][]T { - var chunks [][]T + var partitions [][]T for size < len(slice) { - chunks = append(chunks, slice[:size]) - slice = slice[size:] + slice, partitions = slice[size:], append(partitions, slice[0:size]) } // Add the last remaining values - return append(chunks, slice) + partitions = append(partitions, slice) + + return partitions }