From cc41a7b44da5c4491b0dcd3dbb0ecb2731b9bc1f Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 13 Dec 2023 13:48:19 -0500 Subject: [PATCH] add disk spill also as configurable --- .../utils/cdc_records/cdc_records_storage.go | 12 +++--------- flow/peerdbenv/config.go | 5 +++++ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 7b5dd05ef4..5c6fba4d3e 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -12,17 +12,11 @@ import ( "time" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" util "github.com/PeerDB-io/peer-flow/utils" "github.com/cockroachdb/pebble" ) -const ( - /** begin with in-memory store, and then switch to Pebble DB - when the number of stored records crosses 100k - **/ - defaultNumRecordsSwitchThreshold = 1_00_000 -) - func encVal(val any) ([]byte, error) { buf := new(bytes.Buffer) enc := gob.NewEncoder(buf) @@ -49,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { numRecords: 0, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, util.RandomString(8)), - numRecordsSwitchThreshold: defaultNumRecordsSwitchThreshold, + numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(), } } @@ -80,7 +74,7 @@ func (c *cdcRecordsStore) initPebbleDB() error { func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error { _, ok := c.inMemoryRecords[key] - if ok || len(c.inMemoryRecords) < defaultNumRecordsSwitchThreshold { + if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { c.inMemoryRecords[key] = rec } else { if c.pebbleDB == nil { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 6497cf273b..37eefad9b5 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -32,3 +32,8 @@ func GetPeerDBCDCIdleTimeoutSeconds() time.Duration { x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) return time.Duration(x) * time.Second } + +// PEERDB_CDC_DISK_SPILL_THRESHOLD +func GetPeerDBCDCDiskSpillThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000) +}