diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 61985eff8e..287082dcdd 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -322,10 +322,6 @@ func (a *FlowableActivity) SyncFlow( errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - if options.RelationMessageMapping == nil { - options.RelationMessageMapping = make(map[uint32]*protos.RelationMessage) - } - return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, SrcTableIDNameMapping: options.SrcTableIdNameMapping, @@ -338,7 +334,6 @@ func (a *FlowableActivity) SyncFlow( TableNameSchemaMapping: options.TableNameSchemaMapping, OverridePublicationName: config.PublicationName, OverrideReplicationSlotName: config.ReplicationSlotName, - RelationMessageMapping: options.RelationMessageMapping, RecordStream: recordBatch, }) }) @@ -361,9 +356,8 @@ func (a *FlowableActivity) SyncFlow( } return &model.SyncResponse{ - CurrentSyncBatchID: -1, - TableSchemaDeltas: recordBatch.SchemaDeltas, - RelationMessageMapping: options.RelationMessageMapping, + CurrentSyncBatchID: -1, + TableSchemaDeltas: recordBatch.SchemaDeltas, }, nil } @@ -401,7 +395,6 @@ func (a *FlowableActivity) SyncFlow( a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to push records: %w", err) } - res.RelationMessageMapping = options.RelationMessageMapping return nil }) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 868bf845a5..a4b4023d1b 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -80,11 +80,6 @@ func (s *QRepAvroSyncMethod) SyncRecords( req.FlowJobName, rawTableName, syncBatchID), ) - err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) - if err != nil { - return nil, fmt.Errorf("failed to sync schema changes: %w", err) - } - query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID query.DefaultProjectID = s.connector.projectID @@ -112,6 +107,11 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String(string(shared.FlowNameKey), req.FlowJobName), slog.String("dstTableName", rawTableName)) + err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) + if err != nil { + return nil, fmt.Errorf("failed to sync schema changes: %w", err) + } + return &model.SyncResponse{ LastSyncedCheckpointID: lastCP, NumRecordsSynced: int64(numRecords), diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 4a4ecd3b75..adb6c5daab 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -28,11 +28,12 @@ import ( type PostgresCDCSource struct { *PostgresConnector - SrcTableIDNameMapping map[uint32]string - TableNameMapping map[string]model.NameAndExclude + srcTableIDNameMapping map[uint32]string + tableNameMapping map[string]model.NameAndExclude + tableNameSchemaMapping map[string]*protos.TableSchema + relationMessageMapping model.RelationMessageMapping slot string publication string - relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map commitLock bool @@ -49,10 +50,11 @@ type PostgresCDCConfig struct { Publication string SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude - RelationMessageMapping model.RelationMessageMapping + TableNameSchemaMapping map[string]*protos.TableSchema ChildToParentRelIDMap map[uint32]uint32 CatalogPool *pgxpool.Pool FlowJobName string + RelationMessageMapping model.RelationMessageMapping } type startReplicationOpts struct { @@ -65,11 +67,12 @@ type startReplicationOpts struct { func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { return &PostgresCDCSource{ PostgresConnector: c, - SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, - TableNameMapping: cdcConfig.TableNameMapping, + srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, + tableNameMapping: cdcConfig.TableNameMapping, + tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, + relationMessageMapping: cdcConfig.RelationMessageMapping, slot: cdcConfig.Slot, publication: cdcConfig.Publication, - relationMessageMapping: cdcConfig.RelationMessageMapping, childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, typeMap: pgtype.NewMap(), commitLock: false, @@ -437,20 +440,14 @@ func (p *PostgresCDCSource) processMessage( // treat all relation messages as corresponding to parent if partitioned. msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) - if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists { + if _, exists := p.srcTableIDNameMapping[msg.RelationID]; !exists { return nil, nil } p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) - if p.relationMessageMapping[msg.RelationID] == nil { - p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) - } else { - // RelationMessages don't contain an LSN, so we use current clientXlogPos instead. - // https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670 - return p.processRelationMessage(ctx, currentClientXlogPos, convertRelationMessageToProto(msg)) - } + return p.processRelationMessage(ctx, currentClientXlogPos, msg) case *pglogrepl.TruncateMessage: p.logger.Warn("TruncateMessage not supported") @@ -465,7 +462,7 @@ func (p *PostgresCDCSource) processInsertMessage( ) (model.Record, error) { relID := p.getParentRelIDIfPartitioned(msg.RelationID) - tableName, exists := p.SrcTableIDNameMapping[relID] + tableName, exists := p.srcTableIDNameMapping[relID] if !exists { return nil, nil } @@ -480,7 +477,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // create empty map of string to interface{} - items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude) + items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } @@ -488,7 +485,7 @@ func (p *PostgresCDCSource) processInsertMessage( return &model.InsertRecord{ CheckpointID: int64(lsn), Items: items, - DestinationTableName: p.TableNameMapping[tableName].Name, + DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, }, nil } @@ -500,7 +497,7 @@ func (p *PostgresCDCSource) processUpdateMessage( ) (model.Record, error) { relID := p.getParentRelIDIfPartitioned(msg.RelationID) - tableName, exists := p.SrcTableIDNameMapping[relID] + tableName, exists := p.srcTableIDNameMapping[relID] if !exists { return nil, nil } @@ -515,13 +512,13 @@ func (p *PostgresCDCSource) processUpdateMessage( } // create empty map of string to interface{} - oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude) + oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting old tuple to map: %w", err) } newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, - rel, p.TableNameMapping[tableName].Exclude) + rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting new tuple to map: %w", err) } @@ -530,7 +527,7 @@ func (p *PostgresCDCSource) processUpdateMessage( CheckpointID: int64(lsn), OldItems: oldItems, NewItems: newItems, - DestinationTableName: p.TableNameMapping[tableName].Name, + DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, UnchangedToastColumns: unchangedToastColumns, }, nil @@ -543,7 +540,7 @@ func (p *PostgresCDCSource) processDeleteMessage( ) (model.Record, error) { relID := p.getParentRelIDIfPartitioned(msg.RelationID) - tableName, exists := p.SrcTableIDNameMapping[relID] + tableName, exists := p.srcTableIDNameMapping[relID] if !exists { return nil, nil } @@ -558,7 +555,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // create empty map of string to interface{} - items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude) + items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } @@ -566,7 +563,7 @@ func (p *PostgresCDCSource) processDeleteMessage( return &model.DeleteRecord{ CheckpointID: int64(lsn), Items: items, - DestinationTableName: p.TableNameMapping[tableName].Name, + DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, }, nil } @@ -580,7 +577,7 @@ It takes a tuple and a relation message as input and returns */ func (p *PostgresCDCSource) convertTupleToMap( tuple *pglogrepl.TupleData, - rel *protos.RelationMessage, + rel *pglogrepl.RelationMessage, exclude map[string]struct{}, ) (*model.RecordItems, map[string]struct{}, error) { // if the tuple is nil, return an empty map @@ -685,22 +682,6 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil } -func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.RelationMessage { - protoColArray := make([]*protos.RelationMessageColumn, 0) - for _, column := range msg.Columns { - protoColArray = append(protoColArray, &protos.RelationMessageColumn{ - Name: column.Name, - Flags: uint32(column.Flags), - DataType: column.DataType, - }) - } - return &protos.RelationMessage{ - RelationId: msg.RelationID, - RelationName: msg.RelationName, - Columns: protoColArray, - } -} - func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName string, rec *model.RelationRecord) error { activityInfo := activity.GetInfo(ctx) workflowID := activityInfo.WorkflowExecution.ID @@ -721,66 +702,71 @@ func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName st func (p *PostgresCDCSource) processRelationMessage( ctx context.Context, lsn pglogrepl.LSN, - currRel *protos.RelationMessage, + currRel *pglogrepl.RelationMessage, ) (model.Record, error) { - // retrieve initial RelationMessage for table changed. - prevRel := p.relationMessageMapping[currRel.RelationId] + // not present in tables to sync, return immediately + p.logger.Warn("hello!", slog.Any("mapping", p.srcTableIDNameMapping), slog.Any("currRel", currRel)) + if _, ok := p.srcTableIDNameMapping[currRel.RelationID]; !ok { + return nil, nil + } + + // retrieve current TableSchema for table changed + // tableNameSchemaMapping uses dst table name as the key, so annoying lookup + prevSchema := p.tableNameSchemaMapping[p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name] // creating maps for lookup later - prevRelMap := make(map[string]*protos.PostgresTableIdentifier) - currRelMap := make(map[string]*protos.PostgresTableIdentifier) - for _, column := range prevRel.Columns { - prevRelMap[column.Name] = &protos.PostgresTableIdentifier{ - RelId: column.DataType, - } + prevRelMap := make(map[string]qvalue.QValueKind) + currRelMap := make(map[string]qvalue.QValueKind) + for _, column := range prevSchema.Columns { + prevRelMap[column.Name] = qvalue.QValueKind(column.Type) } for _, column := range currRel.Columns { - currRelMap[column.Name] = &protos.PostgresTableIdentifier{ - RelId: column.DataType, + qKind := p.postgresOIDToQValueKind(column.DataType) + if qKind == qvalue.QValueKindInvalid { + typeName, ok := p.customTypesMapping[column.DataType] + if ok { + qKind = customTypeToQKind(typeName) + } } + currRelMap[column.Name] = qKind } schemaDelta := &protos.TableSchemaDelta{ - // set it to the source table for now, so we can update the schema on the source side - // then at the Workflow level we set it t - SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId], - DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name, + SrcTableName: p.srcTableIDNameMapping[currRel.RelationID], + DstTableName: p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name, AddedColumns: make([]*protos.DeltaAddedColumn, 0), } for _, column := range currRel.Columns { // not present in previous relation message, but in current one, so added. - if prevRelMap[column.Name] == nil { - qKind := p.postgresOIDToQValueKind(column.DataType) - if qKind == qvalue.QValueKindInvalid { - typeName, ok := p.customTypesMapping[column.DataType] - if ok { - qKind = customTypeToQKind(typeName) - } - } + if _, ok := prevRelMap[column.Name]; !ok { schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{ ColumnName: column.Name, - ColumnType: string(qKind), + ColumnType: string(currRelMap[column.Name]), }) // present in previous and current relation messages, but data types have changed. // so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first. - } else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId { - p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column, - schemaDelta.SrcTableName)) + } else if prevRelMap[column.Name] != currRelMap[column.Name] { + p.logger.Warn(fmt.Sprintf("Detected column %s with type changed from %s to %s in table %s, but not propagating", + column.Name, prevRelMap[column.Name], currRelMap[column.Name], schemaDelta.SrcTableName)) } } - for _, column := range prevRel.Columns { + for _, column := range prevSchema.Columns { // present in previous relation message, but not in current one, so dropped. - if currRelMap[column.Name] == nil { + if _, ok := currRelMap[column.Name]; !ok { p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column, schemaDelta.SrcTableName)) } } - p.relationMessageMapping[currRel.RelationId] = currRel - rec := &model.RelationRecord{ - TableSchemaDelta: schemaDelta, - CheckpointID: int64(lsn), + p.relationMessageMapping[currRel.RelationID] = currRel + // only log audit if there is actionable delta + if len(schemaDelta.AddedColumns) > 0 { + rec := &model.RelationRecord{ + TableSchemaDelta: schemaDelta, + CheckpointID: int64(lsn), + } + return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec) } - return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec) + return nil, nil } func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 49afa99002..f82b867f2b 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -30,18 +30,19 @@ import ( ) type PostgresConnector struct { - connStr string - config *protos.PostgresConfig - ssh *SSHTunnel - conn *pgx.Conn - replConfig *pgx.ConnConfig - replConn *pgx.Conn - replState *ReplState - replLock sync.Mutex - customTypesMapping map[uint32]string - metadataSchema string - hushWarnOID map[uint32]struct{} - logger log.Logger + connStr string + config *protos.PostgresConfig + ssh *SSHTunnel + conn *pgx.Conn + replConfig *pgx.ConnConfig + replConn *pgx.Conn + replState *ReplState + replLock sync.Mutex + customTypesMapping map[uint32]string + metadataSchema string + hushWarnOID map[uint32]struct{} + logger log.Logger + relationMessageMapping model.RelationMessageMapping } type ReplState struct { @@ -90,17 +91,18 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) } return &PostgresConnector{ - connStr: connectionString, - config: pgConfig, - ssh: tunnel, - conn: conn, - replConfig: replConfig, - replState: nil, - replLock: sync.Mutex{}, - customTypesMapping: customTypeMap, - metadataSchema: metadataSchema, - hushWarnOID: make(map[uint32]struct{}), - logger: logger.LoggerFromCtx(ctx), + connStr: connectionString, + config: pgConfig, + ssh: tunnel, + conn: conn, + replConfig: replConfig, + replState: nil, + replLock: sync.Mutex{}, + customTypesMapping: customTypeMap, + metadataSchema: metadataSchema, + hushWarnOID: make(map[uint32]struct{}), + logger: logger.LoggerFromCtx(ctx), + relationMessageMapping: make(model.RelationMessageMapping), }, nil } @@ -356,10 +358,11 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo Slot: slotName, Publication: publicationName, TableNameMapping: req.TableNameMapping, - RelationMessageMapping: req.RelationMessageMapping, + TableNameSchemaMapping: req.TableNameSchemaMapping, ChildToParentRelIDMap: childToParentRelIDMap, CatalogPool: catalogPool, FlowJobName: req.FlowJobName, + RelationMessageMapping: c.relationMessageMapping, }) err = cdc.PullRecords(ctx, req) @@ -476,11 +479,6 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco } } - err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) - if err != nil { - return nil, fmt.Errorf("failed to sync schema changes: %w", err) - } - syncRecordsTx, err := c.conn.Begin(ctx) if err != nil { return nil, fmt.Errorf("error starting transaction for syncing records: %w", err) @@ -521,6 +519,11 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco return nil, err } + err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) + if err != nil { + return nil, fmt.Errorf("failed to sync schema changes: %w", err) + } + return &model.SyncResponse{ LastSyncedCheckpointID: lastCP, NumRecordsSynced: int64(numRecords), diff --git a/flow/model/model.go b/flow/model/model.go index 5a900c928b..9e38e237de 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -3,6 +3,8 @@ package model import ( "time" + "github.com/jackc/pglogrepl" + "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -38,8 +40,6 @@ type PullRecordsRequest struct { OverridePublicationName string // override replication slot name OverrideReplicationSlotName string - // for supporting schema changes - RelationMessageMapping RelationMessageMapping // record batch for pushing changes into RecordStream *CDCRecordStream } @@ -187,8 +187,6 @@ type SyncResponse struct { TableNameRowsMapping map[string]uint32 // to be carried to parent workflow TableSchemaDeltas []*protos.TableSchemaDelta - // to be stored in state for future PullFlows - RelationMessageMapping RelationMessageMapping } type NormalizePayload struct { @@ -223,4 +221,4 @@ func (r *RelationRecord) GetItems() *RecordItems { return nil } -type RelationMessageMapping map[uint32]*protos.RelationMessage +type RelationMessageMapping map[uint32]*pglogrepl.RelationMessage diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c3de6393de..2bcd21773b 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -24,8 +24,7 @@ import ( type CDCFlowWorkflowState struct { // Current signalled state of the peer flow. ActiveSignal model.CDCFlowSignal - // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. - // Needed to support schema changes. + // deprecated field RelationMessageMapping model.RelationMessageMapping CurrentFlowStatus protos.FlowStatus // flow config update request, set to nil after processed @@ -457,13 +456,6 @@ func CDCFlowWorkflow( syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx) syncResultChan.AddToSelector(mainLoopSelector, func(result *model.SyncResponse, _ bool) { syncCount += 1 - if result != nil { - if state.SyncFlowOptions.RelationMessageMapping == nil { - state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping - } else { - maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping) - } - } }) normChan := model.NormalizeSignal.GetSignalChannel(ctx) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 949d1016dd..6f6cbdb418 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -74,7 +74,10 @@ func NormalizeFlowWorkflow( if s.SyncBatchID > state.SyncBatchID { state.SyncBatchID = s.SyncBatchID } - state.TableNameSchemaMapping = s.TableNameSchemaMapping + if s.TableNameSchemaMapping != nil { + state.TableNameSchemaMapping = s.TableNameSchemaMapping + } + state.Wait = false }) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index bc21c03596..ce6bf8c579 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -121,7 +121,6 @@ func SyncFlowWorkflow( "", childSyncFlowRes, ).Get(ctx, nil) - options.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping totalRecordsSynced += childSyncFlowRes.NumRecordsSynced logger.Info("Total records synced: ", slog.Int64("totalRecordsSynced", totalRecordsSynced)) diff --git a/protos/flow.proto b/protos/flow.proto index 08b87fb9d9..e872b0fa17 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -10,12 +10,14 @@ message TableNameMapping { string destination_table_name = 2; } +// deprecated message RelationMessageColumn { uint32 flags = 1; string name = 2; uint32 data_type = 3; } +// deprecated message RelationMessage { uint32 relation_id = 1; string relation_name = 2; @@ -99,6 +101,7 @@ message CreateTablesFromExistingOutput { message SyncFlowOptions { uint32 batch_size = 1; + // deprecated field map relation_message_mapping = 2; uint64 idle_timeout_seconds = 3; map src_table_id_name_mapping = 4;