Skip to content

Commit

Permalink
Merge branch 'main' into qvalue-by-value
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 13, 2023
2 parents d61361e + f33c34f commit 8487187
Showing 1 changed file with 13 additions and 23 deletions.
36 changes: 13 additions & 23 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"math/big"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -319,14 +319,12 @@ type CDCRecordStream struct {
SchemaDeltas chan *protos.TableSchemaDelta
// Relation message mapping
RelationMessageMapping chan *RelationMessageMapping
// Mutex for synchronizing access to the checkpoint fields
checkpointMutex sync.Mutex
// firstCheckPointID is the first ID of the commit that corresponds to this batch.
firstCheckPointID int64
firstCheckPointID atomic.Int64
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckPointID is the last ID of the commit that corresponds to this batch.
lastCheckPointID int64
lastCheckPointID atomic.Int64
// empty signal to indicate if the records are going to be empty or not.
emptySignal chan bool
}
Expand All @@ -339,39 +337,31 @@ func NewCDCRecordStream() *CDCRecordStream {
emptySignal: make(chan bool, 1),
RelationMessageMapping: make(chan *RelationMessageMapping, 1),
lastCheckpointSet: false,
lastCheckPointID: 0,
firstCheckPointID: 0,
lastCheckPointID: atomic.Int64{},
firstCheckPointID: atomic.Int64{},
}
}

func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
r.checkpointMutex.Lock()
defer r.checkpointMutex.Unlock()
r.firstCheckPointID.CompareAndSwap(0, val)

if r.firstCheckPointID == 0 {
r.firstCheckPointID = val
}

if val > r.lastCheckPointID {
r.lastCheckPointID = val
// TODO update with https://github.com/golang/go/issues/63999 once implemented
// r.lastCheckPointID.Max(val)
oldLast := r.lastCheckPointID.Load()
for oldLast < val && !r.lastCheckPointID.CompareAndSwap(oldLast, val) {
oldLast = r.lastCheckPointID.Load()
}
}

func (r *CDCRecordStream) GetFirstCheckpoint() int64 {
r.checkpointMutex.Lock()
defer r.checkpointMutex.Unlock()

return r.firstCheckPointID
return r.firstCheckPointID.Load()
}

func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) {
r.checkpointMutex.Lock()
defer r.checkpointMutex.Unlock()

if !r.lastCheckpointSet {
return 0, errors.New("last checkpoint not set, stream is still active")
}
return r.lastCheckPointID, nil
return r.lastCheckPointID.Load(), nil
}

func (r *CDCRecordStream) AddRecord(record Record) {
Expand Down

0 comments on commit 8487187

Please sign in to comment.