Skip to content

Commit

Permalink
Remove FirstCheckpointID
Browse files Browse the repository at this point in the history
This value is not currently being computed consistently,
to be correct it should be taking the minimum checkpoint id, not the first

Ultimately it only serves for being written to monitoring,
so remove it
  • Loading branch information
serprex committed Dec 13, 2023
1 parent f33c34f commit bc4c1f5
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 64 deletions.
9 changes: 4 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchStartLSN: pglogrepl.LSN(recordBatch.GetFirstCheckpoint()),
BatchEndlSN: 0,
StartTime: startTime,
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
return nil, err
Expand Down
16 changes: 4 additions & 12 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Expand Down Expand Up @@ -649,11 +647,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("record type %T not supported", r)
}

if first {
firstCP = record.GetCheckPointID()
first = false
}

entries[0] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: uuid.New().String(),
Expand Down Expand Up @@ -703,11 +696,10 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
c.logger.Info(fmt.Sprintf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName))

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

rowsSynced := int64(numRecords)
return &model.SyncResponse{
FirstSyncedCheckPointID: batch.GetFirstCheckpoint(),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: rowsSynced,
TableNameRowsMapping: make(map[string]uint32),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: rowsSynced,
TableNameRowsMapping: make(map[string]uint32),
}, nil
}

Expand Down
22 changes: 6 additions & 16 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
records := make([][]interface{}, 0)
tableNameRowsMapping := make(map[string]uint32)

first := true
var firstCP int64 = 0

for record := range req.Records.GetRecords() {
switch typedRecord := record.(type) {
case *model.InsertRecord:
Expand Down Expand Up @@ -340,18 +337,12 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
default:
return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord)
}

if first {
firstCP = record.GetCheckPointID()
first = false
}
}

if len(records) == 0 {
return &model.SyncResponse{
FirstSyncedCheckPointID: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
}, nil
}

Expand Down Expand Up @@ -397,11 +388,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(len(records)),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(len(records)),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,9 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
}

return &model.SyncResponse{
FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,11 +567,10 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
}

return &model.SyncResponse{
FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
13 changes: 6 additions & 7 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (
)

type CDCBatchInfo struct {
BatchID int64
RowsInBatch uint32
BatchStartLSN pglogrepl.LSN
BatchEndlSN pglogrepl.LSN
StartTime time.Time
BatchID int64
RowsInBatch uint32
BatchEndlSN pglogrepl.LSN
StartTime time.Time
}

func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error {
Expand Down Expand Up @@ -61,8 +60,8 @@ func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName str
_, err := pool.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn,
start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`,
flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch,
uint64(batchInfo.BatchStartLSN), uint64(batchInfo.BatchEndlSN), batchInfo.StartTime)
flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, 0,
uint64(batchInfo.BatchEndlSN), batchInfo.StartTime)
if err != nil {
return fmt.Errorf("error while inserting batch into cdc_batch: %w", err)
}
Expand Down
11 changes: 0 additions & 11 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ type CDCRecordStream struct {
SchemaDeltas chan *protos.TableSchemaDelta
// Relation message mapping
RelationMessageMapping chan *RelationMessageMapping
// firstCheckPointID is the first ID of the commit that corresponds to this batch.
firstCheckPointID atomic.Int64
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckPointID is the last ID of the commit that corresponds to this batch.
Expand All @@ -338,13 +336,10 @@ func NewCDCRecordStream() *CDCRecordStream {
RelationMessageMapping: make(chan *RelationMessageMapping, 1),
lastCheckpointSet: false,
lastCheckPointID: atomic.Int64{},
firstCheckPointID: atomic.Int64{},
}
}

func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
r.firstCheckPointID.CompareAndSwap(0, val)

// TODO update with https://github.com/golang/go/issues/63999 once implemented
// r.lastCheckPointID.Max(val)
oldLast := r.lastCheckPointID.Load()
Expand All @@ -353,10 +348,6 @@ func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
}
}

func (r *CDCRecordStream) GetFirstCheckpoint() int64 {
return r.firstCheckPointID.Load()
}

func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) {
if !r.lastCheckpointSet {
return 0, errors.New("last checkpoint not set, stream is still active")
Expand Down Expand Up @@ -445,8 +436,6 @@ type NormalizeRecordsRequest struct {
}

type SyncResponse struct {
// FirstSyncedCheckPointID is the first ID that was synced.
FirstSyncedCheckPointID int64
// LastSyncedCheckPointID is the last ID that was synced.
LastSyncedCheckPointID int64
// NumRecordsSynced is the number of records that were synced.
Expand Down

0 comments on commit bc4c1f5

Please sign in to comment.