diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2d1eacd029..0f8bcd99cc 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -587,6 +587,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) c.datasetID, rawTableName, distinctTableNames)) for _, tableName := range distinctTableNames { + unchangedToastColumns := tableNametoUnchangedToastCols[tableName] dstDatasetTable, _ := c.convertToDatasetTable(tableName) mergeGen := &mergeStmtGenerator{ rawDatasetTable: &datasetTable{ @@ -598,7 +599,6 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) normalizedTableSchema: c.tableNameSchemaMapping[tableName], syncBatchID: batchIDs.SyncBatchID, normalizeBatchID: batchIDs.NormalizeBatchID, - unchangedToastColumns: tableNametoUnchangedToastCols[tableName], peerdbCols: &protos.PeerDBColumns{ SoftDeleteColName: req.SoftDeleteColName, SyncedAtColName: req.SyncedAtColName, @@ -607,7 +607,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) shortColumn: map[string]string{}, } // normalize anything between last normalized batch id to last sync batchid - mergeStmts := mergeGen.generateMergeStmts() + mergeStmts := mergeGen.generateMergeStmts(unchangedToastColumns) for i, mergeStmt := range mergeStmts { c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..", i+1, len(mergeStmts), tableName)) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index d93e31ad89..eb861b4570 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -23,8 +23,6 @@ type mergeStmtGenerator struct { normalizeBatchID int64 // the schema of the table to merge into normalizedTableSchema *protos.TableSchema - // array of toast column combinations that are unchanged - unchangedToastColumns []string // _PEERDB_IS_DELETED and _SYNCED_AT columns peerdbCols *protos.PeerDBColumns // map for shorter columns @@ -139,7 +137,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName) insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP" - updateStatementsforToastCols := m.generateUpdateStatements(pureColNames) + updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, unchangedToastColumns) if m.peerdbCols.SoftDelete { softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName) softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE" @@ -180,11 +178,11 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) } -func (m *mergeStmtGenerator) generateMergeStmts() []string { +func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string { // TODO (kaushik): This is so that the statement size for individual merge statements // doesn't exceed the limit. We should make this configurable. const batchSize = 8 - partitions := utils.ArrayChunks(m.unchangedToastColumns, batchSize) + partitions := utils.ArrayChunks(allUnchangedToastColas, batchSize) mergeStmts := make([]string, 0, len(partitions)) for _, partition := range partitions { @@ -209,17 +207,17 @@ and updating the other columns (not the unchanged toast columns) 6. Repeat steps 1-5 for each unique unchanged toast column group. 7. Return the list of generated update statements. */ -func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string { +func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastColumns []string) []string { handleSoftDelete := m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "") // weird way of doing it but avoids prealloc lint updateStmts := make([]string, 0, func() int { if handleSoftDelete { - return 2 * len(m.unchangedToastColumns) + return 2 * len(unchangedToastColumns) } - return len(m.unchangedToastColumns) + return len(unchangedToastColumns) }()) - for _, cols := range m.unchangedToastColumns { + for _, cols := range unchangedToastColumns { unchangedColsArray := strings.Split(cols, ",") otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index cc49b17cbd..55759f6d39 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -12,7 +12,6 @@ func TestGenerateUpdateStatement(t *testing.T) { allCols := []string{"col1", "col2", "col3"} unchangedToastCols := []string{""} m := &mergeStmtGenerator{ - unchangedToastColumns: unchangedToastCols, shortColumn: map[string]string{ "col1": "_c0", "col2": "_c1", @@ -35,7 +34,7 @@ func TestGenerateUpdateStatement(t *testing.T) { "`synced_at`=CURRENT_TIMESTAMP", } - result := m.generateUpdateStatements(allCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols) for i := range expected { expected[i] = utils.RemoveSpacesTabsNewlines(expected[i]) @@ -51,7 +50,6 @@ func TestGenerateUpdateStatement_WithSoftDelete(t *testing.T) { allCols := []string{"col1", "col2", "col3"} unchangedToastCols := []string{""} m := &mergeStmtGenerator{ - unchangedToastColumns: unchangedToastCols, shortColumn: map[string]string{ "col1": "_c0", "col2": "_c1", @@ -79,7 +77,7 @@ func TestGenerateUpdateStatement_WithSoftDelete(t *testing.T) { "`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } - result := m.generateUpdateStatements(allCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols) for i := range expected { expected[i] = utils.RemoveSpacesTabsNewlines(expected[i]) @@ -100,7 +98,6 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { "col2": "_c1", "col3": "_c2", }, - unchangedToastColumns: unchangedToastCols, peerdbCols: &protos.PeerDBColumns{ SoftDelete: false, SoftDeleteColName: "deleted", @@ -123,7 +120,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { "`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP", } - result := m.generateUpdateStatements(allCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols) for i := range expected { expected[i] = utils.RemoveSpacesTabsNewlines(expected[i]) @@ -144,7 +141,6 @@ func TestGenerateUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *testing. "col2": "_c1", "col3": "_c2", }, - unchangedToastColumns: unchangedToastCols, peerdbCols: &protos.PeerDBColumns{ SoftDelete: true, SoftDeleteColName: "deleted", @@ -180,7 +176,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *testing. "`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } - result := m.generateUpdateStatements(allCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols) for i := range expected { expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])