Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data panic #18

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions intdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error {
if err != nil {
return err
}
if len(curChunkBytesData) == 0 {
return nil
}
d.uncompressed, err = ZSTDDecompress(d.uncompressed[:cap(d.uncompressed)], curChunkBytesData)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions load.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func load(data *segment.Data) (*Segment, error) {
return nil, err
}

rv.initDecompressedStoredFieldChunks(len(rv.storedFieldChunkOffsets))

err = rv.loadDvReaders()
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func initSegmentBase(mem []byte, footer *footer,
fieldFSTs: make(map[uint16]*vellum.FST),
storedFieldChunkOffsets: storedFieldChunkOffsets,
}
sb.initDecompressedStoredFieldChunks(len(storedFieldChunkOffsets))
sb.updateSize()

err := sb.loadDvReaders()
Expand Down
73 changes: 46 additions & 27 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,66 @@ import (
"encoding/binary"
)

func (s *Segment) getDocStoredMetaAndUnCompressed(docNum uint64) (meta, data []byte, err error) {
_, storedOffset, n, metaLen, dataLen, err := s.getDocStoredOffsets(docNum)
if err != nil {
return nil, nil, err
func (s *Segment) initDecompressedStoredFieldChunks(n int) {
s.m.Lock()
s.decompressedStoredFieldChunks = make(map[uint32]*segmentCacheData, n)
for i := uint32(0); i < uint32(n); i++ {
s.decompressedStoredFieldChunks[i] = &segmentCacheData{}
}

meta = s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)]
data = s.storedFieldChunkUncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)]
return meta, data, nil
s.m.Unlock()
}

func (s *Segment) getDocStoredOffsets(docNum uint64) (indexOffset, storedOffset, n, metaLen, dataLen uint64, err error) {
indexOffset, storedOffset, err = s.getDocStoredOffsetsOnly(docNum)
func (s *Segment) getDocStoredMetaAndUnCompressed(docNum uint64) (meta, data []byte, err error) {
_, storedOffset, err := s.getDocStoredOffsetsOnly(docNum)
if err != nil {
return 0, 0, 0, 0, 0, err
return nil, nil, err
}

// document chunk coder
chunkI := docNum / uint64(defaultDocumentChunkSize)
chunkOffsetStart := s.storedFieldChunkOffsets[int(chunkI)]
chunkOffsetEnd := s.storedFieldChunkOffsets[int(chunkI)+1]
compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd))
if err != nil {
return 0, 0, 0, 0, 0, err
var uncompressed []byte
chunkI := uint32(docNum) / defaultDocumentChunkSize
storedFieldDecompressed := s.decompressedStoredFieldChunks[chunkI]
storedFieldDecompressed.m.Lock()
if storedFieldDecompressed.data == nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section tries to reduce the amount of repeated work by doing it inside a locked section, have you considered performing the decompression outside the lock and then lock->check->update.

So we might end up with repeated work between multiple requests but less contention between them.

what do you thing? (and also @mschoch)

Copy link
Contributor Author

@hengfeiyang hengfeiyang Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voldyman We don't want decompress all chunks, just want decompress the chunk which visited. so we decompress when visit, and to avoid decompress multiple, so we use lock to ensure just do once.

And about you talked i am not sure how to implement it, i think you can try to create a new PR or give some code for that.

// we haven't already loaded and decompressed this chunk
chunkOffsetStart := s.storedFieldChunkOffsets[int(chunkI)]
chunkOffsetEnd := s.storedFieldChunkOffsets[int(chunkI)+1]
compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd))
if err != nil {
return nil, nil, err
}

// decompress it
storedFieldDecompressed.data, err = ZSTDDecompress(nil, compressed)
if err != nil {
return nil, nil, err
}
}
s.storedFieldChunkUncompressed = s.storedFieldChunkUncompressed[:0]
s.storedFieldChunkUncompressed, err = ZSTDDecompress(s.storedFieldChunkUncompressed[:cap(s.storedFieldChunkUncompressed)], compressed)
if err != nil {
return 0, 0, 0, 0, 0, err
// once initialized it wouldn't change, so we can unlock the mutex
uncompressed = storedFieldDecompressed.data
storedFieldDecompressed.m.Unlock()

metaDataLenEnd := storedOffset + binary.MaxVarintLen64
if metaDataLenEnd > uint64(len(uncompressed)) {
metaDataLenEnd = uint64(len(uncompressed))
}
metaLenData := uncompressed[storedOffset:metaDataLenEnd]

metaLenData := s.storedFieldChunkUncompressed[int(storedOffset):int(storedOffset+binary.MaxVarintLen64)]
var read int
metaLen, read = binary.Uvarint(metaLenData)
var n uint64
metaLen, read := binary.Uvarint(metaLenData)
n += uint64(read)

dataLenData := s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+binary.MaxVarintLen64)]
dataLen, read = binary.Uvarint(dataLenData)
dataLenEnd := storedOffset + n + binary.MaxVarintLen64
if dataLenEnd > uint64(len(uncompressed)) {
dataLenEnd = uint64(len(uncompressed))
}
dataLenData := uncompressed[int(storedOffset+n):dataLenEnd]
dataLen, read := binary.Uvarint(dataLenData)
n += uint64(read)

return indexOffset, storedOffset, n, metaLen, dataLen, nil
meta = uncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)]
data = uncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)]
return meta, data, nil
}

func (s *Segment) getDocStoredOffsetsOnly(docNum uint64) (indexOffset, storedOffset uint64, err error) {
Expand Down
13 changes: 9 additions & 4 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,22 @@ type Segment struct {
fieldDocs map[uint16]uint64 // fieldID -> # docs with value in field
fieldFreqs map[uint16]uint64 // fieldID -> # total tokens in field

storedFieldChunkOffsets []uint64 // stored field chunk offset
storedFieldChunkUncompressed []byte // for uncompress cache
storedFieldChunkOffsets []uint64 // stored field chunk offset

dictLocs []uint64
fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field
fieldDvNames []string // field names cached in fieldDvReaders
size uint64

// state loaded dynamically
m sync.Mutex
fieldFSTs map[uint16]*vellum.FST
m sync.RWMutex
fieldFSTs map[uint16]*vellum.FST
decompressedStoredFieldChunks map[uint32]*segmentCacheData
}

type segmentCacheData struct {
data []byte
m sync.RWMutex
}

func (s *Segment) WriteTo(w io.Writer, _ chan struct{}) (int64, error) {
Expand Down
35 changes: 35 additions & 0 deletions segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ice
import (
"path/filepath"
"reflect"
"sync"
"testing"

segment "github.com/blugelabs/bluge_segment_api"
Expand Down Expand Up @@ -549,3 +550,37 @@ func checkExpectedFields(t *testing.T, fields []string) {
}
}
}

func TestSegmentConcurrency(t *testing.T) {
path, cleanup := setupTestDir(t)
defer cleanup()

segPath := filepath.Join(path, "segment.ice")
seg, closeF, err := createDiskSegment(buildTestSegmentMulti, segPath)
if err != nil {
t.Fatal(err)
}
defer func() {
cerr := closeF()
if cerr != nil {
t.Fatalf("error closing segment: %v", err)
}
}()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
meta, data, err := seg.getDocStoredMetaAndUnCompressed(0)
if err != nil {
t.Errorf("getDocStoredMetaAndUnCompressed err: %v", err)
}
if meta == nil || data == nil {
t.Errorf("getDocStoredMetaAndUnCompressed meta or data should not be nil")
}
wg.Done()
}()
}

wg.Wait()
}