From f96c9f8c212952317954c8772f0be5b0bf9d2550 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 24 Dec 2024 16:55:02 -0500 Subject: [PATCH] Add watch streaming test to CRDB tests --- internal/datastore/crdb/crdb_test.go | 118 ++++++++++++++++++++++++++ internal/datastore/memdb/readwrite.go | 2 +- pkg/datastore/datastore.go | 26 ++++++ 3 files changed, 145 insertions(+), 1 deletion(-) diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index 11e7991e0f..e874cced01 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -36,6 +36,7 @@ import ( testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/test" + "github.com/authzed/spicedb/pkg/genutil/mapz" "github.com/authzed/spicedb/pkg/migrate" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/tuple" @@ -74,6 +75,29 @@ func TestCRDBDatastoreWithoutIntegrity(t *testing.T) { return ds, nil }), false) + + t.Run("TestWatchStreaming", createDatastoreTest( + b, + StreamingWatchTest, + RevisionQuantization(0), + GCWindow(veryLargeGCWindow), + )) +} + +type datastoreTestFunc func(t *testing.T, ds datastore.Datastore) + +func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := NewCRDBDatastore(ctx, uri, options...) + require.NoError(t, err) + return ds + }) + defer ds.Close() + + tf(t, ds) + } } func TestCRDBDatastoreWithFollowerReads(t *testing.T) { @@ -593,3 +617,97 @@ func RelationshipIntegrityWatchTest(t *testing.T, tester test.DatastoreTester) { require.Fail("Timed out") } } + +func StreamingWatchTest(t *testing.T, rawDS datastore.Datastore) { + require := require.New(t) + + ds, rev := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, ` + caveat somecaveat(somecondition int) { + somecondition == 42 + } + + caveat somecaveat2(somecondition int) { + somecondition == 42 + } + + definition user {} + + definition user2 {} + + definition resource { + relation viewer: user + } + + definition resource2 { + relation viewer: user2 + } + `, []tuple.Relationship{ + tuple.MustParse("resource:foo#viewer@user:tom"), + tuple.MustParse("resource:foo#viewer@user:fred"), + }, require) + ctx := context.Background() + + // Touch and delete some relationships, add a namespace and caveat and delete a namespace and caveat. + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + err := rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{ + tuple.Touch(tuple.MustParse("resource:foo#viewer@user:tom")), + tuple.Delete(tuple.MustParse("resource:foo#viewer@user:fred")), + }) + require.NoError(err) + + err = rwt.DeleteNamespaces(ctx, "resource2") + require.NoError(err) + + err = rwt.DeleteCaveats(ctx, []string{"somecaveat2"}) + require.NoError(err) + + err = rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{ + Name: "somenewnamespace", + }) + require.NoError(err) + + err = rwt.WriteCaveats(ctx, []*core.CaveatDefinition{{ + Name: "somenewcaveat", + }}) + require.NoError(err) + + return nil + }) + require.NoError(err) + + // Ensure the watch API returns the integrity information. + opts := datastore.WatchOptions{ + Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints, + WatchBufferLength: 128, + WatchBufferWriteTimeout: 1 * time.Minute, + EmissionStrategy: datastore.EmitImmediatelyStrategy, + } + + expectedChanges := mapz.NewSet[string]() + expectedChanges.Add("DELETE(resource:foo#viewer@user:fred)\n") + expectedChanges.Add("DeletedCaveat: somecaveat2\n") + expectedChanges.Add("DeletedNamespace: resource2\n") + expectedChanges.Add("Definition: *corev1.NamespaceDefinition:somenewnamespace\n") + expectedChanges.Add("Definition: *corev1.CaveatDefinition:somenewcaveat\n") + + changes, errchan := ds.Watch(ctx, rev, opts) + for { + select { + case change, ok := <-changes: + if !ok { + require.Fail("Timed out waiting for WatchDisconnectedError") + } + + debugString := change.DebugString() + require.True(expectedChanges.Has(debugString), "unexpected change: %s", debugString) + expectedChanges.Delete(change.DebugString()) + if expectedChanges.IsEmpty() { + return + } + case err := <-errchan: + require.Failf("Failed waiting for changes with error", "error: %v", err) + case <-time.NewTimer(10 * time.Second).C: + require.Fail("Timed out") + } + } +} diff --git a/internal/datastore/memdb/readwrite.go b/internal/datastore/memdb/readwrite.go index 885a406553..4add605c2a 100644 --- a/internal/datastore/memdb/readwrite.go +++ b/internal/datastore/memdb/readwrite.go @@ -312,7 +312,7 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri } if foundRaw == nil { - return fmt.Errorf("unable to find namespace to delete") + return fmt.Errorf("namespace not found") } if err := tx.Delete(tableNamespace, foundRaw); err != nil { diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index c3fcbf63bf..6505db7bd4 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -68,6 +68,32 @@ type RevisionChanges struct { Metadata *structpb.Struct } +func (rc *RevisionChanges) DebugString() string { + if rc.IsCheckpoint { + return "[checkpoint]" + } + + debugString := "" + + for _, relChange := range rc.RelationshipChanges { + debugString += relChange.DebugString() + "\n" + } + + for _, def := range rc.ChangedDefinitions { + debugString += fmt.Sprintf("Definition: %T:%s\n", def, def.GetName()) + } + + for _, ns := range rc.DeletedNamespaces { + debugString += fmt.Sprintf("DeletedNamespace: %s\n", ns) + } + + for _, caveat := range rc.DeletedCaveats { + debugString += fmt.Sprintf("DeletedCaveat: %s\n", caveat) + } + + return debugString +} + func (rc *RevisionChanges) MarshalZerologObject(e *zerolog.Event) { e.Str("revision", rc.Revision.String()) e.Bool("is-checkpoint", rc.IsCheckpoint)