Skip to content

Commit

Permalink
added cpkey for BQ as well, code cleanup and more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 16, 2023
1 parent a3e5b44 commit 03f9d94
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 70 deletions.
34 changes: 20 additions & 14 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
matchData: "",
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
unchangedToastColumns: "",
})
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
Expand Down Expand Up @@ -571,7 +571,7 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
matchData: itemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
unchangedToastColumns: "",
})

tableNameRowsMapping[r.DestinationTableName] += 1
Expand Down Expand Up @@ -741,7 +741,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: utils.KeysToString(r.UnchangedToastColumns),
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
Expand Down Expand Up @@ -802,7 +802,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: utils.KeysToString(r.UnchangedToastColumns),
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec

// create the job in the metadata table
jobStatement := fmt.Sprintf(
"INSERT INTO %s.%s (mirror_job_name, offset,sync_batch_id) VALUES ('%s',%d,%d);",
"INSERT INTO %s.%s (mirror_job_name,offset,sync_batch_id) VALUES ('%s',%d,%d);",
c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID)
if hasJob {
jobStatement = fmt.Sprintf(
Expand Down Expand Up @@ -1288,14 +1288,12 @@ func (m *MergeStmtGenerator) generateDeDupedCTE() string {
) _peerdb_ranked
WHERE _peerdb_rank = 1
) SELECT * FROM _peerdb_de_duplicated_data_res`
pkey := m.NormalizedTableSchema.PrimaryKeyColumns[0]
return fmt.Sprintf(cte, pkey)
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns, ", ' ', "))
return fmt.Sprintf(cte, pkeyColsStr)
}

// generateMergeStmt generates a merge statement.
func (m *MergeStmtGenerator) generateMergeStmt(tempTable string) string {
pkey := m.NormalizedTableSchema.PrimaryKeyColumns[0]

// comma separated list of column names
backtickColNames := make([]string, 0)
pureColNames := make([]string, 0)
Expand All @@ -1305,18 +1303,26 @@ func (m *MergeStmtGenerator) generateMergeStmt(tempTable string) string {
}
csep := strings.Join(backtickColNames, ", ")

udateStatementsforToastCols := m.generateUpdateStatement(pureColNames, m.UnchangedToastColumns)
updateStringToastCols := strings.Join(udateStatementsforToastCols, " ")
updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns)
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns))
for _, pkeyColName := range m.NormalizedTableSchema.PrimaryKeyColumns {
pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_peerdb_target.%s = _peerdb_deduped.%s",
pkeyColName, pkeyColName))
}
// _peerdb_target.<pkey1> = _peerdb_deduped.<pkey1> AND _peerdb_target.<pkey2> = _peerdb_deduped.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
ON _peerdb_target.%s = _peerdb_deduped.%s
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
DELETE;
`, m.Dataset, m.NormalizedTable, tempTable, pkey, pkey, csep, csep, updateStringToastCols)
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols)
}

