Skip to content

Commit

Permalink
comparing RelationMessage against TableSchema, stop caching RelationM…
Browse files Browse the repository at this point in the history
…essages (#1438)

Previously we diffed the previous and current `RelationMessages`
received from Postgres during logical replication to ascertain if the
schema changed or not. This fails in cases where the schema changed
after mirror setup but before we get the first `RelationMessage` which
can happen a) during initial load, before CDC and b) during CDC, if no
rows are synced before schema changes are made.

This causes a desync between PeerDB's view of the table and the actual
status of the table, causing issues. Changed to diff from the current
`TableSchema` instead, which should always be present. Eliminates having
to store and pass `RelationMessageMapping` around, it is scoped to
reading from the replication connection.

Also fixes bugs with schema changes being handled incorrectly for PG and
BQ, and schema audits being logged without schema deltas actually being
present.
  • Loading branch information
heavycrystal authored Mar 9, 2024
1 parent 120786f commit 9432e24
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 135 deletions.
11 changes: 2 additions & 9 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,6 @@ func (a *FlowableActivity) SyncFlow(

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
if options.RelationMessageMapping == nil {
options.RelationMessageMapping = make(map[uint32]*protos.RelationMessage)
}

return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
Expand All @@ -338,7 +334,6 @@ func (a *FlowableActivity) SyncFlow(
TableNameSchemaMapping: options.TableNameSchemaMapping,
OverridePublicationName: config.PublicationName,
OverrideReplicationSlotName: config.ReplicationSlotName,
RelationMessageMapping: options.RelationMessageMapping,
RecordStream: recordBatch,
})
})
Expand All @@ -361,9 +356,8 @@ func (a *FlowableActivity) SyncFlow(
}

return &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatch.SchemaDeltas,
RelationMessageMapping: options.RelationMessageMapping,
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatch.SchemaDeltas,
}, nil
}

Expand Down Expand Up @@ -401,7 +395,6 @@ func (a *FlowableActivity) SyncFlow(
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = options.RelationMessageMapping

return nil
})
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ func (s *QRepAvroSyncMethod) SyncRecords(
req.FlowJobName, rawTableName, syncBatchID),
)

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

query := bqClient.Query(insertStmt)
query.DefaultDatasetID = s.connector.datasetID
query.DefaultProjectID = s.connector.projectID
Expand Down Expand Up @@ -112,6 +107,11 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(numRecords),
Expand Down
138 changes: 62 additions & 76 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (

type PostgresCDCSource struct {
*PostgresConnector
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
srcTableIDNameMapping map[uint32]string
tableNameMapping map[string]model.NameAndExclude
tableNameSchemaMapping map[string]*protos.TableSchema
relationMessageMapping model.RelationMessageMapping
slot string
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
commitLock bool

Expand All @@ -49,10 +50,11 @@ type PostgresCDCConfig struct {
Publication string
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
TableNameSchemaMapping map[string]*protos.TableSchema
ChildToParentRelIDMap map[uint32]uint32
CatalogPool *pgxpool.Pool
FlowJobName string
RelationMessageMapping model.RelationMessageMapping
}

type startReplicationOpts struct {
Expand All @@ -65,11 +67,12 @@ type startReplicationOpts struct {
func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
return &PostgresCDCSource{
PostgresConnector: c,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
tableNameMapping: cdcConfig.TableNameMapping,
tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping,
relationMessageMapping: cdcConfig.RelationMessageMapping,
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
typeMap: pgtype.NewMap(),
commitLock: false,
Expand Down Expand Up @@ -437,20 +440,14 @@ func (p *PostgresCDCSource) processMessage(
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)

if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists {
if _, exists := p.srcTableIDNameMapping[msg.RelationID]; !exists {
return nil, nil
}

p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))

if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
// RelationMessages don't contain an LSN, so we use current clientXlogPos instead.
// https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670
return p.processRelationMessage(ctx, currentClientXlogPos, convertRelationMessageToProto(msg))
}
return p.processRelationMessage(ctx, currentClientXlogPos, msg)

case *pglogrepl.TruncateMessage:
p.logger.Warn("TruncateMessage not supported")
Expand All @@ -465,7 +462,7 @@ func (p *PostgresCDCSource) processInsertMessage(
) (model.Record, error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
tableName, exists := p.srcTableIDNameMapping[relID]
if !exists {
return nil, nil
}
Expand All @@ -480,15 +477,15 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude)
items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.InsertRecord{
CheckpointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName].Name,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -500,7 +497,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
) (model.Record, error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
tableName, exists := p.srcTableIDNameMapping[relID]
if !exists {
return nil, nil
}
Expand All @@ -515,13 +512,13 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// create empty map of string to interface{}
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting old tuple to map: %w", err)
}

newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple,
rel, p.TableNameMapping[tableName].Exclude)
rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting new tuple to map: %w", err)
}
Expand All @@ -530,7 +527,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
CheckpointID: int64(lsn),
OldItems: oldItems,
NewItems: newItems,
DestinationTableName: p.TableNameMapping[tableName].Name,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
}, nil
Expand All @@ -543,7 +540,7 @@ func (p *PostgresCDCSource) processDeleteMessage(
) (model.Record, error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
tableName, exists := p.srcTableIDNameMapping[relID]
if !exists {
return nil, nil
}
Expand All @@ -558,15 +555,15 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.DeleteRecord{
CheckpointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName].Name,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -580,7 +577,7 @@ It takes a tuple and a relation message as input and returns
*/
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
rel *pglogrepl.RelationMessage,
exclude map[string]struct{},
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
Expand Down Expand Up @@ -685,22 +682,6 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
}

