Skip to content

Commit

Permalink
refactor to PG as well, fixed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 8, 2024
1 parent d67cfd6 commit 25bab37
Show file tree
Hide file tree
Showing 9 changed files with 601 additions and 302 deletions.
13 changes: 10 additions & 3 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,14 @@ and updating the other columns (not the unchanged toast columns)
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string {
updateStmts := make([]string, 0, len(m.unchangedToastColumns))
handleSoftDelete := m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "")
// weird way of doing it but avoids prealloc lint
updateStmts := make([]string, 0, func() int {
if handleSoftDelete {
return 2 * len(m.unchangedToastColumns)
}
return len(m.unchangedToastColumns)
}())

for _, cols := range m.unchangedToastColumns {
unchangedColsArray := strings.Split(cols, ",")
Expand All @@ -212,7 +219,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string
m.peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if m.peerdbCols.SoftDeleteColName != "" {
if handleSoftDelete {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=FALSE",
m.peerdbCols.SoftDeleteColName))
}
Expand All @@ -226,7 +233,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string
// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "") {
if handleSoftDelete {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s`=TRUE", m.peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ",")
Expand Down
156 changes: 116 additions & 40 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,52 @@ package connbigquery

import (
"reflect"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
func TestGenerateUpdateStatement(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDelete: false,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"AND _ut=''" +
"THEN UPDATE SET " +
"`col1`=_d._c0," +
"`col2`=_d._c1," +
"`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
result[i] = removeSpacesTabsNewlines(result[i])
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}

func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
func TestGenerateUpdateStatement_WithSoftDelete(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
Expand Down Expand Up @@ -100,18 +82,112 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
result[i] = removeSpacesTabsNewlines(result[i])
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}

func removeSpacesTabsNewlines(s string) string {
s = strings.ReplaceAll(s, " ", "")
s = strings.ReplaceAll(s, "\t", "")
s = strings.ReplaceAll(s, "\n", "")
return s
func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: false,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}

func TestGenerateUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}
Loading

0 comments on commit 25bab37

Please sign in to comment.