Skip to content

Commit

Permalink
add CommitTime to cdc messages, expose to script
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 20, 2024
1 parent 9ba55ca commit e656775
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 33 deletions.
39 changes: 28 additions & 11 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type PostgresCDCSource struct {
slot string
publication string
typeMap *pgtype.Map
commitLock bool
commitLock *pglogrepl.BeginMessage

// for partitioned tables, maps child relid to parent relid
childToParentRelIDMapping map[uint32]uint32
Expand Down Expand Up @@ -75,7 +75,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *
publication: cdcConfig.Publication,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
typeMap: pgtype.NewMap(),
commitLock: false,
commitLock: nil,
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
}
}

if !p.commitLock {
if p.commitLock == nil {
if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) {
return nil
}
Expand All @@ -208,7 +208,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len()))

if !p.commitLock {
if p.commitLock == nil {
p.logger.Info(
fmt.Sprintf("no commit lock, returning currently accumulated records - %d",
cdcRecordsStorage.Len()))
Expand Down Expand Up @@ -241,7 +241,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
}

if err != nil && !p.commitLock {
if err != nil && p.commitLock == nil {
if pgconn.Timeout(err) {
p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d",
cdcRecordsStorage.Len()))
Expand Down Expand Up @@ -408,6 +408,13 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
}
}

func (p *PostgresCDCSource) commitTime() time.Time {
if p.commitLock != nil {
return p.commitLock.CommitTime
}
return time.Time{}
}

func (p *PostgresCDCSource) processMessage(
ctx context.Context,
batch *model.CDCRecordStream,
Expand All @@ -423,7 +430,7 @@ func (p *PostgresCDCSource) processMessage(
case *pglogrepl.BeginMessage:
p.logger.Debug(fmt.Sprintf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid))
p.logger.Debug("Locking PullRecords at BeginMessage, awaiting CommitMessage")
p.commitLock = true
p.commitLock = msg
case *pglogrepl.InsertMessage:
return p.processInsertMessage(xld.WALStart, msg)
case *pglogrepl.UpdateMessage:
Expand All @@ -435,7 +442,7 @@ func (p *PostgresCDCSource) processMessage(
p.logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN))
batch.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = false
p.commitLock = nil
case *pglogrepl.RelationMessage:
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)
Expand Down Expand Up @@ -483,7 +490,10 @@ func (p *PostgresCDCSource) processInsertMessage(
}

