Skip to content

Commit 4fe9aba

Browse files
jsonl_reader: fix to handle long lines
1 parent 98b2b1e commit 4fe9aba

File tree

1 file changed

+20
-12
lines changed

1 file changed

+20
-12
lines changed

jsonl_reader.go

+20-12
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
type JSONLReader struct {
15-
scanner *bufio.Scanner
15+
reader *bufio.Reader
1616
}
1717

1818
// Assert JSONLReader satisfies the interface processors.DataProcessor
@@ -21,26 +21,34 @@ var _ processors.DataProcessor = &JSONLReader{}
2121
// NewJSONLReader returns a new JSONLReader wrapping the given io.Reader object
2222
func NewJSONLReader(r io.Reader) *JSONLReader {
2323
return &JSONLReader{
24-
scanner: bufio.NewScanner(r),
24+
reader: bufio.NewReader(r),
2525
}
2626
}
2727

2828
func (r *JSONLReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) {
29-
var line []byte
30-
for r.scanner.Scan() {
31-
line = r.scanner.Bytes()
29+
// TODO: This allocates more than is necessary but at least it can handle large lines
30+
Outer:
31+
for {
32+
var line []byte
33+
for {
34+
chunk, isPrefix, err := r.reader.ReadLine()
35+
if err != nil {
36+
if err == io.EOF {
37+
break Outer
38+
}
39+
util.KillPipelineIfErr(err, killChan)
40+
}
41+
line = append(line, chunk...)
42+
if !isPrefix {
43+
break
44+
}
45+
}
3246

3347
if !json.Valid(line) {
3448
util.KillPipelineIfErr(errors.New("Not valid JSON"), killChan)
3549
}
3650

37-
// scanner.Bytes will overwrite our slice on the next iteration so we send a copy
38-
// to the output channel
39-
outputChan <- append([]byte(nil), line...)
40-
}
41-
42-
if err := r.scanner.Err(); err != nil {
43-
util.KillPipelineIfErr(err, killChan)
51+
outputChan <- line
4452
}
4553
}
4654

0 commit comments

Comments
 (0)