diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9d98f53136..bbcea455c8 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -274,11 +274,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, monitoring.CDCBatchInfo{ - BatchID: syncBatchID + 1, - RowsInBatch: 0, - BatchStartLSN: pglogrepl.LSN(recordBatch.GetFirstCheckpoint()), - BatchEndlSN: 0, - StartTime: startTime, + BatchID: syncBatchID + 1, + RowsInBatch: 0, + BatchEndlSN: 0, + StartTime: startTime, }) if err != nil { return nil, err diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index e18ab4e318..b40c1ed765 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -490,8 +490,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro( syncBatchID int64, ) (*model.SyncResponse, error) { tableNameRowsMapping := make(map[string]uint32) - first := true - var firstCP int64 = 0 recordStream := model.NewQRecordStream(1 << 20) err := recordStream.SetSchema(&model.QRecordSchema{ Fields: []*model.QField{ @@ -649,11 +647,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro( return nil, fmt.Errorf("record type %T not supported", r) } - if first { - firstCP = record.GetCheckPointID() - first = false - } - entries[0] = qvalue.QValue{ Kind: qvalue.QValueKindString, Value: uuid.New().String(), @@ -703,11 +696,10 @@ func (c *BigQueryConnector) syncRecordsViaAvro( c.logger.Info(fmt.Sprintf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)) return &model.SyncResponse{ - FirstSyncedCheckPointID: firstCP, - LastSyncedCheckPointID: lastCP, - NumRecordsSynced: int64(numRecords), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCP, + NumRecordsSynced: int64(numRecords), + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 38802c26e3..ee9d9c8012 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -269,10 +269,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S rowsSynced := int64(numRecords) return &model.SyncResponse{ - FirstSyncedCheckPointID: batch.GetFirstCheckpoint(), - LastSyncedCheckPointID: lastCheckpoint, - NumRecordsSynced: rowsSynced, - TableNameRowsMapping: make(map[string]uint32), + LastSyncedCheckPointID: lastCheckpoint, + NumRecordsSynced: rowsSynced, + TableNameRowsMapping: make(map[string]uint32), }, nil } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 6f0deaeb63..e5bec86b06 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -277,9 +277,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S records := make([][]interface{}, 0) tableNameRowsMapping := make(map[string]uint32) - first := true - var firstCP int64 = 0 - for record := range req.Records.GetRecords() { switch typedRecord := record.(type) { case *model.InsertRecord: @@ -340,18 +337,12 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S default: return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord) } - - if first { - firstCP = record.GetCheckPointID() - first = false - } } if len(records) == 0 { return &model.SyncResponse{ - FirstSyncedCheckPointID: 0, - LastSyncedCheckPointID: 0, - NumRecordsSynced: 0, + LastSyncedCheckPointID: 0, + NumRecordsSynced: 0, }, nil } @@ -397,11 +388,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } return &model.SyncResponse{ - FirstSyncedCheckPointID: firstCP, - LastSyncedCheckPointID: lastCP, - NumRecordsSynced: int64(len(records)), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCP, + NumRecordsSynced: int64(len(records)), + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index a91ba98b70..7dd102a305 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -239,10 +239,9 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes } return &model.SyncResponse{ - FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(), - LastSyncedCheckPointID: lastCheckpoint, - NumRecordsSynced: int64(numRecords), - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCheckpoint, + NumRecordsSynced: int64(numRecords), + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index ef05d84127..f854647e98 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -567,11 +567,10 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( } return &model.SyncResponse{ - FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(), - LastSyncedCheckPointID: lastCheckpoint, - NumRecordsSynced: int64(numRecords), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCheckpoint, + NumRecordsSynced: int64(numRecords), + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index c8082ca858..857a33c398 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -17,11 +17,10 @@ import ( ) type CDCBatchInfo struct { - BatchID int64 - RowsInBatch uint32 - BatchStartLSN pglogrepl.LSN - BatchEndlSN pglogrepl.LSN - StartTime time.Time + BatchID int64 + RowsInBatch uint32 + BatchEndlSN pglogrepl.LSN + StartTime time.Time } func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error { @@ -61,8 +60,8 @@ func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName str _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn, start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`, - flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, - uint64(batchInfo.BatchStartLSN), uint64(batchInfo.BatchEndlSN), batchInfo.StartTime) + flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, 0, + uint64(batchInfo.BatchEndlSN), batchInfo.StartTime) if err != nil { return fmt.Errorf("error while inserting batch into cdc_batch: %w", err) } diff --git a/flow/model/model.go b/flow/model/model.go index 8818d7610f..ecc6c0fc26 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -319,8 +319,6 @@ type CDCRecordStream struct { SchemaDeltas chan *protos.TableSchemaDelta // Relation message mapping RelationMessageMapping chan *RelationMessageMapping - // firstCheckPointID is the first ID of the commit that corresponds to this batch. - firstCheckPointID atomic.Int64 // Indicates if the last checkpoint has been set. lastCheckpointSet bool // lastCheckPointID is the last ID of the commit that corresponds to this batch. @@ -338,13 +336,10 @@ func NewCDCRecordStream() *CDCRecordStream { RelationMessageMapping: make(chan *RelationMessageMapping, 1), lastCheckpointSet: false, lastCheckPointID: atomic.Int64{}, - firstCheckPointID: atomic.Int64{}, } } func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) { - r.firstCheckPointID.CompareAndSwap(0, val) - // TODO update with https://github.com/golang/go/issues/63999 once implemented // r.lastCheckPointID.Max(val) oldLast := r.lastCheckPointID.Load() @@ -353,10 +348,6 @@ func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) { } } -func (r *CDCRecordStream) GetFirstCheckpoint() int64 { - return r.firstCheckPointID.Load() -} - func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) { if !r.lastCheckpointSet { return 0, errors.New("last checkpoint not set, stream is still active") @@ -445,8 +436,6 @@ type NormalizeRecordsRequest struct { } type SyncResponse struct { - // FirstSyncedCheckPointID is the first ID that was synced. - FirstSyncedCheckPointID int64 // LastSyncedCheckPointID is the last ID that was synced. LastSyncedCheckPointID int64 // NumRecordsSynced is the number of records that were synced.