return &model.InsertRecord{
CheckpointID: int64(lsn),
BaseRecord: model.BaseRecord{
CheckpointID: int64(lsn),
CommitTime: p.commitTime(),
},
Items: items,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
Expand Down Expand Up @@ -524,7 +534,10 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

return &model.UpdateRecord{
CheckpointID: int64(lsn),
BaseRecord: model.BaseRecord{
CheckpointID: int64(lsn),
CommitTime: p.commitTime(),
},
OldItems: oldItems,
NewItems: newItems,
DestinationTableName: p.tableNameMapping[tableName].Name,
Expand Down Expand Up @@ -561,7 +574,10 @@ func (p *PostgresCDCSource) processDeleteMessage(
}

return &model.DeleteRecord{
CheckpointID: int64(lsn),
BaseRecord: model.BaseRecord{
CheckpointID: int64(lsn),
CommitTime: p.commitTime(),
},
Items: items,
DestinationTableName: p.tableNameMapping[tableName].Name,
SourceTableName: tableName,
Expand Down Expand Up @@ -762,8 +778,9 @@ func (p *PostgresCDCSource) processRelationMessage(
// only log audit if there is actionable delta
if len(schemaDelta.AddedColumns) > 0 {
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckpointID: int64(lsn),
CommitTime: p.commitTime(),
TableSchemaDelta: schemaDelta,
}
return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) {
PkeyColVal: [32]byte(pkeyColVal),
}
rec := &model.InsertRecord{
BaseRecord: model.BaseRecord{
CheckpointID: 1,
CommitTime: time.Now(),
},
SourceTableName: "test_src_tbl",
DestinationTableName: "test_dst_tbl",
CheckpointID: 1,
CommitID: 2,
Items: &model.RecordItems{
ColToValIdx: map[string]int{
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s KafkaSuite) TestSimple() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: srcTableName,
FlowJobName: e2e.AddSuffix(s, "kasimple"),
TableNameMapping: map[string]string{srcTableName: "katest"},
Destination: s.Peer(),
}
Expand Down
37 changes: 17 additions & 20 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PullRecordsRequest struct {

type Record interface {
GetCheckpointID() int64
GetCommitTime() time.Time
GetDestinationTableName() string
GetSourceTableName() string
// get columns and values for the record
Expand All @@ -74,23 +75,33 @@ func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) *ToJSONOptions {
}
}

type BaseRecord struct {
// CheckpointID is the ID of the record.
CheckpointID int64 `json:"checkpointId"`
// CommitTime from BeginMessage
CommitTime time.Time `json:"commitTime"`
}

type InsertRecord struct {
BaseRecord
// Name of the source table
SourceTableName string
// Name of the destination table
DestinationTableName string
// CheckpointID is the ID of the record.
CheckpointID int64
// CommitID is the ID of the commit corresponding to this record.
CommitID int64
// Items is a map of column name to value.
Items *RecordItems
}

func (r *InsertRecord) GetCheckpointID() int64 {
func (r *BaseRecord) GetCheckpointID() int64 {
return r.CheckpointID
}

func (r *BaseRecord) GetCommitTime() time.Time {
return r.CommitTime
}

func (r *InsertRecord) GetDestinationTableName() string {
return r.DestinationTableName
}
Expand All @@ -104,10 +115,9 @@ func (r *InsertRecord) GetItems() *RecordItems {
}

type UpdateRecord struct {
BaseRecord
// Name of the source table
SourceTableName string
// CheckpointID is the ID of the record.
CheckpointID int64
// Name of the destination table
DestinationTableName string
// OldItems is a map of column name to value.
Expand All @@ -118,10 +128,6 @@ type UpdateRecord struct {
UnchangedToastColumns map[string]struct{}
}

func (r *UpdateRecord) GetCheckpointID() int64 {
return r.CheckpointID
}

func (r *UpdateRecord) GetDestinationTableName() string {
return r.DestinationTableName
}
Expand All @@ -135,22 +141,17 @@ func (r *UpdateRecord) GetItems() *RecordItems {
}

type DeleteRecord struct {
BaseRecord
// Name of the source table
SourceTableName string
// Name of the destination table
DestinationTableName string
// CheckpointID is the ID of the record.
CheckpointID int64
// Items is a map of column name to value.
Items *RecordItems
// unchanged toast columns, filled from latest UpdateRecord
UnchangedToastColumns map[string]struct{}
}

func (r *DeleteRecord) GetCheckpointID() int64 {
return r.CheckpointID
}

func (r *DeleteRecord) GetDestinationTableName() string {
return r.DestinationTableName
}
Expand Down Expand Up @@ -219,14 +220,10 @@ type NormalizeResponse struct {

// being clever and passing the delta back as a regular record instead of heavy CDC refactoring.
type RelationRecord struct {
CheckpointID int64 `json:"checkpointId"`
BaseRecord
TableSchemaDelta *protos.TableSchemaDelta `json:"tableSchemaDelta"`
}

func (r *RelationRecord) GetCheckpointID() int64 {
return r.CheckpointID
}

func (r *RelationRecord) GetDestinationTableName() string {
return r.TableSchemaDelta.DstTableName
}
Expand Down
2 changes: 2 additions & 0 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func LuaRecordIndex(ls *lua.LState) int {
}
case "checkpoint":
ls.Push(LuaI64.New(ls, record.GetCheckpointID()))
case "commit_time":
ls.Push(LuaTime.New(ls, record.GetCommitTime()))
case "target":
ls.Push(lua.LString(record.GetDestinationTableName()))
case "source":
Expand Down

0 comments on commit e656775

Please sign in to comment.