Skip to content

Commit

Permalink
pcap index: Compress offsets that exceed threshold
Browse files Browse the repository at this point in the history
Introduce ranger.Envelope.Merge that merges two Envelopes into a
single Envelope.

This fixes bug where indexing a large pcap causes the system to
oom panic.

When constructing the time index for a pcap, compress the array of
offset points to an Envelope when the size of the array reaches a
certain threshold. Subsequent compressions will be merged into the
section's Envelope keeping the memory footprint low.

The downside to this approach is for the indexes of large pcap files
the difference between adjacent X values starts out very wide then
narrows as one iterate through the Bins. This will result in larger
pcap scans (i.e. slow searches) for hits at the beginning of the file
and smaller scans (i.e. faster searches) towards the end. Consensus
was that the difference in search times probably won't be noticeable
enough to warrant introducing a fancier algorithm. Filed #1095 to
revisit.

Closes #1039
  • Loading branch information
mattnibs committed Aug 13, 2020
1 parent 07fc77b commit e8b7e26
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 30 deletions.
30 changes: 26 additions & 4 deletions pcap/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"io/ioutil"
"unsafe"

"github.com/brimsec/zq/pcap/pcapio"
"github.com/brimsec/zq/pkg/nano"
Expand Down Expand Up @@ -38,6 +39,15 @@ type Section struct {
Index ranger.Envelope
}

const (
pointSize = unsafe.Sizeof(ranger.Point{})
offsetMaxMem = 1024 * 1024 * 64 // 64MB
)

// offsetThresh is the max number of pcap offsets collected before offset array
// in CreateIndex is condensed with ranger.NewEvelope.
var offsetThresh = offsetMaxMem / int(pointSize)

