diff --git a/cli/outputflags/flags.go b/cli/outputflags/flags.go index 64b220a504..414b923699 100644 --- a/cli/outputflags/flags.go +++ b/cli/outputflags/flags.go @@ -37,7 +37,6 @@ func (f *Flags) setFlags(fs *flag.FlagSet) { fs.BoolVar(&f.Text.ShowTypes, "T", false, "display field types in text output") fs.BoolVar(&f.Text.ShowFields, "F", false, "display field names in text output") fs.BoolVar(&f.color, "color", true, "enable/disable color formatting for -Z and lake text output") - fs.IntVar(&f.Zng.StreamRecordsMax, "b", 0, "limit for number of records in each ZNG stream (0 for no limit)") fs.IntVar(&f.Zng.LZ4BlockSize, "znglz4blocksize", zngio.DefaultLZ4BlockSize, "LZ4 block size in bytes for ZNG compression (nonpositive to disable)") fs.IntVar(&f.ZSON.Pretty, "pretty", 4, diff --git a/cmd/zed/index/seek/command.go b/cmd/zed/index/seek/command.go deleted file mode 100644 index 4dce9c24ee..0000000000 --- a/cmd/zed/index/seek/command.go +++ /dev/null @@ -1,134 +0,0 @@ -package seek - -import ( - "errors" - "flag" - "fmt" - "os" - - zedindex "github.com/brimdata/zed/cmd/zed/index" - "github.com/brimdata/zed/expr" - "github.com/brimdata/zed/field" - "github.com/brimdata/zed/index" - "github.com/brimdata/zed/pkg/charm" - "github.com/brimdata/zed/pkg/fs" - "github.com/brimdata/zed/pkg/storage" - "github.com/brimdata/zed/zio/zngio" - "github.com/brimdata/zed/zng" - "github.com/brimdata/zed/zson" -) - -var Seek = &charm.Spec{ - Name: "seek", - Usage: "seek [-f framethresh] [ -o file ] [-v field] -k field file", - Short: "generate a seek-style index file for a zng file", - Long: ` -The seek command creates an index for the seek offsets of each -start-of-stream (sos) in a zng file. The key field is specified by -k and all -values in this field must be in ascending order. The seek offset of each sos -is stored as the field "offset" in the base layer of the output search index -unless overridden by -v. -It is an error if the values of the key field are not of uniform type. - -This is command is useful for creating time indexes for large zng logs where the -zng records are sorted by time.`, - New: newCommand, -} - -func init() { - zedindex.Cmd.Add(Seek) -} - -type Command struct { - *zedindex.Command - frameThresh int - outputFile string - keyField string - offsetField string -} - -func newCommand(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { - c := &Command{Command: parent.(*zedindex.Command)} - f.IntVar(&c.frameThresh, "f", 32*1024, "minimum frame size used in index file") - f.StringVar(&c.outputFile, "o", "index.zng", "name of index output file") - f.StringVar(&c.keyField, "k", "", "name of search key field") - f.StringVar(&c.offsetField, "v", "offset", "field name for seek offset in output index") - return c, nil -} - -func (c *Command) Run(args []string) error { - _, cleanup, err := c.Init() - if err != nil { - return err - } - defer cleanup() - //XXX no reason to limit this... fix later - if len(args) != 1 { - return errors.New("must specify a single zng input file containing keys and optional values") - } - file := os.Stdin - path := args[0] - if path != "-" { - var err error - file, err = fs.Open(path) - if err != nil { - return err - } - defer file.Close() - } - zctx := zson.NewContext() - reader := zngio.NewReader(file, zctx) - keys := field.DottedList(c.keyField) - writer, err := index.NewWriter(zctx, storage.NewLocalEngine(), c.outputFile, index.KeyFields(keys...), index.FrameThresh(c.frameThresh)) - if err != nil { - return err - } - close := true - defer func() { - if close { - writer.Close() - } - }() - readKey := expr.NewDotExpr(field.Dotted(c.keyField)) - var builder *zng.Builder - var keyType zng.Type - var offset int64 - // to skip to each sos, we read the first rec normally - // then call SkipStream and the bottmo of the for-loop. - rec, err := reader.Read() - for err == nil && rec != nil { - k, err := readKey.Eval(rec) - if err != nil || k.Type == nil || k.Bytes == nil { - // if the key doesn't exist or is unset, fail here - // XXX we should check that key order is ascending - return fmt.Errorf("key field is missing: %s", rec) - } - if builder == nil { - keyType = k.Type - cols := []zng.Column{ - {c.keyField, k.Type}, - {c.offsetField, zng.TypeInt64}, - } - typ, err := zctx.LookupTypeRecord(cols) - if err != nil { - return err - } - builder = zng.NewBuilder(typ) - } else if keyType != k.Type { - return fmt.Errorf("key type changed from %q to %q", keyType.ZSON(), k.Type.ZSON()) - } - offBytes := zng.EncodeInt(offset) - out := builder.Build(k.Bytes, offBytes) - if err := writer.Write(out); err != nil { - return err - } - rec, offset, err = reader.SkipStream() - } - if err != nil { - return err - } - // We do this little song and dance so we can return error on close - // but don't call close twice if we make it here. - close = false - return writer.Close() -} diff --git a/cmd/zed/lake/add/command.go b/cmd/zed/lake/add/command.go index d50dcebe1d..3355e3b656 100644 --- a/cmd/zed/lake/add/command.go +++ b/cmd/zed/lake/add/command.go @@ -11,6 +11,7 @@ import ( "github.com/brimdata/zed/pkg/charm" "github.com/brimdata/zed/pkg/rlimit" "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/pkg/units" "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zson" @@ -45,17 +46,20 @@ func init() { // TBD: add option to apply Zed program on add path? type Command struct { - lake *zedlake.Command - importStreamRecordMax int - commit bool - inputFlags inputflags.Flags + lake *zedlake.Command + seekStride units.Bytes + commit bool + inputFlags inputflags.Flags zedlake.CommitFlags } func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { - c := &Command{lake: parent.(*zedlake.Command)} + c := &Command{ + lake: parent.(*zedlake.Command), + seekStride: units.Bytes(lake.SeekIndexStride), + } f.BoolVar(&c.commit, "commit", false, "commit added data if successfully written") - f.IntVar(&c.importStreamRecordMax, "streammax", lake.ImportStreamRecordsMax, "limit for number of records in each ZNG stream (0 for no limit)") + f.Var(&c.seekStride, "seekstride", "size of seek-index unit for ZNG data, as '32KB', '1MB', etc.") c.inputFlags.SetFlags(f) c.CommitFlags.SetFlags(f) return c, nil @@ -70,7 +74,7 @@ func (c *Command) Run(args []string) error { if len(args) == 0 { return errors.New("zed lake add: at least one input file must be specified (- for stdin)") } - lake.ImportStreamRecordsMax = c.importStreamRecordMax + lake.SeekIndexStride = int(c.seekStride) if _, err := rlimit.RaiseOpenFilesLimit(); err != nil { return err } diff --git a/cmd/zed/lake/load/command.go b/cmd/zed/lake/load/command.go index 7aec0ce334..4c1fdf0abb 100644 --- a/cmd/zed/lake/load/command.go +++ b/cmd/zed/lake/load/command.go @@ -12,6 +12,7 @@ import ( "github.com/brimdata/zed/pkg/charm" "github.com/brimdata/zed/pkg/rlimit" "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/pkg/units" "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zson" @@ -33,17 +34,20 @@ func init() { } type Command struct { - lake *zedlake.Command - importStreamRecordMax int - commit bool + lake *zedlake.Command + seekStride units.Bytes + commit bool zedlake.CommitFlags procFlags procflags.Flags inputFlags inputflags.Flags } func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { - c := &Command{lake: parent.(*zedlake.Command)} - f.IntVar(&c.importStreamRecordMax, "streammax", lake.ImportStreamRecordsMax, "limit for number of records in each ZNG stream (0 for no limit)") + c := &Command{ + lake: parent.(*zedlake.Command), + seekStride: units.Bytes(lake.SeekIndexStride), + } + f.Var(&c.seekStride, "seekstride", "size of seek-index unit for ZNG data, as '32KB', '1MB', etc.") c.CommitFlags.SetFlags(f) c.inputFlags.SetFlags(f) c.procFlags.SetFlags(f) @@ -59,7 +63,7 @@ func (c *Command) Run(args []string) error { if len(args) == 0 { return errors.New("zed lake load: at least one input file must be specified (- for stdin)") } - lake.ImportStreamRecordsMax = c.importStreamRecordMax + lake.SeekIndexStride = int(c.seekStride) if _, err := rlimit.RaiseOpenFilesLimit(); err != nil { return err } diff --git a/cmd/zed/main.go b/cmd/zed/main.go index 26c22415a2..4db6662cfd 100644 --- a/cmd/zed/main.go +++ b/cmd/zed/main.go @@ -20,7 +20,6 @@ import ( _ "github.com/brimdata/zed/cmd/zed/index/create" _ "github.com/brimdata/zed/cmd/zed/index/lookup" _ "github.com/brimdata/zed/cmd/zed/index/section" - _ "github.com/brimdata/zed/cmd/zed/index/seek" "github.com/brimdata/zed/cmd/zed/lake" _ "github.com/brimdata/zed/cmd/zed/lake/add" _ "github.com/brimdata/zed/cmd/zed/lake/commit" diff --git a/index/ztests/seek.yaml b/index/ztests/seek.yaml deleted file mode 100644 index 3ee14a6baf..0000000000 --- a/index/ztests/seek.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# create a zng with sos every two records then index it as a time index -script: | - zq -b 2 - | zed index seek -f 100 -o index.zng -k ts - - zq -z index.zng - -inputs: - - name: stdin - data: | - {ts:1970-01-01T00:16:40Z,foo:"a"} - {ts:1970-01-01T00:16:41Z,foo:"b"} - {ts:1970-01-01T00:16:42Z,foo:"c"} - {ts:1970-01-01T00:16:43Z,foo:"d"} - {ts:1970-01-01T00:16:44Z,foo:"e"} - {ts:1970-01-01T00:16:45Z,foo:"f"} - {ts:1970-01-01T00:16:46Z,foo:"g"} - {ts:1970-01-01T00:16:47Z,foo:"h"} - -outputs: - - name: stdout - data: | - {ts:1970-01-01T00:16:40Z,offset:0} - {ts:1970-01-01T00:16:42Z,offset:34} - {ts:1970-01-01T00:16:44Z,offset:68} - {ts:1970-01-01T00:16:46Z,offset:102} - {magic:"zed_index",version:2 (int32),descending:false,child_field:"_child",frame_thresh:100 (int32),sections:[58],keys:null (0=({ts:time}))} (=1) diff --git a/index/ztests/tsindex.yaml b/index/ztests/tsindex.yaml deleted file mode 100644 index 8a8eb3cb9a..0000000000 --- a/index/ztests/tsindex.yaml +++ /dev/null @@ -1,23 +0,0 @@ -script: | - # index ts every 10 records - zq -b 10 babble.zson | zed index create -S -o index -k ts - - # exact lookup for this particular ts - zed index lookup -z -k 2020-04-21T23:42:11.06754599Z index - echo === - # exact lookup for an absent ts - zed index lookup -z -k 2020-04-21T23:42:11.06754600Z index - echo === - # closest lookup for the absent ts - zed index lookup -c -z -k 2020-04-21T23:42:11.06754600Z index - -inputs: - - name: babble.zson - source: ../../testdata/babble.zson - -outputs: - - name: stdout - data: | - {key:2020-04-21T23:42:11.06754599Z} - === - === - {key:2020-04-21T23:42:11.06754599Z} diff --git a/lake/segment/writer.go b/lake/segment/writer.go index 14e2c88a75..dc51c97e7b 100644 --- a/lake/segment/writer.go +++ b/lake/segment/writer.go @@ -17,82 +17,100 @@ import ( // Writer is a zbuf.Writer that writes a stream of sorted records into a // data segment. type Writer struct { - ref *Reference - byteCounter *writeCounter - count uint64 - rowObject *zngio.Writer - firstKey zng.Value - lastKey zng.Value - needSeekWrite bool - order order.Which - seekIndex *seekindex.Writer - seekIndexCloser io.Closer - first bool - poolKey field.Path + ref *Reference + byteCounter *writeCounter + count uint64 + rowObject *zngio.Writer + firstKey zng.Value + lastKey zng.Value + lastSOS int64 + order order.Which + seekIndex *seekindex.Writer + seekIndexCloser io.Closer + seekIndexStride int + seekIndexTrigger int + first bool + poolKey field.Path } -func (r *Reference) NewWriter(ctx context.Context, engine storage.Engine, path *storage.URI, o order.Which, poolKey field.Path, seekIndexFactor int) (*Writer, error) { +// NewWriter returns a writer for writing the data of a zng-row storage object as +// well as optionally creating a seek index for the row object when the +// seekIndexStride is non-zero. We assume all records are non-volatile until +// Close as zng.Values from the various record bodies are referenced across +// calls to Write. +func (r *Reference) NewWriter(ctx context.Context, engine storage.Engine, path *storage.URI, o order.Which, poolKey field.Path, seekIndexStride int) (*Writer, error) { out, err := engine.Put(ctx, r.RowObjectPath(path)) if err != nil { return nil, err } counter := &writeCounter{bufwriter.New(out), 0} writer := zngio.NewWriter(counter, zngio.WriterOpts{ - StreamRecordsMax: seekIndexFactor, - LZ4BlockSize: zngio.DefaultLZ4BlockSize, + LZ4BlockSize: zngio.DefaultLZ4BlockSize, }) - seekOut, err := engine.Put(ctx, r.SeekObjectPath(path)) - if err != nil { - return nil, err + w := &Writer{ + ref: r, + byteCounter: counter, + rowObject: writer, + order: o, + first: true, + poolKey: poolKey, } - opts := zngio.WriterOpts{ - //LZ4BlockSize: zngio.DefaultLZ4BlockSize, + if seekIndexStride != 0 { + w.seekIndexStride = seekIndexStride + seekOut, err := engine.Put(ctx, r.SeekObjectPath(path)) + if err != nil { + return nil, err + } + opts := zngio.WriterOpts{ + //LZ4BlockSize: zngio.DefaultLZ4BlockSize, + } + seekWriter := zngio.NewWriter(bufwriter.New(seekOut), opts) + w.seekIndex = seekindex.NewWriter(seekWriter) + w.seekIndexCloser = seekWriter } - seekWriter := zngio.NewWriter(bufwriter.New(seekOut), opts) - return &Writer{ - ref: r, - byteCounter: counter, - rowObject: writer, - seekIndex: seekindex.NewWriter(seekWriter), - seekIndexCloser: seekWriter, - order: o, - first: true, - poolKey: poolKey, - }, nil + return w, nil } func (w *Writer) Write(rec *zng.Record) error { - // We want to index the start of stream (SOS) position of the data file by - // record timestamps; we don't know when we've started a new stream until - // after we have written the first record in the stream. - sos := w.rowObject.LastSOS() - if err := w.rowObject.Write(rec); err != nil { - return err - } key, err := rec.Deref(w.poolKey) if err != nil { key = zng.Value{zng.TypeNull, nil} } - if w.first { - w.first = false - w.firstKey = key - if err := w.seekIndex.Write(key, sos); err != nil { + if w.seekIndex != nil { + if err := w.writeIndex(key); err != nil { return err } - } else if w.needSeekWrite && (w.lastKey.Bytes == nil || !bytes.Equal(key.Bytes, w.lastKey.Bytes)) { - if err := w.seekIndex.Write(key, sos); err != nil { - return err - } - w.needSeekWrite = false } - if w.rowObject.LastSOS() != sos { - w.needSeekWrite = true + if err := w.rowObject.Write(rec); err != nil { + return err } w.lastKey = key w.count++ return nil } +func (w *Writer) writeIndex(key zng.Value) error { + w.seekIndexTrigger += len(key.Bytes) + if w.first { + w.first = false + w.firstKey = key + w.lastKey = key + return w.seekIndex.Write(key, 0) + } + if w.seekIndexTrigger < w.seekIndexStride || bytes.Equal(key.Bytes, w.lastKey.Bytes) { + return nil + } + if err := w.rowObject.EndStream(); err != nil { + return err + } + pos := w.rowObject.Position() + if err := w.seekIndex.Write(key, pos); err != nil { + return err + } + w.seekIndexTrigger = 0 + return nil +} + // Abort is called when an error occurs during write. Errors are ignored // because the write error will be more informative and should be returned. func (w *Writer) Abort() { diff --git a/lake/writer.go b/lake/writer.go index 12c858dce8..57f2292c1a 100644 --- a/lake/writer.go +++ b/lake/writer.go @@ -17,7 +17,7 @@ import ( ) var ( - ImportStreamRecordsMax = zngio.DefaultStreamRecordsMax + SeekIndexStride = 64 * 1024 // For unit testing. importLZ4BlockSize = zngio.DefaultLZ4BlockSize @@ -140,7 +140,7 @@ func (w *Writer) writeObject(seg *segment.Reference, recs []*zng.Record) error { if err != nil { seg.Last = zng.Value{zng.TypeNull, nil} } - writer, err := seg.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.Layout.Order, key, ImportStreamRecordsMax) + writer, err := seg.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.Layout.Order, key, SeekIndexStride) if err != nil { return err } diff --git a/lake/ztests/consecutive-ts.yaml b/lake/ztests/consecutive-ts.yaml index 9a7c56764f..b131315207 100644 --- a/lake/ztests/consecutive-ts.yaml +++ b/lake/ztests/consecutive-ts.yaml @@ -1,16 +1,23 @@ script: | export ZED_LAKE_ROOT=test zed lake init -q - zed lake create -q -p logs - zed lake load -q -p logs -streammax 2 - + zed lake create -q -p logs -orderby ts:desc + zed lake load -q -p logs -seekstride 11B in.zson zq -z test/*/D/*-seek.zng inputs: - - name: stdin + - name: in.zson data: | {ts:1970-01-01T00:00:00Z} {ts:1970-01-01T00:00:02Z} {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:02Z} + {ts:1970-01-01T00:00:03Z} + {ts:1970-01-01T00:00:03Z} + {ts:1970-01-01T00:00:03Z} + {ts:1970-01-01T00:00:03Z} {ts:1970-01-01T00:00:03Z} {ts:1970-01-01T00:00:03Z} {ts:1970-01-01T00:00:03Z} @@ -29,5 +36,5 @@ outputs: data: | {key:1970-01-01T00:00:08Z,offset:0} {key:1970-01-01T00:00:06Z,offset:23} - {key:1970-01-01T00:00:02Z,offset:69} - {key:1970-01-01T00:00:00Z,offset:90} + {key:1970-01-01T00:00:02Z,offset:72} + {key:1970-01-01T00:00:00Z,offset:108} diff --git a/lake/ztests/seek-index-overlap.yaml b/lake/ztests/seek-index-overlap.yaml index 385e2db4f3..ed5f2cbf69 100644 --- a/lake/ztests/seek-index-overlap.yaml +++ b/lake/ztests/seek-index-overlap.yaml @@ -3,12 +3,12 @@ script: | zed lake init -q zed lake create -p asc -orderby ts:asc -q zed lake create -p desc -orderby ts:desc -q - zq "tail 900" babble.zson | zed lake load -p asc -streammax=100 -q - - zq "head 250" babble.zson | zed lake load -p asc -streammax=100 -q - + zq "tail 900" babble.zson | zed lake load -p asc -seekstride=2000B -q - + zq "head 250" babble.zson | zed lake load -p asc -seekstride=2000B -q - zed lake query -z -s "from asc | count()" echo === | tee /dev/stderr - zq "tail 900" babble.zson | zed lake load -p desc -streammax=100 -q - - zq "head 250" babble.zson | zed lake load -p desc -streammax=100 -q - + zq "tail 900" babble.zson | zed lake load -p desc -seekstride=2000B -q - + zq "head 250" babble.zson | zed lake load -p desc -seekstride=2000B -q - zed lake query -z -s "from desc | count()" inputs: diff --git a/lake/ztests/seek-index.yaml b/lake/ztests/seek-index.yaml index 8c23f3f9e4..8e292113a7 100644 --- a/lake/ztests/seek-index.yaml +++ b/lake/ztests/seek-index.yaml @@ -2,11 +2,11 @@ script: | export ZED_LAKE_ROOT=test zed lake init -q zed lake create -p asc -orderby ts:asc -q - zed lake load -q -streammax=100 -p asc babble.zson + zed lake load -q -seekstride=2KB -p asc babble.zson zed lake query -z -s "from asc over 2020-04-21T23:59:26.063Z to 2020-04-21T23:59:38.069Z" echo === | tee /dev/stderr zed lake create -p desc -orderby ts:desc -q - zed lake load -q -streammax=100 -p desc babble.zson + zed lake load -q -seekstride=2KB -p desc babble.zson zed lake query -z -s "from desc over 2020-04-21T23:59:26.063Z to 2020-04-21T23:59:38.069Z" inputs: @@ -25,8 +25,8 @@ outputs: {ts:2020-04-21T23:59:26.06326664Z,s:"potbellied-Dedanim",v:230} - name: stderr data: | - data opened: 6547 + data opened: 16401 data read: 87 === - data opened: 6547 + data opened: 16404 data read: 87 diff --git a/zio/zng_test.go b/zio/zng_test.go index 7b35dd1a33..411215ab8b 100644 --- a/zio/zng_test.go +++ b/zio/zng_test.go @@ -185,8 +185,7 @@ func TestStreams(t *testing.T) { r := zson.NewReader(strings.NewReader(in), zson.NewContext()) var out Output zw := zngio.NewWriter(&out, zngio.WriterOpts{ - StreamRecordsMax: 2, - LZ4BlockSize: zngio.DefaultLZ4BlockSize, + LZ4BlockSize: zngio.DefaultLZ4BlockSize, }) var recs []*zng.Record @@ -198,6 +197,9 @@ func TestStreams(t *testing.T) { } require.NoError(t, zw.Write(rec)) recs = append(recs, rec.Keep()) + if len(recs)%2 == 0 { + require.NoError(t, zw.EndStream()) + } } zr := zngio.NewReader(bytes.NewReader(out.Buffer.Bytes()), zson.NewContext()) diff --git a/zio/zngio/index.go b/zio/zngio/index.go deleted file mode 100644 index 2f1333bf72..0000000000 --- a/zio/zngio/index.go +++ /dev/null @@ -1,230 +0,0 @@ -package zngio - -import ( - "errors" - "io" - "os" - "sync" - - "github.com/brimdata/zed/pkg/nano" - "github.com/brimdata/zed/zio" - "github.com/brimdata/zed/zng" - "github.com/brimdata/zed/zson" -) - -type Ordering int - -const ( - OrderUnknown Ordering = iota - OrderAscending - OrderDescending - OrderUnsorted -) - -type TimeIndex struct { - mu sync.Mutex - order Ordering - index []mark - indexReady bool -} - -type mark struct { - Ts nano.Ts - Offset int64 -} - -// NewIndex creates a new Index object, which is the container that holds -// the in-memory index for a (b)zng file. The first call to NewReader() -// will return a reader that scans the entire file, building a time-based -// index in the process, subsequent readers can use this index to read -// only the relevant zng streams from the underlying file. -func NewTimeIndex() *TimeIndex { - return &TimeIndex{} -} - -// Create a new reader for the given zng file. Only records with timestamps -// that fall within the time range indicated by span will be emitted by -// the returned Reader object. -func (ti *TimeIndex) NewReader(f *os.File, zctx *zson.Context, span nano.Span) (zio.ReadCloser, error) { - ti.mu.Lock() - defer ti.mu.Unlock() - - if ti.indexReady { - return newRangeReader(f, zctx, ti.order, ti.index, span) - } - - return &indexReader{ - Reader: *NewReader(f, zctx), - Closer: f, - start: span.Ts, - end: span.End(), - parent: ti, - }, nil -} - -// indexReader is a zbuf.Reader that also builds an index as it reads. -type indexReader struct { - Reader - io.Closer - start nano.Ts - end nano.Ts - parent *TimeIndex - order Ordering - marks []mark - lastSOS int64 - lastTs nano.Ts - lastIndexedTs nano.Ts -} - -func (i *indexReader) Read() (*zng.Record, error) { - for { - rec, err := i.readOne() - if err != nil { - return nil, err - } - - if rec == nil { - i.parent.mu.Lock() - defer i.parent.mu.Unlock() - i.parent.order = i.order - i.parent.index = i.marks - i.parent.indexReady = true - return nil, nil - } - - if rec.Ts() < i.start { - continue - } - if rec.Ts() <= i.end { - return rec, nil - } - - // This record falls after the end of the requested time - // span. Spin through this loop until we hit EOF anyway - // to finish building the index. - // XXX this will be wasteful if small ranges near the - // start of the file is all that is ever read. revisit this... - } -} - -func (i *indexReader) readOne() (*zng.Record, error) { - rec, err := i.Reader.Read() - if err != nil || rec == nil { - return nil, err - } - - if i.lastTs != 0 { - switch i.order { - case OrderUnknown: - if rec.Ts() > i.lastTs { - i.order = OrderAscending - } else if rec.Ts() < i.lastTs { - i.order = OrderDescending - } - case OrderAscending: - if rec.Ts() < i.lastTs { - i.order = OrderUnsorted - } - case OrderDescending: - if rec.Ts() > i.lastTs { - i.order = OrderUnsorted - } - } - } - i.lastTs = rec.Ts() - - sos := i.Reader.LastSOS() - if sos != i.lastSOS { - i.lastSOS = sos - ts := rec.Ts() - if ts != i.lastIndexedTs { - i.lastIndexedTs = ts - i.marks = append(i.marks, mark{ts, sos}) - } - } - - return rec, nil -} - -// rangeReader is a wrapper around zngio.Reader that uses an in-memory -// index to reduce the I/O needed to get matching records when reading a -// large zng file that includes sub-streams and a nano.Span that refers -// to a smaller time range within the file. -type rangeReader struct { - Reader - io.Closer - order Ordering - start nano.Ts - end nano.Ts - nread uint64 -} - -func newRangeReader(f *os.File, zctx *zson.Context, order Ordering, index []mark, span nano.Span) (*rangeReader, error) { - var off int64 - - if order == OrderAscending || order == OrderDescending { - // Find the stream within the zng file that holds the - // start time. For a large index this could be optimized - // with a binary search. - for _, mark := range index { - if order == OrderAscending && mark.Ts > span.Ts { - break - } - if order == OrderDescending && mark.Ts < span.End() { - break - } - off = mark.Offset - } - } - - if off > 0 { - newoff, err := f.Seek(off, io.SeekStart) - if err != nil { - return nil, err - } - if newoff != int64(off) { - return nil, errors.New("file truncated") //XXX - } - } - return &rangeReader{ - Reader: *NewReader(f, zctx), - Closer: f, - order: order, - start: span.Ts, - end: span.End(), - }, nil -} - -func (r *rangeReader) Read() (*zng.Record, error) { - for { - rec, err := r.Reader.Read() - if err != nil { - return nil, err - } - r.nread++ - if rec != nil { - switch r.order { - case OrderAscending: - if rec.Ts() < r.start { - continue - } - if rec.Ts() > r.end { - rec = nil - } - case OrderDescending: - if rec.Ts() > r.end { - continue - } - if rec.Ts() < r.start { - rec = nil - } - } - } - return rec, nil - } -} - -// Used from tests -func (r *rangeReader) reads() uint64 { - return r.nread -} diff --git a/zio/zngio/index_test.go b/zio/zngio/index_test.go deleted file mode 100644 index 5144107ddd..0000000000 --- a/zio/zngio/index_test.go +++ /dev/null @@ -1,141 +0,0 @@ -package zngio - -import ( - "os" - "path/filepath" - "strings" - "testing" - - "github.com/brimdata/zed/pkg/fs" - "github.com/brimdata/zed/pkg/nano" - "github.com/brimdata/zed/zio" - "github.com/brimdata/zed/zson" - "github.com/stretchr/testify/require" -) - -var records = []string{ - "{ts:2020-04-14T17:42:40Z,value:0}", - "{ts:2020-04-14T17:42:41Z,value:1}", - "{ts:2020-04-14T17:42:42Z,value:2}", - "{ts:2020-04-14T17:42:43Z,value:3}", - "{ts:2020-04-14T17:42:44Z,value:4}", - "{ts:2020-04-14T17:42:45Z,value:5}", - "{ts:2020-04-14T17:42:46Z,value:6}", - "{ts:2020-04-14T17:42:47Z,value:7}", - "{ts:2020-04-14T17:42:48Z,value:8}", - "{ts:2020-04-14T17:42:49Z,value:9}", -} - -// Parameters used for testing. Note that in the zng data above, -// indexed with a stream size of 2 records, this time span will straddle -// 3 streams with only part of the first and last stream falling inside -// the time range. -const startTime = "1586886163" -const endTime = "1586886166" - -// The guts of the test. r must be a reader allocated from a -// TimeIndex with the contents above and a time span delimited by -// startTime and endTime as defined above. First verifies that calling Read() -// repeatedly gives just the records that fall within the requested time -// span. Then, if checkReads is true, verify that the total records read -// from disk is just enough to cover the time span, and not the entire -// file (with streams of 2 records each and parts of 3 streams being -// inside the time span, a total of 6 records should be read). -func checkReader(t *testing.T, r zio.Reader, expected []int, checkReads bool) { - for _, expect := range expected { - rec, err := r.Read() - require.NoError(t, err) - - require.NotNil(t, rec) - v, err := rec.AccessInt("value") - require.NoError(t, err) - - require.Equal(t, int64(expect), v, "Got expected record value") - } - - rec, err := r.Read() - require.NoError(t, err) - require.Nil(t, rec, "Reached eof after last record in time span") - - if checkReads { - rr, ok := r.(*rangeReader) - require.True(t, ok, "Can get read stats from index reader") - require.LessOrEqual(t, rr.reads(), uint64(6), "Indexed reader did not read the entire file") - } -} - -func TestZngIndex(t *testing.T) { - // Create a time span that hits parts of different streams - // from within the zng file. - start, err := nano.ParseTs(startTime) - require.NoError(t, err) - end, err := nano.ParseTs(endTime) - - require.NoError(t, err) - span := nano.NewSpanTs(start, end) - - dotest := func(input, fname string, expected []int) { - // create a test zng file - reader := zson.NewReader(strings.NewReader(input), zson.NewContext()) - fp, err := os.Create(fname) - require.NoError(t, err) - - writer := NewWriter(fp, WriterOpts{StreamRecordsMax: 2}) - - for { - rec, err := reader.Read() - require.NoError(t, err) - if rec == nil { - break - } - - err = writer.Write(rec) - require.NoError(t, err) - } - require.NoError(t, writer.Close()) // Also closes fp. - - index := NewTimeIndex() - - // First time we read the file we don't have an index, but a - // search with a time span should still only return results - // from the span. - fp, err = fs.Open(fname) - require.NoError(t, err) - ireader, err := index.NewReader(fp, zson.NewContext(), span) - require.NoError(t, err) - - checkReader(t, ireader, expected, false) - err = fp.Close() - require.NoError(t, err) - - // Second time through, should get the same results, this time - // verify that we didn't read the whole file. - fp, err = fs.Open(fname) - require.NoError(t, err) - ireader, err = index.NewReader(fp, zson.NewContext(), span) - require.NoError(t, err) - - checkReader(t, ireader, expected, true) - err = fp.Close() - require.NoError(t, err) - } - - dir := t.TempDir() - - // Test once with ascending timestamps - t.Run("IndexAscending", func(t *testing.T) { - fname := filepath.Join(dir, "ascend") - input := strings.Join(records, "\n") - dotest(input, fname, []int{3, 4, 5, 6}) - }) - - // And test again with descending timestamps - t.Run("IndexDescending", func(t *testing.T) { - fname := filepath.Join(dir, "descend") - var buf strings.Builder - for i := len(records) - 1; i >= 0; i-- { - buf.WriteString(records[i]) - } - dotest(buf.String(), fname, []int{6, 5, 4, 3}) - }) -} diff --git a/zio/zngio/scanner_test.go b/zio/zngio/scanner_test.go index 864c6f857d..b9c25107f1 100644 --- a/zio/zngio/scanner_test.go +++ b/zio/zngio/scanner_test.go @@ -30,9 +30,10 @@ func TestScannerContext(t *testing.T) { require.NoError(t, err) var buf bytes.Buffer w := NewWriter(zio.NopCloser(&buf), WriterOpts{}) - for j := 0; j < DefaultStreamRecordsMax; j++ { + for j := 0; j < 100; j++ { require.NoError(t, w.Write(rec)) } + require.NoError(t, w.EndStream()) require.NoError(t, w.Close()) bufs = append(bufs, buf.Bytes()) } diff --git a/zio/zngio/writer.go b/zio/zngio/writer.go index 8764f5a4c2..d2ed37e5ec 100644 --- a/zio/zngio/writer.go +++ b/zio/zngio/writer.go @@ -11,8 +11,6 @@ import ( const ( // DefaultLZ4BlockSize is a reasonable default for WriterOpts.LZ4BlockSize. DefaultLZ4BlockSize = 16 * 1024 - // DefaultStreamRecordsMax is a reasonable default for WriterOpts.StreamRecordsMax. - DefaultStreamRecordsMax = 5000 ) type Writer struct { @@ -23,13 +21,11 @@ type Writer struct { encoder *Encoder buffer []byte - lastSOS int64 streamRecords int } type WriterOpts struct { - StreamRecordsMax int - LZ4BlockSize int + LZ4BlockSize int } func NewWriter(w io.WriteCloser, opts WriterOpts) *Writer { @@ -93,14 +89,9 @@ func (w *Writer) EndStream() error { if err := w.writeUncompressed([]byte{zng.CtrlEOS}); err != nil { return err } - w.lastSOS = w.Position() return nil } -func (w *Writer) LastSOS() int64 { - return w.lastSOS -} - func (w *Writer) Write(r *zng.Record) error { // First send any typedefs for unsent types. typ := w.encoder.Lookup(r.Type) @@ -149,11 +140,6 @@ func (w *Writer) Write(r *zng.Record) error { return err } w.streamRecords++ - if max := w.opts.StreamRecordsMax; max > 0 && w.streamRecords >= max { - if err := w.EndStream(); err != nil { - return err - } - } return nil } diff --git a/zio/zngio/ztests/streams-1.yaml b/zio/zngio/ztests/streams-1.yaml deleted file mode 100644 index 4298245040..0000000000 --- a/zio/zngio/ztests/streams-1.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# Test frame markers inserted into a zng frame (including testing -# that type definitions are repeated as needed). - -zed: '*' - -input: | - {_path:"a",ts:1970-01-01T00:00:10Z,d:1.} - {_path:"xyz",ts:1970-01-01T00:00:20Z,d:1.5} - -output-flags: -f zng -b 1 - -outputHex: | - # define a record with 3 columns - f6 03 - # first column name is _path (len 5) - 05 5f 70 61 74 68 - # first column type is string (16) - 10 - # second column name is ts (len 2) - 02 74 73 - # second column type is time (10) - 09 - # third column name is d (len 1) - 01 64 - # third column type is float64 (12) - 0c - # value using type id 23 (0x17), the record defined above - # total length of this recor is 17 bytes (0x11) - 17 11 - # first column is a primitive value, 2 total bytes - 04 - # value of the first column is the string "a" - 61 - # second column is a primitive value, 6 total bytes - 0c - # time value is encoded in nanoseconds shifted one bit left - # 2000000000 == 0x04a817c800 - 00 c8 17 a8 04 - # third column is a primitive value, 9 total bytes - 12 - # 8 bytes of float64 data representing 1.0 - 00 00 00 00 00 00 f0 3f - # end of stream - ff - # new frame: repeat the record type definition - f6 03 - 05 5f 70 61 74 68 - 10 - 02 74 73 - 09 - 01 64 - 0c - # another encoded value using the same record definition as before - 17 13 - # first column: primitive value of 4 total byte, values xyz - 08 78 79 7a - # second column: primitive value of 20 (converted to nanoseconds, encoded <<1) - 0c 00 90 2f 50 09 - # third column, primitive value of 9 total bytes, float64 1.5 - 12 00 00 00 00 00 00 f8 3f - # end of stream - ff diff --git a/zio/zngio/ztests/streams-2.yaml b/zio/zngio/ztests/streams-2.yaml deleted file mode 100644 index 54c8bd0b50..0000000000 --- a/zio/zngio/ztests/streams-2.yaml +++ /dev/null @@ -1,41 +0,0 @@ -# Test that type ids are not re-used across zng streams. - -zed: '*' - -input: | - {s:"a"} - {i:1 (int32)} (=0) - {i:2} (0) - -output-flags: -f zng -b 2 -znglz4blocksize=0 - -outputHex: | - # define the record corresponding to type 0 above: 1 col, name s, type string - f6 01 - 01 73 - 10 - # value using type id 23 (0x17), equivalent to type 0 in the tzng source - # total length of this record is 2 bytes - 17 02 - # first column is a primitive value, 2 total bytes, the string "a" - 04 61 - # define the record corresponding to type 1: 1 col, name i, type int32 - f6 01 - 01 69 - 06 - # value using type id 24 (0x18), corresponding to type 1 in tzng - 18 02 - # consists of one primitive value, representing the integer 1 - 04 02 - # end of stream - ff - # new frame: repeat the type definition for type 1 - f6 01 - 01 69 - 06 - # value using the new type definition, since this is a new stream, - # the value should use type id 23, not 24 as was used in the first stream - 17 02 - 04 04 - # end of stream - ff diff --git a/zio/zngio/ztests/type-reset.yaml b/zio/zngio/ztests/type-reset.yaml deleted file mode 100644 index df312fba33..0000000000 --- a/zio/zngio/ztests/type-reset.yaml +++ /dev/null @@ -1,10 +0,0 @@ -zed: '*' - -input: | - {_path:"a",ts:1970-01-01T00:00:10Z,d:1.} - {_path:"xyz",ts:1970-01-01T00:00:20Z,d:1.5} - -output-flags: -f zng -b 2 - -output: !!binary | - 9gMFX3BhdGgQAnRzCQFkDBcRBGEMAMgXqAQSAAAAAAAA8D8XEwh4eXoMAJAvUAkSAAAAAAAA+D// diff --git a/zio/zngio/ztests/zctx-alias-reset-2.yaml b/zio/zngio/ztests/zctx-alias-reset-2.yaml index 62287e4cc3..df40a446e8 100644 --- a/zio/zngio/ztests/zctx-alias-reset-2.yaml +++ b/zio/zngio/ztests/zctx-alias-reset-2.yaml @@ -1,10 +1,12 @@ # Test that type aliases are properly reset and reusable after stream boundaries script: | - zq -b 2 - | zq -z "count() by proto" - + zq "head 1" in.zson > t1.zng + zq "tail 2" in.zson > t2.zng + cat t1.zng t2.zng | zq -z "count() by proto" - inputs: - - name: stdin + - name: in.zson data: | {ts:2015-03-05T14:25:12.963801Z} {ts:2015-03-05T14:25:14.419939Z,proto:"udp" (=zenum)} (=0) diff --git a/zio/zngio/ztests/zctx-alias-reset.yaml b/zio/zngio/ztests/zctx-alias-reset.yaml index d725f99f72..bd8b19350b 100644 --- a/zio/zngio/ztests/zctx-alias-reset.yaml +++ b/zio/zngio/ztests/zctx-alias-reset.yaml @@ -1,10 +1,12 @@ # Test that type aliases are properly reset and reusable after stream boundaries script: | - zq -b 2 - | zq -z - + zq "head 1" in.zson > t1.zng + zq "tail 2" in.zson > t2.zng + cat t1.zng t2.zng | zq -z - inputs: - - name: stdin + - name: in.zson data: | {ts:2015-03-05T14:25:12.963801Z} {ts:2015-03-05T14:25:14.419939Z,proto:"udp" (=zenum)} (=0) diff --git a/zio/zngio/ztests/zctx-reset.yaml b/zio/zngio/ztests/zctx-reset.yaml index 96d01fb0e0..3f1e4b5524 100644 --- a/zio/zngio/ztests/zctx-reset.yaml +++ b/zio/zngio/ztests/zctx-reset.yaml @@ -1,11 +1,12 @@ # Test that type contexts are properly reset and reusable after stream boundaries script: | - zq -b 1 - > s.zng + zq in.zson > s.zng + zq in.zson >> s.zng zq -z s.zng inputs: - - name: stdin + - name: in.zson data: | {a:"hello"} {b:10} @@ -15,3 +16,5 @@ outputs: data: | {a:"hello"} {b:10} + {a:"hello"} + {b:10}