diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 4683adb072..4f61388477 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -35,7 +35,7 @@ type PostgresCDCSource struct { slot string publication string typeMap *pgtype.Map - commitLock bool + commitLock *pglogrepl.BeginMessage // for partitioned tables, maps child relid to parent relid childToParentRelIDMapping map[uint32]uint32 @@ -75,7 +75,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) * publication: cdcConfig.Publication, childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, typeMap: pgtype.NewMap(), - commitLock: false, + commitLock: nil, catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, } @@ -188,7 +188,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco } } - if !p.commitLock { + if p.commitLock == nil { if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) { return nil } @@ -208,7 +208,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco if !cdcRecordsStorage.IsEmpty() { p.logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len())) - if !p.commitLock { + if p.commitLock == nil { p.logger.Info( fmt.Sprintf("no commit lock, returning currently accumulated records - %d", cdcRecordsStorage.Len())) @@ -241,7 +241,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco return fmt.Errorf("consumeStream preempted: %w", ctxErr) } - if err != nil && !p.commitLock { + if err != nil && p.commitLock == nil { if pgconn.Timeout(err) { p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d", cdcRecordsStorage.Len())) @@ -408,6 +408,13 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco } } +func (p *PostgresCDCSource) commitTime() time.Time { + if p.commitLock != nil { + return p.commitLock.CommitTime + } + return time.Time{} +} + func (p *PostgresCDCSource) processMessage( ctx context.Context, batch *model.CDCRecordStream, @@ -423,7 +430,7 @@ func (p *PostgresCDCSource) processMessage( case *pglogrepl.BeginMessage: p.logger.Debug(fmt.Sprintf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid)) p.logger.Debug("Locking PullRecords at BeginMessage, awaiting CommitMessage") - p.commitLock = true + p.commitLock = msg case *pglogrepl.InsertMessage: return p.processInsertMessage(xld.WALStart, msg) case *pglogrepl.UpdateMessage: @@ -435,7 +442,7 @@ func (p *PostgresCDCSource) processMessage( p.logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", msg.CommitLSN, msg.TransactionEndLSN)) batch.UpdateLatestCheckpoint(int64(msg.CommitLSN)) - p.commitLock = false + p.commitLock = nil case *pglogrepl.RelationMessage: // treat all relation messages as corresponding to parent if partitioned. msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) @@ -483,7 +490,10 @@ func (p *PostgresCDCSource) processInsertMessage( } return &model.InsertRecord{ - CheckpointID: int64(lsn), + BaseRecord: model.BaseRecord{ + CheckpointID: int64(lsn), + CommitTime: p.commitTime(), + }, Items: items, DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, @@ -524,7 +534,10 @@ func (p *PostgresCDCSource) processUpdateMessage( } return &model.UpdateRecord{ - CheckpointID: int64(lsn), + BaseRecord: model.BaseRecord{ + CheckpointID: int64(lsn), + CommitTime: p.commitTime(), + }, OldItems: oldItems, NewItems: newItems, DestinationTableName: p.tableNameMapping[tableName].Name, @@ -561,7 +574,10 @@ func (p *PostgresCDCSource) processDeleteMessage( } return &model.DeleteRecord{ - CheckpointID: int64(lsn), + BaseRecord: model.BaseRecord{ + CheckpointID: int64(lsn), + CommitTime: p.commitTime(), + }, Items: items, DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, @@ -762,8 +778,11 @@ func (p *PostgresCDCSource) processRelationMessage( // only log audit if there is actionable delta if len(schemaDelta.AddedColumns) > 0 { rec := &model.RelationRecord{ + BaseRecord: model.BaseRecord{ + CheckpointID: int64(lsn), + CommitTime: p.commitTime(), + }, TableSchemaDelta: schemaDelta, - CheckpointID: int64(lsn), } return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 50796109f3..a56c06e0b7 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -47,9 +47,12 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { PkeyColVal: [32]byte(pkeyColVal), } rec := &model.InsertRecord{ + BaseRecord: model.BaseRecord{ + CheckpointID: 1, + CommitTime: time.Now(), + }, SourceTableName: "test_src_tbl", DestinationTableName: "test_dst_tbl", - CheckpointID: 1, CommitID: 2, Items: &model.RecordItems{ ColToValIdx: map[string]int{ diff --git a/flow/e2e/kafka/kafka_test.go b/flow/e2e/kafka/kafka_test.go index 3670706281..52fbe98b0f 100644 --- a/flow/e2e/kafka/kafka_test.go +++ b/flow/e2e/kafka/kafka_test.go @@ -92,7 +92,7 @@ func (s KafkaSuite) TestSimple() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: srcTableName, + FlowJobName: e2e.AddSuffix(s, "kasimple"), TableNameMapping: map[string]string{srcTableName: "katest"}, Destination: s.Peer(), } diff --git a/flow/model/model.go b/flow/model/model.go index 06f56ec2f3..55e3334f94 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -49,6 +49,7 @@ type PullRecordsRequest struct { type Record interface { GetCheckpointID() int64 + GetCommitTime() time.Time GetDestinationTableName() string GetSourceTableName() string // get columns and values for the record @@ -74,23 +75,33 @@ func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) *ToJSONOptions { } } +type BaseRecord struct { + // CheckpointID is the ID of the record. + CheckpointID int64 `json:"checkpointId"` + // CommitTime from BeginMessage + CommitTime time.Time `json:"commitTime"` +} + type InsertRecord struct { + BaseRecord // Name of the source table SourceTableName string // Name of the destination table DestinationTableName string - // CheckpointID is the ID of the record. - CheckpointID int64 // CommitID is the ID of the commit corresponding to this record. CommitID int64 // Items is a map of column name to value. Items *RecordItems } -func (r *InsertRecord) GetCheckpointID() int64 { +func (r *BaseRecord) GetCheckpointID() int64 { return r.CheckpointID } +func (r *BaseRecord) GetCommitTime() time.Time { + return r.CommitTime +} + func (r *InsertRecord) GetDestinationTableName() string { return r.DestinationTableName } @@ -104,10 +115,9 @@ func (r *InsertRecord) GetItems() *RecordItems { } type UpdateRecord struct { + BaseRecord // Name of the source table SourceTableName string - // CheckpointID is the ID of the record. - CheckpointID int64 // Name of the destination table DestinationTableName string // OldItems is a map of column name to value. @@ -118,10 +128,6 @@ type UpdateRecord struct { UnchangedToastColumns map[string]struct{} } -func (r *UpdateRecord) GetCheckpointID() int64 { - return r.CheckpointID -} - func (r *UpdateRecord) GetDestinationTableName() string { return r.DestinationTableName } @@ -135,22 +141,17 @@ func (r *UpdateRecord) GetItems() *RecordItems { } type DeleteRecord struct { + BaseRecord // Name of the source table SourceTableName string // Name of the destination table DestinationTableName string - // CheckpointID is the ID of the record. - CheckpointID int64 // Items is a map of column name to value. Items *RecordItems // unchanged toast columns, filled from latest UpdateRecord UnchangedToastColumns map[string]struct{} } -func (r *DeleteRecord) GetCheckpointID() int64 { - return r.CheckpointID -} - func (r *DeleteRecord) GetDestinationTableName() string { return r.DestinationTableName } @@ -219,14 +220,10 @@ type NormalizeResponse struct { // being clever and passing the delta back as a regular record instead of heavy CDC refactoring. type RelationRecord struct { - CheckpointID int64 `json:"checkpointId"` + BaseRecord TableSchemaDelta *protos.TableSchemaDelta `json:"tableSchemaDelta"` } -func (r *RelationRecord) GetCheckpointID() int64 { - return r.CheckpointID -} - func (r *RelationRecord) GetDestinationTableName() string { return r.TableSchemaDelta.DstTableName } diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 0044e56371..313b4cacfb 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -217,6 +217,8 @@ func LuaRecordIndex(ls *lua.LState) int { } case "checkpoint": ls.Push(LuaI64.New(ls, record.GetCheckpointID())) + case "commit_time": + ls.Push(LuaTime.New(ls, record.GetCommitTime())) case "target": ls.Push(lua.LString(record.GetDestinationTableName())) case "source":