diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 7ecf248c0a..16f65655c5 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -123,14 +123,16 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa }() for destinationTableName, rowCounts := range tableNameRowsMapping { - numRows := rowCounts.InsertCount + rowCounts.UpdateCount + rowCounts.DeleteCount + inserts := rowCounts.InsertCount.Load() + updates := rowCounts.UpdateCount.Load() + deletes := rowCounts.DeleteCount.Load() _, err = insertBatchTablesTx.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batch_table (flow_name,batch_id,destination_table_name,num_rows, insert_count,update_count,delete_count) VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`, - flowJobName, batchID, destinationTableName, numRows, - rowCounts.InsertCount, rowCounts.UpdateCount, rowCounts.DeleteCount) + flowJobName, batchID, destinationTableName, + inserts+updates+deletes, inserts, updates, deletes) if err != nil { return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err) } diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 483d0f58ee..00688cc2ce 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -130,11 +130,7 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts { tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps)) for _, mapping := range tableMaps { - tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{ - InsertCount: 0, - UpdateCount: 0, - DeleteCount: 0, - } + tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{} } return tableNameRowsMapping diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 0a0c26d2e4..576074db3b 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -1,13 +1,15 @@ package model import ( + "sync/atomic" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) type RecordTypeCounts struct { - InsertCount int - UpdateCount int - DeleteCount int + InsertCount atomic.Int32 + UpdateCount atomic.Int32 + DeleteCount atomic.Int32 } type QRecordStream struct { diff --git a/flow/model/record.go b/flow/model/record.go index 30f3493cf4..9b728ff705 100644 --- a/flow/model/record.go +++ b/flow/model/record.go @@ -58,7 +58,7 @@ func (r *InsertRecord[T]) GetItems() T { func (r *InsertRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.InsertCount++ + recordCount.InsertCount.Add(1) } } @@ -91,7 +91,7 @@ func (r *UpdateRecord[T]) GetItems() T { func (r *UpdateRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.UpdateCount++ + recordCount.UpdateCount.Add(1) } } @@ -122,7 +122,7 @@ func (r *DeleteRecord[T]) GetItems() T { func (r *DeleteRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.DeleteCount++ + recordCount.DeleteCount.Add(1) } }