This repository has been archived by the owner on Feb 11, 2022. It is now read-only.
forked from fraugster/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
page_dict.go
119 lines (94 loc) · 3.09 KB
/
page_dict.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package goparquet
import (
"bytes"
"io"
"github.com/fraugster/parquet-go/parquet"
"github.com/pkg/errors"
)
// dictionaryPage is not a real data page, so there is no need to implement the page interface
type dictPageReader struct {
ph *parquet.PageHeader
numValues int32
enc valuesDecoder
values []interface{}
}
func (dp *dictPageReader) init(dict valuesDecoder) error {
if dict == nil {
return errors.New("dictionary page without dictionary value encoder")
}
dp.enc = dict
return nil
}
func (dp *dictPageReader) read(r io.Reader, ph *parquet.PageHeader, codec parquet.CompressionCodec) error {
if ph.DictionaryPageHeader == nil {
return errors.Errorf("null DictionaryPageHeader in %+v", ph)
}
if dp.numValues = ph.DictionaryPageHeader.NumValues; dp.numValues < 0 {
return errors.Errorf("negative NumValues in DICTIONARY_PAGE: %d", dp.numValues)
}
if ph.DictionaryPageHeader.Encoding != parquet.Encoding_PLAIN && ph.DictionaryPageHeader.Encoding != parquet.Encoding_PLAIN_DICTIONARY {
return errors.Errorf("only Encoding_PLAIN and Encoding_PLAIN_DICTIONARY is supported for dict values encoder")
}
dp.ph = ph
reader, err := createDataReader(r, codec, ph.GetCompressedPageSize(), ph.GetUncompressedPageSize())
if err != nil {
return err
}
if cap(dp.values) < int(dp.numValues) {
dp.values = make([]interface{}, 0, dp.numValues)
}
dp.values = dp.values[:int(dp.numValues)]
if err := dp.enc.init(reader); err != nil {
return err
}
// no error is accepted here, even EOF
if n, err := dp.enc.decodeValues(dp.values); err != nil {
return errors.Wrapf(err, "expected %d value read %d value", dp.numValues, n)
}
return nil
}
type dictPageWriter struct {
col *Column
codec parquet.CompressionCodec
}
func (dp *dictPageWriter) init(schema SchemaWriter, col *Column, codec parquet.CompressionCodec) error {
dp.col = col
dp.codec = codec
return nil
}
func (dp *dictPageWriter) getHeader(comp, unComp int) *parquet.PageHeader {
ph := &parquet.PageHeader{
Type: parquet.PageType_DICTIONARY_PAGE,
UncompressedPageSize: int32(unComp),
CompressedPageSize: int32(comp),
Crc: nil,
DictionaryPageHeader: &parquet.DictionaryPageHeader{
NumValues: dp.col.data.values.numDistinctValues(),
Encoding: parquet.Encoding_PLAIN, // PLAIN_DICTIONARY is deprecated in the Parquet 2.0 specification
IsSorted: nil,
},
}
return ph
}
func (dp *dictPageWriter) write(w io.Writer) (int, int, error) {
// In V1 data page is compressed separately
dataBuf := &bytes.Buffer{}
encoder, err := getDictValuesEncoder(dp.col.Element())
if err != nil {
return 0, 0, err
}
err = encodeValue(dataBuf, encoder, dp.col.data.values.values)
if err != nil {
return 0, 0, err
}
comp, err := compressBlock(dataBuf.Bytes(), dp.codec)
if err != nil {
return 0, 0, errors.Wrapf(err, "compressing data failed with %s method", dp.codec)
}
compSize, unCompSize := len(comp), len(dataBuf.Bytes())
header := dp.getHeader(compSize, unCompSize)
if err := writeThrift(header, w); err != nil {
return 0, 0, err
}
return compSize, unCompSize, writeFull(w, comp)
}