Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PG,BQ,SF CDC: PeerDB Columns #845

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncBatchID: syncBatchID,
NormalizeBatchID: normalizeBatchID,
UnchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
Expand Down Expand Up @@ -961,7 +966,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns))
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns), len(tableSchema.Columns)+2)
idx := 0
for colName, genericColType := range tableSchema.Columns {
columns[idx] = &bigquery.FieldSchema{
Expand All @@ -972,6 +977,22 @@ func (c *BigQueryConnector) SetupNormalizedTables(
idx++
}

if req.SoftDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
Name: req.SoftDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
})
}

if req.SyncedAtColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SyncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
})
}

// create the table using the columns
schema := bigquery.Schema(columns)
err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})
Expand Down
54 changes: 47 additions & 7 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type mergeStmtGenerator struct {
NormalizedTableSchema *protos.TableSchema
// array of toast column combinations that are unchanged
UnchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
}

// GenerateMergeStmt generates a merge statements.
Expand All @@ -39,7 +41,7 @@ func (m *mergeStmtGenerator) generateMergeStmts() []string {
"CREATE TEMP TABLE %s AS (%s, %s);",
tempTable, flattenedCTE, deDupedCTE)

mergeStmt := m.generateMergeStmt(tempTable)
mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols)

dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable)

Expand Down Expand Up @@ -127,7 +129,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string {
// comma separated list of column names
backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Expand All @@ -136,8 +138,19 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
pureColNames = append(pureColNames, colName)
}
csep := strings.Join(backtickColNames, ", ")

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns)
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.UnchangedToastColumns, peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)",
softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL))
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns))
Expand All @@ -148,15 +161,26 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
// _peerdb_target.<pkey1> = _peerdb_deduped.<pkey1> AND _peerdb_target.<pkey2> = _peerdb_deduped.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if peerdbCols.SoftDelete {
colName := peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
if peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart, peerdbCols.SyncedAtColName)
}
}

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
DELETE;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols)
%s;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL,
updateStringToastCols, deletePart)
}

/*
Expand All @@ -174,7 +198,11 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
func (m *mergeStmtGenerator) generateUpdateStatements(
allCols []string,
unchangedToastCols []string,
peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
Expand All @@ -184,6 +212,18 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchange
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = FALSE",
peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
Expand Down
30 changes: 23 additions & 7 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"reflect"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
Expand All @@ -16,21 +18,28 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
" AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2, col3' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1",
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col3'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2",
" `col2` = _peerdb_deduped.col2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -53,10 +62,17 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
"THEN UPDATE SET " +
"`col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
" `synced_at`=CURRENT_TIMESTAMP," +
"`deleted`=FALSE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols,
&protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
Loading
Loading