Skip to content

Commit

Permalink
comparing RelationMessage against TableSchema, stop caching RelationM…
Browse files Browse the repository at this point in the history
…essages
  • Loading branch information
heavycrystal committed Mar 5, 2024
1 parent 0a32419 commit ae925c1
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 127 deletions.
11 changes: 2 additions & 9 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,6 @@ func (a *FlowableActivity) SyncFlow(

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
if options.RelationMessageMapping == nil {
options.RelationMessageMapping = make(map[uint32]*protos.RelationMessage)
}

return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
Expand All @@ -337,7 +333,6 @@ func (a *FlowableActivity) SyncFlow(
TableNameSchemaMapping: options.TableNameSchemaMapping,
OverridePublicationName: config.PublicationName,
OverrideReplicationSlotName: config.ReplicationSlotName,
RelationMessageMapping: options.RelationMessageMapping,
RecordStream: recordBatch,
})
})
Expand All @@ -360,9 +355,8 @@ func (a *FlowableActivity) SyncFlow(
}

return &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatch.SchemaDeltas,
RelationMessageMapping: options.RelationMessageMapping,
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatch.SchemaDeltas,
}, nil
}

Expand Down Expand Up @@ -400,7 +394,6 @@ func (a *FlowableActivity) SyncFlow(
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = options.RelationMessageMapping

return nil
})
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ func (s *QRepAvroSyncMethod) SyncRecords(
req.FlowJobName, rawTableName, syncBatchID),
)

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

query := bqClient.Query(insertStmt)
query.DefaultDatasetID = s.connector.datasetID
query.DefaultProjectID = s.connector.projectID
Expand Down Expand Up @@ -112,6 +107,11 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(numRecords),
Expand Down
137 changes: 61 additions & 76 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (

type PostgresCDCSource struct {
*PostgresConnector
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
srcTableIDNameMapping map[uint32]string
tableNameMapping map[string]model.NameAndExclude
tableNameSchemaMapping map[string]*protos.TableSchema
// relationMessageMapping is not an input, but populated by RelationMessages received here.
relationMessageMapping model.RelationMessageMapping
slot string
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
commitLock bool

Expand All @@ -49,7 +51,7 @@ type PostgresCDCConfig struct {
Publication string
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
TableNameSchemaMapping map[string]*protos.TableSchema
ChildToParentRelIDMap map[uint32]uint32
CatalogPool *pgxpool.Pool
FlowJobName string
Expand All @@ -65,11 +67,12 @@ type startReplicationOpts struct {
func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
return &PostgresCDCSource{
PostgresConnector: c,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
tableNameMapping: cdcConfig.TableNameMapping,
tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping,
relationMessageMapping: make(model.RelationMessageMapping),
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
typeMap: pgtype.NewMap(),
commitLock: false,
Expand Down Expand Up @@ -437,20 +440,14 @@ func (p *PostgresCDCSource) processMessage(
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)

if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists {
if _, exists := p.srcTableIDNameMapping[msg.RelationID]; !exists {
return nil, nil
}

p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))

if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
// RelationMessages don't contain an LSN, so we use current clientXlogPos instead.
// https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670
return p.processRelationMessage(ctx, currentClientXlogPos, convertRelationMessageToProto(msg))
}
return p.processRelationMessage(ctx, currentClientXlogPos, msg)

case *pglogrepl.TruncateMessage:
p.logger.Warn("TruncateMessage not supported")
Expand All @@ -465,7 +462,7 @@ func (p *PostgresCDCSource) processInsertMessage(
) (model.Record, error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
tableName, exists := p.srcTableIDNameMapping[relID]
if !exists {
return nil, nil
}
Expand All @@ -480,15 +477,15 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude)
items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.InsertRecord{
CheckpointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName].Name,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -500,7 +497,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
) (model.Record, error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
tableName, exists := p.srcTableIDNameMapping[relID]
if !exists {
return nil, nil
}
Expand All @@ -515,13 +512,13 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// create empty map of string to interface{}
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting old tuple to map: %w", err)
}

newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple,
rel, p.TableNameMapping[tableName].Exclude)
rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting new tuple to map: %w", err)
}
Expand All @@ -530,7 +527,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
CheckpointID: int64(lsn),
OldItems: oldItems,
NewItems: newItems,
DestinationTableName: p.TableNameMapping[tableName].Name,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
UnchangedToastColumns: unchangedToastColumns,
}, nil
Expand All @@ -543,7 +540,7 @@ func (p *PostgresCDCSource) processDeleteMessage(
) (model.Record, error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
tableName, exists := p.srcTableIDNameMapping[relID]
if !exists {
return nil, nil
}
Expand All @@ -558,15 +555,15 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

// create empty map of string to interface{}
items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude)
items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}

