forked from colinmarc/sequencefile
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathheader.go
152 lines (127 loc) · 3.44 KB
/
header.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package sequencefile
import (
"encoding/binary"
"fmt"
)
// A Header represents the information contained in the header of the
// SequenceFile.
type Header struct {
Version int
Compression Compression
CompressionCodec CompressionCodec
CompressionCodecClassName string
KeyClassName string
ValueClassName string
Metadata map[string]string
SyncMarker string
}
// ReadHeader parses the SequenceFile header from the input stream, and fills
// in the Header struct with the values. This should be called when the reader
// is positioned at the start of the file or input stream, before any records
// are read.
//
// ReadHeader will also validate that the settings of the SequenceFile
// (version, compression, key/value serialization, etc) are compatible.
func (r *Reader) ReadHeader() error {
magic, err := r.consume(4)
if err != nil {
return fmt.Errorf("sequencefile: reading magic number: %s", err)
} else if string(magic[:3]) != "SEQ" {
return fmt.Errorf("sequencefile: invalid magic number: %s", magic)
}
r.Header.Version = int(magic[3])
if r.Header.Version < 5 {
return fmt.Errorf("sequencefile: unsupported version: %d", r.Header.Version)
}
keyClassName, err := r.readString()
if err != nil {
return err
}
valueClassName, err := r.readString()
if err != nil {
return err
}
r.Header.KeyClassName = keyClassName
r.Header.ValueClassName = valueClassName
r.clear()
flags, err := r.consume(2)
if err != nil {
return err
}
valueCompression := uint8(flags[0])
blockCompression := uint8(flags[1])
if blockCompression > 0 {
r.Header.Compression = BlockCompression
} else if valueCompression > 0 {
r.Header.Compression = RecordCompression
} else {
r.Header.Compression = NoCompression
}
if r.Header.Compression != NoCompression {
compressionCodecClassName, err := r.readString()
if err != nil {
return err
}
r.Header.CompressionCodecClassName = compressionCodecClassName
switch r.Header.CompressionCodecClassName {
case "org.apache.hadoop.io.compress.GzipCodec":
r.Header.CompressionCodec = GzipCompression
case "org.apache.hadoop.io.compress.SnappyCodec":
r.Header.CompressionCodec = SnappyCompression
default:
return fmt.Errorf("sequencefile: unsupported compression codec: %s", r.Header.CompressionCodecClassName)
}
}
r.compression = r.Header.Compression
r.codec = r.Header.CompressionCodec
err = r.readMetadata()
if err != nil {
return err
}
r.clear()
marker, err := r.consume(SyncSize)
if err != nil {
return err
}
r.Header.SyncMarker = string(marker)
r.syncMarkerBytes = make([]byte, SyncSize)
copy(r.syncMarkerBytes, marker)
return nil
}
func (r *Reader) readMetadata() error {
r.clear()
b, err := r.consume(4)
if err != nil {
return err
}
pairs := int(binary.BigEndian.Uint32(b))
if pairs < 0 || pairs > 1024 {
return fmt.Errorf("sequencefile: invalid metadata pair count: %d", pairs)
}
metadata := make(map[string]string, pairs)
for i := 0; i < pairs; i++ {
key, err := r.readString()
if err != nil {
return err
}
value, err := r.readString()
if err != nil {
return err
}
metadata[key] = value
}
r.Header.Metadata = metadata
return nil
}
func (r *Reader) readString() (string, error) {
length, err := ReadVInt(r.reader)
if err != nil {
return "", err
}
r.clear()
b, err := r.consume(int(length))
if err != nil {
return "", err
}
return string(b), nil
}