From 5b486ce76fe6fcb385d8103018ad5bff7bfbd56a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 15 Feb 2024 13:54:53 +0000 Subject: [PATCH] Make cdcRecordsStorage Len threadsafe Since we use it in HeartbeatRoutine --- .../utils/cdc_records/cdc_records_storage.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 914d5e9159..d25181e75a 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -9,6 +9,7 @@ import ( "math/big" "os" "runtime" + "sync/atomic" "time" "github.com/cockroachdb/pebble" @@ -32,7 +33,7 @@ func encVal(val any) ([]byte, error) { type cdcRecordsStore struct { inMemoryRecords map[model.TableWithPkey]model.Record pebbleDB *pebble.DB - numRecords int + numRecords atomic.Int32 flowJobName string dbFolderName string numRecordsSwitchThreshold int @@ -45,7 +46,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { return &cdcRecordsStore{ inMemoryRecords: make(map[model.TableWithPkey]model.Record), pebbleDB: nil, - numRecords: 0, + numRecords: atomic.Int32{}, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(), @@ -138,7 +139,7 @@ func (c *cdcRecordsStore) Set(logger log.Logger, key *model.TableWithPkey, rec m } } } - c.numRecords++ + c.numRecords.Add(1) return nil } @@ -181,12 +182,12 @@ func (c *cdcRecordsStore) Get(key model.TableWithPkey) (model.Record, bool, erro return nil, false, nil } -func (c *cdcRecordsStore) IsEmpty() bool { - return c.numRecords == 0 +func (c *cdcRecordsStore) Len() int { + return int(c.numRecords.Load()) } -func (c *cdcRecordsStore) Len() int { - return c.numRecords +func (c *cdcRecordsStore) IsEmpty() bool { + return c.Len() == 0 } func (c *cdcRecordsStore) Close() error {