From bc4c1f5196b3c7d24fdf969543e00eff511e5f8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 13 Dec 2023 14:12:55 +0000 Subject: [PATCH] Remove FirstCheckpointID This value is not currently being computed consistently, to be correct it should be taking the minimum checkpoint id, not the first Ultimately it only serves for being written to monitoring, so remove it --- flow/activities/flowable.go | 9 ++++---- flow/connectors/bigquery/bigquery.go | 16 ++++---------- flow/connectors/eventhub/eventhub.go | 7 +++--- flow/connectors/postgres/postgres.go | 22 +++++-------------- flow/connectors/s3/s3.go | 7 +++--- flow/connectors/snowflake/snowflake.go | 9 ++++---- .../connectors/utils/monitoring/monitoring.go | 13 +++++------ flow/model/model.go | 11 ---------- 8 files changed, 30 insertions(+), 64 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9d98f53136..bbcea455c8 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -274,11 +274,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, monitoring.CDCBatchInfo{ - BatchID: syncBatchID + 1, - RowsInBatch: 0, - BatchStartLSN: pglogrepl.LSN(recordBatch.GetFirstCheckpoint()), - BatchEndlSN: 0, - StartTime: startTime, + BatchID: syncBatchID + 1, + RowsInBatch: 0, + BatchEndlSN: 0, + StartTime: startTime, }) if err != nil { return nil, err diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index e18ab4e318..b40c1ed765 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -490,8 +490,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro( syncBatchID int64, ) (*model.SyncResponse, error) { tableNameRowsMapping := make(map[string]uint32) - first := true - var firstCP int64 = 0 recordStream := model.NewQRecordStream(1 << 20) err := recordStream.SetSchema(&model.QRecordSchema{ Fields: []*model.QField{ @@ -649,11 +647,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro( return nil, fmt.Errorf("record type %T not supported", r) } - if first { - firstCP = record.GetCheckPointID() - first = false - } - entries[0] = qvalue.QValue{ Kind: qvalue.QValueKindString, Value: uuid.New().String(), @@ -703,11 +696,10 @@ func (c *BigQueryConnector) syncRecordsViaAvro( c.logger.Info(fmt.Sprintf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)) return &model.SyncResponse{ - FirstSyncedCheckPointID: firstCP, - LastSyncedCheckPointID: lastCP, - NumRecordsSynced: int64(numRecords), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCP, + NumRecordsSynced: int64(numRecords), + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 38802c26e3..ee9d9c8012 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -269,10 +269,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S rowsSynced := int64(numRecords) return &model.SyncResponse{ - FirstSyncedCheckPointID: batch.GetFirstCheckpoint(), - LastSyncedCheckPointID: lastCheckpoint, - NumRecordsSynced: rowsSynced, - TableNameRowsMapping: make(map[string]uint32), + LastSyncedCheckPointID: lastCheckpoint, + NumRecordsSynced: rowsSynced, + TableNameRowsMapping: make(map[string]uint32), }, nil } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 6f0deaeb63..e5bec86b06 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -277,9 +277,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S records := make([][]interface{}, 0) tableNameRowsMapping := make(map[string]uint32) - first := true - var firstCP int64 = 0 - for record := range req.Records.GetRecords() { switch typedRecord := record.(type) { case *model.InsertRecord: @@ -340,18 +337,12 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S default: return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord) } - - if first { - firstCP = record.GetCheckPointID() - first = false - } } if len(records) == 0 { return &model.SyncResponse{ - FirstSyncedCheckPointID: 0, - LastSyncedCheckPointID: 0, - NumRecordsSynced: 0, + LastSyncedCheckPointID: 0, + NumRecordsSynced: 0, }, nil } @@ -397,11 +388,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } return &model.SyncResponse{ - FirstSyncedCheckPointID: firstCP, - LastSyncedCheckPointID: lastCP, - NumRecordsSynced: int64(len(records)), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCP, + NumRecordsSynced: int64(len(records)), + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index a91ba98b70..7dd102a305 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -239,10 +239,9 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes } return &model.SyncResponse{ - FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(), - LastSyncedCheckPointID: lastCheckpoint, - NumRecordsSynced: int64(numRecords), - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCheckpoint, + NumRecordsSynced: int64(numRecords), + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index ef05d84127..f854647e98 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -567,11 +567,10 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( } return &model.SyncResponse{ - FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(), - LastSyncedCheckPointID: lastCheckpoint, - NumRecordsSynced: int64(numRecords), - CurrentSyncBatchID: syncBatchID, - TableNameRowsMapping: tableNameRowsMapping, + LastSyncedCheckPointID: lastCheckpoint, + NumRecordsSynced: int64(numRecords), + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, }, nil } diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index c8082ca858..857a33c398 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -17,11 +17,10 @@ import ( ) type CDCBatchInfo struct { - BatchID int64 - RowsInBatch uint32 - BatchStartLSN pglogrepl.LSN - BatchEndlSN pglogrepl.LSN - StartTime time.Time + BatchID int64 + RowsInBatch uint32 + BatchEndlSN pglogrepl.LSN + StartTime time.Time } func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error { @@ -61,8 +60,8 @@ func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName str _, err := pool.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn, start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`, - flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, - uint64(batchInfo.BatchStartLSN), uint64(batchInfo.BatchEndlSN), batchInfo.StartTime) + flowJobName, batchInfo.BatchID, batchInfo.RowsInBatch, 0, + uint64(batchInfo.BatchEndlSN), batchInfo.StartTime) if err != nil { return fmt.Errorf("error while inserting batch into cdc_batch: %w", err) } diff --git a/flow/model/model.go b/flow/model/model.go index 8818d7610f..ecc6c0fc26 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -319,8 +319,6 @@ type CDCRecordStream struct { SchemaDeltas chan *protos.TableSchemaDelta // Relation message mapping RelationMessageMapping chan *RelationMessageMapping - // firstCheckPointID is the first ID of the commit that corresponds to this batch. - firstCheckPointID atomic.Int64 // Indicates if the last checkpoint has been set. lastCheckpointSet bool // lastCheckPointID is the last ID of the commit that corresponds to this batch. @@ -338,13 +336,10 @@ func NewCDCRecordStream() *CDCRecordStream { RelationMessageMapping: make(chan *RelationMessageMapping, 1), lastCheckpointSet: false, lastCheckPointID: atomic.Int64{}, - firstCheckPointID: atomic.Int64{}, } } func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) { - r.firstCheckPointID.CompareAndSwap(0, val) - // TODO update with https://github.com/golang/go/issues/63999 once implemented // r.lastCheckPointID.Max(val) oldLast := r.lastCheckPointID.Load() @@ -353,10 +348,6 @@ func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) { } } -func (r *CDCRecordStream) GetFirstCheckpoint() int64 { - return r.firstCheckPointID.Load() -} - func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) { if !r.lastCheckpointSet { return 0, errors.New("last checkpoint not set, stream is still active") @@ -445,8 +436,6 @@ type NormalizeRecordsRequest struct { } type SyncResponse struct { - // FirstSyncedCheckPointID is the first ID that was synced. - FirstSyncedCheckPointID int64 // LastSyncedCheckPointID is the last ID that was synced. LastSyncedCheckPointID int64 // NumRecordsSynced is the number of records that were synced.