Skip to content

Commit

Permalink
Reducing size (#8)
Browse files Browse the repository at this point in the history
* feat: add trunk compress for storedFields

* feat: use zstd replace snappy

* feat: compress docValue

* feat: reback the docValue compress

* feat: packed docNum and Offset for docValue

* doc: update go.mod

* feat: packed numeric of posting list

* feat: compress numeric of posting list

* feat: packed numeric of posting list

* feat: compress intcoder

* feat: run optimize on bitmap

* feat: optimize document values chunk

* doc: add author

* style: change implement

* fix: tests

* test: add test for document coder

* style: format code

* update trunk to chunk

* update sort of Authors

* rename BufferSize to Size and remove Close method

* update version

* fix panic when search memory

* rename variables
  • Loading branch information
hengfeiyang authored May 26, 2022
1 parent 09719ef commit 8b0e8c2
Show file tree
Hide file tree
Showing 20 changed files with 575 additions and 182 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
#
# Please keep the list sorted.

Hengfei Yang <[email protected]>
Marty Schoch <[email protected]>
15 changes: 10 additions & 5 deletions contentcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"bytes"
"encoding/binary"
"io"

"github.com/golang/snappy"
)

var termSeparator byte = 0xff
Expand All @@ -39,7 +37,7 @@ type chunkedContentCoder struct {

chunkMeta []metaData

compressed []byte // temp buf for snappy compression
compressed []byte // temp buf for compression
}

// metaData represents the data information inside a
Expand Down Expand Up @@ -107,18 +105,25 @@ func (c *chunkedContentCoder) flushContents() error {
}

// write out the metaData slice
diffDocNum := uint64(0)
diffDvOffset := uint64(0)
for _, meta := range c.chunkMeta {
err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset)
err = writeUvarints(&c.chunkMetaBuf, meta.DocNum-diffDocNum, meta.DocDvOffset-diffDvOffset)
if err != nil {
return err
}
diffDocNum = meta.DocNum
diffDvOffset = meta.DocDvOffset
}

// write the metadata to final data
metaData := c.chunkMetaBuf.Bytes()
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), ZSTDCompressionLevel)
if err != nil {
return err
}
c.final = append(c.final, c.compressed...)

c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData))
Expand Down
28 changes: 16 additions & 12 deletions contentcoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func TestChunkedContentCoder(t *testing.T) {
docNums: []uint64{0},
vals: [][]byte{[]byte("bluge")},
// 1 chunk, chunk-0 length 11(b), value
expected: []byte{0x1, 0x0, 0x5, 0x5, 0x10, 'b', 'l', 'u', 'g', 'e',
0xa,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
expected: []byte{
0x1, 0x0, 0x5, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x29, 0x0, 0x0,
'b', 'l', 'u', 'g', 'e',
0x7e, 0xde, 0xed, 0x4a, 0x15, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1,
},
},
{
maxDocNum: 1,
Expand All @@ -47,11 +49,13 @@ func TestChunkedContentCoder(t *testing.T) {
[]byte("scorch"),
},

expected: []byte{0x1, 0x0, 0x6, 0x6, 0x14, 0x75, 0x70, 0x73, 0x69, 0x64,
0x65, 0x1, 0x1, 0x6, 0x6, 0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68,
0xb, 0x16,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2},
expected: []byte{
0x1, 0x0, 0x6, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x31, 0x0, 0x0,
0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x35, 0x89, 0x5a, 0xd,
0x1, 0x1, 0x6, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x31, 0x0, 0x0,
0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, 0xc4, 0x46, 0x89, 0x39, 0x16, 0x2c,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2,
},
},
}

