Skip to content

Commit

Permalink
Add watch streaming test to CRDB tests
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Dec 24, 2024
1 parent ba30bf1 commit f96c9f8
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 1 deletion.
118 changes: 118 additions & 0 deletions internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
testdatastore "github.com/authzed/spicedb/internal/testserver/datastore"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/test"
"github.com/authzed/spicedb/pkg/genutil/mapz"
"github.com/authzed/spicedb/pkg/migrate"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
Expand Down Expand Up @@ -74,6 +75,29 @@ func TestCRDBDatastoreWithoutIntegrity(t *testing.T) {

return ds, nil
}), false)

t.Run("TestWatchStreaming", createDatastoreTest(
b,
StreamingWatchTest,
RevisionQuantization(0),
GCWindow(veryLargeGCWindow),
))
}

type datastoreTestFunc func(t *testing.T, ds datastore.Datastore)

func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewCRDBDatastore(ctx, uri, options...)
require.NoError(t, err)
return ds
})
defer ds.Close()

tf(t, ds)
}
}

func TestCRDBDatastoreWithFollowerReads(t *testing.T) {
Expand Down Expand Up @@ -593,3 +617,97 @@ func RelationshipIntegrityWatchTest(t *testing.T, tester test.DatastoreTester) {
require.Fail("Timed out")
}
}

func StreamingWatchTest(t *testing.T, rawDS datastore.Datastore) {
require := require.New(t)

ds, rev := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
caveat somecaveat(somecondition int) {
somecondition == 42
}
caveat somecaveat2(somecondition int) {
somecondition == 42
}
definition user {}
definition user2 {}
definition resource {
relation viewer: user
}
definition resource2 {
relation viewer: user2
}
`, []tuple.Relationship{
tuple.MustParse("resource:foo#viewer@user:tom"),
tuple.MustParse("resource:foo#viewer@user:fred"),
}, require)
ctx := context.Background()

// Touch and delete some relationships, add a namespace and caveat and delete a namespace and caveat.
_, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
err := rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{
tuple.Touch(tuple.MustParse("resource:foo#viewer@user:tom")),
tuple.Delete(tuple.MustParse("resource:foo#viewer@user:fred")),
})
require.NoError(err)

err = rwt.DeleteNamespaces(ctx, "resource2")
require.NoError(err)

err = rwt.DeleteCaveats(ctx, []string{"somecaveat2"})
require.NoError(err)

err = rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{
Name: "somenewnamespace",
})
require.NoError(err)

err = rwt.WriteCaveats(ctx, []*core.CaveatDefinition{{
Name: "somenewcaveat",
}})
require.NoError(err)

return nil
})
require.NoError(err)

// Ensure the watch API returns the integrity information.
opts := datastore.WatchOptions{
Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints,
WatchBufferLength: 128,
WatchBufferWriteTimeout: 1 * time.Minute,
EmissionStrategy: datastore.EmitImmediatelyStrategy,
}

expectedChanges := mapz.NewSet[string]()
expectedChanges.Add("DELETE(resource:foo#viewer@user:fred)\n")
expectedChanges.Add("DeletedCaveat: somecaveat2\n")
expectedChanges.Add("DeletedNamespace: resource2\n")
expectedChanges.Add("Definition: *corev1.NamespaceDefinition:somenewnamespace\n")
expectedChanges.Add("Definition: *corev1.CaveatDefinition:somenewcaveat\n")

changes, errchan := ds.Watch(ctx, rev, opts)
for {
select {
case change, ok := <-changes:
if !ok {
require.Fail("Timed out waiting for WatchDisconnectedError")
}

debugString := change.DebugString()
require.True(expectedChanges.Has(debugString), "unexpected change: %s", debugString)
expectedChanges.Delete(change.DebugString())
if expectedChanges.IsEmpty() {
return
}
case err := <-errchan:
require.Failf("Failed waiting for changes with error", "error: %v", err)
case <-time.NewTimer(10 * time.Second).C:
require.Fail("Timed out")
}
}
}
2 changes: 1 addition & 1 deletion internal/datastore/memdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri
}

if foundRaw == nil {
return fmt.Errorf("unable to find namespace to delete")
return fmt.Errorf("namespace not found")
}

if err := tx.Delete(tableNamespace, foundRaw); err != nil {
Expand Down
26 changes: 26 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ type RevisionChanges struct {
Metadata *structpb.Struct
}

func (rc *RevisionChanges) DebugString() string {
if rc.IsCheckpoint {
return "[checkpoint]"
}

debugString := ""

for _, relChange := range rc.RelationshipChanges {
debugString += relChange.DebugString() + "\n"
}

for _, def := range rc.ChangedDefinitions {
debugString += fmt.Sprintf("Definition: %T:%s\n", def, def.GetName())
}

for _, ns := range rc.DeletedNamespaces {
debugString += fmt.Sprintf("DeletedNamespace: %s\n", ns)
}

for _, caveat := range rc.DeletedCaveats {
debugString += fmt.Sprintf("DeletedCaveat: %s\n", caveat)
}

return debugString
}

func (rc *RevisionChanges) MarshalZerologObject(e *zerolog.Event) {
e.Str("revision", rc.Revision.String())
e.Bool("is-checkpoint", rc.IsCheckpoint)
Expand Down

0 comments on commit f96c9f8

Please sign in to comment.