Skip to content

Commit

Permalink
add disk spill also as configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 13, 2023
1 parent ce70006 commit cc41a7b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
12 changes: 3 additions & 9 deletions flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,11 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/cockroachdb/pebble"
)

const (
/** begin with in-memory store, and then switch to Pebble DB
when the number of stored records crosses 100k
**/
defaultNumRecordsSwitchThreshold = 1_00_000
)

func encVal(val any) ([]byte, error) {
buf := new(bytes.Buffer)
enc := gob.NewEncoder(buf)
Expand All @@ -49,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore {
numRecords: 0,
flowJobName: flowJobName,
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, util.RandomString(8)),
numRecordsSwitchThreshold: defaultNumRecordsSwitchThreshold,
numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(),
}
}

Expand Down Expand Up @@ -80,7 +74,7 @@ func (c *cdcRecordsStore) initPebbleDB() error {

func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error {
_, ok := c.inMemoryRecords[key]
if ok || len(c.inMemoryRecords) < defaultNumRecordsSwitchThreshold {
if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold {
c.inMemoryRecords[key] = rec
} else {
if c.pebbleDB == nil {
Expand Down
5 changes: 5 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ func GetPeerDBCDCIdleTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_THRESHOLD
func GetPeerDBCDCDiskSpillThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)
}

0 comments on commit cc41a7b

Please sign in to comment.