Expand All @@ -61,7 +65,7 @@ func TestChunkedContentCoder(t *testing.T) {
for i, docNum := range test.docNums {
err := cic.Add(docNum, test.vals[i])
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
t.Fatalf("error adding to contentcoder: %v", err)
}
}
_ = cic.Close()
Expand Down Expand Up @@ -98,11 +102,11 @@ func TestChunkedContentCoders(t *testing.T) {
for i, docNum := range docNums {
err := cic1.Add(docNum, vals[i])
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
t.Fatalf("error adding to contentcoder: %v", err)
}
err = cic2.Add(docNum, vals[i])
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
t.Fatalf("error adding to contentcoder: %v", err)
}
}
_ = cic1.Close()
Expand Down
139 changes: 139 additions & 0 deletions documentcoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package ice

import (
"bytes"
"encoding/binary"
"io"
)

const defaultDocumentChunkSize uint32 = 128

type chunkedDocumentCoder struct {
chunkSize uint64
w io.Writer
buf *bytes.Buffer
metaBuf []byte
n uint64
bytes uint64
compressed []byte
offsets []uint64
}

func newChunkedDocumentCoder(chunkSize uint64, w io.Writer) *chunkedDocumentCoder {
c := &chunkedDocumentCoder{
chunkSize: chunkSize,
w: w,
}
c.buf = bytes.NewBuffer(nil)
c.metaBuf = make([]byte, binary.MaxVarintLen64)
c.offsets = append(c.offsets, 0)
return c
}

func (c *chunkedDocumentCoder) Add(docNum uint64, meta, data []byte) (int, error) {
var wn, n int
var err error
n = binary.PutUvarint(c.metaBuf, uint64(len(meta)))
if n, err = c.writeToBuf(c.metaBuf[:n]); err != nil {
return 0, err
}
wn += n
n = binary.PutUvarint(c.metaBuf, uint64(len(data)))
if n, err = c.writeToBuf(c.metaBuf[:n]); err != nil {
return 0, err
}
wn += n
if n, err = c.writeToBuf(meta); err != nil {
return 0, err
}
wn += n
if n, err = c.writeToBuf(data); err != nil {
return 0, err
}
wn += n

return wn, c.newLine()
}

func (c *chunkedDocumentCoder) writeToBuf(data []byte) (int, error) {
return c.buf.Write(data)
}

func (c *chunkedDocumentCoder) newLine() error {
c.n++
if c.n%c.chunkSize != 0 {
return nil
}
return c.flush()
}

func (c *chunkedDocumentCoder) flush() error {
if c.buf.Len() > 0 {
var err error
c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.buf.Bytes(), ZSTDCompressionLevel)
if err != nil {
return err
}
n, err := c.w.Write(c.compressed)
if err != nil {
return err
}
c.bytes += uint64(n)
c.buf.Reset()
}
c.offsets = append(c.offsets, c.bytes)
return nil
}

func (c *chunkedDocumentCoder) Write() error {
// flush first
if err := c.flush(); err != nil {
return err
}
var err error
var wn, n int
// write chunk offsets
for _, offset := range c.offsets {
n = binary.PutUvarint(c.metaBuf, offset)
if _, err = c.w.Write(c.metaBuf[:n]); err != nil {
return err
}
wn += n
}
// write chunk offset length
err = binary.Write(c.w, binary.BigEndian, uint32(wn))
if err != nil {
return err
}
// write chunk num
err = binary.Write(c.w, binary.BigEndian, uint32(len(c.offsets)))
if err != nil {
return err
}
return nil
}

func (c *chunkedDocumentCoder) Reset() {
c.compressed = c.compressed[:0]
c.offsets = c.offsets[:0]
c.n = 0
c.bytes = 0
c.buf.Reset()
}

// Size returns buffer size of current chunk
func (c *chunkedDocumentCoder) Size() uint64 {
return uint64(c.buf.Len())
}

// Len returns chunks num
func (c *chunkedDocumentCoder) Len() int {
return len(c.offsets)
}

// Len returns chunks num
func (c *chunkedDocumentCoder) Offsets() []uint64 {
m := make([]uint64, 0, len(c.offsets))
m = append(m, c.offsets...)
return m
}
124 changes: 124 additions & 0 deletions documentcoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package ice

