diff --git a/flow/model/model.go b/flow/model/model.go index 97668102a..4abdcc401 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -6,7 +6,7 @@ import ( "fmt" "math/big" "slices" - "sync" + "sync/atomic" "time" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -319,14 +319,12 @@ type CDCRecordStream struct { SchemaDeltas chan *protos.TableSchemaDelta // Relation message mapping RelationMessageMapping chan *RelationMessageMapping - // Mutex for synchronizing access to the checkpoint fields - checkpointMutex sync.Mutex // firstCheckPointID is the first ID of the commit that corresponds to this batch. - firstCheckPointID int64 + firstCheckPointID atomic.Int64 // Indicates if the last checkpoint has been set. lastCheckpointSet bool // lastCheckPointID is the last ID of the commit that corresponds to this batch. - lastCheckPointID int64 + lastCheckPointID atomic.Int64 // empty signal to indicate if the records are going to be empty or not. emptySignal chan bool } @@ -339,39 +337,31 @@ func NewCDCRecordStream() *CDCRecordStream { emptySignal: make(chan bool, 1), RelationMessageMapping: make(chan *RelationMessageMapping, 1), lastCheckpointSet: false, - lastCheckPointID: 0, - firstCheckPointID: 0, + lastCheckPointID: atomic.Int64{}, + firstCheckPointID: atomic.Int64{}, } } func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) { - r.checkpointMutex.Lock() - defer r.checkpointMutex.Unlock() + r.firstCheckPointID.CompareAndSwap(0, val) - if r.firstCheckPointID == 0 { - r.firstCheckPointID = val - } - - if val > r.lastCheckPointID { - r.lastCheckPointID = val + // TODO update with https://github.com/golang/go/issues/63999 once implemented + // r.lastCheckPointID.Max(val) + oldLast := r.lastCheckPointID.Load() + for oldLast < val && !r.lastCheckPointID.CompareAndSwap(oldLast, val) { + oldLast = r.lastCheckPointID.Load() } } func (r *CDCRecordStream) GetFirstCheckpoint() int64 { - r.checkpointMutex.Lock() - defer r.checkpointMutex.Unlock() - - return r.firstCheckPointID + return r.firstCheckPointID.Load() } func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) { - r.checkpointMutex.Lock() - defer r.checkpointMutex.Unlock() - if !r.lastCheckpointSet { return 0, errors.New("last checkpoint not set, stream is still active") } - return r.lastCheckPointID, nil + return r.lastCheckPointID.Load(), nil } func (r *CDCRecordStream) AddRecord(record Record) {