// CreateIndex creates an index for a pcap presented as an io.Reader.
// The size parameter indicates how many bins the index should contain.
func CreateIndex(r io.Reader, size int) (Index, error) {
Expand Down Expand Up @@ -79,15 +89,26 @@ func CreateIndex(r io.Reader, size int) (Index, error) {
}
y := uint64(ts)
offsets = append(offsets, ranger.Point{X: off, Y: y})
// In order to avoid running out of memory for large pcap sections,
// condense offsets with envelope once offsetThresh has been
// reached.
if len(offsets) > offsetThresh {
env := ranger.NewEnvelope(offsets, size)
section.Index = env.Merge(section.Index)
offsets = offsets[:0]
}

case pcapio.TypeSection:
// end previous section and start a new one
if section == nil && offsets != nil {
err := errors.New("missing section header")
return nil, pcapio.NewErrInvalidPcap(err)
}
if section != nil && offsets != nil {
section.Index = ranger.NewEnvelope(offsets, size)
if section != nil {
if offsets != nil {
env := ranger.NewEnvelope(offsets, size)
section.Index = env.Merge(section.Index)
}
index = append(index, *section)
}
slice := slicer.Slice{
Expand All @@ -97,12 +118,13 @@ func CreateIndex(r io.Reader, size int) (Index, error) {
section = &Section{
Blocks: []slicer.Slice{slice},
}
offsets = nil
offsets = offsets[:0]
}
}
// end last section
if section != nil && offsets != nil {
section.Index = ranger.NewEnvelope(offsets, size)
env := ranger.NewEnvelope(offsets, size)
section.Index = env.Merge(section.Index)
index = append(index, *section)
}
if len(index) == 0 {
Expand Down
83 changes: 82 additions & 1 deletion pkg/ranger/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ func rangeOf(offsets []Point) Range {
return r
}

func rangeOfBins(bins []Bin) Range {
r := Range{Y0: math.MaxUint64, Y1: 0}
for _, b := range bins {
y0, y1 := b.Range.Y0, b.Range.Y1
if r.Y0 > y0 {
r.Y0 = y0
}
if r.Y1 < y1 {
r.Y1 = y1
}
}
return r
}

// NewEnvelope creates a range envelope structure used by FindSmallestDomain.
// The X field of the Points must be in non-decreasing order.
func NewEnvelope(offsets []Point, nbin int) Envelope {
Expand All @@ -79,8 +93,8 @@ func NewEnvelope(offsets []Point, nbin int) Envelope {
nout := (n + stride - 1) / stride
bins := make([]Bin, nout)
for k := 0; k < nout; k++ {
bins[k].X = offsets[k*stride].X
m := k * stride
bins[k].X = offsets[m].X
end := m + stride
if end > len(offsets) {
end = len(offsets)
Expand Down Expand Up @@ -113,3 +127,70 @@ func (e Envelope) FindSmallestDomain(r Range) Domain {
}
return Domain{x0, x1}
}

type combiner struct {
envelopes []Envelope
}

func (c *combiner) next() (Bin, bool) {
idx := -1
for i := range c.envelopes {
if len(c.envelopes[i]) == 0 {
continue
}
if idx == -1 || c.envelopes[i][0].X < c.envelopes[idx][0].X {
idx = i
}
}
if idx == -1 {
return Bin{}, true
}
b := c.envelopes[idx][0]
c.envelopes[idx] = c.envelopes[idx][1:]
return b, false
}

func (c *combiner) nextN(n int) []Bin {
var bins []Bin
for i := 0; i < n; i++ {
b, done := c.next()
if done {
return bins
}
bins = append(bins, b)
}
return bins
}

func (c *combiner) size() (size int) {
for _, e := range c.envelopes {
size += len(e)
}
return
}

func (c *combiner) maxlen() (l int) {
for _, e := range c.envelopes {
if len(e) > l {
l = len(e)
}
}
return
}

// Merge squashes the two envelopes together returning a new Envelope the size
// of the longest Envelope provided.
func (e Envelope) Merge(u Envelope) Envelope {
c := &combiner{[]Envelope{e, u}}
n := c.size()
nbin := c.maxlen()
stride := strideSize(n, nbin)
nout := (n + stride - 1) / stride
out := make([]Bin, nout)
for k := 0; k < nout; k++ {
bins := c.nextN(stride)
out[k].X = bins[0].X
out[k].Range = rangeOfBins(bins)
}
return out
}
84 changes: 59 additions & 25 deletions pkg/ranger/ranger_test.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,81 @@
package ranger_test
package ranger

import (
"math"
"testing"

"github.com/brimsec/zq/pkg/ranger"
"github.com/stretchr/testify/assert"
)

func find(pts []ranger.Point, nbin int, r ranger.Range) ranger.Domain {
e := ranger.NewEnvelope(pts, nbin)
func find(pts []Point, nbin int, r Range) Domain {
e := NewEnvelope(pts, nbin)
return e.FindSmallestDomain(r)
}

// func TestEnvelope(t *testing.T) {
// t.Parallel()
// pts := []Point{
// {1, 0x100},
// {2, 0x120},
// {3, 0x110},
// {4, 0x130},
// {5, 0x150},
// {6, 0x150},
// }
// d := find(pts, 2, Range{0x100, 0x120})
// assert.Exactly(t, Domain{1, 5}, d)
// }

func TestEnvelope(t *testing.T) {
t.Parallel()
pts := []ranger.Point{
pts := []Point{
{1, 0x100},
{2, 0x120},
{3, 0x110},
{4, 0x130},
{5, 0x150},
{6, 0x150},
}
d := find(pts, 0, ranger.Range{0x151, 0x151})
assert.Exactly(t, ranger.Domain{}, d)
d = find(pts, 0, ranger.Range{0, 0x90})
assert.Exactly(t, ranger.Domain{}, d)
d = find(pts, 0, ranger.Range{0x90, 0x111})
assert.Exactly(t, ranger.Domain{1, 4}, d)
d = find(pts, 0, ranger.Range{0x115, 0x135})
assert.Exactly(t, ranger.Domain{2, 5}, d)
d = find(pts, 0, ranger.Range{0x150, 0x150})
assert.Exactly(t, ranger.Domain{5, math.MaxUint64}, d)
d = find(pts, 0, ranger.Range{0x151, 0x151})
assert.Exactly(t, ranger.Domain{}, d)
d = find(pts, 3, ranger.Range{0x100, 0x109})
assert.Exactly(t, ranger.Domain{1, 3}, d)
d = find(pts, 3, ranger.Range{0x100, 0x120})
assert.Exactly(t, ranger.Domain{1, 5}, d)
d = find(pts, 3, ranger.Range{0x100, 0x130})
assert.Exactly(t, ranger.Domain{1, 5}, d)
d := find(pts, 0, Range{0x151, 0x151})
assert.Exactly(t, Domain{}, d)
d = find(pts, 0, Range{0, 0x90})
assert.Exactly(t, Domain{}, d)
d = find(pts, 0, Range{0x90, 0x111})
assert.Exactly(t, Domain{1, 4}, d)
d = find(pts, 0, Range{0x115, 0x135})
assert.Exactly(t, Domain{2, 5}, d)
d = find(pts, 0, Range{0x150, 0x150})
assert.Exactly(t, Domain{5, math.MaxUint64}, d)
d = find(pts, 0, Range{0x151, 0x151})
assert.Exactly(t, Domain{}, d)
d = find(pts, 3, Range{0x100, 0x109})
assert.Exactly(t, Domain{1, 3}, d)
d = find(pts, 3, Range{0x100, 0x120})
assert.Exactly(t, Domain{1, 5}, d)
d = find(pts, 3, Range{0x100, 0x130})
assert.Exactly(t, Domain{1, 5}, d)
pts[5].Y = 0x149
d = find(pts, 3, ranger.Range{0x100, 0x149})
assert.Exactly(t, ranger.Domain{1, math.MaxUint64}, d)
d = find(pts, 3, Range{0x100, 0x149})
assert.Exactly(t, Domain{1, math.MaxUint64}, d)
}

func TestUnion(t *testing.T) {
env1 := Envelope{
{1, Range{0x100, 0x120}},
{3, Range{0x110, 0x130}},
{5, Range{0x150, 0x150}},
}
env2 := Envelope{
{7, Range{0x110, 0x160}},
{9, Range{0x170, 0x180}},
{11, Range{0x190, 0x190}},
{13, Range{0x180, 0x230}},
}
assert.Exactly(t, env1.Merge(env2), Envelope{
{1, Range{0x100, 0x130}},
{5, Range{0x110, 0x160}},
{9, Range{0x170, 0x190}},
{13, Range{0x180, 0x230}},
})
assert.Exactly(t, Envelope{}.Merge(env1), env1)
}

0 comments on commit e8b7e26

Please sign in to comment.