Skip to content

Commit

Permalink
Allow configurable column names for soft-delete and synced-at (#653)
Browse files Browse the repository at this point in the history
In `soft_delete` mode, PeerDB creates a new `soft_delete_col_name`
column in each table in the destination. This mode is currently only
supported on Snowflake destination. Rows deleted that are deleted (based
on the primary key) are not deleted on the destination, rather the
column value is set to `true` in the `soft_delete` mode.
  • Loading branch information
iskakaushik authored Nov 14, 2023
1 parent c482129 commit 173142e
Show file tree
Hide file tree
Showing 21 changed files with 2,215 additions and 600 deletions.
6 changes: 4 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,10 @@ func (a *FlowableActivity) StartNormalize(
}

res, err := dstConn.NormalizeRecords(&model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
})
if err != nil {
return nil, fmt.Errorf("failed to normalized records: %w", err)
Expand Down
14 changes: 14 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ func (h *FlowRequestHandler) CreateCDCFlow(
MaxBatchSize: maxBatchSize,
}

if req.ConnectionConfigs.SoftDeleteColName == "" {
req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED"
} else {
// make them all uppercase
req.ConnectionConfigs.SoftDeleteColName = strings.ToUpper(req.ConnectionConfigs.SoftDeleteColName)
}

if req.ConnectionConfigs.SyncedAtColName == "" {
req.ConnectionConfigs.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName)
}

if req.CreateCatalogEntry {
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2"`,
}

result := c.generateUpdateStatement(allCols, unchangedToastCols)
result := c.generateUpdateStatement("", allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -44,7 +44,7 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3"`,
}

result := c.generateUpdateStatement(allCols, unchangedToastCols)
result := c.generateUpdateStatement("", allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,9 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(localFilePath string, stage str
return nil
}

func (sc *SnowflakeConnector) GetCopyTransformation(dstTableName string) (*CopyInfo, error) {
func (sc *SnowflakeConnector) GetCopyTransformation(
dstTableName string,
) (*CopyInfo, error) {
colInfo, colsErr := sc.getColsFromTable(dstTableName)
if colsErr != nil {
return nil, fmt.Errorf("failed to get columns from destination table: %w", colsErr)
Expand All @@ -348,9 +350,6 @@ func (sc *SnowflakeConnector) GetCopyTransformation(dstTableName string) (*CopyI
var transformations []string
var columnOrder []string
for colName, colType := range colInfo.ColumnMap {
if colName == "_PEERDB_IS_DELETED" {
continue
}
columnOrder = append(columnOrder, fmt.Sprintf("\"%s\"", colName))
switch colType {
case "GEOGRAPHY":
Expand Down
68 changes: 58 additions & 10 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ const (
getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
isDeletedColumnName = "_PEERDB_IS_DELETED"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"

syncRecordsChunkSize = 1024
Expand Down Expand Up @@ -422,7 +421,8 @@ func (c *SnowflakeConnector) SetupNormalizedTables(
continue
}

normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable(tableIdentifier, tableSchema)
normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable(
tableIdentifier, tableSchema, req.SoftDeleteColName, req.SyncedAtColName)
_, err = c.database.ExecContext(c.ctx, normalizedTableCreateSQL)
if err != nil {
return nil, fmt.Errorf("[sf] error while creating normalized table: %w", err)
Expand Down Expand Up @@ -761,7 +761,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
tableNametoUnchangedToastCols[destinationTableName],
getRawTableIdentifier(req.FlowJobName),
syncBatchID, normalizeBatchID,
req.SoftDelete,
req,
normalizeRecordsTx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -889,6 +889,8 @@ func (c *SnowflakeConnector) checkIfTableExists(schemaIdentifier string, tableId
func generateCreateTableSQLForNormalizedTable(
sourceTableIdentifier string,
sourceTableSchema *protos.TableSchema,
softDeleteColName string,
syncedAtColName string,
) string {
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns))
for columnName, genericColumnType := range sourceTableSchema.Columns {
Expand All @@ -903,8 +905,18 @@ func generateCreateTableSQLForNormalizedTable(

// add a _peerdb_is_deleted column to the normalized table
// this is boolean default false, and is used to mark records as deleted
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf(`"%s" BOOLEAN DEFAULT FALSE,`, isDeletedColumnName))
if softDeleteColName != "" {
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf(`"%s" BOOLEAN DEFAULT FALSE,`, softDeleteColName))
}

// add a _peerdb_synced column to the normalized table
// this is a timestamp column that is used to mark records as synced
// default value is the current timestamp (snowflake)
if syncedAtColName != "" {
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf(`"%s" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,`, syncedAtColName))
}

// add composite primary key to the table
primaryKeyColsUpperQuoted := make([]string, 0)
Expand Down Expand Up @@ -955,7 +967,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
rawTableIdentifier string,
syncBatchID int64,
normalizeBatchID int64,
softDelete bool,
normalizeReq *model.NormalizeRecordsRequest,
normalizeRecordsTx *sql.Tx,
) (int64, error) {
normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier]
Expand Down Expand Up @@ -998,16 +1010,41 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
for _, columnName := range columnNames {
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(columnName)))
}

// add soft delete and synced at columns to the list of columns
if normalizeReq.SoftDelete {
colName := normalizeReq.SoftDeleteColName
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName)))
}

if normalizeReq.SyncedAtColName != "" {
colName := normalizeReq.SyncedAtColName
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName)))
}

