From 661080d9f7dcf74f291f7d4df630b3ce0e9335ff Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 2 Feb 2024 18:05:38 +0530 Subject: [PATCH] flow tests ain't flowing debug pt.2 + more fix --- flow/activities/flowable.go | 5 +- flow/connectors/postgres/cdc.go | 103 ++++++++++++++++---------------- flow/peerdbenv/config.go | 12 ++++ 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d7f037bb77..26a6bbe061 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -236,8 +236,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, MaxBatchSize: input.SyncFlowOptions.BatchSize, - IdleTimeout: time.Duration(input.SyncFlowOptions.IdleTimeoutSeconds) * - time.Second, + IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds( + int(input.SyncFlowOptions.IdleTimeoutSeconds), + ), TableNameSchemaMapping: input.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index ea0b0394c5..5e8d3b8d81 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -5,7 +5,6 @@ import ( "crypto/sha256" "fmt" "log/slog" - "regexp" "time" "github.com/jackc/pglogrepl" @@ -46,7 +45,7 @@ type PostgresCDCSource struct { catalogPool *pgxpool.Pool flowJobName string - walSegmentRemovedRegex *regexp.Regexp + signaledAsNonEmpty bool } type PostgresCDCConfig struct { @@ -74,9 +73,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 return nil, fmt.Errorf("error getting child to parent relid map: %w", err) } - pattern := "requested WAL segment .* has already been removed.*" - regex := regexp.MustCompile(pattern) - flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string) return &PostgresCDCSource{ ctx: cdcConfig.AppContext, @@ -93,7 +89,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, - walSegmentRemovedRegex: regex, + signaledAsNonEmpty: false, }, nil } @@ -239,29 +235,26 @@ func (p *PostgresCDCSource) consumeStream( standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - addRecordWithKey := func(key model.TableWithPkey, rec model.Record) error { - records.AddRecord(rec) - err := cdcRecordsStorage.Set(key, rec) - if err != nil { - return err + addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error { + if key != nil { + err := cdcRecordsStorage.Set(*key, rec) + if err != nil { + return err + } } + records.AddRecord(rec) - if cdcRecordsStorage.Len() == 1 { + // we are not storing in case of replident full + // so don't tie signalling to length of storage + if !p.signaledAsNonEmpty { records.SignalAsNotEmpty() nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) p.logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline)) + p.signaledAsNonEmpty = true } return nil } - addRecord := func(rec model.Record) error { - key, err := p.recToTablePKey(req, rec) - if err != nil { - return err - } - return addRecordWithKey(*key, rec) - } - pkmRequiresResponse := false waitingForCommit := false @@ -329,7 +322,6 @@ func (p *PostgresCDCSource) consumeStream( rawMsg, err := conn.ReceiveMessage(ctx) cancel() - utils.RecordHeartbeat(p.ctx, "consumeStream ReceiveMessage") ctxErr := p.ctx.Err() if ctxErr != nil { return fmt.Errorf("consumeStream preempted: %w", ctxErr) @@ -396,7 +388,7 @@ func (p *PostgresCDCSource) consumeStream( // will change in future isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err = addRecord(rec) + err := addRecordWithKey(nil, rec) if err != nil { return err } @@ -411,14 +403,14 @@ func (p *PostgresCDCSource) consumeStream( return err } if !ok { - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) } else { // iterate through unchanged toast cols and set them in new record updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems()) for _, col := range updatedCols { delete(r.UnchangedToastColumns, col) } - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) } if err != nil { return err @@ -428,7 +420,7 @@ func (p *PostgresCDCSource) consumeStream( case *model.InsertRecord: isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err = addRecord(rec) + err := addRecordWithKey(nil, rec) if err != nil { return err } @@ -438,42 +430,53 @@ func (p *PostgresCDCSource) consumeStream( return err } - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) if err != nil { return err } } case *model.DeleteRecord: - tablePkeyVal, err := p.recToTablePKey(req, rec) - if err != nil { - return err - } + isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull + if isFullReplica { + err := addRecordWithKey(nil, rec) + if err != nil { + return err + } + } else { + tablePkeyVal, err := p.recToTablePKey(req, rec) + if err != nil { + return err + } - latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal) - if err != nil { - return err - } - if ok { - deleteRecord := rec.(*model.DeleteRecord) - deleteRecord.Items = latestRecord.GetItems() - updateRecord, ok := latestRecord.(*model.UpdateRecord) + latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal) + if err != nil { + return err + } if ok { - deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + deleteRecord := rec.(*model.DeleteRecord) + deleteRecord.Items = latestRecord.GetItems() + updateRecord, ok := latestRecord.(*model.UpdateRecord) + if ok { + deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + } + } else { + deleteRecord := rec.(*model.DeleteRecord) + // there is nothing to backfill the items in the delete record with, + // so don't update the row with this record + // add sentinel value to prevent update statements from selecting + deleteRecord.UnchangedToastColumns = map[string]struct{}{ + "_peerdb_not_backfilled_delete": {}, + } } - } else { - deleteRecord := rec.(*model.DeleteRecord) - // there is nothing to backfill the items in the delete record with, - // so don't update the row with this record - // add sentinel value to prevent update statements from selecting - deleteRecord.UnchangedToastColumns = map[string]struct{}{ - "_peerdb_not_backfilled_delete": {}, + + // A delete can only be followed by an INSERT, which does not need backfilling + // No need to store DeleteRecords in memory or disk. + err = addRecordWithKey(nil, rec) + if err != nil { + return err } } - err = addRecord(rec) - if err != nil { - return err - } case *model.RelationRecord: tableSchemaDelta := r.TableSchemaDelta if len(tableSchemaDelta.AddedColumns) > 0 { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 1ba0f1009d..e9c0a2b8fc 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -29,6 +29,18 @@ func PeerDBEventhubFlushTimeoutSeconds() time.Duration { return time.Duration(x) * time.Second } +// env variable doesn't exist anymore, but tests appear to depend on this +// in lieu of an actual value of IdleTimeoutSeconds +func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { + var x int + if providedValue > 0 { + x = providedValue + } else { + x = getEnvInt("", 10) + } + return time.Duration(x) * time.Second +} + // PEERDB_CDC_DISK_SPILL_THRESHOLD func PeerDBCDCDiskSpillThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)