Skip to content

Commit

Permalink
Merge pull request #1668 from josephschorr/pg-null-caveat
Browse files Browse the repository at this point in the history
Fix handling of NULL caveats in Postgres watch
  • Loading branch information
vroldanbet authored Dec 1, 2023
2 parents 9415fff + a454097 commit ada2eca
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 3 deletions.
134 changes: 134 additions & 0 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package postgres

import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/samber/lo"
"github.com/scylladb/go-set/strset"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -196,6 +198,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))

t.Run("TestNullCaveatWatch", createDatastoreTest(
b,
NullCaveatWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
}

t.Run("OTelTracing", createDatastoreTest(
Expand Down Expand Up @@ -1417,3 +1428,126 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) {
require.NoError(t, err)
require.Greater(t, currentMaximumID, 12345)
}

func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) {
require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

// Run the watch API.
changes, errchan := ds.Watch(ctx, lowestRevision)
require.Zero(len(errchan))

// Manually insert a relationship with a NULL caveat. This is allowed, but can only happen due to
// bulk import (normal write rels will make it empty instead)
pds := ds.(*pgDatastore)
_, err = pds.ReadWriteTx(ctx, func(ctx context.Context, drwt datastore.ReadWriteTransaction) error {
rwt := drwt.(*pgReadWriteTXN)

createInserts := writeTuple
valuesToWrite := []interface{}{
"resource",
"someresourceid",
"somerelation",
"subject",
"somesubject",
"...",
nil, // set explicitly to null
nil, // set explicitly to null
}

query := createInserts.Values(valuesToWrite...)
sql, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}

_, err = rwt.tx.Exec(ctx, sql, args...)
return err
})
require.NoError(err)

// Verify the relationship create was tracked by the watch.
verifyUpdates(require, [][]*core.RelationTupleUpdate{
{
tuple.Touch(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")),
},
},
changes,
errchan,
false,
)

// Delete the relationship and ensure it does not raise an error in watch.
deleteUpdate := tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject"))
_, err = common.UpdateTuplesInDatastore(ctx, ds, deleteUpdate)
require.NoError(err)

// Verify the delete.
verifyUpdates(require, [][]*core.RelationTupleUpdate{
{
tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")),
},
},
changes,
errchan,
false,
)
}

const waitForChangesTimeout = 5 * time.Second

// TODO(jschorr): Combine with the same impl in the datastore shared tests
func verifyUpdates(
require *require.Assertions,
testUpdates [][]*core.RelationTupleUpdate,
changes <-chan *datastore.RevisionChanges,
errchan <-chan error,
expectDisconnect bool,
) {
for _, expected := range testUpdates {
changeWait := time.NewTimer(waitForChangesTimeout)
select {
case change, ok := <-changes:
if !ok {
require.True(expectDisconnect, "unexpected disconnect")
errWait := time.NewTimer(waitForChangesTimeout)
select {
case err := <-errchan:
require.True(errors.As(err, &datastore.ErrWatchDisconnected{}))
return
case <-errWait.C:
require.Fail("Timed out waiting for ErrWatchDisconnected")
}
return
}

expectedChangeSet := setOfChanges(expected)
actualChangeSet := setOfChanges(change.Changes)

missingExpected := strset.Difference(expectedChangeSet, actualChangeSet)
unexpected := strset.Difference(actualChangeSet, expectedChangeSet)

require.True(missingExpected.IsEmpty(), "expected changes missing: %s", missingExpected)
require.True(unexpected.IsEmpty(), "unexpected changes: %s", unexpected)

time.Sleep(1 * time.Millisecond)
case <-changeWait.C:
require.Fail("Timed out", "waiting for changes: %s", expected)
}
}

require.False(expectDisconnect, "all changes verified without expected disconnect")
}

func setOfChanges(changes []*core.RelationTupleUpdate) *strset.Set {
changeSet := strset.NewWithSize(len(changes))
for _, change := range changes {
changeSet.Add(fmt.Sprintf("OPERATION_%s(%s)", change.Operation, tuple.StringWithoutCaveat(change.Tuple)))
}
return changeSet
}
6 changes: 3 additions & 3 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit
}

var createdXID, deletedXID xid8
var caveatName string
var caveatName *string
var caveatContext map[string]any
if err := changes.Scan(
&nextTuple.ResourceAndRelation.Namespace,
Expand All @@ -223,13 +223,13 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit
return nil, fmt.Errorf("unable to parse changed tuple: %w", err)
}

if caveatName != "" {
if caveatName != nil && *caveatName != "" {
contextStruct, err := structpb.NewStruct(caveatContext)
if err != nil {
return nil, fmt.Errorf("failed to read caveat context from update: %w", err)
}
nextTuple.Caveat = &core.ContextualizedCaveat{
CaveatName: caveatName,
CaveatName: *caveatName,
Context: contextStruct,
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories)
t.Run("TestWatchCancel", func(t *testing.T) { WatchCancelTest(t, tester) })
t.Run("TestCaveatedRelationshipWatch", func(t *testing.T) { CaveatedRelationshipWatchTest(t, tester) })
t.Run("TestWatchWithTouch", func(t *testing.T) { WatchWithTouchTest(t, tester) })
t.Run("TestWatchWithDelete", func(t *testing.T) { WatchWithDeleteTest(t, tester) })
}
}

Expand Down
64 changes: 64 additions & 0 deletions pkg/datastore/test/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,70 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) {
)
}

func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) {
require := require.New(t)

ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16)
require.NoError(err)

setupDatastore(ds, require)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

// TOUCH a relationship and ensure watch sees it.
changes, errchan := ds.Watch(ctx, lowestRevision)
require.Zero(len(errchan))

afterTouchRevision, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH,
tuple.Parse("document:firstdoc#viewer@user:tom"),
tuple.Parse("document:firstdoc#viewer@user:sarah"),
tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"),
)
require.NoError(err)

ensureTuples(ctx, require, ds,
tuple.Parse("document:firstdoc#viewer@user:tom"),
tuple.Parse("document:firstdoc#viewer@user:sarah"),
tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"),
)

verifyUpdates(require, [][]*core.RelationTupleUpdate{
{
tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:tom")),
tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:sarah")),
tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]")),
},
},
changes,
errchan,
false,
)

// DELETE the relationship
changes, errchan = ds.Watch(ctx, afterTouchRevision)
require.Zero(len(errchan))

_, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tuple.Parse("document:firstdoc#viewer@user:tom"))
require.NoError(err)

ensureTuples(ctx, require, ds,
tuple.Parse("document:firstdoc#viewer@user:sarah"),
tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"),
)

verifyUpdates(require, [][]*core.RelationTupleUpdate{
{tuple.Delete(tuple.Parse("document:firstdoc#viewer@user:tom"))},
},
changes,
errchan,
false,
)
}

func verifyNoUpdates(
require *require.Assertions,
changes <-chan *datastore.RevisionChanges,
Expand Down

0 comments on commit ada2eca

Please sign in to comment.