import (
"bytes"
"testing"
)

func TestChunkedDocumentCoder(t *testing.T) {
tests := []struct {
chunkSize uint64
docNums []uint64
metas [][]byte
datas [][]byte
expected []byte
expectedChunkNum int
}{
{
chunkSize: 1,
docNums: []uint64{0},
metas: [][]byte{{0}},
datas: [][]byte{[]byte("bluge")},
expected: []byte{
0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x41,
0x0, 0x0, 0x1, 0x5, 0x0, 0x62, 0x6c, 0x75, 0x67, 0x65, 0x2b, 0x30, 0x97, 0x33, 0x0, 0x15, 0x15,
0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x3,
},
expectedChunkNum: 3, // left, chunk, right
},
{
chunkSize: 1,
docNums: []uint64{0, 1},
metas: [][]byte{{0}, {1}},
datas: [][]byte{[]byte("upside"), []byte("scorch")},
expected: []byte{
0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x49,
0x0, 0x0, 0x1, 0x6, 0x0, 0x75, 0x70, 0x73, 0x69, 0x64, 0x65,
0x36, 0x6e, 0x7e, 0x39, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x49,
0x0, 0x0, 0x1, 0x6, 0x1, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68,
0x8f, 0x83, 0xa3, 0x37, 0x0, 0x16, 0x2c, 0x2c,
0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x4,
},
expectedChunkNum: 4, // left, chunk, chunk, right
},
}

for _, test := range tests {
var actual bytes.Buffer
cic := newChunkedDocumentCoder(test.chunkSize, &actual)
for i, docNum := range test.docNums {
_, err := cic.Add(docNum, test.metas[i], test.datas[i])
if err != nil {
t.Fatalf("error adding to documentcoder: %v", err)
}
}
err := cic.Write()
if err != nil {
t.Fatalf("error writing: %v", err)
}
if !bytes.Equal(test.expected, actual.Bytes()) {
t.Errorf("got:%s, expected:%s", actual.String(), string(test.expected))
}
if test.expectedChunkNum != cic.Len() {
t.Errorf("got:%d, expected:%d", cic.Len(), test.expectedChunkNum)
}
}
}

func TestChunkedDocumentCoders(t *testing.T) {
chunkSize := uint64(2)
docNums := []uint64{0, 1, 2, 3, 4, 5}
metas := [][]byte{
{0},
{1},
{2},
{3},
{4},
{5},
}
datas := [][]byte{
[]byte("scorch"),
[]byte("does"),
[]byte("better"),
[]byte("than"),
[]byte("upside"),
[]byte("down"),
}
chunkNum := 5 // left, chunk, chunk, chunk, right

var actual1, actual2 bytes.Buffer
// chunkedDocumentCoder that writes out at the end
cic1 := newChunkedDocumentCoder(chunkSize, &actual1)
// chunkedContentCoder that writes out in chunks
cic2 := newChunkedDocumentCoder(chunkSize, &actual2)

for i, docNum := range docNums {
_, err := cic1.Add(docNum, metas[i], datas[i])
if err != nil {
t.Fatalf("error adding to documentcoder: %v", err)
}
_, err = cic2.Add(docNum, metas[i], datas[i])
if err != nil {
t.Fatalf("error adding to documentcoder: %v", err)
}
}

err := cic1.Write()
if err != nil {
t.Fatalf("error writing: %v", err)
}
err = cic2.Write()
if err != nil {
t.Fatalf("error writing: %v", err)
}

if !bytes.Equal(actual1.Bytes(), actual2.Bytes()) {
t.Errorf("%s != %s", actual1.String(), actual2.String())
}
if chunkNum != cic1.Len() {
t.Errorf("got:%d, expected:%d", cic1.Len(), chunkNum)
}
if chunkNum != cic2.Len() {
t.Errorf("got:%d, expected:%d", cic2.Len(), chunkNum)
}
}
Loading

0 comments on commit 8b0e8c2

Please sign in to comment.