Skip to content

Commit

Permalink
Merge pull request #1575 from josephschorr/reenable-race
Browse files Browse the repository at this point in the history
Reenable -race on unit tests and fix tests
  • Loading branch information
josephschorr authored Oct 11, 2023
2 parents 3dce83c + 8cd3d5b commit a4d7a98
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 23 deletions.
6 changes: 3 additions & 3 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ var MaxGCInterval = 60 * time.Minute
// StartGarbageCollector loops forever until the context is canceled and
// performs garbage collection on the provided interval.
func StartGarbageCollector(ctx context.Context, gc GarbageCollector, interval, window, timeout time.Duration) error {
return startGarbageCollectorWithMaxElapsedTime(ctx, gc, interval, window, 0, timeout)
return startGarbageCollectorWithMaxElapsedTime(ctx, gc, interval, window, 0, timeout, gcFailureCounter)
}

func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, gc GarbageCollector, interval, window, maxElapsedTime, timeout time.Duration) error {
func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, gc GarbageCollector, interval, window, maxElapsedTime, timeout time.Duration, failureCounter prometheus.Counter) error {
backoffInterval := backoff.NewExponentialBackOff()
backoffInterval.InitialInterval = interval
backoffInterval.MaxInterval = max(MaxGCInterval, interval)
Expand Down Expand Up @@ -130,7 +130,7 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, gc GarbageColl

err := RunGarbageCollection(gc, window, timeout)
if err != nil {
gcFailureCounter.Inc()
failureCounter.Inc()
nextInterval = backoffInterval.NextBackOff()
log.Ctx(ctx).Warn().Err(err).
Dur("next-attempt-in", nextInterval).
Expand Down
14 changes: 6 additions & 8 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ func (t testGC) DeleteBeforeTx(_ context.Context, _ datastore.Revision) (Deletio
}

func TestGCFailureBackoff(t *testing.T) {
defer func() {
gcFailureCounter = prometheus.NewCounter(gcFailureCounterConfig)
}()
localCounter := prometheus.NewCounter(gcFailureCounterConfig)
reg := prometheus.NewRegistry()
require.NoError(t, reg.Register(gcFailureCounter))
require.NoError(t, reg.Register(localCounter))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, testGC{}, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute))
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, testGC{}, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute, localCounter))
}()
time.Sleep(200 * time.Millisecond)
cancel()
Expand All @@ -56,13 +54,13 @@ func TestGCFailureBackoff(t *testing.T) {
}
require.Greater(t, *(mf.GetMetric()[0].Counter.Value), 100.0, "MaxElapsedTime=1ns did not cause backoff to get ignored")

gcFailureCounter = prometheus.NewCounter(gcFailureCounterConfig)
localCounter = prometheus.NewCounter(gcFailureCounterConfig)
reg = prometheus.NewRegistry()
require.NoError(t, reg.Register(gcFailureCounter))
require.NoError(t, reg.Register(localCounter))
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
go func() {
require.Error(t, StartGarbageCollector(ctx, testGC{}, 100*time.Millisecond, 1*time.Second, 1*time.Minute))
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, testGC{}, 100*time.Millisecond, 0, 1*time.Second, 1*time.Minute, localCounter))
}()
time.Sleep(200 * time.Millisecond)
cancel()
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/memdb/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestConcurrentWriteRelsError(t *testing.T) {
for i := 0; i < 50; i++ {
i := i
g.Go(func() error {
_, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error {
_, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error {
updates := []*corev1.RelationTupleUpdate{}
for j := 0; j < 500; j++ {
updates = append(updates, &corev1.RelationTupleUpdate{
Expand Down
30 changes: 20 additions & 10 deletions internal/dispatch/graph/reachableresources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"slices"
"sort"
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -978,21 +979,25 @@ type breakingDatastore struct {

func (bds breakingDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
delegate := bds.Datastore.SnapshotReader(rev)
return &breakingReader{delegate, 0}
return &breakingReader{Reader: delegate, counter: 0, lock: sync.Mutex{}}
}

type breakingReader struct {
datastore.Reader
counter int
lock sync.Mutex
}

func (br *breakingReader) ReverseQueryRelationships(
ctx context.Context,
subjectsFilter datastore.SubjectsFilter,
options ...options.ReverseQueryOptionsOption,
) (datastore.RelationshipIterator, error) {
br.lock.Lock()
br.counter++
if br.counter > 1 {
current := br.counter
br.lock.Unlock()
if current > 1 {
return nil, fmt.Errorf("some sort of error")
}
return br.Reader.ReverseQueryRelationships(ctx, subjectsFilter, options...)
Expand Down Expand Up @@ -1392,21 +1397,26 @@ type cancelingDatastore struct {

func (cds cancelingDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
delegate := cds.Datastore.SnapshotReader(rev)
return &cancelingReader{delegate, 0}
return &cancelingReader{delegate, 0, sync.Mutex{}}
}

type cancelingReader struct {
datastore.Reader
counter int
lock sync.Mutex
}

func (cr *cancelingReader) ReverseQueryRelationships(
ctx context.Context,
subjectsFilter datastore.SubjectsFilter,
options ...options.ReverseQueryOptionsOption,
) (datastore.RelationshipIterator, error) {
cr.lock.Lock()
cr.counter++
if cr.counter > 1 {
current := cr.counter
cr.lock.Unlock()

if current > 1 {
return nil, context.Canceled
}
return cr.Reader.ReverseQueryRelationships(ctx, subjectsFilter, options...)
Expand Down Expand Up @@ -1449,18 +1459,18 @@ func TestReachableResourcesWithCachingInParallelTest(t *testing.T) {
require.New(t),
)

dispatcher := NewLocalOnlyDispatcher(50)
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{})
require.NoError(t, err)

cachingDispatcher.SetDelegate(dispatcher)

g := errgroup.Group{}
for i := 0; i < 100; i++ {
g.Go(func() error {
ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background()))
require.NoError(t, datastoremw.SetInContext(ctx, ds))

dispatcher := NewLocalOnlyDispatcher(50)
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{})
require.NoError(t, err)

cachingDispatcher.SetDelegate(dispatcher)

stream := dispatch.NewCollectingDispatchStream[*v1.DispatchReachableResourcesResponse](ctx)
err = cachingDispatcher.DispatchReachableResources(&v1.DispatchReachableResourcesRequest{
ResourceRelation: RR("resource", "view"),
Expand Down
3 changes: 3 additions & 0 deletions internal/graph/cursors.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,12 @@ func withInternalParallelizedStreamingIterableInCursor[T any, Q any](
taskIndex := taskIndex
item := item
tr.Add(func(ctx context.Context) error {
stream.lock.Lock()
if ci.limits.hasExhaustedLimit() {
stream.lock.Unlock()
return nil
}
stream.lock.Unlock()

ici, err := getItemCursor(taskIndex)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion magefiles/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (t Test) All() error {
// Unit Runs the unit tests
func (Test) Unit() error {
fmt.Println("running unit tests")
return goTest("./...", "-tags", "ci,skipintegrationtests", "-timeout", "10m")
return goTest("./...", "-tags", "ci,skipintegrationtests", "-race", "-timeout", "10m")
}

// Image Run tests that run the built image
Expand Down

0 comments on commit a4d7a98

Please sign in to comment.