Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
add prefixCompressedPostings
Browse files Browse the repository at this point in the history
Signed-off-by: naivewong <[email protected]>
  • Loading branch information
naivewong committed Aug 3, 2019
1 parent d5b3f07 commit 4bf1288
Show file tree
Hide file tree
Showing 3 changed files with 426 additions and 24 deletions.
52 changes: 28 additions & 24 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
FormatV1 = 1
// FormatV2 represents 2 version of index.
FormatV2 = 2
// FormatV3 represents 3 version of index (using PrefixCompressedPostings for postings).
FormatV3 = 3

labelNameSeperator = "\xff"

Expand Down Expand Up @@ -121,7 +123,7 @@ type Writer struct {
// Reusable memory.
buf1 encoding.Encbuf
buf2 encoding.Encbuf
uint32s []uint32
uint64s []uint64

symbols map[string]uint32 // symbol offsets
seriesOffsets map[uint64]uint64 // offsets of series
Expand Down Expand Up @@ -205,7 +207,7 @@ func NewWriter(fn string) (*Writer, error) {
// Reusable memory.
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
uint32s: make([]uint32, 0, 1<<15),
uint64s: make([]uint64, 0, 1<<15),

// Caches.
symbols: make(map[string]uint32, 1<<13),
Expand Down Expand Up @@ -290,7 +292,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Writer) writeMeta() error {
w.buf1.Reset()
w.buf1.PutBE32(MagicIndex)
w.buf1.PutByte(FormatV2)
w.buf1.PutByte(FormatV3)

return w.write(w.buf1.Get())
}
Expand Down Expand Up @@ -522,30 +524,25 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
// Order of the references in the postings list does not imply order
// of the series references within the persisted block they are mapped to.
// We have to sort the new references again.
refs := w.uint32s[:0]
refs := w.uint64s[:0]

for it.Next() {
offset, ok := w.seriesOffsets[it.At()]
if !ok {
return errors.Errorf("%p series for reference %d not found", w, it.At())
}
if offset > (1<<32)-1 {
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
}
refs = append(refs, uint32(offset))
refs = append(refs, offset)
}
if err := it.Err(); err != nil {
return err
}
sort.Sort(uint32slice(refs))
sort.Sort(uint64slice(refs))

w.buf2.Reset()
w.buf2.PutBE32int(len(refs))

for _, r := range refs {
w.buf2.PutBE32(r)
}
w.uint32s = refs
writePrefixCompressedPostings(&w.buf2, refs)
w.uint64s = refs

w.buf1.Reset()
w.buf1.PutBE32int(w.buf2.Len())
Expand All @@ -556,11 +553,11 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
return errors.Wrap(err, "write postings")
}

type uint32slice []uint32
type uint64slice []uint64

func (s uint32slice) Len() int { return len(s) }
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
func (s uint64slice) Len() int { return len(s) }
func (s uint64slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint64slice) Less(i, j int) bool { return s[i] < s[j] }

type labelIndexHashEntry struct {
keys []string
Expand Down Expand Up @@ -678,7 +675,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
}
r.version = int(r.b.Range(4, 5)[0])

