From 15f39074adddd74a5e7c97d1d25e43b074da9b54 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sat, 6 Jan 2024 09:48:24 -0500 Subject: [PATCH] Unchanged toast split (#1004) --- .github/workflows/customer-docker.yml | 2 +- flow/connectors/bigquery/bigquery.go | 14 ++++++++--- .../bigquery/merge_statement_generator.go | 25 ++++++++++++++++--- flow/connectors/utils/array.go | 13 ++++++++++ 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/.github/workflows/customer-docker.yml b/.github/workflows/customer-docker.yml index 6d9eb4dbdc..4b25c2d9e4 100644 --- a/.github/workflows/customer-docker.yml +++ b/.github/workflows/customer-docker.yml @@ -12,7 +12,7 @@ jobs: docker-build: strategy: matrix: - runner: [ubicloud-standard-2-ubuntu-2204-arm] + runner: [ubuntu-latest] runs-on: ${{ matrix.runner }} permissions: contents: read diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index c60acaa47b..d08935cf37 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -606,13 +606,19 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) }, shortColumn: map[string]string{}, } + // normalize anything between last normalized batch id to last sync batchid - mergeStmt := mergeGen.generateMergeStmt() + mergeStmts := mergeGen.generateMergeStmts() // run the merge statement - _, err = c.client.Query(mergeStmt).Read(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + for i, mergeStmt := range mergeStmts { + c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..", + i+1, len(mergeStmts), tableName)) + q := c.client.Query(mergeStmt) + _, err = q.Read(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + } } } // update metadata to make the last normalized batch id to the recent last sync batch id. diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 47dd2b3936..a7a890a543 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -120,8 +120,22 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { return fmt.Sprintf(cte, pkeyColsStr) } -// generateMergeStmt generates a merge statement. -func (m *mergeStmtGenerator) generateMergeStmt() string { +// generateMergeStmts generates merge statements, partitioned by unchanged toast columns. +func (m *mergeStmtGenerator) generateMergeStmts() []string { + // TODO (kaushik): This is so that the statement size for individual merge statements + // doesn't exceed the limit. We should make this configurable. + const batchSize = 8 + partitions := utils.ArrayPartition(m.unchangedToastColumns, batchSize) + + mergeStmts := make([]string, 0, len(partitions)) + for _, partition := range partitions { + mergeStmts = append(mergeStmts, m.generateMergeStmt(partition)) + } + + return mergeStmts +} + +func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) string { // comma separated list of column names columnCount := utils.TableSchemaColumns(m.normalizedTableSchema) backtickColNames := make([]string, 0, columnCount) @@ -139,8 +153,11 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName) insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP" - updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, - m.unchangedToastColumns, m.peerdbCols) + updateStatementsforToastCols := m.generateUpdateStatements( + pureColNames, + unchangedToastColumns, + m.peerdbCols, + ) if m.peerdbCols.SoftDelete { softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName) softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE" diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index 3fe95df21e..0b74edb905 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -16,3 +16,16 @@ func ArrayMinus(first []string, second []string) []string { } return result } + +func ArrayPartition[T any](slice []T, size int) [][]T { + var partitions [][]T + + for size < len(slice) { + slice, partitions = slice[size:], append(partitions, slice[0:size]) + } + + // Add the last remaining values + partitions = append(partitions, slice) + + return partitions +}