diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 13c884de95..1d49ae795c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -358,6 +358,7 @@ func (a *FlowableActivity) StartNormalize( return nil, fmt.Errorf("failed to normalized records: %w", err) } + // normalize flow did not run due to no records, no need to update end time. if res.Done { err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName, res.EndBatchID) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index e44ca3a3fd..708f739867 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -100,7 +100,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( maxBatchSize := int(cfg.MaxBatchSize) if maxBatchSize == 0 { - maxBatchSize = 200000 + maxBatchSize = 100000 cfg.MaxBatchSize = uint32(maxBatchSize) } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 1fddb5cb93..a00759ee09 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -1288,7 +1288,8 @@ func (m *MergeStmtGenerator) generateDeDupedCTE() string { ) _peerdb_ranked WHERE _peerdb_rank = 1 ) SELECT * FROM _peerdb_de_duplicated_data_res` - pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns, ", ' ', ")) + pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns, + ", '_peerdb_concat_', ")) return fmt.Sprintf(cte, pkeyColsStr) } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 872dd92c19..b48143150b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -2,9 +2,9 @@ package connpostgres import ( "context" + "crypto/sha256" "fmt" "reflect" - "strings" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -241,42 +241,52 @@ func (p *PostgresCDCSource) consumeStream( // tableName here is destination tableName. // should be ideally sourceTableName as we are in PullRecords. // will change in future - compositePKeyString, err := p.compositePKeyToString(req, rec) - if err != nil { - return nil, err - } + isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull + if isFullReplica { + records.Records = append(records.Records, rec) + } else { + compositePKeyString, err := p.compositePKeyToString(req, rec) + if err != nil { + return nil, err + } - tablePkeyVal := model.TableWithPkey{ - TableName: tableName, - PkeyColVal: compositePKeyString, + tablePkeyVal := model.TableWithPkey{ + TableName: tableName, + PkeyColVal: compositePKeyString, + } + _, ok := records.TablePKeyLastSeen[tablePkeyVal] + if !ok { + records.Records = append(records.Records, rec) + records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1 + } else { + oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]] + // iterate through unchanged toast cols and set them in new record + updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems()) + for _, col := range updatedCols { + delete(r.UnchangedToastColumns, col) + } + records.Records = append(records.Records, rec) + records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1 + } } - _, ok := records.TablePKeyLastSeen[tablePkeyVal] - if !ok { + case *model.InsertRecord: + isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull + if isFullReplica { records.Records = append(records.Records, rec) - records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1 } else { - oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]] - // iterate through unchanged toast cols and set them in new record - updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems()) - for _, col := range updatedCols { - delete(r.UnchangedToastColumns, col) + compositePKeyString, err := p.compositePKeyToString(req, rec) + if err != nil { + return nil, err + } + + tablePkeyVal := model.TableWithPkey{ + TableName: tableName, + PkeyColVal: compositePKeyString, } records.Records = append(records.Records, rec) + // all columns will be set in insert record, so add it to the map records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1 } - case *model.InsertRecord: - compositePKeyString, err := p.compositePKeyToString(req, rec) - if err != nil { - return nil, err - } - - tablePkeyVal := model.TableWithPkey{ - TableName: tableName, - PkeyColVal: compositePKeyString, - } - records.Records = append(records.Records, rec) - // all columns will be set in insert record, so add it to the map - records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1 case *model.DeleteRecord: records.Records = append(records.Records, rec) case *model.RelationRecord: @@ -608,15 +618,17 @@ func (p *PostgresCDCSource) processRelationMessage( func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest, rec model.Record) (string, error) { tableName := rec.GetTableName() - pkeyColsMerged := make([]string, 0) + pkeyColsMerged := make([]byte, 0) for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns { pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol) if err != nil { return "", fmt.Errorf("error getting pkey column value: %w", err) } - pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value)) + pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...) } - return strings.Join(pkeyColsMerged, " "), nil + hasher := sha256.New() + hasher.Write(pkeyColsMerged) + return fmt.Sprintf("%x", hasher.Sum(nil)), nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f963e3e426..4e6a913d38 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -713,7 +713,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest // sync hasn't created job metadata yet, chill. if !jobMetadataExists { return &model.NormalizeResponse{ - Done: true, + Done: false, }, nil } destinationTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID)