From b947d5723d8d5cbb23fe33f4d19778da3d0dd410 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Fri, 29 Sep 2023 17:16:08 -0400 Subject: [PATCH] Avoid recorder for io.ReadSeeker in anyio.NewReaderWithOpts NewReaderWithOpts always creates a Recorder, which buffers its input and can cause format detection to fail if the buffer is too small. Avoid that problem when the passed reader implements io.ReadSeeker by modifying Track to create a Recorder only when the reader cannot seek. Closes #4586. --- zio/anyio/gzip.go | 10 ++++------ zio/anyio/reader.go | 17 ++++++++--------- zio/anyio/track.go | 31 +++++++++++++++++++++++++++++-- zio/anyio/ztests/huge.yaml | 23 +++++++++++++++++++++++ 4 files changed, 64 insertions(+), 17 deletions(-) create mode 100644 zio/anyio/ztests/huge.yaml diff --git a/zio/anyio/gzip.go b/zio/anyio/gzip.go index e5b09ece1b..2af21fb9b8 100644 --- a/zio/anyio/gzip.go +++ b/zio/anyio/gzip.go @@ -17,20 +17,18 @@ func GzipReader(r io.Reader) (io.Reader, error) { return rs, nil } } - recorder := NewRecorder(r) - track := NewTrack(recorder) + track := NewTrack(r) // gzip.NewReader blocks until it reads ten bytes. readGzipID only // reads two bytes. if !readGzipID(track) { - return recorder, nil + return track.Reader(), nil } track.Reset() _, err := gzip.NewReader(track) if err == nil { - // create a new reader from recorder (track keeps a copy of read data) - return gzip.NewReader(recorder) + return gzip.NewReader(track.Reader()) } - return recorder, nil + return track.Reader(), nil } // RFC 1952, Section 2.3.1 diff --git a/zio/anyio/reader.go b/zio/anyio/reader.go index dfd948f4bb..c2bc85b839 100644 --- a/zio/anyio/reader.go +++ b/zio/anyio/reader.go @@ -65,26 +65,25 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea vngErr = errors.New("vng: auto-detection requires seekable input") } - recorder := NewRecorder(r) - track := NewTrack(recorder) + track := NewTrack(r) arrowsErr := isArrowStream(track) if arrowsErr == nil { - return arrowio.NewReader(zctx, recorder) + return arrowio.NewReader(zctx, track.Reader()) } arrowsErr = fmt.Errorf("arrows: %w", arrowsErr) track.Reset() zeekErr := match(zeekio.NewReader(zed.NewContext(), track), "zeek", 1) if zeekErr == nil { - return zio.NopReadCloser(zeekio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(zeekio.NewReader(zctx, track.Reader())), nil } track.Reset() // ZJSON must come before JSON and ZSON since it is a subset of both. zjsonErr := match(zjsonio.NewReader(zed.NewContext(), track), "zjson", 1) if zjsonErr == nil { - return zio.NopReadCloser(zjsonio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(zjsonio.NewReader(zctx, track.Reader())), nil } track.Reset() @@ -93,13 +92,13 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea // sake of tests. jsonErr := match(jsonio.NewReader(zed.NewContext(), track), "json", 10) if jsonErr == nil { - return zio.NopReadCloser(jsonio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(jsonio.NewReader(zctx, track.Reader())), nil } track.Reset() zsonErr := match(zsonio.NewReader(zed.NewContext(), track), "zson", 1) if zsonErr == nil { - return zio.NopReadCloser(zsonio.NewReader(zctx, recorder)), nil + return zio.NopReadCloser(zsonio.NewReader(zctx, track.Reader())), nil } track.Reset() @@ -113,7 +112,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea // Close zngReader to ensure that it does not continue to call track.Read. zngReader.Close() if zngErr == nil { - return zngio.NewReaderWithOpts(zctx, recorder, opts.ZNG), nil + return zngio.NewReaderWithOpts(zctx, track.Reader(), opts.ZNG), nil } track.Reset() @@ -126,7 +125,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea track.Reset() csvErr = match(csvio.NewReader(zed.NewContext(), track, opts.CSV), "csv", 1) if csvErr == nil { - return zio.NopReadCloser(csvio.NewReader(zctx, recorder, opts.CSV)), nil + return zio.NopReadCloser(csvio.NewReader(zctx, track.Reader(), opts.CSV)), nil } } track.Reset() diff --git a/zio/anyio/track.go b/zio/anyio/track.go index 547d9d2fa2..aacdd09b1d 100644 --- a/zio/anyio/track.go +++ b/zio/anyio/track.go @@ -1,24 +1,51 @@ package anyio +import "io" + const TrackSize = InitBufferSize type Track struct { + rs io.ReadSeeker + initial int64 + recorder *Recorder off int } -func NewTrack(r *Recorder) *Track { +func NewTrack(r io.Reader) *Track { + if rs, ok := r.(io.ReadSeeker); ok { + if n, err := rs.Seek(0, io.SeekCurrent); err == nil { + return &Track{rs: rs, initial: n} + } + } return &Track{ - recorder: r, + recorder: NewRecorder(r), } } func (t *Track) Reset() { + if t.rs != nil { + // We ignore errors here under the assumption that a subsequent + // call to Read will also fail. + t.rs.Seek(t.initial, io.SeekStart) + return + } t.off = 0 } func (t *Track) Read(b []byte) (int, error) { + if t.rs != nil { + return t.rs.Read(b) + } n, err := t.recorder.ReadAt(t.off, b) t.off += n return n, err } + +func (t *Track) Reader() io.Reader { + if t.rs != nil { + t.Reset() + return t.rs + } + return t.recorder +} diff --git a/zio/anyio/ztests/huge.yaml b/zio/anyio/ztests/huge.yaml new file mode 100644 index 0000000000..de809b288b --- /dev/null +++ b/zio/anyio/ztests/huge.yaml @@ -0,0 +1,23 @@ +script: | + ! yes ' ' | head -c $((11 * 1024 * 1024)) > huge.zson + echo 0 >> huge.zson + zq -z huge.zson + ! cat huge.zson | zq -z - + +outputs: + - name: stdout + data: | + 0 + - name: stderr + data: | + stdio:stdin: format detection error + arrows: schema message length exceeds 1 MiB + csv: line 1: no comma found + json: buffer exceeded max size trying to infer input format + line: auto-detection not supported + parquet: auto-detection requires seekable input + vng: auto-detection requires seekable input + zeek: line 1: bad types/fields definition in zeek header + zjson: line 1: malformed ZJSON: bad type object: "": unpacker error parsing JSON: unexpected end of JSON input + zng: buffer exceeded max size trying to infer input format + zson: buffer exceeded max size trying to infer input format