diff --git a/pcap/index.go b/pcap/index.go index af4d381f13..cc5876f98a 100644 --- a/pcap/index.go +++ b/pcap/index.go @@ -5,6 +5,7 @@ import ( "errors" "io" "io/ioutil" + "unsafe" "github.com/brimsec/zq/pcap/pcapio" "github.com/brimsec/zq/pkg/nano" @@ -38,6 +39,15 @@ type Section struct { Index ranger.Envelope } +const ( + pointSize = unsafe.Sizeof(ranger.Point{}) + offsetMaxMem = 1024 * 1024 * 64 // 64MB +) + +// offsetThresh is the max number of pcap offsets collected before offset array +// in CreateIndex is condensed with ranger.NewEvelope. +var offsetThresh = offsetMaxMem / int(pointSize) + // CreateIndex creates an index for a pcap presented as an io.Reader. // The size parameter indicates how many bins the index should contain. func CreateIndex(r io.Reader, size int) (Index, error) { @@ -79,6 +89,14 @@ func CreateIndex(r io.Reader, size int) (Index, error) { } y := uint64(ts) offsets = append(offsets, ranger.Point{X: off, Y: y}) + // In order to avoid running out of memory for large pcap sections, + // condense offsets with envelope once offsetThresh has been + // reached. + if len(offsets) > offsetThresh { + env := ranger.NewEnvelope(offsets, size) + section.Index = env.Merge(section.Index) + offsets = offsets[:0] + } case pcapio.TypeSection: // end previous section and start a new one @@ -86,8 +104,11 @@ func CreateIndex(r io.Reader, size int) (Index, error) { err := errors.New("missing section header") return nil, pcapio.NewErrInvalidPcap(err) } - if section != nil && offsets != nil { - section.Index = ranger.NewEnvelope(offsets, size) + if section != nil { + if offsets != nil { + env := ranger.NewEnvelope(offsets, size) + section.Index = env.Merge(section.Index) + } index = append(index, *section) } slice := slicer.Slice{ @@ -97,12 +118,13 @@ func CreateIndex(r io.Reader, size int) (Index, error) { section = &Section{ Blocks: []slicer.Slice{slice}, } - offsets = nil + offsets = offsets[:0] } } // end last section if section != nil && offsets != nil { - section.Index = ranger.NewEnvelope(offsets, size) + env := ranger.NewEnvelope(offsets, size) + section.Index = env.Merge(section.Index) index = append(index, *section) } if len(index) == 0 { diff --git a/pkg/ranger/envelope.go b/pkg/ranger/envelope.go index c047249d3b..6e6db69c66 100644 --- a/pkg/ranger/envelope.go +++ b/pkg/ranger/envelope.go @@ -68,6 +68,20 @@ func rangeOf(offsets []Point) Range { return r } +func rangeOfBins(bins []Bin) Range { + r := Range{Y0: math.MaxUint64, Y1: 0} + for _, b := range bins { + y0, y1 := b.Range.Y0, b.Range.Y1 + if r.Y0 > y0 { + r.Y0 = y0 + } + if r.Y1 < y1 { + r.Y1 = y1 + } + } + return r +} + // NewEnvelope creates a range envelope structure used by FindSmallestDomain. // The X field of the Points must be in non-decreasing order. func NewEnvelope(offsets []Point, nbin int) Envelope { @@ -79,8 +93,8 @@ func NewEnvelope(offsets []Point, nbin int) Envelope { nout := (n + stride - 1) / stride bins := make([]Bin, nout) for k := 0; k < nout; k++ { - bins[k].X = offsets[k*stride].X m := k * stride + bins[k].X = offsets[m].X end := m + stride if end > len(offsets) { end = len(offsets) @@ -113,3 +127,70 @@ func (e Envelope) FindSmallestDomain(r Range) Domain { } return Domain{x0, x1} } + +type combiner struct { + envelopes []Envelope +} + +func (c *combiner) next() (Bin, bool) { + idx := -1 + for i := range c.envelopes { + if len(c.envelopes[i]) == 0 { + continue + } + if idx == -1 || c.envelopes[i][0].X < c.envelopes[idx][0].X { + idx = i + } + } + if idx == -1 { + return Bin{}, true + } + b := c.envelopes[idx][0] + c.envelopes[idx] = c.envelopes[idx][1:] + return b, false +} + +func (c *combiner) nextN(n int) []Bin { + var bins []Bin + for i := 0; i < n; i++ { + b, done := c.next() + if done { + return bins + } + bins = append(bins, b) + } + return bins +} + +func (c *combiner) size() (size int) { + for _, e := range c.envelopes { + size += len(e) + } + return +} + +func (c *combiner) maxlen() (l int) { + for _, e := range c.envelopes { + if len(e) > l { + l = len(e) + } + } + return +} + +// Merge squashes the two envelopes together returning a new Envelope the size +// of the longest Envelope provided. +func (e Envelope) Merge(u Envelope) Envelope { + c := &combiner{[]Envelope{e, u}} + n := c.size() + nbin := c.maxlen() + stride := strideSize(n, nbin) + nout := (n + stride - 1) / stride + out := make([]Bin, nout) + for k := 0; k < nout; k++ { + bins := c.nextN(stride) + out[k].X = bins[0].X + out[k].Range = rangeOfBins(bins) + } + return out +} diff --git a/pkg/ranger/ranger_test.go b/pkg/ranger/ranger_test.go index b4615a5c19..2af90dcdc6 100644 --- a/pkg/ranger/ranger_test.go +++ b/pkg/ranger/ranger_test.go @@ -1,21 +1,34 @@ -package ranger_test +package ranger import ( "math" "testing" - "github.com/brimsec/zq/pkg/ranger" "github.com/stretchr/testify/assert" ) -func find(pts []ranger.Point, nbin int, r ranger.Range) ranger.Domain { - e := ranger.NewEnvelope(pts, nbin) +func find(pts []Point, nbin int, r Range) Domain { + e := NewEnvelope(pts, nbin) return e.FindSmallestDomain(r) } +// func TestEnvelope(t *testing.T) { +// t.Parallel() +// pts := []Point{ +// {1, 0x100}, +// {2, 0x120}, +// {3, 0x110}, +// {4, 0x130}, +// {5, 0x150}, +// {6, 0x150}, +// } +// d := find(pts, 2, Range{0x100, 0x120}) +// assert.Exactly(t, Domain{1, 5}, d) +// } + func TestEnvelope(t *testing.T) { t.Parallel() - pts := []ranger.Point{ + pts := []Point{ {1, 0x100}, {2, 0x120}, {3, 0x110}, @@ -23,25 +36,46 @@ func TestEnvelope(t *testing.T) { {5, 0x150}, {6, 0x150}, } - d := find(pts, 0, ranger.Range{0x151, 0x151}) - assert.Exactly(t, ranger.Domain{}, d) - d = find(pts, 0, ranger.Range{0, 0x90}) - assert.Exactly(t, ranger.Domain{}, d) - d = find(pts, 0, ranger.Range{0x90, 0x111}) - assert.Exactly(t, ranger.Domain{1, 4}, d) - d = find(pts, 0, ranger.Range{0x115, 0x135}) - assert.Exactly(t, ranger.Domain{2, 5}, d) - d = find(pts, 0, ranger.Range{0x150, 0x150}) - assert.Exactly(t, ranger.Domain{5, math.MaxUint64}, d) - d = find(pts, 0, ranger.Range{0x151, 0x151}) - assert.Exactly(t, ranger.Domain{}, d) - d = find(pts, 3, ranger.Range{0x100, 0x109}) - assert.Exactly(t, ranger.Domain{1, 3}, d) - d = find(pts, 3, ranger.Range{0x100, 0x120}) - assert.Exactly(t, ranger.Domain{1, 5}, d) - d = find(pts, 3, ranger.Range{0x100, 0x130}) - assert.Exactly(t, ranger.Domain{1, 5}, d) + d := find(pts, 0, Range{0x151, 0x151}) + assert.Exactly(t, Domain{}, d) + d = find(pts, 0, Range{0, 0x90}) + assert.Exactly(t, Domain{}, d) + d = find(pts, 0, Range{0x90, 0x111}) + assert.Exactly(t, Domain{1, 4}, d) + d = find(pts, 0, Range{0x115, 0x135}) + assert.Exactly(t, Domain{2, 5}, d) + d = find(pts, 0, Range{0x150, 0x150}) + assert.Exactly(t, Domain{5, math.MaxUint64}, d) + d = find(pts, 0, Range{0x151, 0x151}) + assert.Exactly(t, Domain{}, d) + d = find(pts, 3, Range{0x100, 0x109}) + assert.Exactly(t, Domain{1, 3}, d) + d = find(pts, 3, Range{0x100, 0x120}) + assert.Exactly(t, Domain{1, 5}, d) + d = find(pts, 3, Range{0x100, 0x130}) + assert.Exactly(t, Domain{1, 5}, d) pts[5].Y = 0x149 - d = find(pts, 3, ranger.Range{0x100, 0x149}) - assert.Exactly(t, ranger.Domain{1, math.MaxUint64}, d) + d = find(pts, 3, Range{0x100, 0x149}) + assert.Exactly(t, Domain{1, math.MaxUint64}, d) +} + +func TestUnion(t *testing.T) { + env1 := Envelope{ + {1, Range{0x100, 0x120}}, + {3, Range{0x110, 0x130}}, + {5, Range{0x150, 0x150}}, + } + env2 := Envelope{ + {7, Range{0x110, 0x160}}, + {9, Range{0x170, 0x180}}, + {11, Range{0x190, 0x190}}, + {13, Range{0x180, 0x230}}, + } + assert.Exactly(t, env1.Merge(env2), Envelope{ + {1, Range{0x100, 0x130}}, + {5, Range{0x110, 0x160}}, + {9, Range{0x170, 0x190}}, + {13, Range{0x180, 0x230}}, + }) + assert.Exactly(t, Envelope{}.Merge(env1), env1) }