Skip to content

Commit

Permalink
optimize batched iter
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Jan 24, 2025
1 parent e1a3b5a commit 29fc98e
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions internal/unionstore/memdb_art.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ type snapshotBatchedIter struct {
reverse bool

// current batch
kvs []KvPair
keys [][]byte
values [][]byte
pos int
batchSize int
nextKey []byte
Expand All @@ -216,10 +217,9 @@ func (db *artDBWithContext) BatchedSnapshotIter(lower, upper []byte, reverse boo
lower: lower,
upper: upper,
reverse: reverse,
batchSize: 4,
batchSize: 32,
}

// Position at first key immediately
iter.fillBatch()
return iter
}
Expand All @@ -233,10 +233,12 @@ func (it *snapshotBatchedIter) fillBatch() error {
it.db.RLock()
defer it.db.RUnlock()

if it.kvs == nil {
it.kvs = make([]KvPair, 0, it.batchSize)
if it.keys == nil || it.values == nil || cap(it.keys) < it.batchSize || cap(it.values) < it.batchSize {
it.keys = make([][]byte, 0, it.batchSize)
it.values = make([][]byte, 0, it.batchSize)
} else {
it.kvs = it.kvs[:0]
it.keys = it.keys[:0]
it.values = it.values[:0]
}

var snapshotIter Iterator
Expand All @@ -256,24 +258,38 @@ func (it *snapshotBatchedIter) fillBatch() error {
defer snapshotIter.Close()

// fill current batch
// Further optimization: let the underlying memdb support batch iter.
for i := 0; i < it.batchSize && snapshotIter.Valid(); i++ {
it.kvs = append(it.kvs, KvPair{
Key: snapshotIter.Key(),
Value: snapshotIter.Value(),
})
it.keys = it.keys[:i+1]
it.values = it.values[:i+1]
it.keys[i] = snapshotIter.Key()
it.values[i] = snapshotIter.Value()
if err := snapshotIter.Next(); err != nil {
return err
}
}

// update state
it.pos = 0
if len(it.kvs) > 0 {
lastKV := it.kvs[len(it.kvs)-1]
if len(it.keys) > 0 {
lastKey := it.keys[len(it.keys)-1]
keyLen := len(lastKey)

if it.reverse {
it.nextKey = append([]byte(nil), lastKV.Key...)
if cap(it.nextKey) >= keyLen {
it.nextKey = it.nextKey[:keyLen]
} else {
it.nextKey = make([]byte, keyLen)
}
copy(it.nextKey, lastKey)
} else {
it.nextKey = append(append([]byte(nil), lastKV.Key...), 0)
if cap(it.nextKey) >= keyLen+1 {
it.nextKey = it.nextKey[:keyLen+1]
} else {
it.nextKey = make([]byte, keyLen+1)
}
copy(it.nextKey, lastKey)
it.nextKey[keyLen] = 0
}
} else {
it.nextKey = nil
Expand All @@ -285,7 +301,7 @@ func (it *snapshotBatchedIter) fillBatch() error {

func (it *snapshotBatchedIter) Valid() bool {
return it.snapshotTruncateSeqNo == it.db.SnapshotSeqNo &&
it.pos < len(it.kvs)
it.pos < len(it.keys)
}

func (it *snapshotBatchedIter) Next() error {
Expand All @@ -300,7 +316,7 @@ func (it *snapshotBatchedIter) Next() error {
}

it.pos++
if it.pos >= len(it.kvs) {
if it.pos >= len(it.keys) {
return it.fillBatch()
}
return nil
Expand All @@ -310,17 +326,18 @@ func (it *snapshotBatchedIter) Key() []byte {
if !it.Valid() {
return nil
}
return it.kvs[it.pos].Key
return it.keys[it.pos]
}

func (it *snapshotBatchedIter) Value() []byte {
if !it.Valid() {
return nil
}
return it.kvs[it.pos].Value
return it.values[it.pos]
}

func (it *snapshotBatchedIter) Close() {
it.kvs = nil
it.keys = nil
it.values = nil
it.nextKey = nil
}

0 comments on commit 29fc98e

Please sign in to comment.