From 55d31f83650056def91fae99b0c6e0e3d38d7850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Tue, 1 Oct 2024 12:22:09 +0100 Subject: [PATCH 1/2] emit memdb checkpoints after changes Clients may consider the revision has moved forward before relationship / the changes were emitted. --- internal/datastore/memdb/watch.go | 10 +++++----- pkg/datastore/test/watch.go | 25 ++++++++++++++++++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index 72649e189e..7933e005b0 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -114,6 +114,11 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, opti changes = append(changes, &change.changes) } + if options.Content&datastore.WatchSchema == datastore.WatchSchema && + len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 { + changes = append(changes, &change.changes) + } + if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision { changes = append(changes, &datastore.RevisionChanges{ Revision: revisions.NewForTimestamp(change.revisionNanos), @@ -121,11 +126,6 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, opti }) } - if options.Content&datastore.WatchSchema == datastore.WatchSchema && - len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 { - changes = append(changes, &change.changes) - } - lastRevision = change.revisionNanos } diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index a91ec8eefc..41f28f59b5 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -726,7 +726,7 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { require.NoError(err) changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ - Content: datastore.WatchCheckpoints | datastore.WatchRelationships, + Content: datastore.WatchCheckpoints | datastore.WatchRelationships | datastore.WatchSchema, CheckpointInterval: 100 * time.Millisecond, }) require.Zero(len(errchan)) @@ -735,6 +735,14 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { tuple.Parse("document:firstdoc#viewer@user:tom"), ) require.NoError(err) + + verifyCheckpointUpdate(require, afterTouchRevision, changes) + + afterTouchRevision, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{Name: "doesnotexist"}) + }) + require.NoError(err) + verifyCheckpointUpdate(require, afterTouchRevision, changes) } @@ -743,14 +751,25 @@ func verifyCheckpointUpdate( expectedRevision datastore.Revision, changes <-chan *datastore.RevisionChanges, ) { + var relChangeEmitted, schemaChangeEmitted bool changeWait := time.NewTimer(waitForChangesTimeout) for { select { case change, ok := <-changes: require.True(ok) + if len(change.ChangedDefinitions) > 0 { + schemaChangeEmitted = true + } + if len(change.RelationshipChanges) > 0 { + relChangeEmitted = true + } if change.IsCheckpoint { - require.True(change.Revision.Equal(change.Revision) || change.Revision.GreaterThan(expectedRevision)) - return + if change.Revision.Equal(expectedRevision) || change.Revision.GreaterThan(expectedRevision) { + require.True(relChangeEmitted || schemaChangeEmitted, "expected relationship/schema changes before checkpoint") + return + } + + // we received a past revision checkpoint, ignore } case <-changeWait.C: require.Fail("Timed out", "waited for checkpoint") From a00bf5882ca22932d4aa0b9009eff01a8cc5c4a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Tue, 1 Oct 2024 18:49:21 +0100 Subject: [PATCH 2/2] add longer backoff to postgres tests so they pass on limited compute in CI --- internal/testserver/datastore/postgres.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/testserver/datastore/postgres.go b/internal/testserver/datastore/postgres.go index 1c7e6a4477..119b330860 100644 --- a/internal/testserver/datastore/postgres.go +++ b/internal/testserver/datastore/postgres.go @@ -129,12 +129,12 @@ func (b *postgresTester) NewDatabase(t testing.TB) string { } const ( - retryCount = 3 - timeBetweenRetries = 100 * time.Millisecond + retryCount = 4 + timeBetweenRetries = 1 * time.Second ) func (b *postgresTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore.Datastore { - for i := 0; i < retryCount; i++ { + for i := 1; i <= retryCount; i++ { connectStr := b.NewDatabase(t) migrationDriver, err := pgmigrations.NewAlembicPostgresDriver(context.Background(), connectStr, datastore.NoCredentialsProvider) @@ -144,11 +144,11 @@ func (b *postgresTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore return initFunc("postgres", connectStr) } - if i == retryCount-1 { + if i == retryCount { require.NoError(t, err, "got error when trying to create migration driver") } - time.Sleep(timeBetweenRetries) + time.Sleep(time.Duration(i) * timeBetweenRetries) } require.Fail(t, "failed to create datastore for testing")