From f8d1a77edd69e76ff26f0d548f4d47584599dc15 Mon Sep 17 00:00:00 2001 From: Alex Gaetano Padula Date: Fri, 21 Feb 2025 04:50:02 -0500 Subject: [PATCH] refactor handleTombstones we shouldn't need to iterate over any sstables, just update sstable filters on if a key is being deleted. Optimization. I've also added some minor comments --- bloomfilter/bloomfilter.go | 3 +- starskey.go | 96 +++++++++++++------------------------- 2 files changed, 34 insertions(+), 65 deletions(-) diff --git a/bloomfilter/bloomfilter.go b/bloomfilter/bloomfilter.go index bba4e10..3fd0b8b 100644 --- a/bloomfilter/bloomfilter.go +++ b/bloomfilter/bloomfilter.go @@ -21,11 +21,10 @@ import ( "bytes" "encoding/gob" "errors" + "github.com/zeebo/xxh3" "hash" "hash/fnv" "math" - - "github.com/zeebo/xxh3" // We could compare against github.com/cespare/xxhash/v2 and github.com/zeebo/xxh3 // but may not be backwards compatible. We use both currently(cespare is used for SuRF) which may be redundant to import 2 packages doing same thing ) diff --git a/starskey.go b/starskey.go index 1345879..95baf0f 100644 --- a/starskey.go +++ b/starskey.go @@ -352,7 +352,7 @@ func Open(config *Config) (*Starskey, error) { log.Println("Write-ahead log opened successfully") - log.Println("Creating memory table") + log.Println("Creating memory table with threshold: ", config.FlushThreshold, "and min/max degree: ", config.Optional.TTreeMin, config.Optional.TTreeMax) // We create the memtable skey.memtable = ttree.New(TTreeMin, TTreeMax) @@ -416,7 +416,7 @@ func (skey *Starskey) Close() error { log.Println("Levels closed") - log.Println("Starskey closed") + log.Println("Starskey closed successfully") if skey.logFile != nil { // If log configured, we close it if err := skey.logFile.Close(); err != nil { @@ -436,6 +436,7 @@ func (skey *Starskey) appendToWal(record *WALRecord) error { } // Write the WAL record to the write-ahead log + // Using the pager we are writing to end of file and with durable sync by default in background if _, err = skey.wal.Write(walSerialized); err != nil { return err } @@ -492,14 +493,11 @@ func (skey *Starskey) Put(key, value []byte) error { // handleTombstones handles tombstones func (skey *Starskey) handleTombstones(key, value []byte) error { + + // We check if the value is a tombstone, if the case we must update the bloomfilter/surf filters if bytes.Equal(value, Tombstone) { for _, level := range skey.levels { for _, sstable := range level.sstables { - klog := sstable.klog - vlog := sstable.vlog - - // Create a new iterator for the key log - it := pager.NewIterator(klog) // If bloom filter is configured we check if key is in the bloom filter if skey.config.BloomFilter { @@ -517,72 +515,44 @@ func (skey *Starskey) handleTombstones(key, value []byte) error { } } - for it.Next() { - data, err := it.Read() - if err != nil { - break - } - klogRecord, err := deserializeKLogRecord(data, skey.config.Compression, skey.config.CompressionOption) + // Delete from bloom filter + if skey.config.BloomFilter { + // Bloom filter we must recreate the bloom filter + err := sstable.createBloomFilter(skey) if err != nil { return err } - if bytes.Equal(klogRecord.Key, key) { - // We found the key - // We read the value from the value log - read, _, err := vlog.Read(int(klogRecord.ValPageNum)) - if err != nil { - return err - } - vlogRecord, err := deserializeVLogRecord(read, skey.config.Compression, skey.config.CompressionOption) - if err != nil { - return err - } - - // Check if the value is a tombstone - if bytes.Equal(vlogRecord.Value, Tombstone) { - continue - } - - // Delete from bloom filter - if skey.config.BloomFilter { - // Bloom filter we must recreate the bloom filter - err = sstable.createBloomFilter(skey) - if err != nil { - return err - } - - } + } - // Delete from SuRF - if skey.config.SuRF { - sstable.surf.Delete(key) - // We update the surf file - surfFile, err := os.OpenFile(fmt.Sprintf("%s%s", strings.TrimSuffix(sstable.klog.Name(), KLogExtension), SuRFExtension), os.O_CREATE|os.O_RDWR, skey.config.Permission) - if err != nil { - return err - } + // Delete from SuRF + if skey.config.SuRF { + sstable.surf.Delete(key) + // We update the surf file + surfFile, err := os.OpenFile(fmt.Sprintf("%s%s", strings.TrimSuffix(sstable.klog.Name(), KLogExtension), SuRFExtension), os.O_CREATE|os.O_RDWR, skey.config.Permission) + if err != nil { + return err + } - // We truncate - err = surfFile.Truncate(0) - if err != nil { - return err - } + // We truncate + err = surfFile.Truncate(0) + if err != nil { + return err + } - // We serialize the surf - serializedSurf, err := sstable.surf.Serialize() - if err != nil { - return err - } + // We serialize the surf + serializedSurf, err := sstable.surf.Serialize() + if err != nil { + return err + } - // We write the surf to the file - _, err = surfFile.WriteAt(serializedSurf, 0) - if err != nil { - return err - } - } + // We write the surf to the file + _, err = surfFile.WriteAt(serializedSurf, 0) + if err != nil { + return err } } + } } }