From 357a72fc60ff77ac2cbd8d067d8df571cecff486 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 11 Oct 2023 12:41:12 -0400 Subject: [PATCH] Reenable -race on unit tests and fix tests Was accidentally removed when we switched to mage for running tests Fixes #1569 --- internal/datastore/memdb/memdb_test.go | 2 +- .../dispatch/graph/reachableresources_test.go | 30 ++++++++++++------- internal/graph/cursors.go | 3 ++ magefiles/test.go | 2 +- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/internal/datastore/memdb/memdb_test.go b/internal/datastore/memdb/memdb_test.go index 339aa9eb66..20d9ea4176 100644 --- a/internal/datastore/memdb/memdb_test.go +++ b/internal/datastore/memdb/memdb_test.go @@ -94,7 +94,7 @@ func TestConcurrentWriteRelsError(t *testing.T) { for i := 0; i < 50; i++ { i := i g.Go(func() error { - _, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { + _, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error { updates := []*corev1.RelationTupleUpdate{} for j := 0; j < 500; j++ { updates = append(updates, &corev1.RelationTupleUpdate{ diff --git a/internal/dispatch/graph/reachableresources_test.go b/internal/dispatch/graph/reachableresources_test.go index 30d8d0ca31..712ca66feb 100644 --- a/internal/dispatch/graph/reachableresources_test.go +++ b/internal/dispatch/graph/reachableresources_test.go @@ -7,6 +7,7 @@ import ( "slices" "sort" "strconv" + "sync" "testing" "github.com/stretchr/testify/require" @@ -978,12 +979,13 @@ type breakingDatastore struct { func (bds breakingDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { delegate := bds.Datastore.SnapshotReader(rev) - return &breakingReader{delegate, 0} + return &breakingReader{Reader: delegate, counter: 0, lock: sync.Mutex{}} } type breakingReader struct { datastore.Reader counter int + lock sync.Mutex } func (br *breakingReader) ReverseQueryRelationships( @@ -991,8 +993,11 @@ func (br *breakingReader) ReverseQueryRelationships( subjectsFilter datastore.SubjectsFilter, options ...options.ReverseQueryOptionsOption, ) (datastore.RelationshipIterator, error) { + br.lock.Lock() br.counter++ - if br.counter > 1 { + current := br.counter + br.lock.Unlock() + if current > 1 { return nil, fmt.Errorf("some sort of error") } return br.Reader.ReverseQueryRelationships(ctx, subjectsFilter, options...) @@ -1392,12 +1397,13 @@ type cancelingDatastore struct { func (cds cancelingDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { delegate := cds.Datastore.SnapshotReader(rev) - return &cancelingReader{delegate, 0} + return &cancelingReader{delegate, 0, sync.Mutex{}} } type cancelingReader struct { datastore.Reader counter int + lock sync.Mutex } func (cr *cancelingReader) ReverseQueryRelationships( @@ -1405,8 +1411,12 @@ func (cr *cancelingReader) ReverseQueryRelationships( subjectsFilter datastore.SubjectsFilter, options ...options.ReverseQueryOptionsOption, ) (datastore.RelationshipIterator, error) { + cr.lock.Lock() cr.counter++ - if cr.counter > 1 { + current := cr.counter + cr.lock.Unlock() + + if current > 1 { return nil, context.Canceled } return cr.Reader.ReverseQueryRelationships(ctx, subjectsFilter, options...) @@ -1449,18 +1459,18 @@ func TestReachableResourcesWithCachingInParallelTest(t *testing.T) { require.New(t), ) - dispatcher := NewLocalOnlyDispatcher(50) - cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{}) - require.NoError(t, err) - - cachingDispatcher.SetDelegate(dispatcher) - g := errgroup.Group{} for i := 0; i < 100; i++ { g.Go(func() error { ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) + dispatcher := NewLocalOnlyDispatcher(50) + cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{}) + require.NoError(t, err) + + cachingDispatcher.SetDelegate(dispatcher) + stream := dispatch.NewCollectingDispatchStream[*v1.DispatchReachableResourcesResponse](ctx) err = cachingDispatcher.DispatchReachableResources(&v1.DispatchReachableResourcesRequest{ ResourceRelation: RR("resource", "view"), diff --git a/internal/graph/cursors.go b/internal/graph/cursors.go index 347d50ac74..8b6560e28c 100644 --- a/internal/graph/cursors.go +++ b/internal/graph/cursors.go @@ -358,9 +358,12 @@ func withInternalParallelizedStreamingIterableInCursor[T any, Q any]( taskIndex := taskIndex item := item tr.Add(func(ctx context.Context) error { + stream.lock.Lock() if ci.limits.hasExhaustedLimit() { + stream.lock.Unlock() return nil } + stream.lock.Unlock() ici, err := getItemCursor(taskIndex) if err != nil { diff --git a/magefiles/test.go b/magefiles/test.go index c3a4525f5e..6f8c9e7f52 100644 --- a/magefiles/test.go +++ b/magefiles/test.go @@ -25,7 +25,7 @@ func (t Test) All() error { // Unit Runs the unit tests func (Test) Unit() error { fmt.Println("running unit tests") - return goTest("./...", "-tags", "ci,skipintegrationtests", "-timeout", "10m") + return goTest("./...", "-tags", "ci,skipintegrationtests", "-race", "-timeout", "10m") } // Image Run tests that run the built image