Skip to content

Commit

Permalink
fixed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 17, 2023
1 parent 7ca5521 commit d130609
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 35 deletions.
1 change: 1 addition & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func (a *FlowableActivity) StartNormalize(
return nil, fmt.Errorf("failed to normalized records: %w", err)
}

// normalize flow did not run due to no records, no need to update end time.
if res.Done {
err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(

maxBatchSize := int(cfg.MaxBatchSize)
if maxBatchSize == 0 {
maxBatchSize = 200000
maxBatchSize = 100000
cfg.MaxBatchSize = uint32(maxBatchSize)
}

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,8 @@ func (m *MergeStmtGenerator) generateDeDupedCTE() string {
) _peerdb_ranked
WHERE _peerdb_rank = 1
) SELECT * FROM _peerdb_de_duplicated_data_res`
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns, ", ' ', "))
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns,
", '_peerdb_concat_', "))
return fmt.Sprintf(cte, pkeyColsStr)
}

Expand Down
76 changes: 44 additions & 32 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package connpostgres

import (
"context"
"crypto/sha256"
"fmt"
"reflect"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand Down Expand Up @@ -241,42 +241,52 @@ func (p *PostgresCDCSource) consumeStream(
// tableName here is destination tableName.
// should be ideally sourceTableName as we are in PullRecords.
// will change in future
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, err
}
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
records.Records = append(records.Records, rec)
} else {
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
} else {
oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]]
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
}
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
case *model.InsertRecord:
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
} else {
oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]]
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
}
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
}
case *model.InsertRecord:
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return nil, err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
}
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
case *model.DeleteRecord:
records.Records = append(records.Records, rec)
case *model.RelationRecord:
Expand Down Expand Up @@ -608,15 +618,17 @@ func (p *PostgresCDCSource) processRelationMessage(

func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest, rec model.Record) (string, error) {
tableName := rec.GetTableName()
pkeyColsMerged := make([]string, 0)
pkeyColsMerged := make([]byte, 0)

for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns {
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return "", fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value))
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...)
}

return strings.Join(pkeyColsMerged, " "), nil
hasher := sha256.New()
hasher.Write(pkeyColsMerged)
return fmt.Sprintf("%x", hasher.Sum(nil)), nil
}
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
// sync hasn't created job metadata yet, chill.
if !jobMetadataExists {
return &model.NormalizeResponse{
Done: true,
Done: false,
}, nil
}
destinationTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID)
Expand Down

0 comments on commit d130609

Please sign in to comment.