Skip to content

Commit

Permalink
merge: Batches not getting Unreferenced
Browse files Browse the repository at this point in the history
This commit fixes an issue where the merge operator was not properly
unreferencing batches.
  • Loading branch information
mattnibs committed Jul 5, 2024
1 parent f497079 commit 6f67560
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion runtime/sam/op/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type Op struct {
// The head-of-line (hol) queue is maintained as a min-heap on cmp of
// hol.vals[0] (see Less) so that the next Read always returns
// hol[0].vals[0].
hol []*puller
hol []*puller
unref zbuf.Batch
}

var _ zbuf.Puller = (*Op)(nil)
Expand All @@ -55,6 +56,10 @@ func (o *Op) Pull(done bool) (zbuf.Batch, error) {
if err != nil {
return nil, err
}
if o.unref != nil {
o.unref.Unref()
o.unref = nil
}
if done {
return nil, o.propagateDone()
}
Expand All @@ -70,6 +75,7 @@ func (o *Op) Pull(done bool) (zbuf.Batch, error) {
// way, it's safe to return min's remaining values as a batch.
batch := min.batch
if len(min.vals) < len(batch.Values()) {
defer batch.Unref()
batch = zbuf.WrapBatch(batch, min.vals)
}
ok, err := min.replenish()
Expand All @@ -86,13 +92,19 @@ func (o *Op) Pull(done bool) (zbuf.Batch, error) {
}

func (o *Op) Read() (*zed.Value, error) {
if o.unref != nil {
o.unref.Unref()
o.unref = nil
}
if o.Len() == 0 {
return nil, nil
}
u := o.hol[0]
val := &u.vals[0]
u.vals = u.vals[1:]
if len(u.vals) == 0 {
// Need to unref on next call to Read (or Pull) so keep this around.
o.unref = u.batch
ok, err := u.replenish()
if err != nil {
return nil, err
Expand Down

0 comments on commit 6f67560

Please sign in to comment.