Skip to content

Commit

Permalink
itlib: add tee iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
0x5a17ed committed May 21, 2024
1 parent 397554f commit 429e0b4
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 0 deletions.
143 changes: 143 additions & 0 deletions itlib/tee.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) 2024 individual contributors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// <https://www.apache.org/licenses/LICENSE-2.0>
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package itlib

import (
"sync"

"github.com/0x5a17ed/itkit"
)

type teeNode[T any] struct {
data T
next *teeNode[T]
}

type teeState[T any] struct {
mx sync.RWMutex
src itkit.Iterator[T]
}

func (st *teeState[T]) nextLocked(cur *teeNode[T]) (*teeNode[T], bool) {
return cur.next, cur.next != nil
}

func (st *teeState[T]) nextRW(cur *teeNode[T]) (n *teeNode[T], ok bool) {
st.mx.Lock()
defer st.mx.Unlock()

if n, ok = st.nextLocked(cur); ok {
return
}

if st.src.Next() {
cur.next = &teeNode[T]{data: st.src.Value()}
}
return cur.next, cur.next != nil
}

func (st *teeState[T]) nextR(cur *teeNode[T]) (n *teeNode[T], ok bool) {
st.mx.RLock()
defer st.mx.RUnlock()

return st.nextLocked(cur)
}

func (st *teeState[T]) next(cur *teeNode[T]) (n *teeNode[T], ok bool) {
if n, ok = st.nextR(cur); ok {
return
}
return st.nextRW(cur)
}

// TeeIterator is an iterator yielding the same items as their
// siblings created from a given source iterator.
//
// A [TeeIterator] can be copied at any time to create more [TeeIterator]
// instances to the same underlying iterator. Once copied, the two
// [TeeIterator] instances are identical, but separate: they will yield
// the same items but advance individually. The same item retrieved
// from one [TeeIterator] instance will also be yielded by its sibling
// [TeeIterator] instances.
//
// Once a [TeeIterator] has been created, the source iterator should
// not be used elsewhere. Advancing the source iterator elsewhere
// will prevent the [TeeIterator] from seeing items retrieved
// elsewhere. Use copies of the [TeeIterator] instead.
//
// All [TeeIterator] instances are safe to use in goroutines.
type TeeIterator[T any] struct {
st *teeState[T]
cur *teeNode[T]
}

// Ensure TeeIterator conforms to the Iterator protocol.
var _ itkit.Iterator[struct{}] = &TeeIterator[struct{}]{}

// Next implements the [itkit.Iterator.Next] interface.
func (it *TeeIterator[T]) Next() bool {
if n, ok := it.st.next(it.cur); ok {
it.cur = n
return true
}

return false
}

// Value implements the [itkit.Iterator.Value] interface.
func (it *TeeIterator[T]) Value() T {
return it.cur.data
}

// Iter returns the [TeeIterator] as an [itkit.Iterator] value.
func (it *TeeIterator[T]) Iter() itkit.Iterator[T] {
return it
}

// Copy copies the [TeeIterator] instance.
func (it *TeeIterator[T]) Copy() *TeeIterator[T] {
c := new(TeeIterator[T])
*c = *it
return c
}

func newTee[T any](src itkit.Iterator[T]) *TeeIterator[T] {
return &TeeIterator[T]{
st: &teeState[T]{
src: src,
},
cur: &teeNode[T]{},
}
}

// TeeN returns n new [TeeIterator] values.
func TeeN[T any](src itkit.Iterator[T], n int) []*TeeIterator[T] {
if n == 0 {
return nil
}

its := make([]*TeeIterator[T], n)
its[0] = newTee(src)
for i := 1; i < n; i++ {
its[i] = its[0].Copy()
}
return its
}

// Tee returns 2 new [TeeIterator] values.
func Tee[T any](src itkit.Iterator[T]) (*TeeIterator[T], *TeeIterator[T]) {
t := newTee(src)
return t, t.Copy()
}
185 changes: 185 additions & 0 deletions itlib/tee_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) 2024 individual contributors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// <https://www.apache.org/licenses/LICENSE-2.0>
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package itlib_test

import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/0x5a17ed/itkit/iters/funcit"
"github.com/0x5a17ed/itkit/iters/rangeit"
"github.com/0x5a17ed/itkit/iters/sliceit"
"github.com/0x5a17ed/itkit/itlib"
"github.com/0x5a17ed/itkit/ittuple"
"github.com/stretchr/testify/assert"
)

