Skip to content

Commit

Permalink
Make cdcRecordsStorage Len threadsafe (#1301)
Browse files Browse the repository at this point in the history
Since we use it in HeartbeatRoutine
  • Loading branch information
serprex authored Feb 15, 2024
1 parent 59d7ee2 commit f4be58b
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"os"
"runtime"
"sync/atomic"
"time"

"github.com/cockroachdb/pebble"
Expand All @@ -32,7 +33,7 @@ func encVal(val any) ([]byte, error) {
type cdcRecordsStore struct {
inMemoryRecords map[model.TableWithPkey]model.Record
pebbleDB *pebble.DB
numRecords int
numRecords atomic.Int32
flowJobName string
dbFolderName string
numRecordsSwitchThreshold int
Expand All @@ -45,7 +46,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore {
return &cdcRecordsStore{
inMemoryRecords: make(map[model.TableWithPkey]model.Record),
pebbleDB: nil,
numRecords: 0,
numRecords: atomic.Int32{},
flowJobName: flowJobName,
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)),
numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(),
Expand Down Expand Up @@ -138,7 +139,7 @@ func (c *cdcRecordsStore) Set(logger log.Logger, key *model.TableWithPkey, rec m
}
}
}
c.numRecords++
c.numRecords.Add(1)
return nil
}

Expand Down Expand Up @@ -181,12 +182,12 @@ func (c *cdcRecordsStore) Get(key model.TableWithPkey) (model.Record, bool, erro
return nil, false, nil
}

func (c *cdcRecordsStore) IsEmpty() bool {
return c.numRecords == 0
func (c *cdcRecordsStore) Len() int {
return int(c.numRecords.Load())
}

func (c *cdcRecordsStore) Len() int {
return c.numRecords
func (c *cdcRecordsStore) IsEmpty() bool {
return c.Len() == 0
}

func (c *cdcRecordsStore) Close() error {
Expand Down

0 comments on commit f4be58b

Please sign in to comment.