From 9432e2417b44d3b78d9e713360697230d73085cb Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Sat, 9 Mar 2024 21:05:21 +0530 Subject: [PATCH] comparing RelationMessage against TableSchema, stop caching RelationMessages (#1438) Previously we diffed the previous and current `RelationMessages` received from Postgres during logical replication to ascertain if the schema changed or not. This fails in cases where the schema changed after mirror setup but before we get the first `RelationMessage` which can happen a) during initial load, before CDC and b) during CDC, if no rows are synced before schema changes are made. This causes a desync between PeerDB's view of the table and the actual status of the table, causing issues. Changed to diff from the current `TableSchema` instead, which should always be present. Eliminates having to store and pass `RelationMessageMapping` around, it is scoped to reading from the replication connection. Also fixes bugs with schema changes being handled incorrectly for PG and BQ, and schema audits being logged without schema deltas actually being present. --- flow/activities/flowable.go | 11 +- flow/connectors/bigquery/qrep_avro_sync.go | 10 +- flow/connectors/postgres/cdc.go | 138 +++++++++------------ flow/connectors/postgres/postgres.go | 61 ++++----- flow/model/model.go | 8 +- flow/workflows/cdc_flow.go | 10 +- flow/workflows/normalize_flow.go | 5 +- flow/workflows/sync_flow.go | 1 - protos/flow.proto | 3 + 9 files changed, 112 insertions(+), 135 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 61985eff8e..287082dcdd 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -322,10 +322,6 @@ func (a *FlowableActivity) SyncFlow( errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - if options.RelationMessageMapping == nil { - options.RelationMessageMapping = make(map[uint32]*protos.RelationMessage) - } - return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, SrcTableIDNameMapping: options.SrcTableIdNameMapping, @@ -338,7 +334,6 @@ func (a *FlowableActivity) SyncFlow( TableNameSchemaMapping: options.TableNameSchemaMapping, OverridePublicationName: config.PublicationName, OverrideReplicationSlotName: config.ReplicationSlotName, - RelationMessageMapping: options.RelationMessageMapping, RecordStream: recordBatch, }) }) @@ -361,9 +356,8 @@ func (a *FlowableActivity) SyncFlow( } return &model.SyncResponse{ - CurrentSyncBatchID: -1, - TableSchemaDeltas: recordBatch.SchemaDeltas, - RelationMessageMapping: options.RelationMessageMapping, + CurrentSyncBatchID: -1, + TableSchemaDeltas: recordBatch.SchemaDeltas, }, nil } @@ -401,7 +395,6 @@ func (a *FlowableActivity) SyncFlow( a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to push records: %w", err) } - res.RelationMessageMapping = options.RelationMessageMapping return nil }) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 868bf845a5..a4b4023d1b 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -80,11 +80,6 @@ func (s *QRepAvroSyncMethod) SyncRecords( req.FlowJobName, rawTableName, syncBatchID), ) - err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) - if err != nil { - return nil, fmt.Errorf("failed to sync schema changes: %w", err) - } - query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID query.DefaultProjectID = s.connector.projectID @@ -112,6 +107,11 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String(string(shared.FlowNameKey), req.FlowJobName), slog.String("dstTableName", rawTableName)) + err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) + if err != nil { + return nil, fmt.Errorf("failed to sync schema changes: %w", err) + } + return &model.SyncResponse{ LastSyncedCheckpointID: lastCP, NumRecordsSynced: int64(numRecords), diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 4a4ecd3b75..adb6c5daab 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -28,11 +28,12 @@ import ( type PostgresCDCSource struct { *PostgresConnector - SrcTableIDNameMapping map[uint32]string - TableNameMapping map[string]model.NameAndExclude + srcTableIDNameMapping map[uint32]string + tableNameMapping map[string]model.NameAndExclude + tableNameSchemaMapping map[string]*protos.TableSchema + relationMessageMapping model.RelationMessageMapping slot string publication string - relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map commitLock bool @@ -49,10 +50,11 @@ type PostgresCDCConfig struct { Publication string SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude - RelationMessageMapping model.RelationMessageMapping + TableNameSchemaMapping map[string]*protos.TableSchema ChildToParentRelIDMap map[uint32]uint32 CatalogPool *pgxpool.Pool FlowJobName string + RelationMessageMapping model.RelationMessageMapping } type startReplicationOpts struct { @@ -65,11 +67,12 @@ type startReplicationOpts struct { func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { return &PostgresCDCSource{ PostgresConnector: c, - SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, - TableNameMapping: cdcConfig.TableNameMapping, + srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, + tableNameMapping: cdcConfig.TableNameMapping, + tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, + relationMessageMapping: cdcConfig.RelationMessageMapping, slot: cdcConfig.Slot, publication: cdcConfig.Publication, - relationMessageMapping: cdcConfig.RelationMessageMapping, childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, typeMap: pgtype.NewMap(), commitLock: false, @@ -437,20 +440,14 @@ func (p *PostgresCDCSource) processMessage( // treat all relation messages as corresponding to parent if partitioned. msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) - if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists { + if _, exists := p.srcTableIDNameMapping[msg.RelationID]; !exists { return nil, nil } p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) - if p.relationMessageMapping[msg.RelationID] == nil { - p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) - } else { - // RelationMessages don't contain an LSN, so we use current clientXlogPos instead. - // https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670 - return p.processRelationMessage(ctx, currentClientXlogPos, convertRelationMessageToProto(msg)) - } + return p.processRelationMessage(ctx, currentClientXlogPos, msg) case *pglogrepl.TruncateMessage: p.logger.Warn("TruncateMessage not supported") @@ -465,7 +462,7 @@ func (p *PostgresCDCSource) processInsertMessage( ) (model.Record, error) { relID := p.getParentRelIDIfPartitioned(msg.RelationID) - tableName, exists := p.SrcTableIDNameMapping[relID] + tableName, exists := p.srcTableIDNameMapping[relID] if !exists { return nil, nil } @@ -480,7 +477,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // create empty map of string to interface{} - items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude) + items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } @@ -488,7 +485,7 @@ func (p *PostgresCDCSource) processInsertMessage( return &model.InsertRecord{ CheckpointID: int64(lsn), Items: items, - DestinationTableName: p.TableNameMapping[tableName].Name, + DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, }, nil } @@ -500,7 +497,7 @@ func (p *PostgresCDCSource) processUpdateMessage( ) (model.Record, error) { relID := p.getParentRelIDIfPartitioned(msg.RelationID) - tableName, exists := p.SrcTableIDNameMapping[relID] + tableName, exists := p.srcTableIDNameMapping[relID] if !exists { return nil, nil } @@ -515,13 +512,13 @@ func (p *PostgresCDCSource) processUpdateMessage( } // create empty map of string to interface{} - oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude) + oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting old tuple to map: %w", err) } newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, - rel, p.TableNameMapping[tableName].Exclude) + rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting new tuple to map: %w", err) } @@ -530,7 +527,7 @@ func (p *PostgresCDCSource) processUpdateMessage( CheckpointID: int64(lsn), OldItems: oldItems, NewItems: newItems, - DestinationTableName: p.TableNameMapping[tableName].Name, + DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, UnchangedToastColumns: unchangedToastColumns, }, nil @@ -543,7 +540,7 @@ func (p *PostgresCDCSource) processDeleteMessage( ) (model.Record, error) { relID := p.getParentRelIDIfPartitioned(msg.RelationID) - tableName, exists := p.SrcTableIDNameMapping[relID] + tableName, exists := p.srcTableIDNameMapping[relID] if !exists { return nil, nil } @@ -558,7 +555,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // create empty map of string to interface{} - items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude) + items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } @@ -566,7 +563,7 @@ func (p *PostgresCDCSource) processDeleteMessage( return &model.DeleteRecord{ CheckpointID: int64(lsn), Items: items, - DestinationTableName: p.TableNameMapping[tableName].Name, + DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, }, nil } @@ -580,7 +577,7 @@ It takes a tuple and a relation message as input and returns */ func (p *PostgresCDCSource) convertTupleToMap( tuple *pglogrepl.TupleData, - rel *protos.RelationMessage, + rel *pglogrepl.RelationMessage, exclude map[string]struct{}, ) (*model.RecordItems, map[string]struct{}, error) { // if the tuple is nil, return an empty map @@ -685,22 +682,6 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil } -func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.RelationMessage { - protoColArray := make([]*protos.RelationMessageColumn, 0) - for _, column := range msg.Columns { - protoColArray = append(protoColArray, &protos.RelationMessageColumn{ - Name: column.Name, - Flags: uint32(column.Flags), - DataType: column.DataType, - }) - } - return &protos.RelationMessage{ - RelationId: msg.RelationID, - RelationName: msg.RelationName, - Columns: protoColArray, - } -} - func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName string, rec *model.RelationRecord) error { activityInfo := activity.GetInfo(ctx) workflowID := activityInfo.WorkflowExecution.ID @@ -721,66 +702,71 @@ func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName st func (p *PostgresCDCSource) processRelationMessage( ctx context.Context, lsn pglogrepl.LSN, - currRel *protos.RelationMessage, + currRel *pglogrepl.RelationMessage, ) (model.Record, error) { - // retrieve initial RelationMessage for table changed. - prevRel := p.relationMessageMapping[currRel.RelationId] + // not present in tables to sync, return immediately + p.logger.Warn("hello!", slog.Any("mapping", p.srcTableIDNameMapping), slog.Any("currRel", currRel)) + if _, ok := p.srcTableIDNameMapping[currRel.RelationID]; !ok { + return nil, nil + } + + // retrieve current TableSchema for table changed + // tableNameSchemaMapping uses dst table name as the key, so annoying lookup + prevSchema := p.tableNameSchemaMapping[p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name] // creating maps for lookup later - prevRelMap := make(map[string]*protos.PostgresTableIdentifier) - currRelMap := make(map[string]*protos.PostgresTableIdentifier) - for _, column := range prevRel.Columns { - prevRelMap[column.Name] = &protos.PostgresTableIdentifier{ - RelId: column.DataType, - } + prevRelMap := make(map[string]qvalue.QValueKind) + currRelMap := make(map[string]qvalue.QValueKind) + for _, column := range prevSchema.Columns { + prevRelMap[column.Name] = qvalue.QValueKind(column.Type) } for _, column := range currRel.Columns { - currRelMap[column.Name] = &protos.PostgresTableIdentifier{ - RelId: column.DataType, + qKind := p.postgresOIDToQValueKind(column.DataType) + if qKind == qvalue.QValueKindInvalid { + typeName, ok := p.customTypesMapping[column.DataType] + if ok { + qKind = customTypeToQKind(typeName) + } } + currRelMap[column.Name] = qKind } schemaDelta := &protos.TableSchemaDelta{ - // set it to the source table for now, so we can update the schema on the source side - // then at the Workflow level we set it t - SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId], - DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name, + SrcTableName: p.srcTableIDNameMapping[currRel.RelationID], + DstTableName: p.tableNameMapping[p.srcTableIDNameMapping[currRel.RelationID]].Name, AddedColumns: make([]*protos.DeltaAddedColumn, 0), } for _, column := range currRel.Columns { // not present in previous relation message, but in current one, so added. - if prevRelMap[column.Name] == nil { - qKind := p.postgresOIDToQValueKind(column.DataType) - if qKind == qvalue.QValueKindInvalid { - typeName, ok := p.customTypesMapping[column.DataType] - if ok { - qKind = customTypeToQKind(typeName) - } - } + if _, ok := prevRelMap[column.Name]; !ok { schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{ ColumnName: column.Name, - ColumnType: string(qKind), + ColumnType: string(currRelMap[column.Name]), }) // present in previous and current relation messages, but data types have changed. // so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first. - } else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId { - p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column, - schemaDelta.SrcTableName)) + } else if prevRelMap[column.Name] != currRelMap[column.Name] { + p.logger.Warn(fmt.Sprintf("Detected column %s with type changed from %s to %s in table %s, but not propagating", + column.Name, prevRelMap[column.Name], currRelMap[column.Name], schemaDelta.SrcTableName)) } } - for _, column := range prevRel.Columns { + for _, column := range prevSchema.Columns { // present in previous relation message, but not in current one, so dropped. - if currRelMap[column.Name] == nil { + if _, ok := currRelMap[column.Name]; !ok { p.logger.Warn(fmt.Sprintf("Detected dropped column %s in table %s, but not propagating", column, schemaDelta.SrcTableName)) } } - p.relationMessageMapping[currRel.RelationId] = currRel - rec := &model.RelationRecord{ - TableSchemaDelta: schemaDelta, - CheckpointID: int64(lsn), + p.relationMessageMapping[currRel.RelationID] = currRel + // only log audit if there is actionable delta + if len(schemaDelta.AddedColumns) > 0 { + rec := &model.RelationRecord{ + TableSchemaDelta: schemaDelta, + CheckpointID: int64(lsn), + } + return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec) } - return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec) + return nil, nil } func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 49afa99002..f82b867f2b 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -30,18 +30,19 @@ import ( ) type PostgresConnector struct { - connStr string - config *protos.PostgresConfig - ssh *SSHTunnel - conn *pgx.Conn - replConfig *pgx.ConnConfig - replConn *pgx.Conn - replState *ReplState - replLock sync.Mutex - customTypesMapping map[uint32]string - metadataSchema string - hushWarnOID map[uint32]struct{} - logger log.Logger + connStr string + config *protos.PostgresConfig + ssh *SSHTunnel + conn *pgx.Conn + replConfig *pgx.ConnConfig + replConn *pgx.Conn + replState *ReplState + replLock sync.Mutex + customTypesMapping map[uint32]string + metadataSchema string + hushWarnOID map[uint32]struct{} + logger log.Logger + relationMessageMapping model.RelationMessageMapping } type ReplState struct { @@ -90,17 +91,18 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) } return &PostgresConnector{ - connStr: connectionString, - config: pgConfig, - ssh: tunnel, - conn: conn, - replConfig: replConfig, - replState: nil, - replLock: sync.Mutex{}, - customTypesMapping: customTypeMap, - metadataSchema: metadataSchema, - hushWarnOID: make(map[uint32]struct{}), - logger: logger.LoggerFromCtx(ctx), + connStr: connectionString, + config: pgConfig, + ssh: tunnel, + conn: conn, + replConfig: replConfig, + replState: nil, + replLock: sync.Mutex{}, + customTypesMapping: customTypeMap, + metadataSchema: metadataSchema, + hushWarnOID: make(map[uint32]struct{}), + logger: logger.LoggerFromCtx(ctx), + relationMessageMapping: make(model.RelationMessageMapping), }, nil } @@ -356,10 +358,11 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo Slot: slotName, Publication: publicationName, TableNameMapping: req.TableNameMapping, - RelationMessageMapping: req.RelationMessageMapping, + TableNameSchemaMapping: req.TableNameSchemaMapping, ChildToParentRelIDMap: childToParentRelIDMap, CatalogPool: catalogPool, FlowJobName: req.FlowJobName, + RelationMessageMapping: c.relationMessageMapping, }) err = cdc.PullRecords(ctx, req) @@ -476,11 +479,6 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco } } - err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) - if err != nil { - return nil, fmt.Errorf("failed to sync schema changes: %w", err) - } - syncRecordsTx, err := c.conn.Begin(ctx) if err != nil { return nil, fmt.Errorf("error starting transaction for syncing records: %w", err) @@ -521,6 +519,11 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco return nil, err } + err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas) + if err != nil { + return nil, fmt.Errorf("failed to sync schema changes: %w", err) + } + return &model.SyncResponse{ LastSyncedCheckpointID: lastCP, NumRecordsSynced: int64(numRecords), diff --git a/flow/model/model.go b/flow/model/model.go index 5a900c928b..9e38e237de 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -3,6 +3,8 @@ package model import ( "time" + "github.com/jackc/pglogrepl" + "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -38,8 +40,6 @@ type PullRecordsRequest struct { OverridePublicationName string // override replication slot name OverrideReplicationSlotName string - // for supporting schema changes - RelationMessageMapping RelationMessageMapping // record batch for pushing changes into RecordStream *CDCRecordStream } @@ -187,8 +187,6 @@ type SyncResponse struct { TableNameRowsMapping map[string]uint32 // to be carried to parent workflow TableSchemaDeltas []*protos.TableSchemaDelta - // to be stored in state for future PullFlows - RelationMessageMapping RelationMessageMapping } type NormalizePayload struct { @@ -223,4 +221,4 @@ func (r *RelationRecord) GetItems() *RecordItems { return nil } -type RelationMessageMapping map[uint32]*protos.RelationMessage +type RelationMessageMapping map[uint32]*pglogrepl.RelationMessage diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c3de6393de..2bcd21773b 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -24,8 +24,7 @@ import ( type CDCFlowWorkflowState struct { // Current signalled state of the peer flow. ActiveSignal model.CDCFlowSignal - // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. - // Needed to support schema changes. + // deprecated field RelationMessageMapping model.RelationMessageMapping CurrentFlowStatus protos.FlowStatus // flow config update request, set to nil after processed @@ -457,13 +456,6 @@ func CDCFlowWorkflow( syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx) syncResultChan.AddToSelector(mainLoopSelector, func(result *model.SyncResponse, _ bool) { syncCount += 1 - if result != nil { - if state.SyncFlowOptions.RelationMessageMapping == nil { - state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping - } else { - maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping) - } - } }) normChan := model.NormalizeSignal.GetSignalChannel(ctx) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 949d1016dd..6f6cbdb418 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -74,7 +74,10 @@ func NormalizeFlowWorkflow( if s.SyncBatchID > state.SyncBatchID { state.SyncBatchID = s.SyncBatchID } - state.TableNameSchemaMapping = s.TableNameSchemaMapping + if s.TableNameSchemaMapping != nil { + state.TableNameSchemaMapping = s.TableNameSchemaMapping + } + state.Wait = false }) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index bc21c03596..ce6bf8c579 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -121,7 +121,6 @@ func SyncFlowWorkflow( "", childSyncFlowRes, ).Get(ctx, nil) - options.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping totalRecordsSynced += childSyncFlowRes.NumRecordsSynced logger.Info("Total records synced: ", slog.Int64("totalRecordsSynced", totalRecordsSynced)) diff --git a/protos/flow.proto b/protos/flow.proto index 08b87fb9d9..e872b0fa17 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -10,12 +10,14 @@ message TableNameMapping { string destination_table_name = 2; } +// deprecated message RelationMessageColumn { uint32 flags = 1; string name = 2; uint32 data_type = 3; } +// deprecated message RelationMessage { uint32 relation_id = 1; string relation_name = 2; @@ -99,6 +101,7 @@ message CreateTablesFromExistingOutput { message SyncFlowOptions { uint32 batch_size = 1; + // deprecated field map relation_message_mapping = 2; uint64 idle_timeout_seconds = 3; map src_table_id_name_mapping = 4;