Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FirstCheckpointID #816

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchStartLSN: pglogrepl.LSN(recordBatch.GetFirstCheckpoint()),
BatchEndlSN: 0,
StartTime: startTime,
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
return nil, err
Expand Down
16 changes: 4 additions & 12 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
Expand Down Expand Up @@ -649,11 +647,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("record type %T not supported", r)
}

if first {
firstCP = record.GetCheckPointID()
first = false
}

entries[0] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: uuid.New().String(),
Expand Down Expand Up @@ -703,11 +696,10 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
c.logger.Info(fmt.Sprintf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName))

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

rowsSynced := int64(numRecords)
return &model.SyncResponse{
FirstSyncedCheckPointID: batch.GetFirstCheckpoint(),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: rowsSynced,
TableNameRowsMapping: make(map[string]uint32),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: rowsSynced,
TableNameRowsMapping: make(map[string]uint32),
}, nil
}

Expand Down
22 changes: 6 additions & 16 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
records := make([][]interface{}, 0)
tableNameRowsMapping := make(map[string]uint32)

first := true
var firstCP int64 = 0

for record := range req.Records.GetRecords() {
switch typedRecord := record.(type) {
case *model.InsertRecord:
Expand Down Expand Up @@ -340,18 +337,12 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
default:
return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord)
}

if first {
firstCP = record.GetCheckPointID()
first = false
}
}

if len(records) == 0 {
return &model.SyncResponse{
FirstSyncedCheckPointID: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
}, nil
}

Expand Down Expand Up @@ -397,11 +388,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(len(records)),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(len(records)),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,9 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
}

return &model.SyncResponse{
FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,11 +567,10 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
}

return &model.SyncResponse{
FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(),
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
LastSyncedCheckPointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
}

Expand Down
13 changes: 6 additions & 7 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (
)

type CDCBatchInfo struct {
BatchID int64
RowsInBatch uint32
BatchStartLSN pglogrepl.LSN
BatchEndlSN pglogrepl.LSN
StartTime time.Time
BatchID int64
RowsInBatch uint32
BatchEndlSN pglogrepl.LSN
StartTime time.Time
}

func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error {
Expand Down Expand Up @@ -61,8 +60,8 @@ func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName str
_, err := pool.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn,
start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`,
flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch,
uint64(batchInfo.BatchStartLSN), uint64(batchInfo.BatchEndlSN), batchInfo.StartTime)
flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, 0,
uint64(batchInfo.BatchEndlSN), batchInfo.StartTime)
if err != nil {
return fmt.Errorf("error while inserting batch into cdc_batch: %w", err)
}
Expand Down
11 changes: 0 additions & 11 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ type CDCRecordStream struct {
SchemaDeltas chan *protos.TableSchemaDelta
// Relation message mapping
RelationMessageMapping chan *RelationMessageMapping
// firstCheckPointID is the first ID of the commit that corresponds to this batch.
firstCheckPointID atomic.Int64
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckPointID is the last ID of the commit that corresponds to this batch.
Expand All @@ -338,13 +336,10 @@ func NewCDCRecordStream() *CDCRecordStream {
RelationMessageMapping: make(chan *RelationMessageMapping, 1),
lastCheckpointSet: false,
lastCheckPointID: atomic.Int64{},
firstCheckPointID: atomic.Int64{},
}
}

func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
r.firstCheckPointID.CompareAndSwap(0, val)

// TODO update with https://github.com/golang/go/issues/63999 once implemented
// r.lastCheckPointID.Max(val)
oldLast := r.lastCheckPointID.Load()
Expand All @@ -353,10 +348,6 @@ func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
}
}

func (r *CDCRecordStream) GetFirstCheckpoint() int64 {
return r.firstCheckPointID.Load()
}

func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) {
if !r.lastCheckpointSet {
return 0, errors.New("last checkpoint not set, stream is still active")
Expand Down Expand Up @@ -445,8 +436,6 @@ type NormalizeRecordsRequest struct {
}

type SyncResponse struct {
// FirstSyncedCheckPointID is the first ID that was synced.
FirstSyncedCheckPointID int64
// LastSyncedCheckPointID is the last ID that was synced.
LastSyncedCheckPointID int64
// NumRecordsSynced is the number of records that were synced.
Expand Down
Loading