diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index ddc3a51313..e556bfd1c1 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -5,6 +5,7 @@ package postgres import ( "context" + "errors" "fmt" "math/rand" "strings" @@ -27,6 +28,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/samber/lo" + "github.com/scylladb/go-set/strset" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/trace" @@ -196,6 +198,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) + + t.Run("TestNullCaveatWatch", createDatastoreTest( + b, + NullCaveatWatchTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) } t.Run("OTelTracing", createDatastoreTest( @@ -1417,3 +1428,126 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) { require.NoError(t, err) require.Greater(t, currentMaximumID, 12345) } + +func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lowestRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + + // Run the watch API. + changes, errchan := ds.Watch(ctx, lowestRevision) + require.Zero(len(errchan)) + + // Manually insert a relationship with a NULL caveat. This is allowed, but can only happen due to + // bulk import (normal write rels will make it empty instead) + pds := ds.(*pgDatastore) + _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, drwt datastore.ReadWriteTransaction) error { + rwt := drwt.(*pgReadWriteTXN) + + createInserts := writeTuple + valuesToWrite := []interface{}{ + "resource", + "someresourceid", + "somerelation", + "subject", + "somesubject", + "...", + nil, // set explicitly to null + nil, // set explicitly to null + } + + query := createInserts.Values(valuesToWrite...) + sql, args, err := query.ToSql() + if err != nil { + return fmt.Errorf(errUnableToWriteRelationships, err) + } + + _, err = rwt.tx.Exec(ctx, sql, args...) + return err + }) + require.NoError(err) + + // Verify the relationship create was tracked by the watch. + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + { + tuple.Touch(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")), + }, + }, + changes, + errchan, + false, + ) + + // Delete the relationship and ensure it does not raise an error in watch. + deleteUpdate := tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")) + _, err = common.UpdateTuplesInDatastore(ctx, ds, deleteUpdate) + require.NoError(err) + + // Verify the delete. + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + { + tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")), + }, + }, + changes, + errchan, + false, + ) +} + +const waitForChangesTimeout = 5 * time.Second + +// TODO(jschorr): Combine with the same impl in the datastore shared tests +func verifyUpdates( + require *require.Assertions, + testUpdates [][]*core.RelationTupleUpdate, + changes <-chan *datastore.RevisionChanges, + errchan <-chan error, + expectDisconnect bool, +) { + for _, expected := range testUpdates { + changeWait := time.NewTimer(waitForChangesTimeout) + select { + case change, ok := <-changes: + if !ok { + require.True(expectDisconnect, "unexpected disconnect") + errWait := time.NewTimer(waitForChangesTimeout) + select { + case err := <-errchan: + require.True(errors.As(err, &datastore.ErrWatchDisconnected{})) + return + case <-errWait.C: + require.Fail("Timed out waiting for ErrWatchDisconnected") + } + return + } + + expectedChangeSet := setOfChanges(expected) + actualChangeSet := setOfChanges(change.Changes) + + missingExpected := strset.Difference(expectedChangeSet, actualChangeSet) + unexpected := strset.Difference(actualChangeSet, expectedChangeSet) + + require.True(missingExpected.IsEmpty(), "expected changes missing: %s", missingExpected) + require.True(unexpected.IsEmpty(), "unexpected changes: %s", unexpected) + + time.Sleep(1 * time.Millisecond) + case <-changeWait.C: + require.Fail("Timed out", "waiting for changes: %s", expected) + } + } + + require.False(expectDisconnect, "all changes verified without expected disconnect") +} + +func setOfChanges(changes []*core.RelationTupleUpdate) *strset.Set { + changeSet := strset.NewWithSize(len(changes)) + for _, change := range changes { + changeSet.Add(fmt.Sprintf("OPERATION_%s(%s)", change.Operation, tuple.StringWithoutCaveat(change.Tuple))) + } + return changeSet +} diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 0926c0df96..5bfa4b09da 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -206,7 +206,7 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit } var createdXID, deletedXID xid8 - var caveatName string + var caveatName *string var caveatContext map[string]any if err := changes.Scan( &nextTuple.ResourceAndRelation.Namespace, @@ -223,13 +223,13 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit return nil, fmt.Errorf("unable to parse changed tuple: %w", err) } - if caveatName != "" { + if caveatName != nil && *caveatName != "" { contextStruct, err := structpb.NewStruct(caveatContext) if err != nil { return nil, fmt.Errorf("failed to read caveat context from update: %w", err) } nextTuple.Caveat = &core.ContextualizedCaveat{ - CaveatName: caveatName, + CaveatName: *caveatName, Context: contextStruct, } } diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 520fb24206..0ac6fe4a80 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -125,6 +125,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) t.Run("TestWatchCancel", func(t *testing.T) { WatchCancelTest(t, tester) }) t.Run("TestCaveatedRelationshipWatch", func(t *testing.T) { CaveatedRelationshipWatchTest(t, tester) }) t.Run("TestWatchWithTouch", func(t *testing.T) { WatchWithTouchTest(t, tester) }) + t.Run("TestWatchWithDelete", func(t *testing.T) { WatchWithDeleteTest(t, tester) }) } } diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index cdb8bdbfbe..890fbb14c0 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -321,6 +321,70 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) { ) } +func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) + require.NoError(err) + + setupDatastore(ds, require) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lowestRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + + // TOUCH a relationship and ensure watch sees it. + changes, errchan := ds.Watch(ctx, lowestRevision) + require.Zero(len(errchan)) + + afterTouchRevision, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH, + tuple.Parse("document:firstdoc#viewer@user:tom"), + tuple.Parse("document:firstdoc#viewer@user:sarah"), + tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"), + ) + require.NoError(err) + + ensureTuples(ctx, require, ds, + tuple.Parse("document:firstdoc#viewer@user:tom"), + tuple.Parse("document:firstdoc#viewer@user:sarah"), + tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"), + ) + + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + { + tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:tom")), + tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:sarah")), + tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]")), + }, + }, + changes, + errchan, + false, + ) + + // DELETE the relationship + changes, errchan = ds.Watch(ctx, afterTouchRevision) + require.Zero(len(errchan)) + + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tuple.Parse("document:firstdoc#viewer@user:tom")) + require.NoError(err) + + ensureTuples(ctx, require, ds, + tuple.Parse("document:firstdoc#viewer@user:sarah"), + tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"), + ) + + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + {tuple.Delete(tuple.Parse("document:firstdoc#viewer@user:tom"))}, + }, + changes, + errchan, + false, + ) +} + func verifyNoUpdates( require *require.Assertions, changes <-chan *datastore.RevisionChanges,