func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.RelationMessage {
protoColArray := make([]*protos.RelationMessageColumn, 0)
for _, column := range msg.Columns {
protoColArray = append(protoColArray, &protos.RelationMessageColumn{
Name: column.Name,
Flags: uint32(column.Flags),
DataType: column.DataType,
})
}
return &protos.RelationMessage{
RelationId: msg.RelationID,
RelationName: msg.RelationName,
Columns: protoColArray,
}
}

func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName string, rec *model.RelationRecord) error {
activityInfo := activity.GetInfo(ctx)
workflowID := activityInfo.WorkflowExecution.ID
Expand All @@ -721,66 +702,71 @@ func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName st
func (p *PostgresCDCSource) processRelationMessage(
ctx context.Context,
lsn pglogrepl.LSN,
currRel *protos.RelationMessage,
currRel *pglogrepl.RelationMessage,
) (model.Record, error) {
// retrieve initial RelationMessage for table changed.
prevRel := p.relationMessageMapping[currRel.RelationId]
// not present in tables to sync, return immediately
p.logger.Warn("hello!", slog.Any("mapping", p.srcTableIDNameMapping), slog.Any("currRel", currRel))
if _, ok := p.srcTableIDNameMapping[currRel.RelationID]; !ok {
return nil, nil
}

// retrieve current TableSchema for table changed
// tableNameSchemaMapping uses dst table name as the key, so annoying lookup
prevSchema := p.tableNameSchemaMapping[p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name]
// creating maps for lookup later
prevRelMap := make(map[string]*protos.PostgresTableIdentifier)
currRelMap := make(map[string]*protos.PostgresTableIdentifier)
for _, column := range prevRel.Columns {
prevRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
}
prevRelMap := make(map[string]qvalue.QValueKind)
currRelMap := make(map[string]qvalue.QValueKind)
for _, column := range prevSchema.Columns {
prevRelMap[column.Name] = qvalue.QValueKind(column.Type)
}
for _, column := range currRel.Columns {
currRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypesMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
}
currRelMap[column.Name] = qKind
}

schemaDelta := &protos.TableSchemaDelta{
// set it to the source table for now, so we can update the schema on the source side
// then at the Workflow level we set it t
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name,
SrcTableName: p.srcTableIDNameMapping[currRel.RelationID],
DstTableName: p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name,
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
}
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if prevRelMap[column.Name] == nil {
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypesMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
}
if _, ok := prevRelMap[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(qKind),
ColumnType: string(currRelMap[column.Name]),
})
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId {
p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName))
} else if prevRelMap[column.Name] != currRelMap[column.Name] {
p.logger.Warn(fmt.Sprintf("Detected column %s with type changed from %s to %s in table %s, but not propagating",
column.Name, prevRelMap[column.Name], currRelMap[column.Name], schemaDelta.SrcTableName))
}
}
for _, column := range prevRel.Columns {
for _, column := range prevSchema.Columns {
// present in previous relation message, but not in current one, so dropped.
if currRelMap[column.Name] == nil {
if _, ok := currRelMap[column.Name]; !ok {
p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName))
}
}

p.relationMessageMapping[currRel.RelationId] = currRel
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckpointID: int64(lsn),
p.relationMessageMapping[currRel.RelationID] = currRel
// only log audit if there is actionable delta
if len(schemaDelta.AddedColumns) > 0 {
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckpointID: int64(lsn),
}
return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec)
}
return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec)
return nil, nil
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
Expand Down
Loading

0 comments on commit 9432e24

Please sign in to comment.