From a454097e74fc55515e111a042094c81c52c4411e Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 30 Nov 2023 16:05:35 -0500 Subject: [PATCH] Fix handling of NULL caveats in Postgres watch Normally, the Postgres driver sets the caveat name column to empty string when no caveat is present. However, the field IS nullable and bulk import can set it to a NULL instead. The watch API code therefore needs to support this scenario --- .../postgres/postgres_shared_test.go | 134 ++++++++++++++++++ internal/datastore/postgres/watch.go | 6 +- pkg/datastore/test/datastore.go | 1 + pkg/datastore/test/watch.go | 64 +++++++++ 4 files changed, 202 insertions(+), 3 deletions(-) diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index ddc3a51313..e556bfd1c1 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -5,6 +5,7 @@ package postgres import ( "context" + "errors" "fmt" "math/rand" "strings" @@ -27,6 +28,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/samber/lo" + "github.com/scylladb/go-set/strset" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/trace" @@ -196,6 +198,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) + + t.Run("TestNullCaveatWatch", createDatastoreTest( + b, + NullCaveatWatchTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) } t.Run("OTelTracing", createDatastoreTest( @@ -1417,3 +1428,126 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) { require.NoError(t, err) require.Greater(t, currentMaximumID, 12345) } + +func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lowestRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + + // Run the watch API. + changes, errchan := ds.Watch(ctx, lowestRevision) + require.Zero(len(errchan)) + + // Manually insert a relationship with a NULL caveat. This is allowed, but can only happen due to + // bulk import (normal write rels will make it empty instead) + pds := ds.(*pgDatastore) + _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, drwt datastore.ReadWriteTransaction) error { + rwt := drwt.(*pgReadWriteTXN) + + createInserts := writeTuple + valuesToWrite := []interface{}{ + "resource", + "someresourceid", + "somerelation", + "subject", + "somesubject", + "...", + nil, // set explicitly to null + nil, // set explicitly to null + } + + query := createInserts.Values(valuesToWrite...) + sql, args, err := query.ToSql() + if err != nil { + return fmt.Errorf(errUnableToWriteRelationships, err) + } + + _, err = rwt.tx.Exec(ctx, sql, args...) + return err + }) + require.NoError(err) + + // Verify the relationship create was tracked by the watch. + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + { + tuple.Touch(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")), + }, + }, + changes, + errchan, + false, + ) + + // Delete the relationship and ensure it does not raise an error in watch. + deleteUpdate := tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")) + _, err = common.UpdateTuplesInDatastore(ctx, ds, deleteUpdate) + require.NoError(err) + + // Verify the delete. + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + { + tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")), + }, + }, + changes, + errchan, + false, + ) +} + +const waitForChangesTimeout = 5 * time.Second + +// TODO(jschorr): Combine with the same impl in the datastore shared tests +func verifyUpdates( + require *require.Assertions, + testUpdates [][]*core.RelationTupleUpdate, + changes <-chan *datastore.RevisionChanges, + errchan <-chan error, + expectDisconnect bool, +) { + for _, expected := range testUpdates { + changeWait := time.NewTimer(waitForChangesTimeout) + select { + case change, ok := <-changes: + if !ok { + require.True(expectDisconnect, "unexpected disconnect") + errWait := time.NewTimer(waitForChangesTimeout) + select { + case err := <-errchan: + require.True(errors.As(err, &datastore.ErrWatchDisconnected{})) + return + case <-errWait.C: + require.Fail("Timed out waiting for ErrWatchDisconnected") + } + return + } + + expectedChangeSet := setOfChanges(expected) + actualChangeSet := setOfChanges(change.Changes) + + missingExpected := strset.Difference(expectedChangeSet, actualChangeSet) + unexpected := strset.Difference(actualChangeSet, expectedChangeSet) + + require.True(missingExpected.IsEmpty(), "expected changes missing: %s", missingExpected) + require.True(unexpected.IsEmpty(), "unexpected changes: %s", unexpected) + + time.Sleep(1 * time.Millisecond) + case <-changeWait.C: + require.Fail("Timed out", "waiting for changes: %s", expected) + } + } + + require.False(expectDisconnect, "all changes verified without expected disconnect") +} + +func setOfChanges(changes []*core.RelationTupleUpdate) *strset.Set { + changeSet := strset.NewWithSize(len(changes)) + for _, change := range changes { + changeSet.Add(fmt.Sprintf("OPERATION_%s(%s)", change.Operation, tuple.StringWithoutCaveat(change.Tuple))) + } + return changeSet +} diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 0926c0df96..5bfa4b09da 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -206,7 +206,7 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit } var createdXID, deletedXID xid8 - var caveatName string + var caveatName *string var caveatContext map[string]any if err := changes.Scan( &nextTuple.ResourceAndRelation.Namespace, @@ -223,13 +223,13 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit return nil, fmt.Errorf("unable to parse changed tuple: %w", err) } - if caveatName != "" { + if caveatName != nil && *caveatName != "" { contextStruct, err := structpb.NewStruct(caveatContext) if err != nil { return nil, fmt.Errorf("failed to read caveat context from update: %w", err) } nextTuple.Caveat = &core.ContextualizedCaveat{ - CaveatName: caveatName, + CaveatName: *caveatName, Context: contextStruct, } } diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 520fb24206..0ac6fe4a80 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -125,6 +125,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) t.Run("TestWatchCancel", func(t *testing.T) { WatchCancelTest(t, tester) }) t.Run("TestCaveatedRelationshipWatch", func(t *testing.T) { CaveatedRelationshipWatchTest(t, tester) }) t.Run("TestWatchWithTouch", func(t *testing.T) { WatchWithTouchTest(t, tester) }) + t.Run("TestWatchWithDelete", func(t *testing.T) { WatchWithDeleteTest(t, tester) }) } } diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index cdb8bdbfbe..890fbb14c0 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -321,6 +321,70 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) { ) } +func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) + require.NoError(err) + + setupDatastore(ds, require) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lowestRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + + // TOUCH a relationship and ensure watch sees it. + changes, errchan := ds.Watch(ctx, lowestRevision) + require.Zero(len(errchan)) + + afterTouchRevision, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH, + tuple.Parse("document:firstdoc#viewer@user:tom"), + tuple.Parse("document:firstdoc#viewer@user:sarah"), + tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"), + ) + require.NoError(err) + + ensureTuples(ctx, require, ds, + tuple.Parse("document:firstdoc#viewer@user:tom"), + tuple.Parse("document:firstdoc#viewer@user:sarah"), + tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"), + ) + + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + { + tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:tom")), + tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:sarah")), + tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]")), + }, + }, + changes, + errchan, + false, + ) + + // DELETE the relationship + changes, errchan = ds.Watch(ctx, afterTouchRevision) + require.Zero(len(errchan)) + + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tuple.Parse("document:firstdoc#viewer@user:tom")) + require.NoError(err) + + ensureTuples(ctx, require, ds, + tuple.Parse("document:firstdoc#viewer@user:sarah"), + tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"), + ) + + verifyUpdates(require, [][]*core.RelationTupleUpdate{ + {tuple.Delete(tuple.Parse("document:firstdoc#viewer@user:tom"))}, + }, + changes, + errchan, + false, + ) +} + func verifyNoUpdates( require *require.Assertions, changes <-chan *datastore.RevisionChanges,