Skip to content

Commit

Permalink
Merge pull request #69 from starskey-io/starskey-48
Browse files Browse the repository at this point in the history
refactor handleTombstones we shouldn't need to iterate over any sstab…
  • Loading branch information
guycipher authored Feb 21, 2025
2 parents 9f64dcb + f8d1a77 commit 24e5015
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 65 deletions.
3 changes: 1 addition & 2 deletions bloomfilter/bloomfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"bytes"
"encoding/gob"
"errors"
"github.com/zeebo/xxh3"
"hash"
"hash/fnv"
"math"

"github.com/zeebo/xxh3"
// We could compare against github.com/cespare/xxhash/v2 and github.com/zeebo/xxh3
// but may not be backwards compatible. We use both currently(cespare is used for SuRF) which may be redundant to import 2 packages doing same thing
)
Expand Down
96 changes: 33 additions & 63 deletions starskey.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func Open(config *Config) (*Starskey, error) {

log.Println("Write-ahead log opened successfully")

log.Println("Creating memory table")
log.Println("Creating memory table with threshold: ", config.FlushThreshold, "and min/max degree: ", config.Optional.TTreeMin, config.Optional.TTreeMax)

// We create the memtable
skey.memtable = ttree.New(TTreeMin, TTreeMax)
Expand Down Expand Up @@ -416,7 +416,7 @@ func (skey *Starskey) Close() error {

log.Println("Levels closed")

log.Println("Starskey closed")
log.Println("Starskey closed successfully")

if skey.logFile != nil { // If log configured, we close it
if err := skey.logFile.Close(); err != nil {
Expand All @@ -436,6 +436,7 @@ func (skey *Starskey) appendToWal(record *WALRecord) error {
}

// Write the WAL record to the write-ahead log
// Using the pager we are writing to end of file and with durable sync by default in background
if _, err = skey.wal.Write(walSerialized); err != nil {
return err
}
Expand Down Expand Up @@ -492,14 +493,11 @@ func (skey *Starskey) Put(key, value []byte) error {

// handleTombstones handles tombstones
func (skey *Starskey) handleTombstones(key, value []byte) error {

// We check if the value is a tombstone, if the case we must update the bloomfilter/surf filters
if bytes.Equal(value, Tombstone) {
for _, level := range skey.levels {
for _, sstable := range level.sstables {
klog := sstable.klog
vlog := sstable.vlog

// Create a new iterator for the key log
it := pager.NewIterator(klog)

// If bloom filter is configured we check if key is in the bloom filter
if skey.config.BloomFilter {
Expand All @@ -517,72 +515,44 @@ func (skey *Starskey) handleTombstones(key, value []byte) error {
}
}

for it.Next() {
data, err := it.Read()
if err != nil {
break
}
klogRecord, err := deserializeKLogRecord(data, skey.config.Compression, skey.config.CompressionOption)
// Delete from bloom filter
if skey.config.BloomFilter {
// Bloom filter we must recreate the bloom filter
err := sstable.createBloomFilter(skey)
if err != nil {
return err
}

if bytes.Equal(klogRecord.Key, key) {
// We found the key
// We read the value from the value log
read, _, err := vlog.Read(int(klogRecord.ValPageNum))
if err != nil {
return err
}
vlogRecord, err := deserializeVLogRecord(read, skey.config.Compression, skey.config.CompressionOption)
if err != nil {
return err
}

// Check if the value is a tombstone
if bytes.Equal(vlogRecord.Value, Tombstone) {
continue
}

// Delete from bloom filter
if skey.config.BloomFilter {
// Bloom filter we must recreate the bloom filter
err = sstable.createBloomFilter(skey)
if err != nil {
return err
}

}
}

// Delete from SuRF
if skey.config.SuRF {
sstable.surf.Delete(key)
// We update the surf file
surfFile, err := os.OpenFile(fmt.Sprintf("%s%s", strings.TrimSuffix(sstable.klog.Name(), KLogExtension), SuRFExtension), os.O_CREATE|os.O_RDWR, skey.config.Permission)
if err != nil {
return err
}
// Delete from SuRF
if skey.config.SuRF {
sstable.surf.Delete(key)
// We update the surf file
surfFile, err := os.OpenFile(fmt.Sprintf("%s%s", strings.TrimSuffix(sstable.klog.Name(), KLogExtension), SuRFExtension), os.O_CREATE|os.O_RDWR, skey.config.Permission)
if err != nil {
return err
}

// We truncate
err = surfFile.Truncate(0)
if err != nil {
return err
}
// We truncate
err = surfFile.Truncate(0)
if err != nil {
return err
}

// We serialize the surf
serializedSurf, err := sstable.surf.Serialize()
if err != nil {
return err
}
// We serialize the surf
serializedSurf, err := sstable.surf.Serialize()
if err != nil {
return err
}

// We write the surf to the file
_, err = surfFile.WriteAt(serializedSurf, 0)
if err != nil {
return err
}
}
// We write the surf to the file
_, err = surfFile.WriteAt(serializedSurf, 0)
if err != nil {
return err
}
}

}
}
}
Expand Down

0 comments on commit 24e5015

Please sign in to comment.