Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pcap index: Compress offsets that exceed threshold #1096

Merged
merged 1 commit into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions pcap/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"io/ioutil"
"unsafe"

"github.com/brimsec/zq/pcap/pcapio"
"github.com/brimsec/zq/pkg/nano"
Expand Down Expand Up @@ -38,6 +39,15 @@ type Section struct {
Index ranger.Envelope
}

const (
pointSize = unsafe.Sizeof(ranger.Point{})
offsetMaxMem = 1024 * 1024 * 64 // 64MB
)

// offsetThresh is the max number of pcap offsets collected before offset array
// in CreateIndex is condensed with ranger.NewEvelope.
var offsetThresh = offsetMaxMem / int(pointSize)

// CreateIndex creates an index for a pcap presented as an io.Reader.
// The size parameter indicates how many bins the index should contain.
func CreateIndex(r io.Reader, size int) (Index, error) {
Expand Down Expand Up @@ -79,15 +89,26 @@ func CreateIndex(r io.Reader, size int) (Index, error) {
}
y := uint64(ts)
offsets = append(offsets, ranger.Point{X: off, Y: y})
// In order to avoid running out of memory for large pcap sections,
// condense offsets with ranger.NewEnvelope once offsetThresh has
// been reached.
if len(offsets) > offsetThresh {
env := ranger.NewEnvelope(offsets, size)
section.Index = env.Merge(section.Index)
offsets = offsets[:0]
}

case pcapio.TypeSection:
// end previous section and start a new one
if section == nil && offsets != nil {
err := errors.New("missing section header")
return nil, pcapio.NewErrInvalidPcap(err)
}
if section != nil && offsets != nil {
section.Index = ranger.NewEnvelope(offsets, size)
if section != nil {
if offsets != nil {
env := ranger.NewEnvelope(offsets, size)
section.Index = env.Merge(section.Index)
}
index = append(index, *section)
}
slice := slicer.Slice{
Expand All @@ -97,12 +118,13 @@ func CreateIndex(r io.Reader, size int) (Index, error) {
section = &Section{
Blocks: []slicer.Slice{slice},
}
offsets = nil
offsets = offsets[:0]
}
}
// end last section
if section != nil && offsets != nil {
section.Index = ranger.NewEnvelope(offsets, size)
env := ranger.NewEnvelope(offsets, size)
section.Index = env.Merge(section.Index)
index = append(index, *section)
}
if len(index) == 0 {
Expand Down
83 changes: 82 additions & 1 deletion pkg/ranger/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ func rangeOf(offsets []Point) Range {
return r
}

func rangeOfBins(bins []Bin) Range {
r := Range{Y0: math.MaxUint64, Y1: 0}
for _, b := range bins {
y0, y1 := b.Range.Y0, b.Range.Y1
if r.Y0 > y0 {
r.Y0 = y0
}
if r.Y1 < y1 {
r.Y1 = y1
}
}
return r
}

// NewEnvelope creates a range envelope structure used by FindSmallestDomain.
// The X field of the Points must be in non-decreasing order.
func NewEnvelope(offsets []Point, nbin int) Envelope {
Expand All @@ -79,8 +93,8 @@ func NewEnvelope(offsets []Point, nbin int) Envelope {
nout := (n + stride - 1) / stride
bins := make([]Bin, nout)
for k := 0; k < nout; k++ {
bins[k].X = offsets[k*stride].X
m := k * stride
bins[k].X = offsets[m].X
end := m + stride
if end > len(offsets) {
end = len(offsets)
Expand Down Expand Up @@ -113,3 +127,70 @@ func (e Envelope) FindSmallestDomain(r Range) Domain {
}
return Domain{x0, x1}
}

type combiner struct {
envelopes []Envelope
}

func (c *combiner) next() (Bin, bool) {
idx := -1
for i := range c.envelopes {
if len(c.envelopes[i]) == 0 {
continue
}
if idx == -1 || c.envelopes[i][0].X < c.envelopes[idx][0].X {
idx = i
}
}
if idx == -1 {
return Bin{}, true
}
b := c.envelopes[idx][0]
c.envelopes[idx] = c.envelopes[idx][1:]
return b, false
}

func (c *combiner) nextN(n int) []Bin {
var bins []Bin
for i := 0; i < n; i++ {
b, done := c.next()
if done {
return bins
}
bins = append(bins, b)
}
return bins
}

func (c *combiner) size() (size int) {
for _, e := range c.envelopes {
size += len(e)
}
return
}

func (c *combiner) maxlen() (l int) {
for _, e := range c.envelopes {
if len(e) > l {
l = len(e)
}
}
return
}

// Merge squashes the two envelopes together returning a new Envelope the size
// of the longest Envelope provided.
func (e Envelope) Merge(u Envelope) Envelope {
c := &combiner{[]Envelope{e, u}}
n := c.size()
nbin := c.maxlen()
stride := strideSize(n, nbin)
nout := (n + stride - 1) / stride
out := make([]Bin, nout)
for k := 0; k < nout; k++ {
bins := c.nextN(stride)
out[k].X = bins[0].X
out[k].Range = rangeOfBins(bins)
}
return out
}
70 changes: 45 additions & 25 deletions pkg/ranger/ranger_test.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,67 @@
package ranger_test
package ranger

import (
"math"
"testing"

"github.com/brimsec/zq/pkg/ranger"
"github.com/stretchr/testify/assert"
)

func find(pts []ranger.Point, nbin int, r ranger.Range) ranger.Domain {
e := ranger.NewEnvelope(pts, nbin)
func find(pts []Point, nbin int, r Range) Domain {
e := NewEnvelope(pts, nbin)
return e.FindSmallestDomain(r)
}

func TestEnvelope(t *testing.T) {
t.Parallel()
pts := []ranger.Point{
pts := []Point{
{1, 0x100},
{2, 0x120},
{3, 0x110},
{4, 0x130},
{5, 0x150},
{6, 0x150},
}
d := find(pts, 0, ranger.Range{0x151, 0x151})
assert.Exactly(t, ranger.Domain{}, d)
d = find(pts, 0, ranger.Range{0, 0x90})
assert.Exactly(t, ranger.Domain{}, d)
d = find(pts, 0, ranger.Range{0x90, 0x111})
assert.Exactly(t, ranger.Domain{1, 4}, d)
d = find(pts, 0, ranger.Range{0x115, 0x135})
assert.Exactly(t, ranger.Domain{2, 5}, d)
d = find(pts, 0, ranger.Range{0x150, 0x150})
assert.Exactly(t, ranger.Domain{5, math.MaxUint64}, d)
d = find(pts, 0, ranger.Range{0x151, 0x151})
assert.Exactly(t, ranger.Domain{}, d)
d = find(pts, 3, ranger.Range{0x100, 0x109})
assert.Exactly(t, ranger.Domain{1, 3}, d)
d = find(pts, 3, ranger.Range{0x100, 0x120})
assert.Exactly(t, ranger.Domain{1, 5}, d)
d = find(pts, 3, ranger.Range{0x100, 0x130})
assert.Exactly(t, ranger.Domain{1, 5}, d)
d := find(pts, 0, Range{0x151, 0x151})
assert.Exactly(t, Domain{}, d)
d = find(pts, 0, Range{0, 0x90})
assert.Exactly(t, Domain{}, d)
d = find(pts, 0, Range{0x90, 0x111})
assert.Exactly(t, Domain{1, 4}, d)
d = find(pts, 0, Range{0x115, 0x135})
assert.Exactly(t, Domain{2, 5}, d)
d = find(pts, 0, Range{0x150, 0x150})
assert.Exactly(t, Domain{5, math.MaxUint64}, d)
d = find(pts, 0, Range{0x151, 0x151})
assert.Exactly(t, Domain{}, d)
d = find(pts, 3, Range{0x100, 0x109})
assert.Exactly(t, Domain{1, 3}, d)
d = find(pts, 3, Range{0x100, 0x120})
assert.Exactly(t, Domain{1, 5}, d)
d = find(pts, 3, Range{0x100, 0x130})
assert.Exactly(t, Domain{1, 5}, d)
pts[5].Y = 0x149
d = find(pts, 3, ranger.Range{0x100, 0x149})
assert.Exactly(t, ranger.Domain{1, math.MaxUint64}, d)
d = find(pts, 3, Range{0x100, 0x149})
assert.Exactly(t, Domain{1, math.MaxUint64}, d)
}

func TestUnion(t *testing.T) {
env1 := Envelope{
{1, Range{0x100, 0x120}},
{3, Range{0x110, 0x130}},
{5, Range{0x150, 0x150}},
}
env2 := Envelope{
{7, Range{0x110, 0x160}},
{9, Range{0x170, 0x180}},
{11, Range{0x190, 0x190}},
{13, Range{0x180, 0x230}},
}
assert.Exactly(t, env1.Merge(env2), Envelope{
{1, Range{0x100, 0x130}},
{5, Range{0x110, 0x160}},
{9, Range{0x170, 0x190}},
{13, Range{0x180, 0x230}},
})
assert.Exactly(t, Envelope{}.Merge(env1), env1)
}