Skip to content

Commit

Permalink
Allow configurable column names for soft-delete and synced-at
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 13, 2023
1 parent 13e0e0f commit 851440b
Show file tree
Hide file tree
Showing 18 changed files with 2,302 additions and 598 deletions.
6 changes: 4 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,10 @@ func (a *FlowableActivity) StartNormalize(
}

res, err := dstConn.NormalizeRecords(&model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
})
if err != nil {
return nil, fmt.Errorf("failed to normalized records: %w", err)
Expand Down
14 changes: 14 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ func (h *FlowRequestHandler) CreateCDCFlow(
MaxBatchSize: maxBatchSize,
}

if req.ConnectionConfigs.SoftDeleteColName == "" {
req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED"
} else {
// make them all uppercase
req.ConnectionConfigs.SoftDeleteColName = strings.ToUpper(req.ConnectionConfigs.SoftDeleteColName)
}

if req.ConnectionConfigs.SyncedAtColName == "" {
req.ConnectionConfigs.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName)
}

if req.CreateCatalogEntry {
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2"`,
}

result := c.generateUpdateStatement(allCols, unchangedToastCols)
result := c.generateUpdateStatement("", allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -44,7 +44,7 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3"`,
}

result := c.generateUpdateStatement(allCols, unchangedToastCols)
result := c.generateUpdateStatement("", allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,6 @@ func (sc *SnowflakeConnector) GetCopyTransformation(dstTableName string) (*CopyI
var transformations []string
var columnOrder []string
for colName, colType := range colInfo.ColumnMap {
if colName == "_PEERDB_IS_DELETED" {
continue
}
columnOrder = append(columnOrder, fmt.Sprintf("\"%s\"", colName))
switch colType {
case "GEOGRAPHY":
Expand Down
34 changes: 25 additions & 9 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ const (
getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
isDeletedColumnName = "_PEERDB_IS_DELETED"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"

syncRecordsChunkSize = 1024
Expand Down Expand Up @@ -422,7 +421,8 @@ func (c *SnowflakeConnector) SetupNormalizedTables(
continue
}

normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable(tableIdentifier, tableSchema)
normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable(
tableIdentifier, tableSchema, req.SoftDeleteColName, req.SyncedAtColName)
_, err = c.database.ExecContext(c.ctx, normalizedTableCreateSQL)
if err != nil {
return nil, fmt.Errorf("[sf] error while creating normalized table: %w", err)
Expand Down Expand Up @@ -761,7 +761,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
tableNametoUnchangedToastCols[destinationTableName],
getRawTableIdentifier(req.FlowJobName),
syncBatchID, normalizeBatchID,
req.SoftDelete,
req,
normalizeRecordsTx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -889,6 +889,8 @@ func (c *SnowflakeConnector) checkIfTableExists(schemaIdentifier string, tableId
func generateCreateTableSQLForNormalizedTable(
sourceTableIdentifier string,
sourceTableSchema *protos.TableSchema,
softDeleteColName string,
syncedAtColName string,
) string {
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns))
for columnName, genericColumnType := range sourceTableSchema.Columns {
Expand All @@ -904,7 +906,13 @@ func generateCreateTableSQLForNormalizedTable(
// add a _peerdb_is_deleted column to the normalized table
// this is boolean default false, and is used to mark records as deleted
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf(`"%s" BOOLEAN DEFAULT FALSE,`, isDeletedColumnName))
fmt.Sprintf(`"%s" BOOLEAN DEFAULT FALSE,`, softDeleteColName))

// add a _peerdb_synced column to the normalized table
// this is a timestamp column that is used to mark records as synced
// default value is the current timestamp (snowflake)
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf(`"%s" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,`, syncedAtColName))

// add composite primary key to the table
primaryKeyColsUpperQuoted := make([]string, 0)
Expand Down Expand Up @@ -955,7 +963,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
rawTableIdentifier string,
syncBatchID int64,
normalizeBatchID int64,
softDelete bool,
normalizeReq *model.NormalizeRecordsRequest,
normalizeRecordsTx *sql.Tx,
) (int64, error) {
normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier]
Expand Down Expand Up @@ -1007,7 +1015,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
}
insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",")

updateStatementsforToastCols := c.generateUpdateStatement(columnNames, unchangedToastColumns)
updateStatementsforToastCols := c.generateUpdateStatement(normalizeReq.SyncedAtColName, columnNames, unchangedToastColumns)
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
Expand All @@ -1019,8 +1027,9 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if softDelete {
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", isDeletedColumnName)
if normalizeReq.SoftDelete {
colName := normalizeReq.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
}

mergeStatement := fmt.Sprintf(mergeStatementSQL, destinationTableIdentifier, toVariantColumnName,
Expand Down Expand Up @@ -1142,7 +1151,8 @@ and updating the other columns.
6. Repeat steps 1-5 for each unique set of unchanged toast column groups.
7. Return the list of generated update statements.
*/
func (c *SnowflakeConnector) generateUpdateStatement(allCols []string, unchangedToastCols []string) []string {
func (c *SnowflakeConnector) generateUpdateStatement(
syncedAtCol string, allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)

for _, cols := range unchangedToastCols {
Expand All @@ -1153,6 +1163,12 @@ func (c *SnowflakeConnector) generateUpdateStatement(allCols []string, unchanged
quotedUpperColName := fmt.Sprintf(`"%s"`, strings.ToUpper(colName))
tmpArray = append(tmpArray, fmt.Sprintf("%s = SOURCE.%s", quotedUpperColName, quotedUpperColName))
}

// set the synced at column to the current timestamp
if syncedAtCol != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, syncedAtCol))
}

ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
Expand Down
Loading

0 comments on commit 851440b

Please sign in to comment.