Skip to content

Commit

Permalink
fixes for UI and general (#1197)
Browse files Browse the repository at this point in the history
1. `TableMappings` is now cloned and stored in state, and modifications
affect only state version. Pass it through to `SyncFlow`.
2. Fixed some logs
3. removed env var for idle timeout seconds, but retaining function for
tests.
4. Renamed mispelt proto field.
5. bug in UI, wasn't looking at the correct version of
`additionalTables`
6. optimization - don't store records from CDC in some cases
7. don't heartbeat after every record.
  • Loading branch information
heavycrystal authored Feb 4, 2024
1 parent 645f587 commit 169112a
Show file tree
Hide file tree
Showing 20 changed files with 168 additions and 132 deletions.
16 changes: 8 additions & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,21 +192,21 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName)
logger := activity.GetLogger(ctx)
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
config := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

logger.Info("pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings))
for _, v := range input.SyncFlowOptions.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
srcConn, err := connectors.GetCDCPullConnector(errCtx, config.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})

hasRecords := !recordBatch.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", hasRecords)
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

if !hasRecords {
// wait for the pull goroutine to finish
Expand All @@ -275,7 +275,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

syncBatchID, err := dstConn.GetLastSyncBatchID(flowName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
if err != nil && config.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}
syncBatchID += 1
Expand All @@ -297,7 +297,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.FlowConnectionConfigs.TableMappings,
TableMappings: input.SyncFlowOptions.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

return &protos.CreateCDCFlowResponse{
WorflowId: workflowID,
WorkflowId: workflowID,
}, nil
}

Expand Down Expand Up @@ -290,7 +290,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

return &protos.CreateQRepFlowResponse{
WorflowId: workflowID,
WorkflowId: workflowID,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (h *FlowRequestHandler) CDCFlowStatus(
// patching config to show latest values from state
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.TableMappings

var initialCopyStatus *protos.SnapshotStatus

Expand Down
98 changes: 45 additions & 53 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/sha256"
"fmt"
"log/slog"
"regexp"
"time"

"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -45,8 +44,6 @@ type PostgresCDCSource struct {
// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
flowJobName string

walSegmentRemovedRegex *regexp.Regexp
}

type PostgresCDCConfig struct {
Expand Down Expand Up @@ -74,9 +71,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
}

pattern := "requested WAL segment .* has already been removed.*"
regex := regexp.MustCompile(pattern)

flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string)
return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
Expand All @@ -93,7 +87,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
walSegmentRemovedRegex: regex,
}, nil
}