type refCounter struct {
finalized chan int
n uint32
}

func (c *refCounter) new() *int {
n := new(int)
*n = int(atomic.AddUint32(&c.n, 1))

runtime.SetFinalizer(n, func(r *int) {
atomic.AddUint32(&c.n, ^uint32(0))
c.finalized <- *r
})

return n
}

func newRefCounter() *refCounter {
return &refCounter{
finalized: make(chan int, 1),
}
}

func TestTee(t *testing.T) {
t.Run("n", func(t *testing.T) {
assert.Len(t, itlib.TeeN(itlib.Empty[struct{}](), 0), 0)
assert.Len(t, itlib.TeeN(itlib.Empty[struct{}](), 1), 1)
assert.Len(t, itlib.TeeN(itlib.Empty[struct{}](), 12), 12)
})

t.Run("empty", func(t *testing.T) {
asserter := assert.New(t)

l, r := itlib.Tee(itlib.Empty[int]())

asserter.False(l.Next())
asserter.False(r.Next())
})

t.Run("interleaved", func(t *testing.T) {
l, r := itlib.Tee(rangeit.Range(5))
s := sliceit.To(itlib.Zip(l.Iter(), r.Iter()))

assert.Equal(t, []itlib.Pair[int, int]{
ittuple.T2[int, int]{Left: 0, Right: 0},
ittuple.T2[int, int]{Left: 1, Right: 1},
ittuple.T2[int, int]{Left: 2, Right: 2},
ittuple.T2[int, int]{Left: 3, Right: 3},
ittuple.T2[int, int]{Left: 4, Right: 4},
}, s)
})

t.Run("progressive", func(t *testing.T) {
n, m := 100, 10

asserter := assert.New(t)

expected := sliceit.To(rangeit.Range(n))

its := itlib.TeeN(rangeit.Range(n), m)
for i := 0; i < m; i++ {
actual := sliceit.To(its[i].Iter())
asserter.Equal(expected, actual)
}
})

t.Run("concurrent", func(t *testing.T) {
n, m := 1000, 100
its := itlib.TeeN(rangeit.Range(n), m)

c := make(chan []int, m)

var wg sync.WaitGroup
for i := 0; i < m; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
c <- sliceit.To(its[i].Iter())
}(i)
}
wg.Wait()

expected := sliceit.To(rangeit.Range(n))
asserter := assert.New(t)
for i := 0; i < m; i++ {
asserter.Equal(expected, <-c)
}
})

t.Run("copy", func(t *testing.T) {
asserter := assert.New(t)

l, _ := itlib.Tee(rangeit.Range(5))

// Advanced the tee iterator once.
asserter.Equal(0, itlib.HeadOrElse(l.Iter(), -1))

// Create a copy of the tee iterator.
c := l.Copy()
asserter.NotSame(c, l)

// Assert retrieving the next item from the clone
// yields the next value.
asserter.Equal(1, itlib.HeadOrElse(c.Iter(), -1))

// Assert retrieving the next item from the original
// tee reader yields the same value.
asserter.Equal(1, itlib.HeadOrElse(l.Iter(), -1))
})

t.Run("leak", func(t *testing.T) {
// Force garbage collection to start with a clean slate.
runtime.GC()

asserter := assert.New(t)

c := newRefCounter()

it := funcit.PullFn(func() (*int, bool) {
return c.new(), true
})
it = itlib.Limit(it, 10)

l, r := itlib.Tee(it)

// Consume left side first, filling up the buffers.
asserter.Len(sliceit.To(l.Iter()), 10)

// Assert that there are currently 10 objects alive.
asserter.Equal(c.n, uint32(10))

// Consume half on the right side.
s := sliceit.To(itlib.Limit(r.Iter(), 5))
asserter.Len(s, 5)

// Force garbage collection to clean up references.
for i := 0; i < 3; i++ {
runtime.GC()
}

// Assert some previously loaded items have
// been freed up.
for i := 0; i < 4; i++ {
select {
case <-c.finalized:
case <-time.After(2 * time.Second):
asserter.FailNow("timeout")
}
}
asserter.Equal(uint32(6), c.n)

runtime.KeepAlive(r)
})
}

0 comments on commit 429e0b4

Please sign in to comment.