This repository has been archived by the owner on Feb 11, 2022. It is now read-only.
forked from fraugster/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hybrid_decoder.go
166 lines (141 loc) · 3.19 KB
/
hybrid_decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package goparquet
// This file is based on the code from https://github.com/kostya-sh/parquet-go
// Copyright (c) 2015 Konstantin Shaposhnikov
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math/bits"
"github.com/pkg/errors"
)
type decoder interface {
next() (int32, error)
init(io.Reader) error
initSize(io.Reader) error
}
type levelDecoder interface {
decoder
maxLevel() uint16
}
type hybridDecoder struct {
r io.Reader
bitWidth int
unpackerFn unpack8int32Func
rleValueSize int
bpRun [8]int32
rleCount uint32
rleValue int32
bpCount uint32
bpRunPos uint8
buffered bool
}
func newHybridDecoder(bitWidth int) *hybridDecoder {
return &hybridDecoder{
bitWidth: bitWidth,
unpackerFn: unpack8Int32FuncByWidth[bitWidth],
rleValueSize: (bitWidth + 7) / 8,
}
}
func (hd *hybridDecoder) initSize(r io.Reader) error {
if hd.bitWidth == 0 {
return nil
}
var size uint32
if err := binary.Read(r, binary.LittleEndian, &size); err != nil {
return err
}
reader := io.LimitReader(r, int64(size))
return hd.init(reader)
}
func (hd *hybridDecoder) init(r io.Reader) error {
if hd.buffered {
buf, err := ioutil.ReadAll(r)
if err != nil {
return err
}
hd.r = bytes.NewReader(buf)
} else {
hd.r = r
}
return nil
}
func (hd *hybridDecoder) next() (next int32, err error) {
// when the bit width is zero, it means we can only have infinite zero.
if hd.bitWidth == 0 {
return 0, nil
}
if hd.r == nil {
return 0, errors.New("reader is not initialized")
}
if hd.rleCount == 0 && hd.bpCount == 0 && hd.bpRunPos == 0 {
if err = hd.readRunHeader(); err != nil {
return 0, err
}
}
switch {
case hd.rleCount > 0:
next = hd.rleValue
hd.rleCount--
case hd.bpCount > 0 || hd.bpRunPos > 0:
if hd.bpRunPos == 0 {
if err = hd.readBitPackedRun(); err != nil {
return 0, err
}
hd.bpCount--
}
next = hd.bpRun[hd.bpRunPos]
hd.bpRunPos = (hd.bpRunPos + 1) % 8
default:
return 0, io.EOF
}
return next, err
}
func (hd *hybridDecoder) readRLERunValue() error {
v := make([]byte, hd.rleValueSize)
n, err := hd.r.Read(v)
if err != nil {
return err
}
if n != hd.rleValueSize {
return io.ErrUnexpectedEOF
}
hd.rleValue = decodeRLEValue(v)
if bits.LeadingZeros32(uint32(hd.rleValue)) < 32-hd.bitWidth {
return errors.New("rle: RLE run value is too large")
}
return nil
}
func (hd *hybridDecoder) readBitPackedRun() error {
data := make([]byte, hd.bitWidth)
_, err := hd.r.Read(data)
if err != nil {
return err
}
hd.bpRun = hd.unpackerFn(data)
return nil
}
func (hd *hybridDecoder) readRunHeader() error {
h, err := readUVariant32(hd.r)
if err != nil {
// this error could be EOF which is ok by this implementation the only issue is the binary.ReadUVariant can not
// return UnexpectedEOF is there is some bit read from the stream with no luck, it always return EOF
return err
}
// The lower bit indicate if this is bitpack or rle
if h&1 == 1 {
hd.bpCount = uint32(h >> 1)
if hd.bpCount == 0 {
return fmt.Errorf("rle: empty bit-packed run")
}
hd.bpRunPos = 0
} else {
hd.rleCount = uint32(h >> 1)
if hd.rleCount == 0 {
return fmt.Errorf("rle: empty RLE run")
}
return hd.readRLERunValue()
}
return nil
}