From 6f6756000ca989d8005df161e7085ac37af0c37a Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Fri, 5 Jul 2024 11:43:20 -0700 Subject: [PATCH] merge: Batches not getting Unreferenced This commit fixes an issue where the merge operator was not properly unreferencing batches. --- runtime/sam/op/merge/merge.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/runtime/sam/op/merge/merge.go b/runtime/sam/op/merge/merge.go index ba4dcd6907..dfa81b55b4 100644 --- a/runtime/sam/op/merge/merge.go +++ b/runtime/sam/op/merge/merge.go @@ -30,7 +30,8 @@ type Op struct { // The head-of-line (hol) queue is maintained as a min-heap on cmp of // hol.vals[0] (see Less) so that the next Read always returns // hol[0].vals[0]. - hol []*puller + hol []*puller + unref zbuf.Batch } var _ zbuf.Puller = (*Op)(nil) @@ -55,6 +56,10 @@ func (o *Op) Pull(done bool) (zbuf.Batch, error) { if err != nil { return nil, err } + if o.unref != nil { + o.unref.Unref() + o.unref = nil + } if done { return nil, o.propagateDone() } @@ -70,6 +75,7 @@ func (o *Op) Pull(done bool) (zbuf.Batch, error) { // way, it's safe to return min's remaining values as a batch. batch := min.batch if len(min.vals) < len(batch.Values()) { + defer batch.Unref() batch = zbuf.WrapBatch(batch, min.vals) } ok, err := min.replenish() @@ -86,6 +92,10 @@ func (o *Op) Pull(done bool) (zbuf.Batch, error) { } func (o *Op) Read() (*zed.Value, error) { + if o.unref != nil { + o.unref.Unref() + o.unref = nil + } if o.Len() == 0 { return nil, nil } @@ -93,6 +103,8 @@ func (o *Op) Read() (*zed.Value, error) { val := &u.vals[0] u.vals = u.vals[1:] if len(u.vals) == 0 { + // Need to unref on next call to Read (or Pull) so keep this around. + o.unref = u.batch ok, err := u.replenish() if err != nil { return nil, err