Skip to content

Commit

Permalink
minor refactor to BQ merge generator too
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 8, 2024
1 parent 40e56d4 commit d67cfd6
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 31 deletions.
27 changes: 11 additions & 16 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
updateStatementsforToastCols := m.generateUpdateStatements(pureColNames)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"
Expand Down Expand Up @@ -196,14 +195,10 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(
allCols []string,
unchangedToastCols []string,
peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string {
updateStmts := make([]string, 0, len(m.unchangedToastColumns))

for _, cols := range m.unchangedToastColumns {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
Expand All @@ -212,14 +207,14 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
if m.peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
m.peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
if m.peerdbCols.SoftDeleteColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=FALSE",
peerdbCols.SoftDeleteColName))
m.peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ",")
Expand All @@ -231,9 +226,9 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
if m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s`=TRUE", peerdbCols.SoftDeleteColName))
fmt.Sprintf("`%s`=TRUE", m.peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
_rt=2 AND _ut='%s'
Expand Down
33 changes: 18 additions & 15 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ import (
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
Expand Down Expand Up @@ -47,11 +53,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})
result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -64,15 +66,21 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
}

func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}

expected := []string{
"WHEN MATCHED AND _rt!=2 " +
Expand All @@ -89,12 +97,7 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols,
&protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})
result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down

0 comments on commit d67cfd6

Please sign in to comment.