Skip to content

Commit

Permalink
Merge branch 'main' into merge-schema-schema-ref
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 9, 2024
2 parents 490e14f + 9124df2 commit 35c5df8
Show file tree
Hide file tree
Showing 15 changed files with 978 additions and 608 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
updateStatementsforToastCols := m.generateUpdateStatements(pureColNames)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"
Expand Down Expand Up @@ -196,14 +195,17 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(
allCols []string,
unchangedToastCols []string,
peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string {
handleSoftDelete := m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "")
// weird way of doing it but avoids prealloc lint
updateStmts := make([]string, 0, func() int {
if handleSoftDelete {
return 2 * len(m.unchangedToastColumns)
}
return len(m.unchangedToastColumns)
}())

for _, cols := range m.unchangedToastColumns {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
Expand All @@ -212,14 +214,14 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
if m.peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
m.peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
if handleSoftDelete {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=FALSE",
peerdbCols.SoftDeleteColName))
m.peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ",")
Expand All @@ -231,9 +233,9 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
if handleSoftDelete {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s`=TRUE", peerdbCols.SoftDeleteColName))
fmt.Sprintf("`%s`=TRUE", m.peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
_rt=2 AND _ut='%s'
Expand Down
181 changes: 130 additions & 51 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,77 +2,67 @@ package connbigquery

import (
"reflect"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
func TestGenerateUpdateStatement(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
peerdbCols: &protos.PeerDBColumns{
SoftDelete: false,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"AND _ut=''" +
"THEN UPDATE SET " +
"`col1`=_d._c0," +
"`col2`=_d._c1," +
"`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})
result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
result[i] = removeSpacesTabsNewlines(result[i])
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}

func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
func TestGenerateUpdateStatement_WithSoftDelete(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}

expected := []string{
"WHEN MATCHED AND _rt!=2 " +
Expand All @@ -89,26 +79,115 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols,
&protos.PeerDBColumns{
SoftDelete: true,
result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: false,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})
},
}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
result[i] = removeSpacesTabsNewlines(result[i])
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}

func removeSpacesTabsNewlines(s string) string {
s = strings.ReplaceAll(s, " ", "")
s = strings.ReplaceAll(s, "\t", "")
s = strings.ReplaceAll(s, "\n", "")
return s
func TestGenerateUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}
Loading

0 comments on commit 35c5df8

Please sign in to comment.