Skip to content

Commit

Permalink
PopulateCountMap: use atomics (#1682)
Browse files Browse the repository at this point in the history
Race existed with parallel queue processing
  • Loading branch information
serprex authored May 7, 2024
1 parent 9967be5 commit 756679e
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
8 changes: 5 additions & 3 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa
}()

for destinationTableName, rowCounts := range tableNameRowsMapping {
numRows := rowCounts.InsertCount + rowCounts.UpdateCount + rowCounts.DeleteCount
inserts := rowCounts.InsertCount.Load()
updates := rowCounts.UpdateCount.Load()
deletes := rowCounts.DeleteCount.Load()
_, err = insertBatchTablesTx.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batch_table
(flow_name,batch_id,destination_table_name,num_rows,
insert_count,update_count,delete_count)
VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`,
flowJobName, batchID, destinationTableName, numRows,
rowCounts.InsertCount, rowCounts.UpdateCount, rowCounts.DeleteCount)
flowJobName, batchID, destinationTableName,
inserts+updates+deletes, inserts, updates, deletes)
if err != nil {
return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err)
}
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,7 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor
func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts {
tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps))
for _, mapping := range tableMaps {
tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{
InsertCount: 0,
UpdateCount: 0,
DeleteCount: 0,
}
tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{}
}

return tableNameRowsMapping
Expand Down
8 changes: 5 additions & 3 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package model

import (
"sync/atomic"

"github.com/PeerDB-io/peer-flow/model/qvalue"
)

type RecordTypeCounts struct {
InsertCount int
UpdateCount int
DeleteCount int
InsertCount atomic.Int32
UpdateCount atomic.Int32
DeleteCount atomic.Int32
}

type QRecordStream struct {
Expand Down
6 changes: 3 additions & 3 deletions flow/model/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *InsertRecord[T]) GetItems() T {
func (r *InsertRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.InsertCount++
recordCount.InsertCount.Add(1)
}
}

Expand Down Expand Up @@ -91,7 +91,7 @@ func (r *UpdateRecord[T]) GetItems() T {
func (r *UpdateRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.UpdateCount++
recordCount.UpdateCount.Add(1)
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func (r *DeleteRecord[T]) GetItems() T {
func (r *DeleteRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.DeleteCount++
recordCount.DeleteCount.Add(1)
}
}

Expand Down

0 comments on commit 756679e

Please sign in to comment.