-
Notifications
You must be signed in to change notification settings - Fork 0
/
dsvreader.go
269 lines (236 loc) · 5.33 KB
/
dsvreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
package dsvreader
import (
"bytes"
"errors"
"fmt"
"io"
)
// NewCSV returns new Reader that reads CSV data from r.
func NewCSV(r io.Reader) *Reader {
tr := &Reader{sep: ','}
tr.Reset(r)
return tr
}
// NewTSV returns new Reader that reads TSV data from r.
func NewTSV(r io.Reader) *Reader {
tr := &Reader{sep: '\t'}
tr.Reset(r)
return tr
}
// NewPSV returns new Reader that reads PSV data from r.
func NewPSV(r io.Reader) *Reader {
tr := &Reader{sep: '|'}
tr.Reset(r)
return tr
}
// NewCustom returns new Reader that reads arbitrary delimiter-separated data from r.
func NewCustom(sep byte, r io.Reader) *Reader {
tr := &Reader{sep: sep}
tr.Reset(r)
return tr
}
// Reader reads delimiter-separated data.
//
// Call NewCSV, NewTSV, NewPSV for creating new reader.
// Call Next before reading the next row.
//
// It is expected that columns are separated by delimiter while rows
// are separated by newlines.
type Reader struct {
r io.Reader
rb []byte
rErr error
rBuf [4 << 10]byte
col int
row int
rowBuf []byte
b []byte
scratch []byte
err error
sep byte
needUnescape bool
}
// Reset resets the reader for reading from r.
func (tr *Reader) Reset(r io.Reader) {
tr.r = r
tr.rb = nil
tr.rErr = nil
tr.col = 0
tr.row = 0
tr.rowBuf = nil
tr.b = nil
tr.scratch = tr.scratch[:0]
tr.err = nil
tr.needUnescape = false
}
// Error returns the last error.
func (tr *Reader) Error() error {
if tr.err == io.EOF {
return nil
}
return tr.err
}
// ResetError resets the current error, so the reader could proceed further.
func (tr *Reader) ResetError() {
tr.err = nil
}
// HasCols returns true if the current row contains unread columns.
//
// An empty row doesn't contain columns.
//
// This function may be used if stream contains rows with different
// number of colums.
func (tr *Reader) HasCols() bool {
return len(tr.rowBuf) > 0 && tr.b != nil
}
// Next advances to the next row.
//
// Returns true if the next row does exist.
//
// Next must be called after reading all the columns on the previous row.
// Check Error after Next returns false.
//
// HasCols may be used for reading rows with variable number of columns.
func (tr *Reader) Next() bool {
if tr.err != nil {
return false
}
if tr.HasCols() {
tr.err = fmt.Errorf("row #%d %q contains unread columns: %q", tr.row, tr.rowBuf, tr.b)
return false
}
tr.row++
tr.col = 0
tr.rowBuf = nil
for {
if len(tr.rb) == 0 {
// Read buffer is empty. Attempt to fill it.
if tr.rErr != nil {
tr.err = tr.rErr
if tr.err != io.EOF {
tr.err = fmt.Errorf("cannot read row #%d: %w", tr.row, tr.err)
} else if len(tr.scratch) > 0 {
tr.err = fmt.Errorf("cannot find newline at the end of row #%d; row: %q", tr.row, tr.scratch)
}
return false
}
n, err := tr.r.Read(tr.rBuf[:])
tr.rb = tr.rBuf[:n]
tr.needUnescape = bytes.IndexByte(tr.rb, '\\') >= 0
tr.rErr = err
}
// Search for the end of the current row.
n := bytes.IndexByte(tr.rb, '\n')
if n >= 0 {
// Fast path: the row has been found.
b := tr.rb[:n]
tr.rb = tr.rb[n+1:]
if len(tr.scratch) > 0 {
tr.scratch = append(tr.scratch, b...)
b = tr.scratch
tr.scratch = tr.scratch[:0]
}
tr.rowBuf = b
tr.b = tr.rowBuf
return true
}
// Slow path: cannot find the end of row.
// Append tr.rb to tr.scratch and repeat.
tr.scratch = append(tr.scratch, tr.rb...)
tr.rb = nil
}
}
// SkipCol skips the next column from the current row.
func (tr *Reader) SkipCol() {
if tr.err != nil {
return
}
if _, err := tr.nextCol(); err != nil {
tr.setColError("cannot skip column", err)
}
}
// Bytes returns the next bytes column value from the current row.
//
// The returned value is valid until the next call to Reader.
func (tr *Reader) Bytes() []byte {
if tr.err != nil {
return nil
}
b, err := tr.nextCol()
if err != nil {
tr.setColError("cannot read `bytes`", err)
return nil
}
if !tr.needUnescape {
return b // Fast path - nothing to unescape.
}
// Unescape b
n := bytes.IndexByte(b, '\\')
if n < 0 {
return b // Nothing to unescape in the current column.
}
// Slow path - in-place unescaping compatible with ClickHouse.
n++
d := b[:n]
b = b[n:]
for len(b) > 0 {
switch b[0] {
case 'b':
d[len(d)-1] = '\b'
case 'f':
d[len(d)-1] = '\f'
case 'r':
d[len(d)-1] = '\r'
case 'n':
d[len(d)-1] = '\n'
case 't':
d[len(d)-1] = '\t'
case '0':
d[len(d)-1] = 0
case '\'':
d[len(d)-1] = '\''
case '\\':
d[len(d)-1] = '\\'
default:
d[len(d)-1] = b[0]
}
b = b[1:]
n = bytes.IndexByte(b, '\\')
if n < 0 {
d = append(d, b...)
break
}
n++
d = append(d, b[:n]...)
b = b[n:]
}
return d
}
// String returns the next string column value from the current row.
//
// String allocates memory. Use Bytes to avoid memory allocations.
func (tr *Reader) String() string {
return string(tr.Bytes())
}
func (tr *Reader) nextCol() ([]byte, error) {
if tr.row == 0 {
return nil, errors.New("missing Next call")
}
tr.col++
if tr.b == nil {
return nil, errors.New("no more columns")
}
n := bytes.IndexByte(tr.b, tr.sep)
if n < 0 {
// last column
b := tr.b
tr.b = nil
return b, nil
}
b := tr.b[:n]
tr.b = tr.b[n+1:]
return b, nil
}
func (tr *Reader) setColError(msg string, err error) {
tr.err = fmt.Errorf("%s at row #%d, col #%d %q: %w", msg, tr.row, tr.col, tr.rowBuf, err)
}