Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactored SF and PG to standalone merge generator #1026

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
updateStatementsforToastCols := m.generateUpdateStatements(pureColNames)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"
Expand Down Expand Up @@ -196,14 +195,17 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(
allCols []string,
unchangedToastCols []string,
peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string {
handleSoftDelete := m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "")
// weird way of doing it but avoids prealloc lint
updateStmts := make([]string, 0, func() int {
if handleSoftDelete {
return 2 * len(m.unchangedToastColumns)
}
return len(m.unchangedToastColumns)
}())

for _, cols := range m.unchangedToastColumns {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
Expand All @@ -212,14 +214,14 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
if m.peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
m.peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
if handleSoftDelete {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=FALSE",
peerdbCols.SoftDeleteColName))
m.peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ",")
Expand All @@ -231,9 +233,9 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
if handleSoftDelete {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s`=TRUE", peerdbCols.SoftDeleteColName))
fmt.Sprintf("`%s`=TRUE", m.peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
_rt=2 AND _ut='%s'
Expand Down
181 changes: 130 additions & 51 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,77 +2,67 @@ package connbigquery

import (
"reflect"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
func TestGenerateUpdateStatement(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
peerdbCols: &protos.PeerDBColumns{
SoftDelete: false,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"AND _ut=''" +
"THEN UPDATE SET " +
"`col1`=_d._c0," +
"`col2`=_d._c1," +
"`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})
result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
result[i] = removeSpacesTabsNewlines(result[i])
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}

func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
func TestGenerateUpdateStatement_WithSoftDelete(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}

expected := []string{
"WHEN MATCHED AND _rt!=2 " +
Expand All @@ -89,26 +79,115 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols,
&protos.PeerDBColumns{
SoftDelete: true,
result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: false,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})
},
}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
result[i] = removeSpacesTabsNewlines(result[i])
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}

func removeSpacesTabsNewlines(s string) string {
s = strings.ReplaceAll(s, " ", "")
s = strings.ReplaceAll(s, "\t", "")
s = strings.ReplaceAll(s, "\n", "")
return s
func TestGenerateUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
},
}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
result[i] = utils.RemoveSpacesTabsNewlines(result[i])
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}
Loading