return &model.DeleteRecord{
CheckpointID: int64(lsn),
Items: items,
DestinationTableName: p.TableNameMapping[tableName].Name,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
}, nil
}
Expand All @@ -580,7 +577,7 @@ It takes a tuple and a relation message as input and returns
*/
func (p *PostgresCDCSource) convertTupleToMap(
tuple *pglogrepl.TupleData,
rel *protos.RelationMessage,
rel *pglogrepl.RelationMessage,
exclude map[string]struct{},
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
Expand Down Expand Up @@ -685,22 +682,6 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
}

func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.RelationMessage {
protoColArray := make([]*protos.RelationMessageColumn, 0)
for _, column := range msg.Columns {
protoColArray = append(protoColArray, &protos.RelationMessageColumn{
Name: column.Name,
Flags: uint32(column.Flags),
DataType: column.DataType,
})
}
return &protos.RelationMessage{
RelationId: msg.RelationID,
RelationName: msg.RelationName,
Columns: protoColArray,
}
}

func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName string, rec *model.RelationRecord) error {
activityInfo := activity.GetInfo(ctx)
workflowID := activityInfo.WorkflowExecution.ID
Expand All @@ -721,66 +702,70 @@ func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName st
func (p *PostgresCDCSource) processRelationMessage(
ctx context.Context,
lsn pglogrepl.LSN,
currRel *protos.RelationMessage,
currRel *pglogrepl.RelationMessage,
) (model.Record, error) {
// retrieve initial RelationMessage for table changed.
prevRel := p.relationMessageMapping[currRel.RelationId]
// not present in tables to sync, return immediately
if _, ok := p.srcTableIDNameMapping[currRel.RelationID]; !ok {
return nil, nil
}

// retrieve current TableSchema for table changed
// tableNameSchemaMapping uses dst table name as the key, so annoying lookup
prevSchema := p.tableNameSchemaMapping[p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name]
// creating maps for lookup later
prevRelMap := make(map[string]*protos.PostgresTableIdentifier)
currRelMap := make(map[string]*protos.PostgresTableIdentifier)
for _, column := range prevRel.Columns {
prevRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
}
prevRelMap := make(map[string]qvalue.QValueKind)
currRelMap := make(map[string]qvalue.QValueKind)
for _, column := range prevSchema.Columns {
prevRelMap[column.Name] = qvalue.QValueKind(column.Type)
}
for _, column := range currRel.Columns {
currRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypesMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
}
currRelMap[column.Name] = qKind
}

schemaDelta := &protos.TableSchemaDelta{
// set it to the source table for now, so we can update the schema on the source side
// then at the Workflow level we set it t
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name,
SrcTableName: p.srcTableIDNameMapping[currRel.RelationID],
DstTableName: p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name,
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
}
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if prevRelMap[column.Name] == nil {
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypesMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
}
if _, ok := prevRelMap[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(qKind),
ColumnType: string(currRelMap[column.Name]),
})
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId {
p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName))
} else if prevRelMap[column.Name] != currRelMap[column.Name] {
p.logger.Warn(fmt.Sprintf("Detected column %s with type changed from %s to %s in table %s, but not propagating",
column.Name, prevRelMap[column.Name], currRelMap[column.Name], schemaDelta.SrcTableName))
}
}
for _, column := range prevRel.Columns {
for _, column := range prevSchema.Columns {
// present in previous relation message, but not in current one, so dropped.
if currRelMap[column.Name] == nil {
if _, ok := currRelMap[column.Name]; !ok {
p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName))
}
}

p.relationMessageMapping[currRel.RelationId] = currRel
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckpointID: int64(lsn),
p.relationMessageMapping[currRel.RelationID] = currRel
// only log audit if there is actually is an actionable delta
if len(schemaDelta.AddedColumns) > 0 {
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckpointID: int64(lsn),
}
return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec)
}
return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec)
return nil, nil
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
Slot: slotName,
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
TableNameSchemaMapping: req.TableNameSchemaMapping,
ChildToParentRelIDMap: childToParentRelIDMap,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
Expand Down Expand Up @@ -476,11 +476,6 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
}
}

err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

syncRecordsTx, err := c.conn.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("error starting transaction for syncing records: %w", err)
Expand Down Expand Up @@ -521,6 +516,11 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
return nil, err
}

err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(numRecords),
Expand Down
Loading

0 comments on commit ae925c1

Please sign in to comment.