Skip to content

Commit

Permalink
PG,BQ,SF CDC: PeerDB Columns (#845)
Browse files Browse the repository at this point in the history
Irons out `_PEERDB_IS_DELETED` (Soft Delete feature) and
`_PEERDB_SYNCED_AT` columns for CDC in BQ, SF, and PG.
  • Loading branch information
Amogh-Bharadwaj authored Dec 19, 2023
1 parent 6dbdf61 commit 476549b
Show file tree
Hide file tree
Showing 15 changed files with 924 additions and 185 deletions.
23 changes: 22 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncBatchID: syncBatchID,
NormalizeBatchID: normalizeBatchID,
UnchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
Expand Down Expand Up @@ -961,7 +966,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns))
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns), len(tableSchema.Columns)+2)
idx := 0
for colName, genericColType := range tableSchema.Columns {
columns[idx] = &bigquery.FieldSchema{
Expand All @@ -972,6 +977,22 @@ func (c *BigQueryConnector) SetupNormalizedTables(
idx++
}

if req.SoftDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SoftDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
})
}

if req.SyncedAtColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SyncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
})
}

// create the table using the columns
schema := bigquery.Schema(columns)
err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})
Expand Down
54 changes: 47 additions & 7 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type mergeStmtGenerator struct {
NormalizedTableSchema *protos.TableSchema
// array of toast column combinations that are unchanged
UnchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
}

// GenerateMergeStmt generates a merge statements.
Expand All @@ -39,7 +41,7 @@ func (m *mergeStmtGenerator) generateMergeStmts() []string {
"CREATE TEMP TABLE %s AS (%s, %s);",
tempTable, flattenedCTE, deDupedCTE)

mergeStmt := m.generateMergeStmt(tempTable)
mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols)

dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable)

Expand Down Expand Up @@ -127,7 +129,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string {
// comma separated list of column names
backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Expand All @@ -136,8 +138,19 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
pureColNames = append(pureColNames, colName)
}
csep := strings.Join(backtickColNames, ", ")

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns)
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.UnchangedToastColumns, peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)",
softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL))
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns))
Expand All @@ -148,15 +161,26 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
// _peerdb_target.<pkey1> = _peerdb_deduped.<pkey1> AND _peerdb_target.<pkey2> = _peerdb_deduped.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if peerdbCols.SoftDelete {
colName := peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
if peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart, peerdbCols.SyncedAtColName)
}
}

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
DELETE;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols)
%s;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL,
updateStringToastCols, deletePart)
}

/*
Expand All @@ -174,7 +198,11 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
func (m *mergeStmtGenerator) generateUpdateStatements(
allCols []string,
unchangedToastCols []string,
peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
Expand All @@ -184,6 +212,18 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchange
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = FALSE",
peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
Expand Down
30 changes: 23 additions & 7 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"reflect"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
Expand All @@ -16,21 +18,28 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
" AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2, col3' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1",
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col3'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2",
" `col2` = _peerdb_deduped.col2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -53,10 +62,17 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
"THEN UPDATE SET " +
"`col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
" `synced_at`=CURRENT_TIMESTAMP," +
"`deleted`=FALSE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols,
&protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
Loading

0 comments on commit 476549b

Please sign in to comment.