Skip to content

Commit

Permalink
flow tests ain't flowing debug pt.2 + more fix
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 2, 2024
1 parent 826a075 commit 661080d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 52 deletions.
5 changes: 3 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: input.SyncFlowOptions.BatchSize,
IdleTimeout: time.Duration(input.SyncFlowOptions.IdleTimeoutSeconds) *
time.Second,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.SyncFlowOptions.IdleTimeoutSeconds),
),
TableNameSchemaMapping: input.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand Down
103 changes: 53 additions & 50 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/sha256"
"fmt"
"log/slog"
"regexp"
"time"

"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -46,7 +45,7 @@ type PostgresCDCSource struct {
catalogPool *pgxpool.Pool
flowJobName string

walSegmentRemovedRegex *regexp.Regexp
signaledAsNonEmpty bool
}

type PostgresCDCConfig struct {
Expand Down Expand Up @@ -74,9 +73,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
}

pattern := "requested WAL segment .* has already been removed.*"
regex := regexp.MustCompile(pattern)

flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string)
return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
Expand All @@ -93,7 +89,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
walSegmentRemovedRegex: regex,
signaledAsNonEmpty: false,
}, nil
}

Expand Down Expand Up @@ -239,29 +235,26 @@ func (p *PostgresCDCSource) consumeStream(
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecordWithKey := func(key model.TableWithPkey, rec model.Record) error {
records.AddRecord(rec)
err := cdcRecordsStorage.Set(key, rec)
if err != nil {
return err
addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error {
if key != nil {
err := cdcRecordsStorage.Set(*key, rec)
if err != nil {
return err
}
}
records.AddRecord(rec)

if cdcRecordsStorage.Len() == 1 {
// we are not storing in case of replident full
// so don't tie signalling to length of storage
if !p.signaledAsNonEmpty {
records.SignalAsNotEmpty()
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
p.logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline))
p.signaledAsNonEmpty = true
}
return nil
}

addRecord := func(rec model.Record) error {
key, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}
return addRecordWithKey(*key, rec)
}

pkmRequiresResponse := false
waitingForCommit := false

Expand Down Expand Up @@ -329,7 +322,6 @@ func (p *PostgresCDCSource) consumeStream(
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()

utils.RecordHeartbeat(p.ctx, "consumeStream ReceiveMessage")
ctxErr := p.ctx.Err()
if ctxErr != nil {
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
Expand Down Expand Up @@ -396,7 +388,7 @@ func (p *PostgresCDCSource) consumeStream(
// will change in future
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err = addRecord(rec)
err := addRecordWithKey(nil, rec)
if err != nil {
return err
}
Expand All @@ -411,14 +403,14 @@ func (p *PostgresCDCSource) consumeStream(
return err
}
if !ok {
err = addRecordWithKey(*tablePkeyVal, rec)
err = addRecordWithKey(tablePkeyVal, rec)
} else {
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
err = addRecordWithKey(*tablePkeyVal, rec)
err = addRecordWithKey(tablePkeyVal, rec)
}
if err != nil {
return err
Expand All @@ -428,7 +420,7 @@ func (p *PostgresCDCSource) consumeStream(
case *model.InsertRecord:
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err = addRecord(rec)
err := addRecordWithKey(nil, rec)
if err != nil {
return err
}
Expand All @@ -438,42 +430,53 @@ func (p *PostgresCDCSource) consumeStream(
return err
}

err = addRecordWithKey(*tablePkeyVal, rec)
err = addRecordWithKey(tablePkeyVal, rec)
if err != nil {
return err
}
}
case *model.DeleteRecord:
tablePkeyVal, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err := addRecordWithKey(nil, rec)
if err != nil {
return err
}
} else {
tablePkeyVal, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}

latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal)
if err != nil {
return err
}
if ok {
deleteRecord := rec.(*model.DeleteRecord)
deleteRecord.Items = latestRecord.GetItems()
updateRecord, ok := latestRecord.(*model.UpdateRecord)
latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal)
if err != nil {
return err
}
if ok {
deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns
deleteRecord := rec.(*model.DeleteRecord)
deleteRecord.Items = latestRecord.GetItems()
updateRecord, ok := latestRecord.(*model.UpdateRecord)
if ok {
deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns
}
} else {
deleteRecord := rec.(*model.DeleteRecord)
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
deleteRecord.UnchangedToastColumns = map[string]struct{}{
"_peerdb_not_backfilled_delete": {},
}
}
} else {
deleteRecord := rec.(*model.DeleteRecord)
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
deleteRecord.UnchangedToastColumns = map[string]struct{}{
"_peerdb_not_backfilled_delete": {},

// A delete can only be followed by an INSERT, which does not need backfilling
// No need to store DeleteRecords in memory or disk.
err = addRecordWithKey(nil, rec)
if err != nil {
return err
}
}

err = addRecord(rec)
if err != nil {
return err
}
case *model.RelationRecord:
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 {
Expand Down
12 changes: 12 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ func PeerDBEventhubFlushTimeoutSeconds() time.Duration {
return time.Duration(x) * time.Second
}

// env variable doesn't exist anymore, but tests appear to depend on this
// in lieu of an actual value of IdleTimeoutSeconds
func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
var x int
if providedValue > 0 {
x = providedValue
} else {
x = getEnvInt("", 10)
}
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_THRESHOLD
func PeerDBCDCDiskSpillThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)
Expand Down

0 comments on commit 661080d

Please sign in to comment.