insertColumnsSQL := strings.TrimSuffix(strings.Join(quotedUpperColNames, ","), ",")

insertValuesSQLArray := make([]string, 0, len(columnNames))
for _, columnName := range columnNames {
quotedUpperColumnName := fmt.Sprintf(`"%s"`, strings.ToUpper(columnName))
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s,", quotedUpperColumnName))
}

// add soft delete and synced at columns to the list of columns
if normalizeReq.SoftDelete {
// add false as default value for soft delete column
insertValuesSQLArray = append(insertValuesSQLArray, "FALSE,")
}

if normalizeReq.SyncedAtColName != "" {
// add current timestamp as default value for synced at column
insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP,")
}

insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",")

updateStatementsforToastCols := c.generateUpdateStatement(columnNames, unchangedToastColumns)
updateStatementsforToastCols := c.generateUpdateStatement(
normalizeReq.SyncedAtColName, columnNames, unchangedToastColumns)
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
Expand All @@ -1019,8 +1056,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if softDelete {
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", isDeletedColumnName)
if normalizeReq.SoftDelete {
colName := normalizeReq.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
if normalizeReq.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP", deletePart, normalizeReq.SyncedAtColName)
}
}

mergeStatement := fmt.Sprintf(mergeStatementSQL, destinationTableIdentifier, toVariantColumnName,
Expand Down Expand Up @@ -1142,7 +1183,8 @@ and updating the other columns.
6. Repeat steps 1-5 for each unique set of unchanged toast column groups.
7. Return the list of generated update statements.
*/
func (c *SnowflakeConnector) generateUpdateStatement(allCols []string, unchangedToastCols []string) []string {
func (c *SnowflakeConnector) generateUpdateStatement(
syncedAtCol string, allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)

for _, cols := range unchangedToastCols {
Expand All @@ -1153,6 +1195,12 @@ func (c *SnowflakeConnector) generateUpdateStatement(allCols []string, unchanged
quotedUpperColName := fmt.Sprintf(`"%s"`, strings.ToUpper(colName))
tmpArray = append(tmpArray, fmt.Sprintf("%s = SOURCE.%s", quotedUpperColName, quotedUpperColName))
}

// set the synced at column to the current timestamp
if syncedAtCol != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, syncedAtCol))
}

ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
Expand Down
2 changes: 2 additions & 0 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto
ret.Destination = c.Destination
ret.CdcSyncMode = c.CDCSyncMode
ret.CdcStagingPath = c.CdcStagingPath
ret.SoftDeleteColName = "_PEERDB_IS_DELETED"
ret.SyncedAtColName = "_PEERDB_SYNCED_AT"
return ret, nil
}

Expand Down
13 changes: 13 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) {
require.NoError(t, err)
s.Equal(10, count)

// check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago
// it should match the count.
newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE'
`, dstTableName)
numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery)
require.NoError(t, err)
s.Equal(10, numNewRows)

// TODO: verify that the data is correctly synced to the destination table
// on the bigquery side

Expand Down Expand Up @@ -1002,6 +1011,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {
"ID": string(qvalue.QValueKindNumeric),
"C1": string(qvalue.QValueKindNumeric),
"_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean),
"_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp),
},
}
output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
Expand Down Expand Up @@ -1030,6 +1040,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {
"C1": string(qvalue.QValueKindNumeric),
"C2": string(qvalue.QValueKindNumeric),
"_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean),
"_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp),
},
}
output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
Expand Down Expand Up @@ -1059,6 +1070,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {
"C2": string(qvalue.QValueKindNumeric),
"C3": string(qvalue.QValueKindNumeric),
"_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean),
"_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp),
},
}
output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
Expand Down Expand Up @@ -1088,6 +1100,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) {
"C2": string(qvalue.QValueKindNumeric),
"C3": string(qvalue.QValueKindNumeric),
"_PEERDB_IS_DELETED": string(qvalue.QValueKindBoolean),
"_PEERDB_SYNCED_AT": string(qvalue.QValueKindTimestamp),
},
}
output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
Expand Down
36 changes: 36 additions & 0 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"math/big"
"os"

connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
)

Expand Down Expand Up @@ -139,3 +141,37 @@ func (s *SnowflakeTestHelper) ExecuteAndProcessQuery(query string) (*model.QReco
func (s *SnowflakeTestHelper) CreateTable(tableName string, schema *model.QRecordSchema) error {
return s.testClient.CreateTable(schema, s.testSchemaName, tableName)
}

// runs a query that returns an int result
func (s *SnowflakeTestHelper) RunIntQuery(query string) (int, error) {
rows, err := s.testClient.ExecuteAndProcessQuery(query)
if err != nil {
return 0, err
}

numRecords := 0
if rows == nil || len(rows.Records) != 1 {
if rows != nil {
numRecords = len(rows.Records)
}
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 rows", query, numRecords)
}

rec := rows.Records[0]
if rec.NumEntries != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, rec.NumEntries)
}

switch rec.Entries[0].Kind {
case qvalue.QValueKindInt32:
return int(rec.Entries[0].Value.(int32)), nil
case qvalue.QValueKindInt64:
return int(rec.Entries[0].Value.(int64)), nil
case qvalue.QValueKindNumeric:
// get big.Rat and convert to int
rat := rec.Entries[0].Value.(*big.Rat)
return int(rat.Num().Int64() / rat.Denom().Int64()), nil
default:
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec.Entries[0].Kind)
}
}
Loading

0 comments on commit 173142e

Please sign in to comment.