Skip to content

Commit

Permalink
perf(storage): improve scanning with GLSN
Browse files Browse the repository at this point in the history
This pull request enhances the performance of scanning with GLSN by using a
pebble iterator to retrieve written data instead of calling Get for each key.

```
goos: darwin
goarch: amd64
pkg: github.com/kakao/varlog/internal/storage
cpu: Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
                       │    base     │                diff                 │
                       │   sec/op    │   sec/op     vs base                │
Storage_ScanWithGLSN-8   2.149m ± 1%   1.021m ± 1%  -52.51% (p=0.000 n=20)

                       │     base     │                diff                 │
                       │     B/op     │     B/op      vs base               │
Storage_ScanWithGLSN-8   32.12Ki ± 0%   31.73Ki ± 0%  -1.21% (p=0.000 n=20)

                       │    base     │                diff                │
                       │  allocs/op  │  allocs/op   vs base               │
Storage_ScanWithGLSN-8   2.004k ± 0%   2.006k ± 0%  +0.10% (p=0.000 n=20)

```
  • Loading branch information
ijsong committed Feb 22, 2024
1 parent 0f79e30 commit e6c11f2
Showing 1 changed file with 54 additions and 12 deletions.
66 changes: 54 additions & 12 deletions internal/storage/scanner.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package storage

import (
"errors"
"fmt"
"io"
"slices"
"sync"

"github.com/cockroachdb/pebble"
Expand All @@ -23,17 +25,19 @@ var scannerPool = sync.Pool{
}

type Scanner struct {
scanConfig
stg *Storage
it *pebble.Iterator
cks struct {
stg *Storage
it *pebble.Iterator
lazyIt *pebble.Iterator
cks struct {
lower []byte
upper []byte
}
dks struct {
lower []byte
upper []byte
}

scanConfig
}

func newScanner() *Scanner {
Expand All @@ -56,37 +60,74 @@ func (s *Scanner) Value() (le varlogpb.LogEntry, err error) {
}

func (s *Scanner) Next() bool {
if s.lazyIt != nil {
_ = s.lazyIt.Next()
}
return s.it.Next()
}

func (s *Scanner) Close() (err error) {
if s.it != nil {
err = s.it.Close()
}
if s.lazyIt != nil {
if e := s.lazyIt.Close(); e != nil {
if err != nil {
err = errors.Join(err, e)
} else {
err = e

Check warning on line 78 in internal/storage/scanner.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/scanner.go#L75-L78

Added lines #L75 - L78 were not covered by tests
}
}
}
s.release()
return err
}

func (s *Scanner) valueByGLSN() (le varlogpb.LogEntry, err error) {
ck := s.it.Key()
dk := s.it.Value()
data, closer, err := s.stg.dataDB.Get(dk)
if err != nil {
if err == pebble.ErrNotFound {
return le, fmt.Errorf("%s: %w", s.stg.path, ErrInconsistentWriteCommitState)
llsn := decodeDataKey(dk)

if s.lazyIt == nil {
err = s.initLazyIterator(dk, llsn)
if err != nil {
return le, err
}
return le, err
}

if slices.Compare(dk, s.lazyIt.Key()) != 0 {
return le, fmt.Errorf("%s: %w", s.stg.path, ErrInconsistentWriteCommitState)

Check warning on line 99 in internal/storage/scanner.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/scanner.go#L99

Added line #L99 was not covered by tests
}

le.GLSN = decodeCommitKey(ck)
le.LLSN = decodeDataKey(dk)
le.LLSN = llsn
data := s.lazyIt.Value()
if len(data) > 0 {
le.Data = make([]byte, len(data))
copy(le.Data, data)
}
_ = closer.Close()
return le, nil
}

func (s *Scanner) initLazyIterator(beginKey []byte, beginLLSN types.LLSN) (err error) {
endLLSN := beginLLSN + types.LLSN(s.end.GLSN-s.begin.GLSN)
endKey := make([]byte, dataKeyLength)
endKey = encodeDataKeyInternal(endLLSN, endKey)
itOpt := &pebble.IterOptions{
LowerBound: beginKey,
UpperBound: endKey,
}

s.lazyIt, err = s.stg.dataDB.NewIter(itOpt)
if err != nil {
return err

Check warning on line 123 in internal/storage/scanner.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/scanner.go#L123

Added line #L123 was not covered by tests
}
if !s.lazyIt.First() {
return fmt.Errorf("%s: %w", s.stg.path, ErrInconsistentWriteCommitState)
}
return nil
}

func (s *Scanner) valueByLLSN() (le varlogpb.LogEntry, err error) {
le.LLSN = decodeDataKey(s.it.Key())
if len(s.it.Value()) > 0 {
Expand All @@ -97,9 +138,10 @@ func (s *Scanner) valueByLLSN() (le varlogpb.LogEntry, err error) {
}

func (s *Scanner) release() {
s.scanConfig = scanConfig{}
s.stg = nil
s.it = nil
s.lazyIt = nil
s.scanConfig = scanConfig{}
scannerPool.Put(s)
}

Expand Down

0 comments on commit e6c11f2

Please sign in to comment.