diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 914d5e9159..d25181e75a 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -9,6 +9,7 @@ import ( "math/big" "os" "runtime" + "sync/atomic" "time" "github.com/cockroachdb/pebble" @@ -32,7 +33,7 @@ func encVal(val any) ([]byte, error) { type cdcRecordsStore struct { inMemoryRecords map[model.TableWithPkey]model.Record pebbleDB *pebble.DB - numRecords int + numRecords atomic.Int32 flowJobName string dbFolderName string numRecordsSwitchThreshold int @@ -45,7 +46,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { return &cdcRecordsStore{ inMemoryRecords: make(map[model.TableWithPkey]model.Record), pebbleDB: nil, - numRecords: 0, + numRecords: atomic.Int32{}, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(), @@ -138,7 +139,7 @@ func (c *cdcRecordsStore) Set(logger log.Logger, key *model.TableWithPkey, rec m } } } - c.numRecords++ + c.numRecords.Add(1) return nil } @@ -181,12 +182,12 @@ func (c *cdcRecordsStore) Get(key model.TableWithPkey) (model.Record, bool, erro return nil, false, nil } -func (c *cdcRecordsStore) IsEmpty() bool { - return c.numRecords == 0 +func (c *cdcRecordsStore) Len() int { + return int(c.numRecords.Load()) } -func (c *cdcRecordsStore) Len() int { - return c.numRecords +func (c *cdcRecordsStore) IsEmpty() bool { + return c.Len() == 0 } func (c *cdcRecordsStore) Close() error {