Skip to content

Commit

Permalink
composite primary key support for SF, PG and BQ (#499)
Browse files Browse the repository at this point in the history
Fixes issues: #491 #492 #493 #494 #495 
Supersedes: #129 and #164
  • Loading branch information
heavycrystal authored Oct 17, 2023
1 parent 8cb21cf commit 17549b2
Show file tree
Hide file tree
Showing 22 changed files with 1,204 additions and 447 deletions.
11 changes: 7 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,13 @@ func (a *FlowableActivity) StartNormalize(
return nil, fmt.Errorf("failed to normalized records: %w", err)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
if err != nil {
return nil, err
// normalize flow did not run due to no records, no need to update end time.
if res.Done {
err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
if err != nil {
return nil, err
}
}

// log the number of batches normalized
Expand Down
37 changes: 22 additions & 15 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
matchData: "",
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
unchangedToastColumns: "",
})
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
Expand Down Expand Up @@ -571,7 +571,7 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
matchData: itemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
unchangedToastColumns: "",
})

tableNameRowsMapping[r.DestinationTableName] += 1
Expand Down Expand Up @@ -741,7 +741,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: utils.KeysToString(r.UnchangedToastColumns),
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
Expand Down Expand Up @@ -802,7 +802,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: utils.KeysToString(r.UnchangedToastColumns),
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
Expand Down Expand Up @@ -893,7 +893,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
if !hasJob || normalizeBatchID == syncBatchID {
log.Printf("waiting for sync to catch up for job %s, so finishing", req.FlowJobName)
return &model.NormalizeResponse{
Done: true,
Done: false,
StartBatchID: normalizeBatchID,
EndBatchID: syncBatchID,
}, nil
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec

// create the job in the metadata table
jobStatement := fmt.Sprintf(
"INSERT INTO %s.%s (mirror_job_name, offset,sync_batch_id) VALUES ('%s',%d,%d);",
"INSERT INTO %s.%s (mirror_job_name,offset,sync_batch_id) VALUES ('%s',%d,%d);",
c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID)
if hasJob {
jobStatement = fmt.Sprintf(
Expand Down Expand Up @@ -1288,14 +1288,13 @@ func (m *MergeStmtGenerator) generateDeDupedCTE() string {
) _peerdb_ranked
WHERE _peerdb_rank = 1
) SELECT * FROM _peerdb_de_duplicated_data_res`
pkey := m.NormalizedTableSchema.PrimaryKeyColumn
return fmt.Sprintf(cte, pkey)
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns,
", '_peerdb_concat_', "))
return fmt.Sprintf(cte, pkeyColsStr)
}

// generateMergeStmt generates a merge statement.
func (m *MergeStmtGenerator) generateMergeStmt(tempTable string) string {
pkey := m.NormalizedTableSchema.PrimaryKeyColumn

// comma separated list of column names
backtickColNames := make([]string, 0)
pureColNames := make([]string, 0)
Expand All @@ -1305,18 +1304,26 @@ func (m *MergeStmtGenerator) generateMergeStmt(tempTable string) string {
}
csep := strings.Join(backtickColNames, ", ")

udateStatementsforToastCols := m.generateUpdateStatement(pureColNames, m.UnchangedToastColumns)
updateStringToastCols := strings.Join(udateStatementsforToastCols, " ")
updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns)
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns))
for _, pkeyColName := range m.NormalizedTableSchema.PrimaryKeyColumns {
pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_peerdb_target.%s = _peerdb_deduped.%s",
pkeyColName, pkeyColName))
}
// _peerdb_target.<pkey1> = _peerdb_deduped.<pkey1> AND _peerdb_target.<pkey2> = _peerdb_deduped.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
ON _peerdb_target.%s = _peerdb_deduped.%s
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
DELETE;
`, m.Dataset, m.NormalizedTable, tempTable, pkey, pkey, csep, csep, updateStringToastCols)
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols)
}

/*
Expand All @@ -1334,7 +1341,7 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *MergeStmtGenerator) generateUpdateStatement(allCols []string, unchangedToastCols []string) []string {
func (m *MergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)

for _, cols := range unchangedToastCols {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
" `col2` = _peerdb_deduped.col2",
}

result := m.generateUpdateStatement(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -56,7 +56,7 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
" `col3` = _peerdb_deduped.col3",
}

result := m.generateUpdateStatement(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to write record to OCF file: %w", err)
}
numRecords++

}
activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Writing OCF contents to BigQuery for partition/batch ID %s",
Expand Down
63 changes: 39 additions & 24 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connpostgres

import (
"context"
"crypto/sha256"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -244,15 +245,14 @@ func (p *PostgresCDCSource) consumeStream(
if isFullReplica {
records.Records = append(records.Records, rec)
} else {
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
return nil, err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: *pkeyColVal,
PkeyColVal: compositePKeyString,
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
Expand All @@ -274,14 +274,14 @@ func (p *PostgresCDCSource) consumeStream(
if isFullReplica {
records.Records = append(records.Records, rec)
} else {
pkeyCol := req.TableNameSchemaMapping[tableName].PrimaryKeyColumn
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
return nil, err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: *pkeyColVal,
PkeyColVal: compositePKeyString,
}
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
Expand Down Expand Up @@ -370,17 +370,16 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// create empty map of string to interface{}
items, unchangedToastColumns, err := p.convertTupleToMap(msg.Tuple, rel)
items, _, err := p.convertTupleToMap(msg.Tuple, rel)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.InsertRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
}, nil
}

Expand Down Expand Up @@ -442,17 +441,16 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// create empty map of string to interface{}
items, unchangedToastColumns, err := p.convertTupleToMap(msg.OldTuple, rel)
items, _, err := p.convertTupleToMap(msg.OldTuple, rel)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.DeleteRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
}, nil
}

Expand All @@ -466,15 +464,15 @@ It takes a tuple and a relation message as input and returns
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
) (*model.RecordItems, map[string]bool, error) {
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
return model.NewRecordItems(), make(map[string]bool), nil
return model.NewRecordItems(), make(map[string]struct{}), nil
}

// create empty map of string to interface{}
items := model.NewRecordItems()
unchangedToastColumns := make(map[string]bool)
unchangedToastColumns := make(map[string]struct{})

for idx, col := range tuple.Columns {
colName := rel.Columns[idx].Name
Expand All @@ -496,7 +494,7 @@ func (p *PostgresCDCSource) convertTupleToMap(
}
items.AddColumn(colName, data)
case 'u': // unchanged toast
unchangedToastColumns[colName] = true
unchangedToastColumns[colName] = struct{}{}
default:
return nil, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType))
}
Expand Down Expand Up @@ -617,3 +615,20 @@ func (p *PostgresCDCSource) processRelationMessage(
CheckPointID: int64(lsn),
}, nil
}

func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest, rec model.Record) (string, error) {
tableName := rec.GetTableName()
pkeyColsMerged := make([]byte, 0)

for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns {
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return "", fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...)
}

hasher := sha256.New()
hasher.Write(pkeyColsMerged)
return fmt.Sprintf("%x", hasher.Sum(nil)), nil
}
Loading

0 comments on commit 17549b2

Please sign in to comment.