From c5d52b1d06a03f7abd4ac01c57a00141b5c0f080 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 9 Dec 2024 15:46:01 -0500 Subject: [PATCH 1/3] Garbage Collection improvements in MySQL and Postgres 1) Have GC lock so that it only runs on a single node at a time 2) Add a missing index in the Postgres datastore for GC This should reduce datastore CPU pressure --- go.mod | 3 +- internal/datastore/common/gc.go | 23 +++++ internal/datastore/common/gc_test.go | 8 ++ internal/datastore/mysql/datastore_test.go | 62 ++++++++++++++ internal/datastore/mysql/gc.go | 8 ++ internal/datastore/mysql/locks.go | 33 ++++++++ internal/datastore/postgres/gc.go | 8 ++ internal/datastore/postgres/locks.go | 30 +++++++ .../zz_migration.0022_add_missing_gc_index.go | 39 +++++++++ .../postgres/postgres_shared_test.go | 84 +++++++++++++++++++ internal/testserver/datastore/postgres.go | 5 +- 11 files changed, 301 insertions(+), 2 deletions(-) create mode 100644 internal/datastore/mysql/locks.go create mode 100644 internal/datastore/postgres/locks.go create mode 100644 internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go diff --git a/go.mod b/go.mod index 5cd3b703e2..6f1a49be7b 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,8 @@ require ( sigs.k8s.io/controller-runtime v0.19.2 ) +require k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 + require ( 4d63.com/gocheckcompilerdirectives v1.2.1 // indirect 4d63.com/gochecknoglobals v0.2.1 // indirect @@ -404,7 +406,6 @@ require ( k8s.io/client-go v0.31.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect - k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect mvdan.cc/gofumpt v0.7.0 // indirect mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/internal/datastore/common/gc.go b/internal/datastore/common/gc.go index d3c90647ab..1e79763441 100644 --- a/internal/datastore/common/gc.go +++ b/internal/datastore/common/gc.go @@ -84,6 +84,9 @@ type GarbageCollector interface { MarkGCCompleted() ResetGCCompleted() + LockForGCRun(ctx context.Context) (bool, error) + UnlockAfterGCRun(ctx context.Context) error + ReadyState(context.Context) (datastore.ReadyState, error) Now(context.Context) (time.Time, error) TxIDBefore(context.Context, time.Time) (datastore.Revision, error) @@ -166,6 +169,26 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + ok, err := gc.LockForGCRun(ctx) + if err != nil { + return fmt.Errorf("error locking for gc run: %w", err) + } + + if !ok { + log.Info(). + Msg("datastore garbage collection already in progress on another node") + return nil + } + + defer func() { + err := gc.UnlockAfterGCRun(ctx) + if err != nil { + log.Error(). + Err(err). + Msg("error unlocking after gc run") + } + }() + ctx, span := tracer.Start(ctx, "RunGarbageCollection") defer span.End() diff --git a/internal/datastore/common/gc_test.go b/internal/datastore/common/gc_test.go index aa42bc9062..60a1db203e 100644 --- a/internal/datastore/common/gc_test.go +++ b/internal/datastore/common/gc_test.go @@ -39,6 +39,14 @@ func newFakeGC(deleter gcDeleter) fakeGC { } } +func (*fakeGC) LockForGCRun(ctx context.Context) (bool, error) { + return true, nil +} + +func (*fakeGC) UnlockAfterGCRun(ctx context.Context) error { + return nil +} + func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) { return datastore.ReadyState{ Message: "Ready", diff --git a/internal/datastore/mysql/datastore_test.go b/internal/datastore/mysql/datastore_test.go index e7caadc6eb..0b35ef09f8 100644 --- a/internal/datastore/mysql/datastore_test.go +++ b/internal/datastore/mysql/datastore_test.go @@ -91,6 +91,29 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF } } +type multiDatastoreTestFunc func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore) + +func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + + var secondDS datastore.Datastore + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + secondDS = ds2 + return ds + }) + defer failOnError(t, ds.Close) + + tf(t, ds, secondDS) + } +} + func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) { _, err := NewMySQLDatastore(context.Background(), "root:password@(localhost:1234)/mysql") require.ErrorContains(t, err, "https://spicedb.dev/d/parse-time-mysql") @@ -127,6 +150,45 @@ func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) { t.Run("QuantizedRevisions", func(t *testing.T) { QuantizedRevisionTest(t, b) }) + t.Run("Locking", createMultiDatastoreTest(b, LockingTest, defaultOptions...)) +} + +func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) { + mds := ds.(*Datastore) + mds2 := ds2.(*Datastore) + + // Acquire a lock. + ctx := context.Background() + acquired, err := mds.tryAcquireLock(ctx, "testing123") + require.NoError(t, err) + require.True(t, acquired) + + // Try to acquire the lock again. + acquired, err = mds2.tryAcquireLock(ctx, "testing123") + require.NoError(t, err) + require.False(t, acquired) + + // Acquire another lock. + acquired, err = mds.tryAcquireLock(ctx, "testing456") + require.NoError(t, err) + require.True(t, acquired) + + // Release the other lock. + err = mds.releaseLock(ctx, "testing123") + require.NoError(t, err) + + // Release the lock. + err = mds.releaseLock(ctx, "testing123") + require.NoError(t, err) + + // Try to acquire the lock again. + acquired, err = mds2.tryAcquireLock(ctx, "testing123") + require.NoError(t, err) + require.True(t, acquired) + + // Release the lock. + err = mds2.releaseLock(ctx, "testing123") + require.NoError(t, err) } func DatabaseSeedingTest(t *testing.T, ds datastore.Datastore) { diff --git a/internal/datastore/mysql/gc.go b/internal/datastore/mysql/gc.go index bf2c04e993..d4375eddbd 100644 --- a/internal/datastore/mysql/gc.go +++ b/internal/datastore/mysql/gc.go @@ -29,6 +29,14 @@ func (mds *Datastore) ResetGCCompleted() { mds.gcHasRun.Store(false) } +func (mds *Datastore) LockForGCRun(ctx context.Context) (bool, error) { + return mds.tryAcquireLock(ctx, gcRunLock) +} + +func (mds *Datastore) UnlockAfterGCRun(ctx context.Context) error { + return mds.releaseLock(ctx, gcRunLock) +} + func (mds *Datastore) Now(ctx context.Context) (time.Time, error) { // Retrieve the `now` time from the database. nowSQL, nowArgs, err := getNow.ToSql() diff --git a/internal/datastore/mysql/locks.go b/internal/datastore/mysql/locks.go new file mode 100644 index 0000000000..310308c8ed --- /dev/null +++ b/internal/datastore/mysql/locks.go @@ -0,0 +1,33 @@ +package mysql + +import "context" + +type lockName string + +const ( + // gcRunLock is the lock name for the garbage collection run. + gcRunLock lockName = "gc_run" +) + +func (mds *Datastore) tryAcquireLock(ctx context.Context, lockName lockName) (bool, error) { + // Acquire the lock, with max 1s timeout. + row := mds.db.QueryRowContext(ctx, ` + SELECT GET_LOCK(?, 1) + `, lockName) + + var acquired int + if err := row.Scan(&acquired); err != nil { + return false, err + } + + return acquired == 1, nil +} + +func (mds *Datastore) releaseLock(ctx context.Context, lockName lockName) error { + _, err := mds.db.ExecContext(ctx, ` + SELECT RELEASE_LOCK(?) + `, + lockName, + ) + return err +} diff --git a/internal/datastore/postgres/gc.go b/internal/datastore/postgres/gc.go index 769859646e..674082171d 100644 --- a/internal/datastore/postgres/gc.go +++ b/internal/datastore/postgres/gc.go @@ -21,6 +21,14 @@ var ( gcPKCols = []string{"tableoid", "ctid"} ) +func (pgd *pgDatastore) LockForGCRun(ctx context.Context) (bool, error) { + return pgd.tryAcquireLock(ctx, gcRunLock) +} + +func (pgd *pgDatastore) UnlockAfterGCRun(ctx context.Context) error { + return pgd.releaseLock(ctx, gcRunLock) +} + func (pgd *pgDatastore) HasGCRun() bool { return pgd.gcHasRun.Load() } diff --git a/internal/datastore/postgres/locks.go b/internal/datastore/postgres/locks.go new file mode 100644 index 0000000000..86863e926c --- /dev/null +++ b/internal/datastore/postgres/locks.go @@ -0,0 +1,30 @@ +package postgres + +import "context" + +type lockID uint32 + +const ( + // gcRunLock is the lock ID for the garbage collection run. + gcRunLock lockID = 1 +) + +func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) { + // Acquire the lock. + row := pgd.writePool.QueryRow(ctx, ` + SELECT pg_try_advisory_lock($1) + `, lockID) + + var lockAcquired bool + if err := row.Scan(&lockAcquired); err != nil { + return false, err + } + return lockAcquired, nil +} + +func (pgd *pgDatastore) releaseLock(ctx context.Context, lockID lockID) error { + _, err := pgd.writePool.Exec(ctx, ` + SELECT pg_advisory_unlock($1) + `, lockID) + return err +} diff --git a/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go b/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go new file mode 100644 index 0000000000..9384c3c3c9 --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go @@ -0,0 +1,39 @@ +package migrations + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// addGCIndexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table +// to support garbage collection. This is in support of the query for selecting the most recent +// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1` +// +// EXPLAIN before the index: +// Limit (cost=0.56..1.78 rows=1 width=558) (actual time=5706.155..5706.156 rows=1 loops=1) +// -> Index Scan Backward using pk_rttx on relation_tuple_transaction (cost=0.56..30428800.04 rows=25023202 width=558) (actual time=5706.154..5706.155 rows=1 loops=1) +// +// Filter: ("timestamp" < (now() - '04:00:00'::interval)) +// Rows Removed by Filter: 6638121 +// +// Planning Time: 0.098 ms +// Execution Time: 5706.192 ms +// (6 rows) +const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY + IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp + ON relation_tuple_transaction (xid DESC, timestamp);` + +func init() { + if err := DatabaseMigrations.Register("add-missing-gc-index", "add-expiration-support", + func(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil { + return fmt.Errorf("failed to add missing GC index: %w", err) + } + return nil + }, + noTxMigration); err != nil { + panic("failed to register migration: " + err.Error()) + } +} diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 7e9573c77f..a343b5e6a8 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -130,6 +130,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionInversionTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -139,6 +140,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionHeadTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -148,6 +150,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -157,6 +160,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OverlappingRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -166,6 +170,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RepairTransactionsTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -175,6 +180,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { NullCaveatWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -184,6 +190,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionTimestampAndTransactionIDTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -193,6 +200,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { CheckpointsOnOutOfBandChangesTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -202,6 +210,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { SerializationErrorTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -211,10 +220,21 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { StrictReadModeTest, RevisionQuantization(0), GCWindow(1000*time.Second), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), ReadStrictMode(true), )) + + t.Run("TestLocking", createMultiDatastoreTest( + b, + LockingTest, + RevisionQuantization(0), + GCWindow(1000*time.Second), + GCInterval(1*time.Hour), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) } t.Run("OTelTracing", createDatastoreTest( @@ -222,6 +242,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OTelTracingTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -288,6 +309,28 @@ func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf datasto } } +type multiDatastoreTestFunc func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore) + +func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + var secondDS datastore.Datastore + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + ds2, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + secondDS = ds2 + return ds + }) + defer ds.Close() + + tf(t, ds, secondDS) + } +} + func SerializationErrorTest(t *testing.T, ds datastore.Datastore) { require := require.New(t) @@ -1163,6 +1206,7 @@ func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgV primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), ) require.NoError(err) @@ -1190,6 +1234,7 @@ func BenchmarkPostgresQuery(b *testing.B) { primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), ) require.NoError(b, err) @@ -1223,6 +1268,7 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), WithQueryInterceptor(interceptor), ) @@ -1444,6 +1490,44 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) { require.Greater(t, currentMaximumID, 12345) } +func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) { + pds := ds.(*pgDatastore) + pds2 := ds2.(*pgDatastore) + + // Acquire a lock. + ctx := context.Background() + acquired, err := pds.tryAcquireLock(ctx, 42) + require.NoError(t, err) + require.True(t, acquired) + + // Try to acquire again. Must be on a different session, as these locks are reentrant. + acquired, err = pds2.tryAcquireLock(ctx, 42) + require.NoError(t, err) + require.False(t, acquired) + + // Acquire another lock. + acquired, err = pds.tryAcquireLock(ctx, 43) + require.NoError(t, err) + require.True(t, acquired) + + // Release the first lock. + err = pds.releaseLock(ctx, 42) + require.NoError(t, err) + + // Try to acquire the first lock again. + acquired, err = pds.tryAcquireLock(ctx, 42) + require.NoError(t, err) + require.True(t, acquired) + + // Release the second lock. + err = pds.releaseLock(ctx, 43) + require.NoError(t, err) + + // Release the first lock. + err = pds.releaseLock(ctx, 42) + require.NoError(t, err) +} + func StrictReadModeTest(t *testing.T, ds datastore.Datastore) { require := require.New(t) diff --git a/internal/testserver/datastore/postgres.go b/internal/testserver/datastore/postgres.go index 78574f914d..f5c08d7ea8 100644 --- a/internal/testserver/datastore/postgres.go +++ b/internal/testserver/datastore/postgres.go @@ -97,7 +97,10 @@ func RunPostgresForTestingWithCommitTimestamps(t testing.TB, bridgeNetworkName s } t.Cleanup(func() { - require.NoError(t, builder.hostConn.Close(context.Background())) + if builder.hostConn != nil { + require.NoError(t, builder.hostConn.Close(context.Background())) + } + require.NoError(t, pool.Purge(postgres)) }) From 4f1ed5f6d26ac805dd72b99d469dfe23ee69cfc6 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 10 Dec 2024 16:58:31 -0500 Subject: [PATCH 2/3] Move GC tests in PG out of parallel running This is necessary because GC takes an exclusive lock now --- internal/datastore/common/gc.go | 30 ++++---- internal/datastore/mysql/locks.go | 7 ++ internal/datastore/postgres/locks.go | 29 ++++++- ...tion.0022_add_index_for_transaction_gc.go} | 2 +- .../postgres/postgres_shared_test.go | 77 +++++++++++++++---- pkg/datastore/test/datastore.go | 13 +++- 6 files changed, 120 insertions(+), 38 deletions(-) rename internal/datastore/postgres/migrations/{zz_migration.0022_add_missing_gc_index.go => zz_migration.0022_add_index_for_transaction_gc.go} (93%) diff --git a/internal/datastore/common/gc.go b/internal/datastore/common/gc.go index 1e79763441..729739273e 100644 --- a/internal/datastore/common/gc.go +++ b/internal/datastore/common/gc.go @@ -169,6 +169,21 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + ctx, span := tracer.Start(ctx, "RunGarbageCollection") + defer span.End() + + // Before attempting anything, check if the datastore is ready. + startTime := time.Now() + ready, err := gc.ReadyState(ctx) + if err != nil { + return err + } + if !ready.IsReady { + log.Ctx(ctx).Warn(). + Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message) + return nil + } + ok, err := gc.LockForGCRun(ctx) if err != nil { return fmt.Errorf("error locking for gc run: %w", err) @@ -189,21 +204,6 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er } }() - ctx, span := tracer.Start(ctx, "RunGarbageCollection") - defer span.End() - - // Before attempting anything, check if the datastore is ready. - startTime := time.Now() - ready, err := gc.ReadyState(ctx) - if err != nil { - return err - } - if !ready.IsReady { - log.Ctx(ctx).Warn(). - Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message) - return nil - } - now, err := gc.Now(ctx) if err != nil { return fmt.Errorf("error retrieving now: %w", err) diff --git a/internal/datastore/mysql/locks.go b/internal/datastore/mysql/locks.go index 310308c8ed..ff8d2b19a5 100644 --- a/internal/datastore/mysql/locks.go +++ b/internal/datastore/mysql/locks.go @@ -11,6 +11,12 @@ const ( func (mds *Datastore) tryAcquireLock(ctx context.Context, lockName lockName) (bool, error) { // Acquire the lock, with max 1s timeout. + // A lock obtained with GET_LOCK() is released explicitly by executing RELEASE_LOCK() + // + // NOTE: Lock is re-entrant, i.e. the same session can acquire the same lock multiple times. + // > It is even possible for a given session to acquire multiple locks for the same name. + // > Other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name. + // See: https://dev.mysql.com/doc/refman/8.4/en/locking-functions.html#function_get-lock row := mds.db.QueryRowContext(ctx, ` SELECT GET_LOCK(?, 1) `, lockName) @@ -24,6 +30,7 @@ func (mds *Datastore) tryAcquireLock(ctx context.Context, lockName lockName) (bo } func (mds *Datastore) releaseLock(ctx context.Context, lockName lockName) error { + // See: https://dev.mysql.com/doc/refman/8.4/en/locking-functions.html#function_release-lock _, err := mds.db.ExecContext(ctx, ` SELECT RELEASE_LOCK(?) `, diff --git a/internal/datastore/postgres/locks.go b/internal/datastore/postgres/locks.go index 86863e926c..497385cf49 100644 --- a/internal/datastore/postgres/locks.go +++ b/internal/datastore/postgres/locks.go @@ -1,6 +1,10 @@ package postgres -import "context" +import ( + "context" + + log "github.com/authzed/spicedb/internal/logging" +) type lockID uint32 @@ -11,6 +15,14 @@ const ( func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) { // Acquire the lock. + // + // NOTE: The lock is re-entrant, i.e. the same session can acquire the same lock multiple times. + // > A lock can be acquired multiple times by its owning process; for each completed lock request + // > there must be a corresponding unlock request before the lock is actually released + // > If a session already holds a given advisory lock, additional requests by it will always succeed, + // > even if other sessions are awaiting the lock; this statement is true regardless of whether the + // > existing lock hold and new request are at session level or transaction level. + // See: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS row := pgd.writePool.QueryRow(ctx, ` SELECT pg_try_advisory_lock($1) `, lockID) @@ -23,8 +35,19 @@ func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool } func (pgd *pgDatastore) releaseLock(ctx context.Context, lockID lockID) error { - _, err := pgd.writePool.Exec(ctx, ` + row := pgd.writePool.QueryRow(ctx, ` SELECT pg_advisory_unlock($1) `, lockID) - return err + + var lockReleased bool + if err := row.Scan(&lockReleased); err != nil { + return err + } + + if !lockReleased { + log.Warn().Uint32("lock_id", uint32(lockID)).Msg("held lock not released; this likely indicates a bug") + return nil + } + + return nil } diff --git a/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go b/internal/datastore/postgres/migrations/zz_migration.0022_add_index_for_transaction_gc.go similarity index 93% rename from internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go rename to internal/datastore/postgres/migrations/zz_migration.0022_add_index_for_transaction_gc.go index 9384c3c3c9..0905f4b490 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go +++ b/internal/datastore/postgres/migrations/zz_migration.0022_add_index_for_transaction_gc.go @@ -26,7 +26,7 @@ const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY ON relation_tuple_transaction (xid DESC, timestamp);` func init() { - if err := DatabaseMigrations.Register("add-missing-gc-index", "add-expiration-support", + if err := DatabaseMigrations.Register("add-index-for-transaction-gc", "add-expiration-support", func(ctx context.Context, conn *pgx.Conn) error { if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil { return fmt.Errorf("failed to add missing GC index: %w", err) diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index a343b5e6a8..aaadbb52d4 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -81,12 +81,13 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { if config.pgbouncer { pgbouncerStr = "pgbouncer-" } - t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { - t.Parallel() + + t.Run(fmt.Sprintf("%spostgres-%s-%s-%s-gc", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer) ctx := context.Background() - test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + // NOTE: gc tests take exclusive locks, so they are run under non-parallel. + test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, RevisionQuantization(revisionQuantization), @@ -101,6 +102,28 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { }) return ds, nil }), false) + }) + + t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { + t.Parallel() + b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer) + ctx := context.Background() + + test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, + RevisionQuantization(revisionQuantization), + GCWindow(gcWindow), + GCInterval(veryLargeGCInterval), + WatchBufferLength(watchBufferLength), + DebugAnalyzeBeforeStatistics(), + MigrationPhase(config.migrationPhase), + ) + require.NoError(t, err) + return ds + }) + return ds, nil + }), test.WithCategories(test.GCCategory), false) t.Run("TransactionTimestamps", createDatastoreTest( b, @@ -130,7 +153,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionInversionTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -140,7 +163,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionHeadTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -150,7 +173,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -160,7 +183,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OverlappingRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -170,7 +193,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RepairTransactionsTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -180,7 +203,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { NullCaveatWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -190,7 +213,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionTimestampAndTransactionIDTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -200,7 +223,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { CheckpointsOnOutOfBandChangesTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -210,7 +233,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { SerializationErrorTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -220,7 +243,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { StrictReadModeTest, RevisionQuantization(0), GCWindow(1000*time.Second), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), ReadStrictMode(true), @@ -231,7 +254,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { LockingTest, RevisionQuantization(0), GCWindow(1000*time.Second), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -242,7 +265,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OTelTracingTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), - GCInterval(1*time.Hour), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -261,7 +284,27 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, pc []postgresCon b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer) // NOTE: watch API requires the commit timestamps, so we skip those tests here. - test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + // NOTE: gc tests take exclusive locks, so they are run under non-parallel. + test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, + RevisionQuantization(revisionQuantization), + GCWindow(gcWindow), + GCInterval(veryLargeGCInterval), + WatchBufferLength(watchBufferLength), + DebugAnalyzeBeforeStatistics(), + ) + require.NoError(t, err) + return ds + }) + return ds, nil + }), test.WithCategories(test.WatchCategory, test.GCCategory), false) + }) + + t.Run(fmt.Sprintf("postgres-%s-gc", pgVersion), func(t *testing.T) { + ctx := context.Background() + b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer) + test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, RevisionQuantization(revisionQuantization), @@ -274,7 +317,7 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, pc []postgresCon return ds }) return ds, nil - }), test.WithCategories(test.WatchCategory), false) + }), false) }) } } diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 40b34c01a3..fc3802bb86 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -152,8 +152,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories, t.Run("TestCheckRevisions", runner(tester, CheckRevisionsTest)) if !except.GC() { - t.Run("TestRevisionGC", runner(tester, RevisionGCTest)) - t.Run("TestInvalidReads", runner(tester, InvalidReadsTest)) + OnlyGCTests(t, tester, concurrent) } t.Run("TestBulkUpload", runner(tester, BulkUploadTest)) @@ -201,6 +200,16 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories, t.Run("TestRelationshipCounterOverExpired", runner(tester, RelationshipCounterOverExpiredTest)) } +func OnlyGCTests(t *testing.T, tester DatastoreTester, concurrent bool) { + runner := serial + if concurrent { + runner = parallel + } + + t.Run("TestRevisionGC", runner(tester, RevisionGCTest)) + t.Run("TestInvalidReads", runner(tester, InvalidReadsTest)) +} + // All runs all generic datastore tests on a DatastoreTester. func All(t *testing.T, tester DatastoreTester, concurrent bool) { AllWithExceptions(t, tester, noException, concurrent) From 6e584aafcc98956badef6fd7ed33faede70cfe72 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 11 Dec 2024 16:40:47 -0500 Subject: [PATCH 3/3] Increase timeout on GC check in GC test --- pkg/datastore/test/revisions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/datastore/test/revisions.go b/pkg/datastore/test/revisions.go index 0354329997..ed21c944ab 100644 --- a/pkg/datastore/test/revisions.go +++ b/pkg/datastore/test/revisions.go @@ -131,7 +131,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) { gcable, ok := ds.(common.GarbageCollector) if ok { gcable.ResetGCCompleted() - require.Eventually(func() bool { return gcable.HasGCRun() }, 5*time.Second, 50*time.Millisecond, "GC was never run as expected") + require.Eventually(func() bool { return gcable.HasGCRun() }, 10*time.Second, 150*time.Millisecond, "GC was never run as expected") } // FIXME currently the various datastores behave differently when a revision was requested and GC Window elapses.