forked from pierrec/lz4
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
364 lines (330 loc) · 9.71 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
package lz4
import (
"encoding/binary"
"errors"
"fmt"
"hash"
"io"
"io/ioutil"
"runtime"
"sync"
"sync/atomic"
)
// ErrInvalid is returned when the data being read is not an LZ4 archive
// (LZ4 magic number detection failed).
var ErrInvalid = errors.New("invalid lz4 data")
// errEndOfBlock is returned by readBlock when it has reached the last block of the frame.
// It is not an error.
var errEndOfBlock = errors.New("end of block")
// Reader implements the LZ4 frame decoder.
// The Header is set after the first call to Read().
// The Header may change between Read() calls in case of concatenated frames.
type Reader struct {
Pos int64 // position within the source
Header
src io.Reader
checksum hash.Hash32 // frame hash
wg sync.WaitGroup // decompressing go routine wait group
data []byte // buffered decompressed data
window []byte // 64Kb decompressed data window
}
// NewReader returns a new LZ4 frame decoder.
// No access to the underlying io.Reader is performed.
func NewReader(src io.Reader) *Reader {
return &Reader{
src: src,
checksum: hashPool.Get(),
}
}
// readHeader checks the frame magic number and parses the frame descriptoz.
// Skippable frames are supported even as a first frame although the LZ4
// specifications recommends skippable frames not to be used as first frames.
func (z *Reader) readHeader(first bool) error {
defer z.checksum.Reset()
for {
var magic uint32
if err := binary.Read(z.src, binary.LittleEndian, &magic); err != nil {
if !first && err == io.ErrUnexpectedEOF {
return io.EOF
}
return err
}
z.Pos += 4
if magic>>8 == frameSkipMagic>>8 {
var skipSize uint32
if err := binary.Read(z.src, binary.LittleEndian, &skipSize); err != nil {
return err
}
z.Pos += 4
m, err := io.CopyN(ioutil.Discard, z.src, int64(skipSize))
z.Pos += m
if err != nil {
return err
}
continue
}
if magic != frameMagic {
return ErrInvalid
}
break
}
// header
var buf [8]byte
if _, err := io.ReadFull(z.src, buf[:2]); err != nil {
return err
}
z.Pos += 2
b := buf[0]
if b>>6 != Version {
return fmt.Errorf("lz4.Read: invalid version: got %d expected %d", b>>6, Version)
}
z.BlockDependency = b>>5&1 == 0
z.BlockChecksum = b>>4&1 > 0
frameSize := b>>3&1 > 0
z.NoChecksum = b>>2&1 == 0
// z.Dict = b&1 > 0
bmsID := buf[1] >> 4 & 0x7
bSize, ok := bsMapID[bmsID]
if !ok {
return fmt.Errorf("lz4.Read: invalid block max size: %d", bmsID)
}
z.BlockMaxSize = bSize
z.checksum.Write(buf[0:2])
if frameSize {
if err := binary.Read(z.src, binary.LittleEndian, &z.Size); err != nil {
return err
}
z.Pos += 8
binary.LittleEndian.PutUint64(buf[:], z.Size)
z.checksum.Write(buf[0:8])
}
// if z.Dict {
// if err := binary.Read(z.src, binary.LittleEndian, &z.DictID); err != nil {
// return err
// }
// z.Pos += 4
// binary.LittleEndian.PutUint32(buf[:], z.DictID)
// z.checksum.Write(buf[0:4])
// }
// header checksum
if _, err := io.ReadFull(z.src, buf[:1]); err != nil {
return err
}
z.Pos++
if h := byte(z.checksum.Sum32() >> 8 & 0xFF); h != buf[0] {
return fmt.Errorf("lz4.Read: invalid header checksum: got %v expected %v", buf[0], h)
}
z.Header.done = true
return nil
}
// Read decompresses data from the underlying source into the supplied buffer.
//
// Since there can be multiple streams concatenated, Header values may
// change between calls to Read(). If that is the case, no data is actually read from
// the underlying io.Reader, to allow for potential input buffer resizing.
//
// Data is buffered if the input buffer is too small, and exhausted upon successive calls.
//
// If the buffer is large enough (typically in multiples of BlockMaxSize) and there is
// no block dependency, then the data will be decompressed concurrently based on the GOMAXPROCS value.
func (z *Reader) Read(buf []byte) (n int, err error) {
if !z.Header.done {
if err = z.readHeader(true); err != nil {
return
}
}
if len(buf) == 0 {
return
}
// exhaust remaining data from previous Read()
if len(z.data) > 0 {
n = copy(buf, z.data)
z.data = z.data[n:]
if len(z.data) == 0 {
z.data = nil
}
return
}
// Break up the input buffer into BlockMaxSize blocks with at least one block.
// Then decompress into each of them concurrently if possible (no dependency).
// In case of dependency, the first block will be missing the window (except on the
// very first call), the rest will have it already since it comes from the previous block.
wbuf := buf
zn := (len(wbuf) + z.BlockMaxSize - 1) / z.BlockMaxSize
zblocks := make([]block, zn)
for zi, abort := 0, uint32(0); zi < zn && atomic.LoadUint32(&abort) == 0; zi++ {
zb := &zblocks[zi]
// last block may be too small
if len(wbuf) < z.BlockMaxSize+len(z.window) {
wbuf = make([]byte, z.BlockMaxSize+len(z.window))
}
copy(wbuf, z.window)
if zb.err = z.readBlock(wbuf, zb); zb.err != nil {
break
}
wbuf = wbuf[z.BlockMaxSize:]
if !z.BlockDependency {
z.wg.Add(1)
go z.decompressBlock(zb, &abort)
continue
}
// cannot decompress concurrently when dealing with block dependency
z.decompressBlock(zb, nil)
// the last block may not contain enough data
if len(z.window) == 0 {
z.window = make([]byte, winSize)
}
if len(zb.data) >= winSize {
copy(z.window, zb.data[len(zb.data)-winSize:])
} else {
copy(z.window, z.window[len(zb.data):])
copy(z.window[len(zb.data)+1:], zb.data)
}
}
z.wg.Wait()
// since a block size may be less then BlockMaxSize, trim the decompressed buffers
for _, zb := range zblocks {
if zb.err != nil {
if zb.err == errEndOfBlock {
return n, z.close()
}
return n, zb.err
}
bLen := len(zb.data)
if !z.NoChecksum {
z.checksum.Write(zb.data)
}
m := copy(buf[n:], zb.data)
// buffer the remaining data (this is necessarily the last block)
if m < bLen {
z.data = zb.data[m:]
}
n += m
}
return
}
// readBlock reads an entire frame block from the frame.
// The input buffer is the one that will receive the decompressed data.
// If the end of the frame is detected, it returns the errEndOfBlock error.
func (z *Reader) readBlock(buf []byte, b *block) error {
var bLen uint32
if err := binary.Read(z.src, binary.LittleEndian, &bLen); err != nil {
return err
}
atomic.AddInt64(&z.Pos, 4)
switch {
case bLen == 0:
return errEndOfBlock
case bLen&(1<<31) == 0:
b.compressed = true
b.data = buf
b.zdata = make([]byte, bLen)
default:
bLen = bLen & (1<<31 - 1)
if int(bLen) > len(buf) {
return fmt.Errorf("lz4.Read: invalid block size: %d", bLen)
}
b.data = buf[:bLen]
b.zdata = buf[:bLen]
}
if _, err := io.ReadFull(z.src, b.zdata); err != nil {
return err
}
if z.BlockChecksum {
if err := binary.Read(z.src, binary.LittleEndian, &b.checksum); err != nil {
return err
}
xxh := hashPool.Get()
defer hashPool.Put(xxh)
xxh.Write(b.zdata)
if h := xxh.Sum32(); h != b.checksum {
return fmt.Errorf("lz4.Read: invalid block checksum: got %x expected %x", h, b.checksum)
}
}
return nil
}
// decompressBlock decompresses a frame block.
// In case of an error, the block err is set with it and abort is set to 1.
func (z *Reader) decompressBlock(b *block, abort *uint32) {
if abort != nil {
defer z.wg.Done()
}
if b.compressed {
n := len(z.window)
m, err := UncompressBlock(b.zdata, b.data, n)
if err != nil {
if abort != nil {
atomic.StoreUint32(abort, 1)
}
b.err = err
return
}
b.data = b.data[n : n+m]
}
atomic.AddInt64(&z.Pos, int64(len(b.data)))
}
// close validates the frame checksum (if any) and checks the next frame (if any).
func (z *Reader) close() error {
if !z.NoChecksum {
var checksum uint32
if err := binary.Read(z.src, binary.LittleEndian, &checksum); err != nil {
return err
}
if checksum != z.checksum.Sum32() {
return fmt.Errorf("lz4.Read: invalid frame checksum: got %x expected %x", z.checksum.Sum32(), checksum)
}
}
// get ready for the next concatenated frame, but do not change the position
pos := z.Pos
z.Reset(z.src)
z.Pos = pos
// since multiple frames can be concatenated, check for another one
return z.readHeader(false)
}
// Reset discards the Reader's state and makes it equivalent to the
// result of its original state from NewReader, but reading from r instead.
// This permits reusing a Reader rather than allocating a new one.
func (z *Reader) Reset(r io.Reader) {
z.Header = Header{}
z.Pos = 0
z.src = r
z.checksum.Reset()
z.data = nil
z.window = nil
}
// WriteTo decompresses the data from the underlying io.Reader and writes it to the io.Writer.
// Returns the number of bytes written.
func (z *Reader) WriteTo(w io.Writer) (n int64, err error) {
cpus := runtime.GOMAXPROCS(0)
var buf []byte
// The initial buffer being nil, the first Read will be only read the compressed frame options.
// The buffer can then be sized appropriately to support maximum concurrency decompression.
// If multiple frames are concatenated, Read() will return with no data decompressed but with
// potentially changed options. The buffer will be resized accordingly, always trying to
// maximize concurrency.
for {
nsize := 0
// the block max size can change if multiple streams are concatenated.
// Check it after every Read().
if z.BlockDependency {
// in case of dependency, we cannot decompress concurrently,
// so allocate the minimum buffer + window size
nsize = len(z.window) + z.BlockMaxSize
} else {
// if no dependency, allocate a buffer large enough for concurrent decompression
nsize = cpus * z.BlockMaxSize
}
if nsize != len(buf) {
buf = make([]byte, nsize)
}
m, er := z.Read(buf)
if er != nil && er != io.EOF {
return n, er
}
m, err = w.Write(buf[:m])
n += int64(m)
if err != nil || er == io.EOF {
return
}
}
}