Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PG,BQ,SF CDC: PeerDB Columns #845

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncBatchID: syncBatchID,
NormalizeBatchID: normalizeBatchID,
UnchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
Expand Down Expand Up @@ -972,6 +977,22 @@ func (c *BigQueryConnector) SetupNormalizedTables(
idx++
}

if req.SoftDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
Name: req.SoftDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
})
}

if req.SyncedAtColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SyncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
})
}

// create the table using the columns
schema := bigquery.Schema(columns)
err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})
Expand Down
54 changes: 47 additions & 7 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type mergeStmtGenerator struct {
NormalizedTableSchema *protos.TableSchema
// array of toast column combinations that are unchanged
UnchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
}

// GenerateMergeStmt generates a merge statements.
Expand All @@ -39,7 +41,7 @@ func (m *mergeStmtGenerator) generateMergeStmts() []string {
"CREATE TEMP TABLE %s AS (%s, %s);",
tempTable, flattenedCTE, deDupedCTE)

mergeStmt := m.generateMergeStmt(tempTable)
mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols)

dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable)

Expand Down Expand Up @@ -127,7 +129,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string {
// comma separated list of column names
backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Expand All @@ -136,8 +138,19 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
pureColNames = append(pureColNames, colName)
}
csep := strings.Join(backtickColNames, ", ")

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns)
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.UnchangedToastColumns, peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)",
softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL))
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns))
Expand All @@ -148,15 +161,26 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
// _peerdb_target.<pkey1> = _peerdb_deduped.<pkey1> AND _peerdb_target.<pkey2> = _peerdb_deduped.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if peerdbCols.SoftDelete {
colName := peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
if peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart, peerdbCols.SyncedAtColName)
}
}

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
DELETE;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols)
%s;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL,
updateStringToastCols, deletePart)
}

