From 429e0b4f874f27b49df44a25307f063169dd647a Mon Sep 17 00:00:00 2001 From: x5a17ed <0x5a17ed@tuta.io> Date: Tue, 21 May 2024 23:56:39 +0200 Subject: [PATCH] itlib: add tee iterator --- itlib/tee.go | 143 +++++++++++++++++++++++++++++++++++ itlib/tee_test.go | 185 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 328 insertions(+) create mode 100644 itlib/tee.go create mode 100644 itlib/tee_test.go diff --git a/itlib/tee.go b/itlib/tee.go new file mode 100644 index 0000000..21dd741 --- /dev/null +++ b/itlib/tee.go @@ -0,0 +1,143 @@ +// Copyright (c) 2024 individual contributors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package itlib + +import ( + "sync" + + "github.com/0x5a17ed/itkit" +) + +type teeNode[T any] struct { + data T + next *teeNode[T] +} + +type teeState[T any] struct { + mx sync.RWMutex + src itkit.Iterator[T] +} + +func (st *teeState[T]) nextLocked(cur *teeNode[T]) (*teeNode[T], bool) { + return cur.next, cur.next != nil +} + +func (st *teeState[T]) nextRW(cur *teeNode[T]) (n *teeNode[T], ok bool) { + st.mx.Lock() + defer st.mx.Unlock() + + if n, ok = st.nextLocked(cur); ok { + return + } + + if st.src.Next() { + cur.next = &teeNode[T]{data: st.src.Value()} + } + return cur.next, cur.next != nil +} + +func (st *teeState[T]) nextR(cur *teeNode[T]) (n *teeNode[T], ok bool) { + st.mx.RLock() + defer st.mx.RUnlock() + + return st.nextLocked(cur) +} + +func (st *teeState[T]) next(cur *teeNode[T]) (n *teeNode[T], ok bool) { + if n, ok = st.nextR(cur); ok { + return + } + return st.nextRW(cur) +} + +// TeeIterator is an iterator yielding the same items as their +// siblings created from a given source iterator. +// +// A [TeeIterator] can be copied at any time to create more [TeeIterator] +// instances to the same underlying iterator. Once copied, the two +// [TeeIterator] instances are identical, but separate: they will yield +// the same items but advance individually. The same item retrieved +// from one [TeeIterator] instance will also be yielded by its sibling +// [TeeIterator] instances. +// +// Once a [TeeIterator] has been created, the source iterator should +// not be used elsewhere. Advancing the source iterator elsewhere +// will prevent the [TeeIterator] from seeing items retrieved +// elsewhere. Use copies of the [TeeIterator] instead. +// +// All [TeeIterator] instances are safe to use in goroutines. +type TeeIterator[T any] struct { + st *teeState[T] + cur *teeNode[T] +} + +// Ensure TeeIterator conforms to the Iterator protocol. +var _ itkit.Iterator[struct{}] = &TeeIterator[struct{}]{} + +// Next implements the [itkit.Iterator.Next] interface. +func (it *TeeIterator[T]) Next() bool { + if n, ok := it.st.next(it.cur); ok { + it.cur = n + return true + } + + return false +} + +// Value implements the [itkit.Iterator.Value] interface. +func (it *TeeIterator[T]) Value() T { + return it.cur.data +} + +// Iter returns the [TeeIterator] as an [itkit.Iterator] value. +func (it *TeeIterator[T]) Iter() itkit.Iterator[T] { + return it +} + +// Copy copies the [TeeIterator] instance. +func (it *TeeIterator[T]) Copy() *TeeIterator[T] { + c := new(TeeIterator[T]) + *c = *it + return c +} + +func newTee[T any](src itkit.Iterator[T]) *TeeIterator[T] { + return &TeeIterator[T]{ + st: &teeState[T]{ + src: src, + }, + cur: &teeNode[T]{}, + } +} + +// TeeN returns n new [TeeIterator] values. +func TeeN[T any](src itkit.Iterator[T], n int) []*TeeIterator[T] { + if n == 0 { + return nil + } + + its := make([]*TeeIterator[T], n) + its[0] = newTee(src) + for i := 1; i < n; i++ { + its[i] = its[0].Copy() + } + return its +} + +// Tee returns 2 new [TeeIterator] values. +func Tee[T any](src itkit.Iterator[T]) (*TeeIterator[T], *TeeIterator[T]) { + t := newTee(src) + return t, t.Copy() +} diff --git a/itlib/tee_test.go b/itlib/tee_test.go new file mode 100644 index 0000000..c7b782d --- /dev/null +++ b/itlib/tee_test.go @@ -0,0 +1,185 @@ +// Copyright (c) 2024 individual contributors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package itlib_test + +import ( + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/0x5a17ed/itkit/iters/funcit" + "github.com/0x5a17ed/itkit/iters/rangeit" + "github.com/0x5a17ed/itkit/iters/sliceit" + "github.com/0x5a17ed/itkit/itlib" + "github.com/0x5a17ed/itkit/ittuple" + "github.com/stretchr/testify/assert" +) + +type refCounter struct { + finalized chan int + n uint32 +} + +func (c *refCounter) new() *int { + n := new(int) + *n = int(atomic.AddUint32(&c.n, 1)) + + runtime.SetFinalizer(n, func(r *int) { + atomic.AddUint32(&c.n, ^uint32(0)) + c.finalized <- *r + }) + + return n +} + +func newRefCounter() *refCounter { + return &refCounter{ + finalized: make(chan int, 1), + } +} + +func TestTee(t *testing.T) { + t.Run("n", func(t *testing.T) { + assert.Len(t, itlib.TeeN(itlib.Empty[struct{}](), 0), 0) + assert.Len(t, itlib.TeeN(itlib.Empty[struct{}](), 1), 1) + assert.Len(t, itlib.TeeN(itlib.Empty[struct{}](), 12), 12) + }) + + t.Run("empty", func(t *testing.T) { + asserter := assert.New(t) + + l, r := itlib.Tee(itlib.Empty[int]()) + + asserter.False(l.Next()) + asserter.False(r.Next()) + }) + + t.Run("interleaved", func(t *testing.T) { + l, r := itlib.Tee(rangeit.Range(5)) + s := sliceit.To(itlib.Zip(l.Iter(), r.Iter())) + + assert.Equal(t, []itlib.Pair[int, int]{ + ittuple.T2[int, int]{Left: 0, Right: 0}, + ittuple.T2[int, int]{Left: 1, Right: 1}, + ittuple.T2[int, int]{Left: 2, Right: 2}, + ittuple.T2[int, int]{Left: 3, Right: 3}, + ittuple.T2[int, int]{Left: 4, Right: 4}, + }, s) + }) + + t.Run("progressive", func(t *testing.T) { + n, m := 100, 10 + + asserter := assert.New(t) + + expected := sliceit.To(rangeit.Range(n)) + + its := itlib.TeeN(rangeit.Range(n), m) + for i := 0; i < m; i++ { + actual := sliceit.To(its[i].Iter()) + asserter.Equal(expected, actual) + } + }) + + t.Run("concurrent", func(t *testing.T) { + n, m := 1000, 100 + its := itlib.TeeN(rangeit.Range(n), m) + + c := make(chan []int, m) + + var wg sync.WaitGroup + for i := 0; i < m; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + c <- sliceit.To(its[i].Iter()) + }(i) + } + wg.Wait() + + expected := sliceit.To(rangeit.Range(n)) + asserter := assert.New(t) + for i := 0; i < m; i++ { + asserter.Equal(expected, <-c) + } + }) + + t.Run("copy", func(t *testing.T) { + asserter := assert.New(t) + + l, _ := itlib.Tee(rangeit.Range(5)) + + // Advanced the tee iterator once. + asserter.Equal(0, itlib.HeadOrElse(l.Iter(), -1)) + + // Create a copy of the tee iterator. + c := l.Copy() + asserter.NotSame(c, l) + + // Assert retrieving the next item from the clone + // yields the next value. + asserter.Equal(1, itlib.HeadOrElse(c.Iter(), -1)) + + // Assert retrieving the next item from the original + // tee reader yields the same value. + asserter.Equal(1, itlib.HeadOrElse(l.Iter(), -1)) + }) + + t.Run("leak", func(t *testing.T) { + // Force garbage collection to start with a clean slate. + runtime.GC() + + asserter := assert.New(t) + + c := newRefCounter() + + it := funcit.PullFn(func() (*int, bool) { + return c.new(), true + }) + it = itlib.Limit(it, 10) + + l, r := itlib.Tee(it) + + // Consume left side first, filling up the buffers. + asserter.Len(sliceit.To(l.Iter()), 10) + + // Assert that there are currently 10 objects alive. + asserter.Equal(c.n, uint32(10)) + + // Consume half on the right side. + s := sliceit.To(itlib.Limit(r.Iter(), 5)) + asserter.Len(s, 5) + + // Force garbage collection to clean up references. + for i := 0; i < 3; i++ { + runtime.GC() + } + + // Assert some previously loaded items have + // been freed up. + for i := 0; i < 4; i++ { + select { + case <-c.finalized: + case <-time.After(2 * time.Second): + asserter.FailNow("timeout") + } + } + asserter.Equal(uint32(6), c.n) + + runtime.KeepAlive(r) + }) +}