if r.version != FormatV1 && r.version != FormatV2 {
if r.version != FormatV1 && r.version != FormatV2 && r.version != FormatV3 {
return nil, errors.Errorf("unknown index file version %d", r.version)
}

Expand Down Expand Up @@ -782,14 +779,14 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
symbolSlice []string
symbols = map[uint32]string{}
)
if version == FormatV2 {
if version == FormatV2 || version == FormatV3 {
symbolSlice = make([]string, 0, cnt)
}

for d.Err() == nil && d.Len() > 0 && cnt > 0 {
s := d.UvarintStr()

if version == FormatV2 {
if version == FormatV2 || version == FormatV3 {
symbolSlice = append(symbolSlice, s)
} else {
symbols[nextPos] = s
Expand Down Expand Up @@ -911,7 +908,7 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == FormatV2 {
if r.version == FormatV2 || r.version == FormatV3 {
offset = id * 16
}
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
Expand All @@ -935,7 +932,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "get postings entry")
}
_, p, err := r.dec.Postings(d.Get())
_, p, err := r.dec.Postings(d.Get(), r.version)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
Expand Down Expand Up @@ -1059,11 +1056,18 @@ type Decoder struct {
}

// Postings returns a postings list for b and its number of elements.
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
func (dec *Decoder) Postings(b []byte, version int) (int, Postings, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if n == 0 {
return n, EmptyPostings(), d.Err()
}
l := d.Get()
return n, newBigEndianPostings(l), d.Err()
if version == FormatV3 {
return n, newPrefixCompressedPostings(l), d.Err()
} else {
return n, newBigEndianPostings(l), d.Err()
}
}

// Series decodes a series entry from the given byte slice into lset and chks.
Expand Down
163 changes: 163 additions & 0 deletions index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"sync"

"github.com/prometheus/tsdb/encoding"
"github.com/prometheus/tsdb/labels"
)

Expand Down Expand Up @@ -689,3 +690,165 @@ func (it *bigEndianPostings) Seek(x uint64) bool {
func (it *bigEndianPostings) Err() error {
return nil
}

type prefixCompressedPostings struct {
bs []byte
cur uint64
inside bool
idx int // The current offset inside the bs.
footerAddr int
key uint64
numBlock int
blockIdx int // The current block idx.
nextBlock int // The starting offset of the next block.
}

func newPrefixCompressedPostings(bstream []byte) *prefixCompressedPostings {
x := binary.BigEndian.Uint32(bstream) // Read the footer address.
return &prefixCompressedPostings{bs: bstream[8:], numBlock: int(binary.BigEndian.Uint32(bstream[4:])), footerAddr: int(x)}
}

func (it *prefixCompressedPostings) At() uint64 {
return it.cur
}

func (it *prefixCompressedPostings) Next() bool {
if it.inside { // Already entered the block.
if it.idx < it.nextBlock {
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
it.idx += 2
return true
}
it.blockIdx += 1 // Go to the next block.
}
// Currently not entered any block.
if it.idx < it.footerAddr {
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
it.idx += 8
it.inside = true
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
it.idx += 2
return true
} else {
return false
}
}

func (it *prefixCompressedPostings) seekInBlock(x uint64) bool {
curVal := x & 0xffff
num := (it.nextBlock - it.idx) >> 1
j := sort.Search(num, func(i int) bool {
return uint64(binary.BigEndian.Uint16(it.bs[it.idx+(i<<1):])) >= curVal
})
if j == num {
// Fast-path to the next block.
// The first element in next block should be >= x.
it.idx = it.nextBlock
it.blockIdx += 1
if it.idx < it.footerAddr {
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
it.idx += 8
it.inside = true
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
it.idx += 2
return true
} else {
return false
}
}
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):]))
it.idx += (j + 1) << 1
return true
}

func (it *prefixCompressedPostings) Seek(x uint64) bool {
if it.cur >= x {
return true
}
curKey := (x >> 16) << 16
if it.inside && it.key == curKey {
// Fast path for x in current block.
return it.seekInBlock(x)
} else {
i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool {
off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):]))
k := binary.BigEndian.Uint64(it.bs[off:])
return k >= curKey
})
if i == it.numBlock-it.blockIdx {
return false
}
it.blockIdx += i
if i != 0 { // i > 0.
it.inside = false
it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):]))
}
}
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
it.idx += 8

it.inside = true

it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
return it.seekInBlock(x)
}

func (it *prefixCompressedPostings) Err() error {
return nil
}

// The size of values inside the block is 2 bytes.
func writePrefixCompressedPostingsBlock(e *encoding.Encbuf, vals []uint16, key uint64) {
e.PutBE64(key)
c := make([]byte, 2)
for _, val := range vals {
binary.BigEndian.PutUint16(c[:], val)
e.PutByte(c[0])
e.PutByte(c[1])
}
}

func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) {
if len(arr) == 0 {
return
}
key := uint64(0xffffffff) // The initial key should be unique.
mask := uint64((1 << uint(16)) - 1) // Mask for the elements in the block.
invertedMask := ^mask
var (
curKey uint64
curVal uint64
idx int // Index of current element in arr.
startingOffs []uint32 // The starting offsets of each block.
vals []uint16 // The converted values in the current block.
startOff = len(e.Get())
)
e.PutBE32(0) // Footer starting offset.
e.PutBE32(0) // Number of blocks.
for idx < len(arr) {
curKey = arr[idx] & invertedMask // Key of block.
curVal = arr[idx] & mask // Value inside block.
if curKey != key {
// Move to next block.
if idx != 0 {
startingOffs = append(startingOffs, uint32(len(e.B)))
writePrefixCompressedPostingsBlock(e, vals, key)
vals = vals[:0]
}
key = curKey
}
vals = append(vals, uint16(curVal))
idx += 1
}
startingOffs = append(startingOffs, uint32(len(e.B)))
writePrefixCompressedPostingsBlock(e, vals, key)
startingOffs = append(startingOffs, uint32(len(e.B)))

binary.BigEndian.PutUint32(e.B[startOff:], uint32(len(e.B)-8-startOff)) // Put footer starting offset.
binary.BigEndian.PutUint32(e.B[startOff+4:], uint32(len(startingOffs)-1)) // Put number of blocks.
for _, off := range startingOffs {
e.PutBE32(off - 8 - uint32(startOff))
}
}
Loading

0 comments on commit 4bf1288

Please sign in to comment.