/*
Expand All @@ -174,7 +198,11 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
func (m *mergeStmtGenerator) generateUpdateStatements(
allCols []string,
unchangedToastCols []string,
peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
Expand All @@ -184,6 +212,18 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchange
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = FALSE",
peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
Expand Down
15 changes: 13 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"reflect"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
Expand All @@ -30,7 +32,11 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
" `col2` = _peerdb_deduped.col2",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -56,7 +62,12 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
" `col3` = _peerdb_deduped.col3",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols,
&protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
114 changes: 97 additions & 17 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND src._peerdb_record_type=2 THEN
DELETE`
%s`
fallbackUpsertStatementSQL = `WITH src_rank AS (
SELECT _peerdb_data,_peerdb_record_type,_peerdb_unchanged_toast_columns,
RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank
Expand All @@ -71,7 +71,7 @@ const (
RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank
FROM %s.%s WHERE _peerdb_batch_id>$1 AND _peerdb_batch_id<=$2 AND _peerdb_destination_table_name=$3
)
DELETE FROM %s USING src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`
%s src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`

dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=$1"
Expand Down Expand Up @@ -346,15 +346,28 @@ func getRawTableIdentifier(jobName string) string {
return fmt.Sprintf("%s_%s", rawTablePrefix, strings.ToLower(jobName))
}

func generateCreateTableSQLForNormalizedTable(sourceTableIdentifier string,
func generateCreateTableSQLForNormalizedTable(
sourceTableIdentifier string,
sourceTableSchema *protos.TableSchema,
softDeleteColName string,
syncedAtColName string,
) string {
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns))
for columnName, genericColumnType := range sourceTableSchema.Columns {
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("\"%s\" %s,", columnName,
qValueKindToPostgresType(genericColumnType)))
}

if softDeleteColName != "" {
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf(`"%s" BOOL DEFAULT FALSE,`, softDeleteColName))
}

if syncedAtColName != "" {
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf(`"%s" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,`, syncedAtColName))
}

// add composite primary key to the table
if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
Expand Down Expand Up @@ -523,17 +536,19 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync

func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifier string,
unchangedToastColumns []string, rawTableIdentifier string, supportsMerge bool,
peerdbCols *protos.PeerDBColumns,
) []string {
if supportsMerge {
return []string{c.generateMergeStatement(destinationTableIdentifier, unchangedToastColumns, rawTableIdentifier)}
return []string{c.generateMergeStatement(destinationTableIdentifier, unchangedToastColumns,
rawTableIdentifier, peerdbCols)}
}
c.logger.Warn("Postgres version is not high enough to support MERGE, falling back to UPSERT + DELETE")
c.logger.Warn("TOAST columns will not be updated properly, use REPLICA IDENTITY FULL or upgrade Postgres")
return c.generateFallbackStatements(destinationTableIdentifier, rawTableIdentifier)
return c.generateFallbackStatements(destinationTableIdentifier, rawTableIdentifier, peerdbCols)
}

func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifier string,
rawTableIdentifier string,
rawTableIdentifier string, peerdbCols *protos.PeerDBColumns,
) []string {
normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier]
columnNames := make([]string, 0, len(normalizedTableSchema.Columns))
Expand Down Expand Up @@ -569,20 +584,35 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
parsedDstTable.String(), columnName, columnCast))
}
deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ")

deletePart := fmt.Sprintf(
"DELETE FROM %s USING",
parsedDstTable.String())

if peerdbCols.SoftDelete {
deletePart = fmt.Sprintf(`UPDATE %s SET "%s" = TRUE`,
parsedDstTable.String(), peerdbCols.SoftDeleteColName)
if peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf(`%s, "%s" = CURRENT_TIMESTAMP`,
deletePart, peerdbCols.SyncedAtColName)
}
deletePart += " FROM"
}
fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL,
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), c.metadataSchema,
rawTableIdentifier, parsedDstTable.String(), insertColumnsSQL, flattenedCastsSQL,
strings.Join(normalizedTableSchema.PrimaryKeyColumns, ","), updateColumnsSQL)
fallbackDeleteStatement := fmt.Sprintf(fallbackDeleteStatementSQL,
strings.Join(maps.Values(primaryKeyColumnCasts), ","), c.metadataSchema,
rawTableIdentifier, parsedDstTable.String(), deleteWhereClauseSQL)
rawTableIdentifier, deletePart, deleteWhereClauseSQL)

return []string{fallbackUpsertStatement, fallbackDeleteStatement}
}

func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier string, unchangedToastColumns []string,
func (c *PostgresConnector) generateMergeStatement(
destinationTableIdentifier string,
unchangedToastColumns []string,
rawTableIdentifier string,
peerdbCols *protos.PeerDBColumns,
) string {
normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier]
columnNames := maps.Keys(normalizedTableSchema.Columns)
Expand Down Expand Up @@ -612,21 +642,60 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
}
}
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",")

insertColumnsSQL := strings.TrimSuffix(strings.Join(columnNames, ","), ",")
insertValuesSQLArray := make([]string, 0, len(columnNames))
for _, columnName := range columnNames {
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", columnName))
}

updateStatementsforToastCols := c.generateUpdateStatement(columnNames, unchangedToastColumns, peerdbCols)
// append synced_at column
columnNames = append(columnNames, fmt.Sprintf(`"%s"`, peerdbCols.SyncedAtColName))
insertColumnsSQL := strings.Join(columnNames, ",")
// fill in synced_at column
insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP")
insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ","), ",")
updateStatements := c.generateUpdateStatement(columnNames, unchangedToastColumns)

return fmt.Sprintf(mergeStatementSQL, strings.Join(maps.Values(primaryKeyColumnCasts), ","),
c.metadataSchema, rawTableIdentifier, parsedDstTable.String(), flattenedCastsSQL,
strings.Join(primaryKeySelectSQLArray, " AND "), insertColumnsSQL, insertValuesSQL, updateStatements)
if peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := strings.TrimSuffix(strings.Join(append(columnNames,
fmt.Sprintf(`"%s"`, peerdbCols.SoftDeleteColName)), ","), ",")
softDeleteInsertValuesSQL := strings.Join(append(insertValuesSQLArray, "TRUE"), ",")

updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (src._peerdb_record_type = 2) THEN INSERT (%s) VALUES(%s)",
softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL))
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, "\n")

deletePart := "DELETE"
if peerdbCols.SoftDelete {
colName := peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf(`UPDATE SET "%s" = TRUE`, colName)
if peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf(`%s, "%s" = CURRENT_TIMESTAMP`,
deletePart, peerdbCols.SyncedAtColName)
}
}

mergeStmt := fmt.Sprintf(
mergeStatementSQL,
strings.Join(maps.Values(primaryKeyColumnCasts), ","),
c.metadataSchema,
rawTableIdentifier,
parsedDstTable.String(),
flattenedCastsSQL,
strings.Join(primaryKeySelectSQLArray, " AND "),
insertColumnsSQL,
insertValuesSQL,
updateStringToastCols,
deletePart,
)

return mergeStmt
}

func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedToastColsLists []string) string {
func (c *PostgresConnector) generateUpdateStatement(allCols []string,
unchangedToastColsLists []string, peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastColsLists))

for _, cols := range unchangedToastColsLists {
Expand All @@ -640,13 +709,24 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedT
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName))
}
// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`,
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`,
peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
return strings.Join(updateStmts, "\n")
return updateStmts
}

func (c *PostgresConnector) getCurrentLSN() (pglogrepl.LSN, error) {
Expand Down
Loading
Loading