Skip to content

Commit

Permalink
Merge branch 'main' into cdc-parallel-sync-normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 6, 2023
2 parents 1ca45cb + 4eed39a commit ee99926
Show file tree
Hide file tree
Showing 15 changed files with 789 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
flow_test:
strategy:
matrix:
runner: [ubicloud-standard-8]
runner: [ubicloud-standard-8, ubuntu-latest]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
services:
Expand Down
125 changes: 75 additions & 50 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -188,39 +189,53 @@ func (p *PostgresCDCSource) consumeStream(
}

var standByLastLogged time.Time
localRecords := make([]model.Record, 0)
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName)
defer func() {
if len(localRecords) == 0 {
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
}
records.RelationMessageMapping <- &p.relationMessageMapping
log.Infof("[finished] PullRecords streamed %d records", len(localRecords))
log.Infof("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len())
err := cdcRecordsStorage.Close()
if err != nil {
log.Warnf("failed to clean up records storage: %v", err)
}
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
jobName := req.FlowJobName
currRecords := len(localRecords)
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})

defer func() {
shutdown <- true
}()

tablePKeyLastSeen := make(map[model.TableWithPkey]int)
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecord := func(rec model.Record) {
addRecordWithKey := func(key model.TableWithPkey, rec model.Record) error {
records.AddRecord(rec)
localRecords = append(localRecords, rec)
err := cdcRecordsStorage.Set(key, rec)
if err != nil {
return err
}

if len(localRecords) == 1 {
if cdcRecordsStorage.Len() == 1 {
records.SignalAsNotEmpty()
log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))
log.Infof("num records accumulated: %d", len(localRecords))
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
return nil
}

addRecord := func(rec model.Record) error {
key, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}
return addRecordWithKey(*key, rec)
}

pkmRequiresResponse := false
Expand All @@ -237,33 +252,33 @@ func (p *PostgresCDCSource) consumeStream(
}

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
standByLastLogged = time.Now()
}

pkmRequiresResponse = false
}

if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {
if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}

if waitingForCommit && !p.commitLock {
log.Infof(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
len(localRecords),
cdcRecordsStorage.Len(),
)
return nil
}

// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if len(localRecords) > 0 {
if !cdcRecordsStorage.IsEmpty() {
log.Infof("[%s] standby deadline reached, have %d records, will return at next commit",
req.FlowJobName,
len(localRecords),
cdcRecordsStorage.Len(),
)

if !p.commitLock {
Expand All @@ -283,7 +298,7 @@ func (p *PostgresCDCSource) consumeStream(
var ctx context.Context
var cancel context.CancelFunc

if len(localRecords) == 0 {
if cdcRecordsStorage.IsEmpty() {
ctx, cancel = context.WithCancel(p.ctx)
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
Expand All @@ -293,7 +308,8 @@ func (p *PostgresCDCSource) consumeStream(
cancel()
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords))
log.Infof("Stand-by deadline reached, returning currently accumulated records - %d",
cdcRecordsStorage.Len())
return nil
} else {
return fmt.Errorf("ReceiveMessage failed: %w", err)
Expand All @@ -306,7 +322,9 @@ func (p *PostgresCDCSource) consumeStream(

msg, ok := rawMsg.(*pgproto3.CopyData)
if !ok {
log.Warnf("unexpected message type: %T", rawMsg)
if rawMsg != nil {
log.Warnf("unexpected message type: %T", rawMsg)
}
continue
}

Expand Down Expand Up @@ -351,63 +369,64 @@ func (p *PostgresCDCSource) consumeStream(
// will change in future
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
addRecord(rec)
err = addRecord(rec)
if err != nil {
return err
}
} else {
compositePKeyString, err := p.compositePKeyToString(req, rec)
tablePkeyVal, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal)
if err != nil {
return err
}
recIndex, ok := tablePKeyLastSeen[tablePkeyVal]
if !ok {
addRecord(rec)
tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1
err = addRecordWithKey(*tablePkeyVal, rec)
} else {
oldRec := localRecords[recIndex]
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
addRecord(rec)
tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1
err = addRecordWithKey(*tablePkeyVal, rec)
}
if err != nil {
return err
}
}

case *model.InsertRecord:
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
addRecord(rec)
err = addRecord(rec)
if err != nil {
return err
}
} else {
compositePKeyString, err := p.compositePKeyToString(req, rec)
tablePkeyVal, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
err = addRecordWithKey(*tablePkeyVal, rec)
if err != nil {
return err
}
addRecord(rec)
// all columns will be set in insert record, so add it to the map
tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1
}
case *model.DeleteRecord:
compositePKeyString, err := p.compositePKeyToString(req, rec)
tablePkeyVal, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal)
if err != nil {
return err
}
recIndex, ok := tablePKeyLastSeen[tablePkeyVal]
if ok {
latestRecord := localRecords[recIndex]
deleteRecord := rec.(*model.DeleteRecord)
deleteRecord.Items = latestRecord.GetItems()
updateRecord, ok := latestRecord.(*model.UpdateRecord)
Expand All @@ -423,7 +442,11 @@ func (p *PostgresCDCSource) consumeStream(
"_peerdb_not_backfilled_delete": {},
}
}
addRecord(rec)

err = addRecord(rec)
if err != nil {
return err
}
case *model.RelationRecord:
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 {
Expand All @@ -438,7 +461,7 @@ func (p *PostgresCDCSource) consumeStream(
clientXLogPos = xld.WALStart
}

if len(localRecords) == 0 {
if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
Expand Down Expand Up @@ -790,21 +813,23 @@ func (p *PostgresCDCSource) processRelationMessage(
}, nil
}

func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest, rec model.Record) (string, error) {
func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
rec model.Record) (*model.TableWithPkey, error) {
tableName := rec.GetTableName()
pkeyColsMerged := make([]byte, 0)

for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns {
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return "", fmt.Errorf("error getting pkey column value: %w", err)
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...)
}

hasher := sha256.New()
hasher.Write(pkeyColsMerged)
return fmt.Sprintf("%x", hasher.Sum(nil)), nil
return &model.TableWithPkey{
TableName: tableName,
PkeyColVal: sha256.Sum256(pkeyColsMerged),
}, nil
}

func (p *PostgresCDCSource) getParentRelIDIfPartitioned(relID uint32) uint32 {
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
CompressSnappy
)

type PeerDBOCFWriter struct {
type peerDBOCFWriter struct {
ctx context.Context
stream *model.QRecordStream
avroSchema *model.QRecordAvroSchemaDefinition
Expand All @@ -45,8 +45,8 @@ func NewPeerDBOCFWriter(
avroSchema *model.QRecordAvroSchemaDefinition,
avroCompressionCodec AvroCompressionCodec,
targetDWH qvalue.QDWHType,
) *PeerDBOCFWriter {
return &PeerDBOCFWriter{
) *peerDBOCFWriter {
return &peerDBOCFWriter{
ctx: ctx,
stream: stream,
avroSchema: avroSchema,
Expand All @@ -55,7 +55,7 @@ func NewPeerDBOCFWriter(
}
}

func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error {
func (p *peerDBOCFWriter) initWriteCloser(w io.Writer) error {
var err error
switch p.avroCompressionCodec {
case CompressNone:
Expand All @@ -77,7 +77,7 @@ func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error {
return nil
}

func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) {
func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) {
err := p.initWriteCloser(w)
if err != nil {
return nil, fmt.Errorf("failed to create compressed writer: %w", err)
Expand All @@ -94,7 +94,7 @@ func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error
return ocfWriter, nil
}

func (p *PeerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (int, error) {
func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (int, error) {
schema, err := p.stream.Schema()
if err != nil {
log.Errorf("failed to get schema from stream: %v", err)
Expand Down Expand Up @@ -149,7 +149,7 @@ func (p *PeerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
return int(numRows.Load()), nil
}

func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
ocfWriter, err := p.createOCFWriter(w)
if err != nil {
return 0, fmt.Errorf("failed to create OCF writer: %w", err)
Expand All @@ -164,7 +164,7 @@ func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
return numRows, nil
}

func (p *PeerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (int, error) {
func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (int, error) {
r, w := io.Pipe()
numRowsWritten := make(chan int, 1)
go func() {
Expand Down Expand Up @@ -202,7 +202,7 @@ func (p *PeerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils
return <-numRowsWritten, nil
}

func (p *PeerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (int, error) {
func (p *peerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (int, error) {
file, err := os.Create(filePath)
if err != nil {
return 0, fmt.Errorf("failed to create file: %w", err)
Expand Down
Loading

0 comments on commit ee99926

Please sign in to comment.