From d2a0659337b09af5406c03428c9543d9e04265a2 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Wed, 29 Sep 2021 10:37:04 -0400 Subject: [PATCH] decode top-level array incrementally in zio/jsonio Reading a large, top-level JSON array with zio/jsonio.Reader is impractical because it decodes the full input on the first call to its Read method. Decode top-level arrays incrementally instead. --- zio/jsonio/reader.go | 64 +++++++++++++------------------------------- 1 file changed, 19 insertions(+), 45 deletions(-) diff --git a/zio/jsonio/reader.go b/zio/jsonio/reader.go index 24c6d65e52..5754cb665d 100644 --- a/zio/jsonio/reader.go +++ b/zio/jsonio/reader.go @@ -3,70 +3,44 @@ package jsonio import ( "bytes" "encoding/json" - "errors" - "fmt" "io" "github.com/brimdata/zed/zng" "github.com/brimdata/zed/zson" ) -const MaxReadBuffer = 25 * 1024 * 1024 - type Reader struct { zctx *zson.Context - reader io.Reader - objects []interface{} + decoder *json.Decoder } func NewReader(r io.Reader, zctx *zson.Context) (*Reader, error) { + d := json.NewDecoder(r) + // Prime d's buffer so we can check for an array. + d.More() + var b [1]byte + if n, _ := d.Buffered().Read(b[:]); n > 0 && b[0] == '[' { + // We have an array. Discard its opening "[" delimiter. + d.Token() + } return &Reader{ - zctx: zctx, - reader: r, + zctx: zctx, + decoder: d, }, nil } func (r *Reader) Read() (*zng.Record, error) { - if r.objects == nil { - b, err := io.ReadAll(io.LimitReader(r.reader, MaxReadBuffer)) - if err != nil { - if err == io.EOF { - err = nil - } - return nil, err - } - if len(b) == MaxReadBuffer { - return nil, fmt.Errorf("JSON input buffer size exceeded: %d", MaxReadBuffer) - } - var v interface{} - if err := json.Unmarshal(b, &v); err != nil { - return nil, err - } - if object, ok := v.(map[string]interface{}); ok { - r.objects = make([]interface{}, 0) - return r.parse(object) - } - a, ok := v.([]interface{}) - if !ok { - return nil, errors.New("JSON input is neither an object or array") - } - r.objects = a - } - if len(r.objects) == 0 { + if !r.decoder.More() { return nil, nil } - object := r.objects[0] - r.objects = r.objects[1:] - return r.parse(object) -} - -func (r *Reader) parse(v interface{}) (*zng.Record, error) { - object, ok := v.(map[string]interface{}) - if !ok { - object = make(map[string]interface{}) - object["value"] = v + var v interface{} + if err := r.decoder.Decode(&v); err != nil { + return nil, err + } + if _, ok := v.(map[string]interface{}); !ok { + v = map[string]interface{}{"value": v} } - b, err := json.Marshal(object) + b, err := json.Marshal(v) if err != nil { return nil, err }