From e8b7e26ad432124c232c9135f54d536685b5c641 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Thu, 13 Aug 2020 10:15:50 -0700 Subject: [PATCH] pcap index: Compress offsets that exceed threshold Introduce ranger.Envelope.Merge that merges two Envelopes into a single Envelope. This fixes bug where indexing a large pcap causes the system to oom panic. When constructing the time index for a pcap, compress the array of offset points to an Envelope when the size of the array reaches a certain threshold. Subsequent compressions will be merged into the section's Envelope keeping the memory footprint low. The downside to this approach is for the indexes of large pcap files the difference between adjacent X values starts out very wide then narrows as one iterate through the Bins. This will result in larger pcap scans (i.e. slow searches) for hits at the beginning of the file and smaller scans (i.e. faster searches) towards the end. Consensus was that the difference in search times probably won't be noticeable enough to warrant introducing a fancier algorithm. Filed #1095 to revisit. Closes #1039 --- pcap/index.go | 30 ++++++++++++-- pkg/ranger/envelope.go | 83 +++++++++++++++++++++++++++++++++++++- pkg/ranger/ranger_test.go | 84 +++++++++++++++++++++++++++------------ 3 files changed, 167 insertions(+), 30 deletions(-) diff --git a/pcap/index.go b/pcap/index.go index af4d381f13..cc5876f98a 100644 --- a/pcap/index.go +++ b/pcap/index.go @@ -5,6 +5,7 @@ import ( "errors" "io" "io/ioutil" + "unsafe" "github.com/brimsec/zq/pcap/pcapio" "github.com/brimsec/zq/pkg/nano" @@ -38,6 +39,15 @@ type Section struct { Index ranger.Envelope } +const ( + pointSize = unsafe.Sizeof(ranger.Point{}) + offsetMaxMem = 1024 * 1024 * 64 // 64MB +) + +// offsetThresh is the max number of pcap offsets collected before offset array +// in CreateIndex is condensed with ranger.NewEvelope. +var offsetThresh = offsetMaxMem / int(pointSize) + // CreateIndex creates an index for a pcap presented as an io.Reader. // The size parameter indicates how many bins the index should contain. func CreateIndex(r io.Reader, size int) (Index, error) { @@ -79,6 +89,14 @@ func CreateIndex(r io.Reader, size int) (Index, error) { } y := uint64(ts) offsets = append(offsets, ranger.Point{X: off, Y: y}) + // In order to avoid running out of memory for large pcap sections, + // condense offsets with envelope once offsetThresh has been + // reached. + if len(offsets) > offsetThresh { + env := ranger.NewEnvelope(offsets, size) + section.Index = env.Merge(section.Index) + offsets = offsets[:0] + } case pcapio.TypeSection: // end previous section and start a new one @@ -86,8 +104,11 @@ func CreateIndex(r io.Reader, size int) (Index, error) { err := errors.New("missing section header") return nil, pcapio.NewErrInvalidPcap(err) } - if section != nil && offsets != nil { - section.Index = ranger.NewEnvelope(offsets, size) + if section != nil { + if offsets != nil { + env := ranger.NewEnvelope(offsets, size) + section.Index = env.Merge(section.Index) + } index = append(index, *section) } slice := slicer.Slice{ @@ -97,12 +118,13 @@ func CreateIndex(r io.Reader, size int) (Index, error) { section = &Section{ Blocks: []slicer.Slice{slice}, } - offsets = nil + offsets = offsets[:0] } } // end last section if section != nil && offsets != nil { - section.Index = ranger.NewEnvelope(offsets, size) + env := ranger.NewEnvelope(offsets, size) + section.Index = env.Merge(section.Index) index = append(index, *section) } if len(index) == 0 { diff --git a/pkg/ranger/envelope.go b/pkg/ranger/envelope.go index c047249d3b..6e6db69c66 100644 --- a/pkg/ranger/envelope.go +++ b/pkg/ranger/envelope.go @@ -68,6 +68,20 @@ func rangeOf(offsets []Point) Range { return r } +func rangeOfBins(bins []Bin) Range { + r := Range{Y0: math.MaxUint64, Y1: 0} + for _, b := range bins { + y0, y1 := b.Range.Y0, b.Range.Y1 + if r.Y0 > y0 { + r.Y0 = y0 + } + if r.Y1 < y1 { + r.Y1 = y1 + } + } + return r +} + // NewEnvelope creates a range envelope structure used by FindSmallestDomain. // The X field of the Points must be in non-decreasing order. func NewEnvelope(offsets []Point, nbin int) Envelope { @@ -79,8 +93,8 @@ func NewEnvelope(offsets []Point, nbin int) Envelope { nout := (n + stride - 1) / stride bins := make([]Bin, nout) for k := 0; k < nout; k++ { - bins[k].X = offsets[k*stride].X m := k * stride + bins[k].X = offsets[m].X end := m + stride if end > len(offsets) { end = len(offsets) @@ -113,3 +127,70 @@ func (e Envelope) FindSmallestDomain(r Range) Domain { } return Domain{x0, x1} } + +type combiner struct { + envelopes []Envelope +} + +func (c *combiner) next() (Bin, bool) { + idx := -1 + for i := range c.envelopes { + if len(c.envelopes[i]) == 0 { + continue + } + if idx == -1 || c.envelopes[i][0].X < c.envelopes[idx][0].X { + idx = i + } + } + if idx == -1 { + return Bin{}, true + } + b := c.envelopes[idx][0] + c.envelopes[idx] = c.envelopes[idx][1:] + return b, false +} + +func (c *combiner) nextN(n int) []Bin { + var bins []Bin + for i := 0; i < n; i++ { + b, done := c.next() + if done { + return bins + } + bins = append(bins, b) + } + return bins +} + +func (c *combiner) size() (size int) { + for _, e := range c.envelopes { + size += len(e) + } + return +} + +func (c *combiner) maxlen() (l int) { + for _, e := range c.envelopes { + if len(e) > l { + l = len(e) + } + } + return +} + +// Merge squashes the two envelopes together returning a new Envelope the size +// of the longest Envelope provided. +func (e Envelope) Merge(u Envelope) Envelope { + c := &combiner{[]Envelope{e, u}} + n := c.size() + nbin := c.maxlen() + stride := strideSize(n, nbin) + nout := (n + stride - 1) / stride + out := make([]Bin, nout) + for k := 0; k < nout; k++ { + bins := c.nextN(stride) + out[k].X = bins[0].X + out[k].Range = rangeOfBins(bins) + } + return out +} diff --git a/pkg/ranger/ranger_test.go b/pkg/ranger/ranger_test.go index b4615a5c19..2af90dcdc6 100644 --- a/pkg/ranger/ranger_test.go +++ b/pkg/ranger/ranger_test.go @@ -1,21 +1,34 @@ -package ranger_test +package ranger import ( "math" "testing" - "github.com/brimsec/zq/pkg/ranger" "github.com/stretchr/testify/assert" ) -func find(pts []ranger.Point, nbin int, r ranger.Range) ranger.Domain { - e := ranger.NewEnvelope(pts, nbin) +func find(pts []Point, nbin int, r Range) Domain { + e := NewEnvelope(pts, nbin) return e.FindSmallestDomain(r) } +// func TestEnvelope(t *testing.T) { +// t.Parallel() +// pts := []Point{ +// {1, 0x100}, +// {2, 0x120}, +// {3, 0x110}, +// {4, 0x130}, +// {5, 0x150}, +// {6, 0x150}, +// } +// d := find(pts, 2, Range{0x100, 0x120}) +// assert.Exactly(t, Domain{1, 5}, d) +// } + func TestEnvelope(t *testing.T) { t.Parallel() - pts := []ranger.Point{ + pts := []Point{ {1, 0x100}, {2, 0x120}, {3, 0x110}, @@ -23,25 +36,46 @@ func TestEnvelope(t *testing.T) { {5, 0x150}, {6, 0x150}, } - d := find(pts, 0, ranger.Range{0x151, 0x151}) - assert.Exactly(t, ranger.Domain{}, d) - d = find(pts, 0, ranger.Range{0, 0x90}) - assert.Exactly(t, ranger.Domain{}, d) - d = find(pts, 0, ranger.Range{0x90, 0x111}) - assert.Exactly(t, ranger.Domain{1, 4}, d) - d = find(pts, 0, ranger.Range{0x115, 0x135}) - assert.Exactly(t, ranger.Domain{2, 5}, d) - d = find(pts, 0, ranger.Range{0x150, 0x150}) - assert.Exactly(t, ranger.Domain{5, math.MaxUint64}, d) - d = find(pts, 0, ranger.Range{0x151, 0x151}) - assert.Exactly(t, ranger.Domain{}, d) - d = find(pts, 3, ranger.Range{0x100, 0x109}) - assert.Exactly(t, ranger.Domain{1, 3}, d) - d = find(pts, 3, ranger.Range{0x100, 0x120}) - assert.Exactly(t, ranger.Domain{1, 5}, d) - d = find(pts, 3, ranger.Range{0x100, 0x130}) - assert.Exactly(t, ranger.Domain{1, 5}, d) + d := find(pts, 0, Range{0x151, 0x151}) + assert.Exactly(t, Domain{}, d) + d = find(pts, 0, Range{0, 0x90}) + assert.Exactly(t, Domain{}, d) + d = find(pts, 0, Range{0x90, 0x111}) + assert.Exactly(t, Domain{1, 4}, d) + d = find(pts, 0, Range{0x115, 0x135}) + assert.Exactly(t, Domain{2, 5}, d) + d = find(pts, 0, Range{0x150, 0x150}) + assert.Exactly(t, Domain{5, math.MaxUint64}, d) + d = find(pts, 0, Range{0x151, 0x151}) + assert.Exactly(t, Domain{}, d) + d = find(pts, 3, Range{0x100, 0x109}) + assert.Exactly(t, Domain{1, 3}, d) + d = find(pts, 3, Range{0x100, 0x120}) + assert.Exactly(t, Domain{1, 5}, d) + d = find(pts, 3, Range{0x100, 0x130}) + assert.Exactly(t, Domain{1, 5}, d) pts[5].Y = 0x149 - d = find(pts, 3, ranger.Range{0x100, 0x149}) - assert.Exactly(t, ranger.Domain{1, math.MaxUint64}, d) + d = find(pts, 3, Range{0x100, 0x149}) + assert.Exactly(t, Domain{1, math.MaxUint64}, d) +} + +func TestUnion(t *testing.T) { + env1 := Envelope{ + {1, Range{0x100, 0x120}}, + {3, Range{0x110, 0x130}}, + {5, Range{0x150, 0x150}}, + } + env2 := Envelope{ + {7, Range{0x110, 0x160}}, + {9, Range{0x170, 0x180}}, + {11, Range{0x190, 0x190}}, + {13, Range{0x180, 0x230}}, + } + assert.Exactly(t, env1.Merge(env2), Envelope{ + {1, Range{0x100, 0x130}}, + {5, Range{0x110, 0x160}}, + {9, Range{0x170, 0x190}}, + {13, Range{0x180, 0x230}}, + }) + assert.Exactly(t, Envelope{}.Merge(env1), env1) }