diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 01affcddfd..f3a00da222 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -139,8 +139,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName) insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP" - updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, - m.unchangedToastColumns, m.peerdbCols) + updateStatementsforToastCols := m.generateUpdateStatements(pureColNames) if m.peerdbCols.SoftDelete { softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName) softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE" @@ -196,14 +195,10 @@ and updating the other columns (not the unchanged toast columns) 6. Repeat steps 1-5 for each unique unchanged toast column group. 7. Return the list of generated update statements. */ -func (m *mergeStmtGenerator) generateUpdateStatements( - allCols []string, - unchangedToastCols []string, - peerdbCols *protos.PeerDBColumns, -) []string { - updateStmts := make([]string, 0, len(unchangedToastCols)) - - for _, cols := range unchangedToastCols { +func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string { + updateStmts := make([]string, 0, len(m.unchangedToastColumns)) + + for _, cols := range m.unchangedToastColumns { unchangedColsArray := strings.Split(cols, ",") otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) @@ -212,14 +207,14 @@ func (m *mergeStmtGenerator) generateUpdateStatements( } // set the synced at column to the current timestamp - if peerdbCols.SyncedAtColName != "" { + if m.peerdbCols.SyncedAtColName != "" { tmpArray = append(tmpArray, fmt.Sprintf("`%s`=CURRENT_TIMESTAMP", - peerdbCols.SyncedAtColName)) + m.peerdbCols.SyncedAtColName)) } // set soft-deleted to false, tackles insert after soft-delete - if peerdbCols.SoftDeleteColName != "" { + if m.peerdbCols.SoftDeleteColName != "" { tmpArray = append(tmpArray, fmt.Sprintf("`%s`=FALSE", - peerdbCols.SoftDeleteColName)) + m.peerdbCols.SoftDeleteColName)) } ssep := strings.Join(tmpArray, ",") @@ -231,9 +226,9 @@ func (m *mergeStmtGenerator) generateUpdateStatements( // generates update statements for the case where updates and deletes happen in the same branch // the backfill has happened from the pull side already, so treat the DeleteRecord as an update // and then set soft-delete to true. - if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { + if m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "") { tmpArray = append(tmpArray[:len(tmpArray)-1], - fmt.Sprintf("`%s`=TRUE", peerdbCols.SoftDeleteColName)) + fmt.Sprintf("`%s`=TRUE", m.peerdbCols.SoftDeleteColName)) ssep := strings.Join(tmpArray, ",") updateStmt := fmt.Sprintf(`WHEN MATCHED AND _rt=2 AND _ut='%s' diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 141b3999b7..1857b41d76 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -9,15 +9,21 @@ import ( ) func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { + allCols := []string{"col1", "col2", "col3"} + unchangedToastCols := []string{"", "col2,col3", "col2", "col3"} m := &mergeStmtGenerator{ shortColumn: map[string]string{ "col1": "_c0", "col2": "_c1", "col3": "_c2", }, + unchangedToastColumns: unchangedToastCols, + peerdbCols: &protos.PeerDBColumns{ + SoftDelete: true, + SoftDeleteColName: "deleted", + SyncedAtColName: "synced_at", + }, } - allCols := []string{"col1", "col2", "col3"} - unchangedToastCols := []string{"", "col2,col3", "col2", "col3"} expected := []string{ "WHEN MATCHED AND _rt!=2 AND _ut=''" + @@ -47,11 +53,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { "`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } - result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{ - SoftDelete: true, - SoftDeleteColName: "deleted", - SyncedAtColName: "synced_at", - }) + result := m.generateUpdateStatements(allCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) @@ -64,15 +66,21 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { } func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { + allCols := []string{"col1", "col2", "col3"} + unchangedToastCols := []string{""} m := &mergeStmtGenerator{ + unchangedToastColumns: unchangedToastCols, shortColumn: map[string]string{ "col1": "_c0", "col2": "_c1", "col3": "_c2", }, + peerdbCols: &protos.PeerDBColumns{ + SoftDelete: true, + SoftDeleteColName: "deleted", + SyncedAtColName: "synced_at", + }, } - allCols := []string{"col1", "col2", "col3"} - unchangedToastCols := []string{""} expected := []string{ "WHEN MATCHED AND _rt!=2 " + @@ -89,12 +97,7 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { "`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } - result := m.generateUpdateStatements(allCols, unchangedToastCols, - &protos.PeerDBColumns{ - SoftDelete: true, - SoftDeleteColName: "deleted", - SyncedAtColName: "synced_at", - }) + result := m.generateUpdateStatements(allCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i])