Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDCRecordStream: replace mutex with atomics #814

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 13 additions & 23 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"math/big"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -319,14 +319,12 @@ type CDCRecordStream struct {
SchemaDeltas chan *protos.TableSchemaDelta
// Relation message mapping
RelationMessageMapping chan *RelationMessageMapping
// Mutex for synchronizing access to the checkpoint fields
checkpointMutex sync.Mutex
// firstCheckPointID is the first ID of the commit that corresponds to this batch.
firstCheckPointID int64
firstCheckPointID atomic.Int64
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckPointID is the last ID of the commit that corresponds to this batch.
lastCheckPointID int64
lastCheckPointID atomic.Int64
// empty signal to indicate if the records are going to be empty or not.
emptySignal chan bool
}
Expand All @@ -339,39 +337,31 @@ func NewCDCRecordStream() *CDCRecordStream {
emptySignal: make(chan bool, 1),
RelationMessageMapping: make(chan *RelationMessageMapping, 1),
lastCheckpointSet: false,
lastCheckPointID: 0,
firstCheckPointID: 0,
lastCheckPointID: atomic.Int64{},
firstCheckPointID: atomic.Int64{},
}
}

func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
r.checkpointMutex.Lock()
defer r.checkpointMutex.Unlock()
r.firstCheckPointID.CompareAndSwap(0, val)

if r.firstCheckPointID == 0 {
r.firstCheckPointID = val
}

if val > r.lastCheckPointID {
r.lastCheckPointID = val
// TODO update with https://github.com/golang/go/issues/63999 once implemented
// r.lastCheckPointID.Max(val)
oldLast := r.lastCheckPointID.Load()
for oldLast < val && !r.lastCheckPointID.CompareAndSwap(oldLast, val) {
oldLast = r.lastCheckPointID.Load()
}
}

func (r *CDCRecordStream) GetFirstCheckpoint() int64 {
r.checkpointMutex.Lock()
defer r.checkpointMutex.Unlock()

return r.firstCheckPointID
return r.firstCheckPointID.Load()
}

func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) {
r.checkpointMutex.Lock()
defer r.checkpointMutex.Unlock()

if !r.lastCheckpointSet {
Copy link
Contributor Author

@serprex serprex Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastCheckpointSet seems kind of funny since it gets written to by Close without synchronization & here seems to only be an assertion. Seems like this struct has a mode between collection/read which would be cleaner as a channel which gets sent first/last checkpoint id at end of pull or something (where we need to somehow communicate back to some process which has been storing a pointer to this from before pull began & is using some external sync to only read first/last after Close)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that channels to communicate most fields in this struct would be a good idea. I only wish channels had a GetOrWait primitive :-)

return 0, errors.New("last checkpoint not set, stream is still active")
}
return r.lastCheckPointID, nil
return r.lastCheckPointID.Load(), nil
}

func (r *CDCRecordStream) AddRecord(record Record) {
Expand Down
Loading