/*
Expand All @@ -1334,7 +1340,7 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *MergeStmtGenerator) generateUpdateStatement(allCols []string, unchangedToastCols []string) []string {
func (m *MergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)

for _, cols := range unchangedToastCols {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
" `col2` = _peerdb_deduped.col2",
}

result := m.generateUpdateStatement(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -56,7 +56,7 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
" `col3` = _peerdb_deduped.col3",
}

result := m.generateUpdateStatement(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
70 changes: 38 additions & 32 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,14 @@ func (p *PostgresCDCSource) consumeStream(
// tableName here is destination tableName.
// should be ideally sourceTableName as we are in PullRecords.
// will change in future
pkeyColsMerged := make([]string, 0)
for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns {
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value))
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: strings.Join(pkeyColsMerged, " "),
PkeyColVal: compositePKeyString,
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
Expand All @@ -266,18 +263,14 @@ func (p *PostgresCDCSource) consumeStream(
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
}
case *model.InsertRecord:
pkeyColsMerged := make([]string, 0)
for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns {
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal))
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: strings.Join(pkeyColsMerged, " "),
PkeyColVal: compositePKeyString,
}
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
Expand Down Expand Up @@ -365,17 +358,16 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// create empty map of string to interface{}
items, unchangedToastColumns, err := p.convertTupleToMap(msg.Tuple, rel)
items, _, err := p.convertTupleToMap(msg.Tuple, rel)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.InsertRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
}, nil
}

Expand Down Expand Up @@ -437,17 +429,16 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// create empty map of string to interface{}
items, unchangedToastColumns, err := p.convertTupleToMap(msg.OldTuple, rel)
items, _, err := p.convertTupleToMap(msg.OldTuple, rel)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.DeleteRecord{
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
CheckPointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName],
SourceTableName: tableName,
}, nil
}

Expand All @@ -461,15 +452,15 @@ It takes a tuple and a relation message as input and returns
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
) (*model.RecordItems, map[string]bool, error) {
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
return model.NewRecordItems(), make(map[string]bool), nil
return model.NewRecordItems(), make(map[string]struct{}), nil
}

// create empty map of string to interface{}
items := model.NewRecordItems()
unchangedToastColumns := make(map[string]bool)
unchangedToastColumns := make(map[string]struct{})

for idx, col := range tuple.Columns {
colName := rel.Columns[idx].Name
Expand All @@ -491,7 +482,7 @@ func (p *PostgresCDCSource) convertTupleToMap(
}
items.AddColumn(colName, data)
case 'u': // unchanged toast
unchangedToastColumns[colName] = true
unchangedToastColumns[colName] = struct{}{}
default:
return nil, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType))
}
Expand Down Expand Up @@ -599,3 +590,18 @@ func (p *PostgresCDCSource) processRelationMessage(
CheckPointID: int64(lsn),
}, nil
}

func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest, rec model.Record) (string, error) {
tableName := rec.GetTableName()
pkeyColsMerged := make([]string, 0)

for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns {
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return "", fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value))
}

return strings.Join(pkeyColsMerged, " "), nil
}
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
0,
"{}",
syncBatchID,
utils.KeysToString(typedRecord.UnchangedToastColumns),
"",
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.UpdateRecord:
Expand Down Expand Up @@ -324,7 +324,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
2,
itemsJSON,
syncBatchID,
utils.KeysToString(typedRecord.UnchangedToastColumns),
"",
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
default:
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/postgres/postgres_cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ func (suite *PostgresCDCTestSuite) validateMutatedToastRecords(records []model.R
suite.Equal(qvalue.QValueKindString, v.Kind)
suite.Equal(65536, len(v.Value.(string)))
suite.Equal(3, len(updateRecord.UnchangedToastColumns))
suite.True(updateRecord.UnchangedToastColumns["lz4_t"])
suite.True(updateRecord.UnchangedToastColumns["n_b"])
suite.True(updateRecord.UnchangedToastColumns["lz4_b"])
suite.Contains(updateRecord.UnchangedToastColumns, "lz4_t")
suite.Contains(updateRecord.UnchangedToastColumns, "n_b")
suite.Contains(updateRecord.UnchangedToastColumns, "lz4_b")
suite.IsType(&model.UpdateRecord{}, records[1])
updateRecord = records[1].(*model.UpdateRecord)
suite.Equal(srcTableName, updateRecord.SourceTableName)
Expand All @@ -258,7 +258,7 @@ func (suite *PostgresCDCTestSuite) validateMutatedToastRecords(records []model.R
suite.Equal(qvalue.QValueKindBytes, v.Kind)
suite.Equal(65536, len(v.Value.([]byte)))
suite.Equal(1, len(updateRecord.UnchangedToastColumns))
suite.True(updateRecord.UnchangedToastColumns["n_t"])
suite.Contains(updateRecord.UnchangedToastColumns, "n_t")
// Test case for records[2]
suite.IsType(&model.UpdateRecord{}, records[2])
updateRecord = records[2].(*model.UpdateRecord)
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra
recordType: 0,
matchData: "",
batchID: syncBatchID,
unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns),
unchangedToastColumns: "",
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.UpdateRecord:
Expand Down Expand Up @@ -605,7 +605,7 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra
recordType: 2,
matchData: itemsJSON,
batchID: syncBatchID,
unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns),
unchangedToastColumns: "",
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
default:
Expand Down Expand Up @@ -995,7 +995,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
pkeyColName, pkeyColName))
}
// TARGET.<pkey1> = SOURCE.<pkey1> AND TARGET.<pkey2> = SOURCE.<pkey2> ...
pkeyColStr := strings.Join(pkeySelectSQLArray, " AND ")
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if softDelete {
Expand All @@ -1005,7 +1005,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
mergeStatement := fmt.Sprintf(mergeStatementSQL, destinationTableIdentifier, toVariantColumnName,
rawTableIdentifier, normalizeBatchID, syncBatchID, flattenedCastsSQL,
fmt.Sprintf("(%s)", strings.Join(normalizedTableSchema.PrimaryKeyColumns, ",")),
pkeyColStr, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)

result, err := normalizeRecordsTx.ExecContext(c.ctx, mergeStatement, destinationTableIdentifier)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/utils/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package utils

import "strings"

func KeysToString(m map[string]bool) string {
func KeysToString(m map[string]struct{}) string {
if m == nil {
return ""
}

var keys []string
for k := range m {
keys = append(keys, k)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsTo
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: KeysToString(typedRecord.UnchangedToastColumns),
Value: "",
}
req.TableMapping[typedRecord.DestinationTableName] += 1
case *model.UpdateRecord:
Expand Down Expand Up @@ -149,11 +149,11 @@ func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsTo
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: KeysToString(typedRecord.UnchangedToastColumns),
Value: "",
}
req.TableMapping[typedRecord.DestinationTableName] += 1
default:
return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord)
return nil, fmt.Errorf("record type %T not supported", typedRecord)
}

if first {
Expand Down
Loading

0 comments on commit 03f9d94

Please sign in to comment.