Expand Down Expand Up @@ -239,12 +232,12 @@ func (p *PostgresCDCSource) consumeStream(
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecordWithKey := func(key model.TableWithPkey, rec model.Record) error {
records.AddRecord(rec)
addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error {
err := cdcRecordsStorage.Set(key, rec)
if err != nil {
return err
}
records.AddRecord(rec)

if cdcRecordsStorage.Len() == 1 {
records.SignalAsNotEmpty()
Expand All @@ -254,14 +247,6 @@ func (p *PostgresCDCSource) consumeStream(
return nil
}

addRecord := func(rec model.Record) error {
key, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}
return addRecordWithKey(*key, rec)
}

pkmRequiresResponse := false
waitingForCommit := false

Expand Down Expand Up @@ -329,7 +314,6 @@ func (p *PostgresCDCSource) consumeStream(
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()

utils.RecordHeartbeat(p.ctx, "consumeStream ReceiveMessage")
ctxErr := p.ctx.Err()
if ctxErr != nil {
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
Expand Down Expand Up @@ -396,7 +380,7 @@ func (p *PostgresCDCSource) consumeStream(
// will change in future
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err = addRecord(rec)
err := addRecordWithKey(nil, rec)
if err != nil {
return err
}
Expand All @@ -411,14 +395,14 @@ func (p *PostgresCDCSource) consumeStream(
return err
}
if !ok {
err = addRecordWithKey(*tablePkeyVal, rec)
err = addRecordWithKey(tablePkeyVal, rec)
} else {
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
err = addRecordWithKey(*tablePkeyVal, rec)
err = addRecordWithKey(tablePkeyVal, rec)
}
if err != nil {
return err
Expand All @@ -428,7 +412,7 @@ func (p *PostgresCDCSource) consumeStream(
case *model.InsertRecord:
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err = addRecord(rec)
err := addRecordWithKey(nil, rec)
if err != nil {
return err
}
Expand All @@ -438,42 +422,53 @@ func (p *PostgresCDCSource) consumeStream(
return err
}

err = addRecordWithKey(*tablePkeyVal, rec)
err = addRecordWithKey(tablePkeyVal, rec)
if err != nil {
return err
}
}
case *model.DeleteRecord:
tablePkeyVal, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err := addRecordWithKey(nil, rec)
if err != nil {
return err
}
} else {
tablePkeyVal, err := p.recToTablePKey(req, rec)
if err != nil {
return err
}

latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal)
if err != nil {
return err
}
if ok {
deleteRecord := rec.(*model.DeleteRecord)
deleteRecord.Items = latestRecord.GetItems()
updateRecord, ok := latestRecord.(*model.UpdateRecord)
latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal)
if err != nil {
return err
}
if ok {
deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns
deleteRecord := rec.(*model.DeleteRecord)
deleteRecord.Items = latestRecord.GetItems()
updateRecord, ok := latestRecord.(*model.UpdateRecord)
if ok {
deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns
}
} else {
deleteRecord := rec.(*model.DeleteRecord)
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
deleteRecord.UnchangedToastColumns = map[string]struct{}{
"_peerdb_not_backfilled_delete": {},
}
}
} else {
deleteRecord := rec.(*model.DeleteRecord)
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
deleteRecord.UnchangedToastColumns = map[string]struct{}{
"_peerdb_not_backfilled_delete": {},

// A delete can only be followed by an INSERT, which does not need backfilling
// No need to store DeleteRecords in memory or disk.
err = addRecordWithKey(nil, rec)
if err != nil {
return err
}
}

err = addRecord(rec)
if err != nil {
return err
}
case *model.RelationRecord:
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 {
Expand Down Expand Up @@ -524,9 +519,6 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
return nil, nil
}

// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
// the state of the relation message somewhere
p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))
if p.relationMessageMapping[msg.RelationID] == nil {
Expand Down Expand Up @@ -556,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// log lsn and relation id for debugging
p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -591,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// log lsn and relation id for debugging
p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -634,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// log lsn and relation id for debugging
p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down
58 changes: 30 additions & 28 deletions flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,39 @@ func (c *cdcRecordsStore) initPebbleDB() error {
return nil
}

func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error {
_, ok := c.inMemoryRecords[key]
if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold {
c.inMemoryRecords[key] = rec
} else {
if c.pebbleDB == nil {
slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk",
c.numRecordsSwitchThreshold),
slog.String(string(shared.FlowNameKey), c.flowJobName))
err := c.initPebbleDB()
func (c *cdcRecordsStore) Set(key *model.TableWithPkey, rec model.Record) error {
if key != nil {
_, ok := c.inMemoryRecords[*key]
if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold {
c.inMemoryRecords[*key] = rec
} else {
if c.pebbleDB == nil {
slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk",
c.numRecordsSwitchThreshold),
slog.String(string(shared.FlowNameKey), c.flowJobName))
err := c.initPebbleDB()
if err != nil {
return err
}
}

encodedKey, err := encVal(key)
if err != nil {
return err
}
}

encodedKey, err := encVal(key)
if err != nil {
return err
}
// necessary to point pointer to interface so the interface is exposed
// instead of the underlying type
encodedRec, err := encVal(&rec)
if err != nil {
return err
}
// we're using Pebble as a cache, no need for durability here.
err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{
Sync: false,
})
if err != nil {
return fmt.Errorf("unable to store value in Pebble: %w", err)
// necessary to point pointer to interface so the interface is exposed
// instead of the underlying type
encodedRec, err := encVal(&rec)
if err != nil {
return err
}
// we're using Pebble as a cache, no need for durability here.
err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{
Sync: false,
})
if err != nil {
return fmt.Errorf("unable to store value in Pebble: %w", err)
}
}
}
c.numRecords++
Expand Down
Loading

0 comments on commit 169112a

Please sign in to comment.