From 58dbccc8fb1fb06523f3a98d2c4e86fc7d8df271 Mon Sep 17 00:00:00 2001 From: Steve McCanne Date: Thu, 20 May 2021 15:02:07 -0700 Subject: [PATCH] simplify seek index writer and CLI arg (#2742) This commit simplifies the seek index writer by moving the logic for auto-stream-termination out of zngio and into segment.Writer. We also deleted the unused indexing code in zngio along with the "zed index seek" command and related support. As previously agreed among the team, the streamax argument to zed lake add and zed lake load was changed to seekstride and the units changed from records to bytes. Closes #2542 --- cli/outputflags/flags.go | 1 - cmd/zed/index/seek/command.go | 134 ------------- cmd/zed/lake/add/command.go | 18 +- cmd/zed/lake/load/command.go | 16 +- cmd/zed/main.go | 1 - index/ztests/seek.yaml | 25 --- index/ztests/tsindex.yaml | 23 --- lake/segment/writer.go | 116 +++++++----- lake/writer.go | 4 +- lake/ztests/consecutive-ts.yaml | 17 +- lake/ztests/seek-index-overlap.yaml | 8 +- lake/ztests/seek-index.yaml | 8 +- zio/zng_test.go | 6 +- zio/zngio/index.go | 230 ----------------------- zio/zngio/index_test.go | 141 -------------- zio/zngio/scanner_test.go | 3 +- zio/zngio/writer.go | 16 +- zio/zngio/ztests/streams-1.yaml | 62 ------ zio/zngio/ztests/streams-2.yaml | 41 ---- zio/zngio/ztests/type-reset.yaml | 10 - zio/zngio/ztests/zctx-alias-reset-2.yaml | 6 +- zio/zngio/ztests/zctx-alias-reset.yaml | 6 +- zio/zngio/ztests/zctx-reset.yaml | 7 +- 23 files changed, 130 insertions(+), 769 deletions(-) delete mode 100644 cmd/zed/index/seek/command.go delete mode 100644 index/ztests/seek.yaml delete mode 100644 index/ztests/tsindex.yaml delete mode 100644 zio/zngio/index.go delete mode 100644 zio/zngio/index_test.go delete mode 100644 zio/zngio/ztests/streams-1.yaml delete mode 100644 zio/zngio/ztests/streams-2.yaml delete mode 100644 zio/zngio/ztests/type-reset.yaml diff --git a/cli/outputflags/flags.go b/cli/outputflags/flags.go index 64b220a504..414b923699 100644 --- a/cli/outputflags/flags.go +++ b/cli/outputflags/flags.go @@ -37,7 +37,6 @@ func (f *Flags) setFlags(fs *flag.FlagSet) { fs.BoolVar(&f.Text.ShowTypes, "T", false, "display field types in text output") fs.BoolVar(&f.Text.ShowFields, "F", false, "display field names in text output") fs.BoolVar(&f.color, "color", true, "enable/disable color formatting for -Z and lake text output") - fs.IntVar(&f.Zng.StreamRecordsMax, "b", 0, "limit for number of records in each ZNG stream (0 for no limit)") fs.IntVar(&f.Zng.LZ4BlockSize, "znglz4blocksize", zngio.DefaultLZ4BlockSize, "LZ4 block size in bytes for ZNG compression (nonpositive to disable)") fs.IntVar(&f.ZSON.Pretty, "pretty", 4, diff --git a/cmd/zed/index/seek/command.go b/cmd/zed/index/seek/command.go deleted file mode 100644 index 4dce9c24ee..0000000000 --- a/cmd/zed/index/seek/command.go +++ /dev/null @@ -1,134 +0,0 @@ -package seek - -import ( - "errors" - "flag" - "fmt" - "os" - - zedindex "github.com/brimdata/zed/cmd/zed/index" - "github.com/brimdata/zed/expr" - "github.com/brimdata/zed/field" - "github.com/brimdata/zed/index" - "github.com/brimdata/zed/pkg/charm" - "github.com/brimdata/zed/pkg/fs" - "github.com/brimdata/zed/pkg/storage" - "github.com/brimdata/zed/zio/zngio" - "github.com/brimdata/zed/zng" - "github.com/brimdata/zed/zson" -) - -var Seek = &charm.Spec{ - Name: "seek", - Usage: "seek [-f framethresh] [ -o file ] [-v field] -k field file", - Short: "generate a seek-style index file for a zng file", - Long: ` -The seek command creates an index for the seek offsets of each -start-of-stream (sos) in a zng file. The key field is specified by -k and all -values in this field must be in ascending order. The seek offset of each sos -is stored as the field "offset" in the base layer of the output search index -unless overridden by -v. -It is an error if the values of the key field are not of uniform type. - -This is command is useful for creating time indexes for large zng logs where the -zng records are sorted by time.`, - New: newCommand, -} - -func init() { - zedindex.Cmd.Add(Seek) -} - -type Command struct { - *zedindex.Command - frameThresh int - outputFile string - keyField string - offsetField string -} - -func newCommand(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { - c := &Command{Command: parent.(*zedindex.Command)} - f.IntVar(&c.frameThresh, "f", 32*1024, "minimum frame size used in index file") - f.StringVar(&c.outputFile, "o", "index.zng", "name of index output file") - f.StringVar(&c.keyField, "k", "", "name of search key field") - f.StringVar(&c.offsetField, "v", "offset", "field name for seek offset in output index") - return c, nil -} - -func (c *Command) Run(args []string) error { - _, cleanup, err := c.Init() - if err != nil { - return err - } - defer cleanup() - //XXX no reason to limit this... fix later - if len(args) != 1 { - return errors.New("must specify a single zng input file containing keys and optional values") - } - file := os.Stdin - path := args[0] - if path != "-" { - var err error - file, err = fs.Open(path) - if err != nil { - return err - } - defer file.Close() - } - zctx := zson.NewContext() - reader := zngio.NewReader(file, zctx) - keys := field.DottedList(c.keyField) - writer, err := index.NewWriter(zctx, storage.NewLocalEngine(), c.outputFile, index.KeyFields(keys...), index.FrameThresh(c.frameThresh)) - if err != nil { - return err - } - close := true - defer func() { - if close { - writer.Close() - } - }() - readKey := expr.NewDotExpr(field.Dotted(c.keyField)) - var builder *zng.Builder - var keyType zng.Type - var offset int64 - // to skip to each sos, we read the first rec normally - // then call SkipStream and the bottmo of the for-loop. - rec, err := reader.Read() - for err == nil && rec != nil { - k, err := readKey.Eval(rec) - if err != nil || k.Type == nil || k.Bytes == nil { - // if the key doesn't exist or is unset, fail here - // XXX we should check that key order is ascending - return fmt.Errorf("key field is missing: %s", rec) - } - if builder == nil { - keyType = k.Type - cols := []zng.Column{ - {c.keyField, k.Type}, - {c.offsetField, zng.TypeInt64}, - } - typ, err := zctx.LookupTypeRecord(cols) - if err != nil { - return err - } - builder = zng.NewBuilder(typ) - } else if keyType != k.Type { - return fmt.Errorf("key type changed from %q to %q", keyType.ZSON(), k.Type.ZSON()) - } - offBytes := zng.EncodeInt(offset) - out := builder.Build(k.Bytes, offBytes) - if err := writer.Write(out); err != nil { - return err - } - rec, offset, err = reader.SkipStream() - } - if err != nil { - return err - } - // We do this little song and dance so we can return error on close - // but don't call close twice if we make it here. - close = false - return writer.Close() -} diff --git a/cmd/zed/lake/add/command.go b/cmd/zed/lake/add/command.go index d50dcebe1d..3355e3b656 100644 --- a/cmd/zed/lake/add/command.go +++ b/cmd/zed/lake/add/command.go @@ -11,6 +11,7 @@ import ( "github.com/brimdata/zed/pkg/charm" "github.com/brimdata/zed/pkg/rlimit" "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/pkg/units" "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zson" @@ -45,17 +46,20 @@ func init() { // TBD: add option to apply Zed program on add path? type Command struct { - lake *zedlake.Command - importStreamRecordMax int - commit bool - inputFlags inputflags.Flags + lake *zedlake.Command + seekStride units.Bytes + commit bool + inputFlags inputflags.Flags zedlake.CommitFlags } func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { - c := &Command{lake: parent.(*zedlake.Command)} + c := &Command{ + lake: parent.(*zedlake.Command), + seekStride: units.Bytes(lake.SeekIndexStride), + } f.BoolVar(&c.commit, "commit", false, "commit added data if successfully written") - f.IntVar(&c.importStreamRecordMax, "streammax", lake.ImportStreamRecordsMax, "limit for number of records in each ZNG stream (0 for no limit)") + f.Var(&c.seekStride, "seekstride", "size of seek-index unit for ZNG data, as '32KB', '1MB', etc.") c.inputFlags.SetFlags(f) c.CommitFlags.SetFlags(f) return c, nil @@ -70,7 +74,7 @@ func (c *Command) Run(args []string) error { if len(args) == 0 { return errors.New("zed lake add: at least one input file must be specified (- for stdin)") } - lake.ImportStreamRecordsMax = c.importStreamRecordMax + lake.SeekIndexStride = int(c.seekStride) if _, err := rlimit.RaiseOpenFilesLimit(); err != nil { return err } diff --git a/cmd/zed/lake/load/command.go b/cmd/zed/lake/load/command.go index 7aec0ce334..4c1fdf0abb 100644 --- a/cmd/zed/lake/load/command.go +++ b/cmd/zed/lake/load/command.go @@ -12,6 +12,7 @@ import ( "github.com/brimdata/zed/pkg/charm" "github.com/brimdata/zed/pkg/rlimit" "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/pkg/units" "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zson" @@ -33,17 +34,20 @@ func init() { } type Command struct { - lake *zedlake.Command - importStreamRecordMax int - commit bool + lake *zedlake.Command + seekStride units.Bytes + commit bool zedlake.CommitFlags procFlags procflags.Flags inputFlags inputflags.Flags } func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { - c := &Command{lake: parent.(*zedlake.Command)} - f.IntVar(&c.importStreamRecordMax, "streammax", lake.ImportStreamRecordsMax, "limit for number of records in each ZNG stream (0 for no limit)") + c := &Command{ + lake: parent.(*zedlake.Command), + seekStride: units.Bytes(lake.SeekIndexStride), + } + f.Var(&c.seekStride, "seekstride", "size of seek-index unit for ZNG data, as '32KB', '1MB', etc.") c.CommitFlags.SetFlags(f) c.inputFlags.SetFlags(f) c.procFlags.SetFlags(f) @@ -59,7 +63,7 @@ func (c *Command) Run(args []string) error { if len(args) == 0 { return errors.New("zed lake load: at least one input file must be specified (- for stdin)") } - lake.ImportStreamRecordsMax = c.importStreamRecordMax + lake.SeekIndexStride = int(c.seekStride) if _, err := rlimit.RaiseOpenFilesLimit(); err != nil { return err } diff --git a/cmd/zed/main.go b/cmd/zed/main.go index 26c22415a2..4db6662cfd 100644 --- a/cmd/zed/main.go +++ b/cmd/zed/main.go @@ -20,7 +20,6 @@ import ( _ "github.com/brimdata/zed/cmd/zed/index/create" _ "github.com/brimdata/zed/cmd/zed/index/lookup" _ "github.com/brimdata/zed/cmd/zed/index/section" - _ "github.com/brimdata/zed/cmd/zed/index/seek" "github.com/brimdata/zed/cmd/zed/lake" _ "github.com/brimdata/zed/cmd/zed/lake/add" _ "github.com/brimdata/zed/cmd/zed/lake/commit" diff --git a/index/ztests/seek.yaml b/index/ztests/seek.yaml deleted file mode 100644 index 3ee14a6baf..0000000000 --- a/index/ztests/seek.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# create a zng with sos every two records then index it as a time index -script: | - zq -b 2 - | zed index seek -f 100 -o index.zng -k ts - - zq -z index.zng - -inputs: - - name: stdin - data: | - {ts:1970-01-01T00:16:40Z,foo:"a"} - {ts:1970-01-01T00:16:41Z,foo:"b"} - {ts:1970-01-01T00:16:42Z,foo:"c"} - {ts:1970-01-01T00:16:43Z,foo:"d"} - {ts:1970-01-01T00:16:44Z,foo:"e"} - {ts:1970-01-01T00:16:45Z,foo:"f"} - {ts:1970-01-01T00:16:46Z,foo:"g"} - {ts:1970-01-01T00:16:47Z,foo:"h"} - -outputs: - - name: stdout - data: | - {ts:1970-01-01T00:16:40Z,offset:0} - {ts:1970-01-01T00:16:42Z,offset:34} - {ts:1970-01-01T00:16:44Z,offset:68} - {ts:1970-01-01T00:16:46Z,offset:102} - {magic:"zed_index",version:2 (int32),descending:false,child_field:"_child",frame_thresh:100 (int32),sections:[58],keys:null (0=({ts:time}))} (=1) diff --git a/index/ztests/tsindex.yaml b/index/ztests/tsindex.yaml deleted file mode 100644 index 8a8eb3cb9a..0000000000 --- a/index/ztests/tsindex.yaml +++ /dev/null @@ -1,23 +0,0 @@ -script: | - # index ts every 10 records - zq -b 10 babble.zson | zed index create -S -o index -k ts - - # exact lookup for this particular ts - zed index lookup -z -k 2020-04-21T23:42:11.06754599Z index - echo === - # exact lookup for an absent ts - zed index lookup -z -k 2020-04-21T23:42:11.06754600Z index - echo === - # closest lookup for the absent ts - zed index lookup -c -z -k 2020-04-21T23:42:11.06754600Z index - -inputs: - - name: babble.zson - source: ../../testdata/babble.zson - -outputs: - - name: stdout - data: | - {key:2020-04-21T23:42:11.06754599Z} - === - === - {key:2020-04-21T23:42:11.06754599Z} diff --git a/lake/segment/writer.go b/lake/segment/writer.go index 14e2c88a75..dc51c97e7b 100644 --- a/lake/segment/writer.go +++ b/lake/segment/writer.go @@ -17,82 +17,100 @@ import ( // Writer is a zbuf.Writer that writes a stream of sorted records into a // data segment. type Writer struct { - ref *Reference - byteCounter *writeCounter - count uint64 - rowObject *zngio.Writer - firstKey zng.Value - lastKey zng.Value - needSeekWrite bool - order order.Which - seekIndex *seekindex.Writer - seekIndexCloser io.Closer - first bool - poolKey field.Path + ref *Reference + byteCounter *writeCounter + count uint64 + rowObject *zngio.Writer + firstKey zng.Value + lastKey zng.Value + lastSOS int64 + order order.Which + seekIndex *seekindex.Writer + seekIndexCloser io.Closer + seekIndexStride int + seekIndexTrigger int + first bool + poolKey field.Path } -func (r *Reference) NewWriter(ctx context.Context, engine storage.Engine, path *storage.URI, o order.Which, poolKey field.Path, seekIndexFactor int) (*Writer, error) { +// NewWriter returns a writer for writing the data of a zng-row storage object as +// well as optionally creating a seek index for the row object when the +// seekIndexStride is non-zero. We assume all records are non-volatile until +// Close as zng.Values from the various record bodies are referenced across +// calls to Write. +func (r *Reference) NewWriter(ctx context.Context, engine storage.Engine, path *storage.URI, o order.Which, poolKey field.Path, seekIndexStride int) (*Writer, error) { out, err := engine.Put(ctx, r.RowObjectPath(path)) if err != nil { return nil, err } counter := &writeCounter{bufwriter.New(out), 0} writer := zngio.NewWriter(counter, zngio.WriterOpts{ - StreamRecordsMax: seekIndexFactor, - LZ4BlockSize: zngio.DefaultLZ4BlockSize, + LZ4BlockSize: zngio.DefaultLZ4BlockSize, }) - seekOut, err := engine.Put(ctx, r.SeekObjectPath(path)) - if err != nil { - return nil, err + w := &Writer{ + ref: r, + byteCounter: counter, + rowObject: writer, + order: o, + first: true, + poolKey: poolKey, } - opts := zngio.WriterOpts{ - //LZ4BlockSize: zngio.DefaultLZ4BlockSize, + if seekIndexStride != 0 { + w.seekIndexStride = seekIndexStride + seekOut, err := engine.Put(ctx, r.SeekObjectPath(path)) + if err != nil { + return nil, err + } + opts := zngio.WriterOpts{ + //LZ4BlockSize: zngio.DefaultLZ4BlockSize, + } + seekWriter := zngio.NewWriter(bufwriter.New(seekOut), opts) + w.seekIndex = seekindex.NewWriter(seekWriter) + w.seekIndexCloser = seekWriter } - seekWriter := zngio.NewWriter(bufwriter.New(seekOut), opts) - return &Writer{ - ref: r, - byteCounter: counter, - rowObject: writer, - seekIndex: seekindex.NewWriter(seekWriter), - seekIndexCloser: seekWriter, - order: o, - first: true, - poolKey: poolKey, - }, nil + return w, nil } func (w *Writer) Write(rec *zng.Record) error { - // We want to index the start of stream (SOS) position of the data file by - // record timestamps; we don't know when we've started a new stream until - // after we have written the first record in the stream. - sos := w.rowObject.LastSOS() - if err := w.rowObject.Write(rec); err != nil { - return err - } key, err := rec.Deref(w.poolKey) if err != nil { key = zng.Value{zng.TypeNull, nil} } - if w.first { - w.first = false - w.firstKey = key - if err := w.seekIndex.Write(key, sos); err != nil { + if w.seekIndex != nil { + if err := w.writeIndex(key); err != nil { return err } - } else if w.needSeekWrite && (w.lastKey.Bytes == nil || !bytes.Equal(key.Bytes, w.lastKey.Bytes)) { - if err := w.seekIndex.Write(key, sos); err != nil { - return err - } - w.needSeekWrite = false } - if w.rowObject.LastSOS() != sos { - w.needSeekWrite = true + if err := w.rowObject.Write(rec); err != nil { + return err } w.lastKey = key w.count++ return nil } +func (w *Writer) writeIndex(key zng.Value) error { + w.seekIndexTrigger += len(key.Bytes) + if w.first { + w.first = false + w.firstKey = key + w.lastKey = key + return w.seekIndex.Write(key, 0) + } + if w.seekIndexTrigger < w.seekIndexStride || bytes.Equal(key.Bytes, w.lastKey.Bytes) { + return nil + } + if err := w.rowObject.EndStream(); err != nil { + return err + } + pos := w.rowObject.Position() + if err := w.seekIndex.Write(key, pos); err != nil { + return err + } + w.seekIndexTrigger = 0 + return nil +} + // Abort is called when an error occurs during write. Errors are ignored // because the write error will be more informative and should be returned. func (w *Writer) Abort() { diff --git a/lake/writer.go b/lake/writer.go index 12c858dce8..57f2292c1a 100644 --- a/lake/writer.go +++ b/lake/writer.go @@ -17,7 +17,7 @@ import ( ) var ( - ImportStreamRecordsMax = zngio.DefaultStreamRecordsMax + SeekIndexStride = 64 * 1024 // For unit testing. importLZ4BlockSize = zngio.DefaultLZ4BlockSize @@ -140,7 +140,7 @@ func (w *Writer) writeObject(seg *segment.Reference, recs []*zng.Record) error { if err != nil { seg.Last = zng.Value{zng.TypeNull, nil} } - writer, err := seg.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.Layout.Order, key, ImportStreamRecordsMax) + writer, err := seg.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.Layout.Order, key, SeekIndexStride) if err != nil { return err } diff --git a/lake/ztests/consecutive-ts.yaml b/lake/ztests/consecutive-ts.yaml index 9a7c56764f..b131315207 100644 --- a/lake/ztests/consecutive-ts.yaml +++ b/lake/ztests/consecutive-ts.yaml @@ -1,16 +1,23 @@ script: | export ZED_LAKE_ROOT=test zed lake init -q - zed lake create -q -p logs - zed lake load -q -p logs -streammax 2 - + zed lake create -q -p logs -orderby ts:desc + zed lake load -q -p logs -seekstride 11B in.zson zq -z test/*/D/*-seek.zng inputs: - - name: stdin + - name: in.zson data: | {ts:1970-01-01T00:00:00Z} {ts:1970-01-01T00:00:02Z} {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:03Z} + {ts:1970-01-01T00:00:03Z} + {ts:1970-01-01T00:00:03Z} + {ts:1970-01-01T00:00:03Z} {ts:1970-01-01T00:00:03Z} {ts:1970-01-01T00:00:03Z} {ts:1970-01-01T00:00:03Z} @@ -29,5 +36,5 @@ outputs: data: | {key:1970-01-01T00:00:08Z,offset:0} {key:1970-01-01T00:00:06Z,offset:23} - {key:1970-01-01T00:00:02Z,offset:69} - {key:1970-01-01T00:00:00Z,offset:90} + {key:1970-01-01T00:00:02Z,offset:72} + {key:1970-01-01T00:00:00Z,offset:108} diff --git a/lake/ztests/seek-index-overlap.yaml b/lake/ztests/seek-index-overlap.yaml index 385e2db4f3..ed5f2cbf69 100644 --- a/lake/ztests/seek-index-overlap.yaml +++ b/lake/ztests/seek-index-overlap.yaml @@ -3,12 +3,12 @@ script: | zed lake init -q zed lake create -p asc -orderby ts:asc -q zed lake create -p desc -orderby ts:desc -q - zq "tail 900" babble.zson | zed lake load -p asc -streammax=100 -q - - zq "head 250" babble.zson | zed lake load -p asc -streammax=100 -q - + zq "tail 900" babble.zson | zed lake load -p asc -seekstride=2000B -q - + zq "head 250" babble.zson | zed lake load -p asc -seekstride=2000B -q - zed lake query -z -s "from asc | count()" echo === | tee /dev/stderr - zq "tail 900" babble.zson | zed lake load -p desc -streammax=100 -q - - zq "head 250" babble.zson | zed lake load -p desc -streammax=100 -q - + zq "tail 900" babble.zson | zed lake load -p desc -seekstride=2000B -q - + zq "head 250" babble.zson | zed lake load -p desc -seekstride=2000B -q - zed lake query -z -s "from desc | count()" inputs: diff --git a/lake/ztests/seek-index.yaml b/lake/ztests/seek-index.yaml index 8c23f3f9e4..8e292113a7 100644 --- a/lake/ztests/seek-index.yaml +++ b/lake/ztests/seek-index.yaml @@ -2,11 +2,11 @@ script: | export ZED_LAKE_ROOT=test zed lake init -q zed lake create -p asc -orderby ts:asc -q - zed lake load -q -streammax=100 -p asc babble.zson + zed lake load -q -seekstride=2KB -p asc babble.zson zed lake query -z -s "from asc over 2020-04-21T23:59:26.063Z to 2020-04-21T23:59:38.069Z" echo === | tee /dev/stderr zed lake create -p desc -orderby ts:desc -q - zed lake load -q -streammax=100 -p desc babble.zson + zed lake load -q -seekstride=2KB -p desc babble.zson zed lake query -z -s "from desc over 2020-04-21T23:59:26.063Z to 2020-04-21T23:59:38.069Z" inputs: @@ -25,8 +25,8 @@ outputs: {ts:2020-04-21T23:59:26.06326664Z,s:"potbellied-Dedanim",v:230} - name: stderr data: | - data opened: 6547 + data opened: 16401 data read: 87 === - data opened: 6547 + data opened: 16404 data read: 87 diff --git a/zio/zng_test.go b/zio/zng_test.go index 7b35dd1a33..411215ab8b 100644 --- a/zio/zng_test.go +++ b/zio/zng_test.go @@ -185,8 +185,7 @@ func TestStreams(t *testing.T) { r := zson.NewReader(strings.NewReader(in), zson.NewContext()) var out Output zw := zngio.NewWriter(&out, zngio.WriterOpts{ - StreamRecordsMax: 2, - LZ4BlockSize: zngio.DefaultLZ4BlockSize, + LZ4BlockSize: zngio.DefaultLZ4BlockSize, }) var recs []*zng.Record @@ -198,6 +197,9 @@ func TestStreams(t *testing.T) { } require.NoError(t, zw.Write(rec)) recs = append(recs, rec.Keep()) + if len(recs)%2 == 0 { + require.NoError(t, zw.EndStream()) + } } zr := zngio.NewReader(bytes.NewReader(out.Buffer.Bytes()), zson.NewContext()) diff --git a/zio/zngio/index.go b/zio/zngio/index.go deleted file mode 100644 index 2f1333bf72..0000000000 --- a/zio/zngio/index.go +++ /dev/null @@ -1,230 +0,0 @@ -package zngio - -import ( - "errors" - "io" - "os" - "sync" - - "github.com/brimdata/zed/pkg/nano" - "github.com/brimdata/zed/zio" - "github.com/brimdata/zed/zng" - "github.com/brimdata/zed/zson" -) - -type Ordering int - -const ( - OrderUnknown Ordering = iota - OrderAscending - OrderDescending - OrderUnsorted -) - -type TimeIndex struct { - mu sync.Mutex - order Ordering - index []mark - indexReady bool -} - -type mark struct { - Ts nano.Ts - Offset int64 -} - -// NewIndex creates a new Index object, which is the container that holds -// the in-memory index for a (b)zng file. The first call to NewReader() -// will return a reader that scans the entire file, building a time-based -// index in the process, subsequent readers can use this index to read -// only the relevant zng streams from the underlying file. -func NewTimeIndex() *TimeIndex { - return &TimeIndex{} -} - -// Create a new reader for the given zng file. Only records with timestamps -// that fall within the time range indicated by span will be emitted by -// the returned Reader object. -func (ti *TimeIndex) NewReader(f *os.File, zctx *zson.Context, span nano.Span) (zio.ReadCloser, error) { - ti.mu.Lock() - defer ti.mu.Unlock() - - if ti.indexReady { - return newRangeReader(f, zctx, ti.order, ti.index, span) - } - - return &indexReader{ - Reader: *NewReader(f, zctx), - Closer: f, - start: span.Ts, - end: span.End(), - parent: ti, - }, nil -} - -// indexReader is a zbuf.Reader that also builds an index as it reads. -type indexReader struct { - Reader - io.Closer - start nano.Ts - end nano.Ts - parent *TimeIndex - order Ordering - marks []mark - lastSOS int64 - lastTs nano.Ts - lastIndexedTs nano.Ts -} - -func (i *indexReader) Read() (*zng.Record, error) { - for { - rec, err := i.readOne() - if err != nil { - return nil, err - } - - if rec == nil { - i.parent.mu.Lock() - defer i.parent.mu.Unlock() - i.parent.order = i.order - i.parent.index = i.marks - i.parent.indexReady = true - return nil, nil - } - - if rec.Ts() < i.start { - continue - } - if rec.Ts() <= i.end { - return rec, nil - } - - // This record falls after the end of the requested time - // span. Spin through this loop until we hit EOF anyway - // to finish building the index. - // XXX this will be wasteful if small ranges near the - // start of the file is all that is ever read. revisit this... - } -} - -func (i *indexReader) readOne() (*zng.Record, error) { - rec, err := i.Reader.Read() - if err != nil || rec == nil { - return nil, err - } - - if i.lastTs != 0 { - switch i.order { - case OrderUnknown: - if rec.Ts() > i.lastTs { - i.order = OrderAscending - } else if rec.Ts() < i.lastTs { - i.order = OrderDescending - } - case OrderAscending: - if rec.Ts() < i.lastTs { - i.order = OrderUnsorted - } - case OrderDescending: - if rec.Ts() > i.lastTs { - i.order = OrderUnsorted - } - } - } - i.lastTs = rec.Ts() - - sos := i.Reader.LastSOS() - if sos != i.lastSOS { - i.lastSOS = sos - ts := rec.Ts() - if ts != i.lastIndexedTs { - i.lastIndexedTs = ts - i.marks = append(i.marks, mark{ts, sos}) - } - } - - return rec, nil -} - -// rangeReader is a wrapper around zngio.Reader that uses an in-memory -// index to reduce the I/O needed to get matching records when reading a -// large zng file that includes sub-streams and a nano.Span that refers -// to a smaller time range within the file. -type rangeReader struct { - Reader - io.Closer - order Ordering - start nano.Ts - end nano.Ts - nread uint64 -} - -func newRangeReader(f *os.File, zctx *zson.Context, order Ordering, index []mark, span nano.Span) (*rangeReader, error) { - var off int64 - - if order == OrderAscending || order == OrderDescending { - // Find the stream within the zng file that holds the - // start time. For a large index this could be optimized - // with a binary search. - for _, mark := range index { - if order == OrderAscending && mark.Ts > span.Ts { - break - } - if order == OrderDescending && mark.Ts < span.End() { - break - } - off = mark.Offset - } - } - - if off > 0 { - newoff, err := f.Seek(off, io.SeekStart) - if err != nil { - return nil, err - } - if newoff != int64(off) { - return nil, errors.New("file truncated") //XXX - } - } - return &rangeReader{ - Reader: *NewReader(f, zctx), - Closer: f, - order: order, - start: span.Ts, - end: span.End(), - }, nil -} - -func (r *rangeReader) Read() (*zng.Record, error) { - for { - rec, err := r.Reader.Read() - if err != nil { - return nil, err - } - r.nread++ - if rec != nil { - switch r.order { - case OrderAscending: - if rec.Ts() < r.start { - continue - } - if rec.Ts() > r.end { - rec = nil - } - case OrderDescending: - if rec.Ts() > r.end { - continue - } - if rec.Ts() < r.start { - rec = nil - } - } - } - return rec, nil - } -} - -// Used from tests -func (r *rangeReader) reads() uint64 { - return r.nread -} diff --git a/zio/zngio/index_test.go b/zio/zngio/index_test.go deleted file mode 100644 index 5144107ddd..0000000000 --- a/zio/zngio/index_test.go +++ /dev/null @@ -1,141 +0,0 @@ -package zngio - -import ( - "os" - "path/filepath" - "strings" - "testing" - - "github.com/brimdata/zed/pkg/fs" - "github.com/brimdata/zed/pkg/nano" - "github.com/brimdata/zed/zio" - "github.com/brimdata/zed/zson" - "github.com/stretchr/testify/require" -) - -var records = []string{ - "{ts:2020-04-14T17:42:40Z,value:0}", - "{ts:2020-04-14T17:42:41Z,value:1}", - "{ts:2020-04-14T17:42:42Z,value:2}", - "{ts:2020-04-14T17:42:43Z,value:3}", - "{ts:2020-04-14T17:42:44Z,value:4}", - "{ts:2020-04-14T17:42:45Z,value:5}", - "{ts:2020-04-14T17:42:46Z,value:6}", - "{ts:2020-04-14T17:42:47Z,value:7}", - "{ts:2020-04-14T17:42:48Z,value:8}", - "{ts:2020-04-14T17:42:49Z,value:9}", -} - -// Parameters used for testing. Note that in the zng data above, -// indexed with a stream size of 2 records, this time span will straddle -// 3 streams with only part of the first and last stream falling inside -// the time range. -const startTime = "1586886163" -const endTime = "1586886166" - -// The guts of the test. r must be a reader allocated from a -// TimeIndex with the contents above and a time span delimited by -// startTime and endTime as defined above. First verifies that calling Read() -// repeatedly gives just the records that fall within the requested time -// span. Then, if checkReads is true, verify that the total records read -// from disk is just enough to cover the time span, and not the entire -// file (with streams of 2 records each and parts of 3 streams being -// inside the time span, a total of 6 records should be read). -func checkReader(t *testing.T, r zio.Reader, expected []int, checkReads bool) { - for _, expect := range expected { - rec, err := r.Read() - require.NoError(t, err) - - require.NotNil(t, rec) - v, err := rec.AccessInt("value") - require.NoError(t, err) - - require.Equal(t, int64(expect), v, "Got expected record value") - } - - rec, err := r.Read() - require.NoError(t, err) - require.Nil(t, rec, "Reached eof after last record in time span") - - if checkReads { - rr, ok := r.(*rangeReader) - require.True(t, ok, "Can get read stats from index reader") - require.LessOrEqual(t, rr.reads(), uint64(6), "Indexed reader did not read the entire file") - } -} - -func TestZngIndex(t *testing.T) { - // Create a time span that hits parts of different streams - // from within the zng file. - start, err := nano.ParseTs(startTime) - require.NoError(t, err) - end, err := nano.ParseTs(endTime) - - require.NoError(t, err) - span := nano.NewSpanTs(start, end) - - dotest := func(input, fname string, expected []int) { - // create a test zng file - reader := zson.NewReader(strings.NewReader(input), zson.NewContext()) - fp, err := os.Create(fname) - require.NoError(t, err) - - writer := NewWriter(fp, WriterOpts{StreamRecordsMax: 2}) - - for { - rec, err := reader.Read() - require.NoError(t, err) - if rec == nil { - break - } - - err = writer.Write(rec) - require.NoError(t, err) - } - require.NoError(t, writer.Close()) // Also closes fp. - - index := NewTimeIndex() - - // First time we read the file we don't have an index, but a - // search with a time span should still only return results - // from the span. - fp, err = fs.Open(fname) - require.NoError(t, err) - ireader, err := index.NewReader(fp, zson.NewContext(), span) - require.NoError(t, err) - - checkReader(t, ireader, expected, false) - err = fp.Close() - require.NoError(t, err) - - // Second time through, should get the same results, this time - // verify that we didn't read the whole file. - fp, err = fs.Open(fname) - require.NoError(t, err) - ireader, err = index.NewReader(fp, zson.NewContext(), span) - require.NoError(t, err) - - checkReader(t, ireader, expected, true) - err = fp.Close() - require.NoError(t, err) - } - - dir := t.TempDir() - - // Test once with ascending timestamps - t.Run("IndexAscending", func(t *testing.T) { - fname := filepath.Join(dir, "ascend") - input := strings.Join(records, "\n") - dotest(input, fname, []int{3, 4, 5, 6}) - }) - - // And test again with descending timestamps - t.Run("IndexDescending", func(t *testing.T) { - fname := filepath.Join(dir, "descend") - var buf strings.Builder - for i := len(records) - 1; i >= 0; i-- { - buf.WriteString(records[i]) - } - dotest(buf.String(), fname, []int{6, 5, 4, 3}) - }) -} diff --git a/zio/zngio/scanner_test.go b/zio/zngio/scanner_test.go index 864c6f857d..b9c25107f1 100644 --- a/zio/zngio/scanner_test.go +++ b/zio/zngio/scanner_test.go @@ -30,9 +30,10 @@ func TestScannerContext(t *testing.T) { require.NoError(t, err) var buf bytes.Buffer w := NewWriter(zio.NopCloser(&buf), WriterOpts{}) - for j := 0; j < DefaultStreamRecordsMax; j++ { + for j := 0; j < 100; j++ { require.NoError(t, w.Write(rec)) } + require.NoError(t, w.EndStream()) require.NoError(t, w.Close()) bufs = append(bufs, buf.Bytes()) } diff --git a/zio/zngio/writer.go b/zio/zngio/writer.go index 8764f5a4c2..d2ed37e5ec 100644 --- a/zio/zngio/writer.go +++ b/zio/zngio/writer.go @@ -11,8 +11,6 @@ import ( const ( // DefaultLZ4BlockSize is a reasonable default for WriterOpts.LZ4BlockSize. DefaultLZ4BlockSize = 16 * 1024 - // DefaultStreamRecordsMax is a reasonable default for WriterOpts.StreamRecordsMax. - DefaultStreamRecordsMax = 5000 ) type Writer struct { @@ -23,13 +21,11 @@ type Writer struct { encoder *Encoder buffer []byte - lastSOS int64 streamRecords int } type WriterOpts struct { - StreamRecordsMax int - LZ4BlockSize int + LZ4BlockSize int } func NewWriter(w io.WriteCloser, opts WriterOpts) *Writer { @@ -93,14 +89,9 @@ func (w *Writer) EndStream() error { if err := w.writeUncompressed([]byte{zng.CtrlEOS}); err != nil { return err } - w.lastSOS = w.Position() return nil } -func (w *Writer) LastSOS() int64 { - return w.lastSOS -} - func (w *Writer) Write(r *zng.Record) error { // First send any typedefs for unsent types. typ := w.encoder.Lookup(r.Type) @@ -149,11 +140,6 @@ func (w *Writer) Write(r *zng.Record) error { return err } w.streamRecords++ - if max := w.opts.StreamRecordsMax; max > 0 && w.streamRecords >= max { - if err := w.EndStream(); err != nil { - return err - } - } return nil } diff --git a/zio/zngio/ztests/streams-1.yaml b/zio/zngio/ztests/streams-1.yaml deleted file mode 100644 index 4298245040..0000000000 --- a/zio/zngio/ztests/streams-1.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# Test frame markers inserted into a zng frame (including testing -# that type definitions are repeated as needed). - -zed: '*' - -input: | - {_path:"a",ts:1970-01-01T00:00:10Z,d:1.} - {_path:"xyz",ts:1970-01-01T00:00:20Z,d:1.5} - -output-flags: -f zng -b 1 - -outputHex: | - # define a record with 3 columns - f6 03 - # first column name is _path (len 5) - 05 5f 70 61 74 68 - # first column type is string (16) - 10 - # second column name is ts (len 2) - 02 74 73 - # second column type is time (10) - 09 - # third column name is d (len 1) - 01 64 - # third column type is float64 (12) - 0c - # value using type id 23 (0x17), the record defined above - # total length of this recor is 17 bytes (0x11) - 17 11 - # first column is a primitive value, 2 total bytes - 04 - # value of the first column is the string "a" - 61 - # second column is a primitive value, 6 total bytes - 0c - # time value is encoded in nanoseconds shifted one bit left - # 2000000000 == 0x04a817c800 - 00 c8 17 a8 04 - # third column is a primitive value, 9 total bytes - 12 - # 8 bytes of float64 data representing 1.0 - 00 00 00 00 00 00 f0 3f - # end of stream - ff - # new frame: repeat the record type definition - f6 03 - 05 5f 70 61 74 68 - 10 - 02 74 73 - 09 - 01 64 - 0c - # another encoded value using the same record definition as before - 17 13 - # first column: primitive value of 4 total byte, values xyz - 08 78 79 7a - # second column: primitive value of 20 (converted to nanoseconds, encoded <<1) - 0c 00 90 2f 50 09 - # third column, primitive value of 9 total bytes, float64 1.5 - 12 00 00 00 00 00 00 f8 3f - # end of stream - ff diff --git a/zio/zngio/ztests/streams-2.yaml b/zio/zngio/ztests/streams-2.yaml deleted file mode 100644 index 54c8bd0b50..0000000000 --- a/zio/zngio/ztests/streams-2.yaml +++ /dev/null @@ -1,41 +0,0 @@ -# Test that type ids are not re-used across zng streams. - -zed: '*' - -input: | - {s:"a"} - {i:1 (int32)} (=0) - {i:2} (0) - -output-flags: -f zng -b 2 -znglz4blocksize=0 - -outputHex: | - # define the record corresponding to type 0 above: 1 col, name s, type string - f6 01 - 01 73 - 10 - # value using type id 23 (0x17), equivalent to type 0 in the tzng source - # total length of this record is 2 bytes - 17 02 - # first column is a primitive value, 2 total bytes, the string "a" - 04 61 - # define the record corresponding to type 1: 1 col, name i, type int32 - f6 01 - 01 69 - 06 - # value using type id 24 (0x18), corresponding to type 1 in tzng - 18 02 - # consists of one primitive value, representing the integer 1 - 04 02 - # end of stream - ff - # new frame: repeat the type definition for type 1 - f6 01 - 01 69 - 06 - # value using the new type definition, since this is a new stream, - # the value should use type id 23, not 24 as was used in the first stream - 17 02 - 04 04 - # end of stream - ff diff --git a/zio/zngio/ztests/type-reset.yaml b/zio/zngio/ztests/type-reset.yaml deleted file mode 100644 index df312fba33..0000000000 --- a/zio/zngio/ztests/type-reset.yaml +++ /dev/null @@ -1,10 +0,0 @@ -zed: '*' - -input: | - {_path:"a",ts:1970-01-01T00:00:10Z,d:1.} - {_path:"xyz",ts:1970-01-01T00:00:20Z,d:1.5} - -output-flags: -f zng -b 2 - -output: !!binary | - 9gMFX3BhdGgQAnRzCQFkDBcRBGEMAMgXqAQSAAAAAAAA8D8XEwh4eXoMAJAvUAkSAAAAAAAA+D// diff --git a/zio/zngio/ztests/zctx-alias-reset-2.yaml b/zio/zngio/ztests/zctx-alias-reset-2.yaml index 62287e4cc3..df40a446e8 100644 --- a/zio/zngio/ztests/zctx-alias-reset-2.yaml +++ b/zio/zngio/ztests/zctx-alias-reset-2.yaml @@ -1,10 +1,12 @@ # Test that type aliases are properly reset and reusable after stream boundaries script: | - zq -b 2 - | zq -z "count() by proto" - + zq "head 1" in.zson > t1.zng + zq "tail 2" in.zson > t2.zng + cat t1.zng t2.zng | zq -z "count() by proto" - inputs: - - name: stdin + - name: in.zson data: | {ts:2015-03-05T14:25:12.963801Z} {ts:2015-03-05T14:25:14.419939Z,proto:"udp" (=zenum)} (=0) diff --git a/zio/zngio/ztests/zctx-alias-reset.yaml b/zio/zngio/ztests/zctx-alias-reset.yaml index d725f99f72..bd8b19350b 100644 --- a/zio/zngio/ztests/zctx-alias-reset.yaml +++ b/zio/zngio/ztests/zctx-alias-reset.yaml @@ -1,10 +1,12 @@ # Test that type aliases are properly reset and reusable after stream boundaries script: | - zq -b 2 - | zq -z - + zq "head 1" in.zson > t1.zng + zq "tail 2" in.zson > t2.zng + cat t1.zng t2.zng | zq -z - inputs: - - name: stdin + - name: in.zson data: | {ts:2015-03-05T14:25:12.963801Z} {ts:2015-03-05T14:25:14.419939Z,proto:"udp" (=zenum)} (=0) diff --git a/zio/zngio/ztests/zctx-reset.yaml b/zio/zngio/ztests/zctx-reset.yaml index 96d01fb0e0..3f1e4b5524 100644 --- a/zio/zngio/ztests/zctx-reset.yaml +++ b/zio/zngio/ztests/zctx-reset.yaml @@ -1,11 +1,12 @@ # Test that type contexts are properly reset and reusable after stream boundaries script: | - zq -b 1 - > s.zng + zq in.zson > s.zng + zq in.zson >> s.zng zq -z s.zng inputs: - - name: stdin + - name: in.zson data: | {a:"hello"} {b:10} @@ -15,3 +16,5 @@ outputs: data: | {a:"hello"} {b:10} + {a:"hello"} + {b:10}