diff --git a/go.mod b/go.mod index 5cd3b703e2..6f1a49be7b 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,8 @@ require ( sigs.k8s.io/controller-runtime v0.19.2 ) +require k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 + require ( 4d63.com/gocheckcompilerdirectives v1.2.1 // indirect 4d63.com/gochecknoglobals v0.2.1 // indirect @@ -404,7 +406,6 @@ require ( k8s.io/client-go v0.31.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect - k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect mvdan.cc/gofumpt v0.7.0 // indirect mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/internal/datastore/common/gc.go b/internal/datastore/common/gc.go index d3c90647ab..1e79763441 100644 --- a/internal/datastore/common/gc.go +++ b/internal/datastore/common/gc.go @@ -84,6 +84,9 @@ type GarbageCollector interface { MarkGCCompleted() ResetGCCompleted() + LockForGCRun(ctx context.Context) (bool, error) + UnlockAfterGCRun(ctx context.Context) error + ReadyState(context.Context) (datastore.ReadyState, error) Now(context.Context) (time.Time, error) TxIDBefore(context.Context, time.Time) (datastore.Revision, error) @@ -166,6 +169,26 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + ok, err := gc.LockForGCRun(ctx) + if err != nil { + return fmt.Errorf("error locking for gc run: %w", err) + } + + if !ok { + log.Info(). + Msg("datastore garbage collection already in progress on another node") + return nil + } + + defer func() { + err := gc.UnlockAfterGCRun(ctx) + if err != nil { + log.Error(). + Err(err). + Msg("error unlocking after gc run") + } + }() + ctx, span := tracer.Start(ctx, "RunGarbageCollection") defer span.End() diff --git a/internal/datastore/common/gc_test.go b/internal/datastore/common/gc_test.go index aa42bc9062..60a1db203e 100644 --- a/internal/datastore/common/gc_test.go +++ b/internal/datastore/common/gc_test.go @@ -39,6 +39,14 @@ func newFakeGC(deleter gcDeleter) fakeGC { } } +func (*fakeGC) LockForGCRun(ctx context.Context) (bool, error) { + return true, nil +} + +func (*fakeGC) UnlockAfterGCRun(ctx context.Context) error { + return nil +} + func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) { return datastore.ReadyState{ Message: "Ready", diff --git a/internal/datastore/mysql/datastore_test.go b/internal/datastore/mysql/datastore_test.go index e7caadc6eb..0b35ef09f8 100644 --- a/internal/datastore/mysql/datastore_test.go +++ b/internal/datastore/mysql/datastore_test.go @@ -91,6 +91,29 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF } } +type multiDatastoreTestFunc func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore) + +func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + + var secondDS datastore.Datastore + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + secondDS = ds2 + return ds + }) + defer failOnError(t, ds.Close) + + tf(t, ds, secondDS) + } +} + func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) { _, err := NewMySQLDatastore(context.Background(), "root:password@(localhost:1234)/mysql") require.ErrorContains(t, err, "https://spicedb.dev/d/parse-time-mysql") @@ -127,6 +150,45 @@ func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) { t.Run("QuantizedRevisions", func(t *testing.T) { QuantizedRevisionTest(t, b) }) + t.Run("Locking", createMultiDatastoreTest(b, LockingTest, defaultOptions...)) +} + +func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) { + mds := ds.(*Datastore) + mds2 := ds2.(*Datastore) + + // Acquire a lock. + ctx := context.Background() + acquired, err := mds.tryAcquireLock(ctx, "testing123") + require.NoError(t, err) + require.True(t, acquired) + + // Try to acquire the lock again. + acquired, err = mds2.tryAcquireLock(ctx, "testing123") + require.NoError(t, err) + require.False(t, acquired) + + // Acquire another lock. + acquired, err = mds.tryAcquireLock(ctx, "testing456") + require.NoError(t, err) + require.True(t, acquired) + + // Release the other lock. + err = mds.releaseLock(ctx, "testing123") + require.NoError(t, err) + + // Release the lock. + err = mds.releaseLock(ctx, "testing123") + require.NoError(t, err) + + // Try to acquire the lock again. + acquired, err = mds2.tryAcquireLock(ctx, "testing123") + require.NoError(t, err) + require.True(t, acquired) + + // Release the lock. + err = mds2.releaseLock(ctx, "testing123") + require.NoError(t, err) } func DatabaseSeedingTest(t *testing.T, ds datastore.Datastore) { diff --git a/internal/datastore/mysql/gc.go b/internal/datastore/mysql/gc.go index bf2c04e993..d4375eddbd 100644 --- a/internal/datastore/mysql/gc.go +++ b/internal/datastore/mysql/gc.go @@ -29,6 +29,14 @@ func (mds *Datastore) ResetGCCompleted() { mds.gcHasRun.Store(false) } +func (mds *Datastore) LockForGCRun(ctx context.Context) (bool, error) { + return mds.tryAcquireLock(ctx, gcRunLock) +} + +func (mds *Datastore) UnlockAfterGCRun(ctx context.Context) error { + return mds.releaseLock(ctx, gcRunLock) +} + func (mds *Datastore) Now(ctx context.Context) (time.Time, error) { // Retrieve the `now` time from the database. nowSQL, nowArgs, err := getNow.ToSql() diff --git a/internal/datastore/mysql/locks.go b/internal/datastore/mysql/locks.go new file mode 100644 index 0000000000..310308c8ed --- /dev/null +++ b/internal/datastore/mysql/locks.go @@ -0,0 +1,33 @@ +package mysql + +import "context" + +type lockName string + +const ( + // gcRunLock is the lock name for the garbage collection run. + gcRunLock lockName = "gc_run" +) + +func (mds *Datastore) tryAcquireLock(ctx context.Context, lockName lockName) (bool, error) { + // Acquire the lock, with max 1s timeout. + row := mds.db.QueryRowContext(ctx, ` + SELECT GET_LOCK(?, 1) + `, lockName) + + var acquired int + if err := row.Scan(&acquired); err != nil { + return false, err + } + + return acquired == 1, nil +} + +func (mds *Datastore) releaseLock(ctx context.Context, lockName lockName) error { + _, err := mds.db.ExecContext(ctx, ` + SELECT RELEASE_LOCK(?) + `, + lockName, + ) + return err +} diff --git a/internal/datastore/postgres/gc.go b/internal/datastore/postgres/gc.go index 769859646e..674082171d 100644 --- a/internal/datastore/postgres/gc.go +++ b/internal/datastore/postgres/gc.go @@ -21,6 +21,14 @@ var ( gcPKCols = []string{"tableoid", "ctid"} ) +func (pgd *pgDatastore) LockForGCRun(ctx context.Context) (bool, error) { + return pgd.tryAcquireLock(ctx, gcRunLock) +} + +func (pgd *pgDatastore) UnlockAfterGCRun(ctx context.Context) error { + return pgd.releaseLock(ctx, gcRunLock) +} + func (pgd *pgDatastore) HasGCRun() bool { return pgd.gcHasRun.Load() } diff --git a/internal/datastore/postgres/locks.go b/internal/datastore/postgres/locks.go new file mode 100644 index 0000000000..86863e926c --- /dev/null +++ b/internal/datastore/postgres/locks.go @@ -0,0 +1,30 @@ +package postgres + +import "context" + +type lockID uint32 + +const ( + // gcRunLock is the lock ID for the garbage collection run. + gcRunLock lockID = 1 +) + +func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) { + // Acquire the lock. + row := pgd.writePool.QueryRow(ctx, ` + SELECT pg_try_advisory_lock($1) + `, lockID) + + var lockAcquired bool + if err := row.Scan(&lockAcquired); err != nil { + return false, err + } + return lockAcquired, nil +} + +func (pgd *pgDatastore) releaseLock(ctx context.Context, lockID lockID) error { + _, err := pgd.writePool.Exec(ctx, ` + SELECT pg_advisory_unlock($1) + `, lockID) + return err +} diff --git a/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go b/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go new file mode 100644 index 0000000000..9384c3c3c9 --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0022_add_missing_gc_index.go @@ -0,0 +1,39 @@ +package migrations + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// addGCIndexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table +// to support garbage collection. This is in support of the query for selecting the most recent +// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1` +// +// EXPLAIN before the index: +// Limit (cost=0.56..1.78 rows=1 width=558) (actual time=5706.155..5706.156 rows=1 loops=1) +// -> Index Scan Backward using pk_rttx on relation_tuple_transaction (cost=0.56..30428800.04 rows=25023202 width=558) (actual time=5706.154..5706.155 rows=1 loops=1) +// +// Filter: ("timestamp" < (now() - '04:00:00'::interval)) +// Rows Removed by Filter: 6638121 +// +// Planning Time: 0.098 ms +// Execution Time: 5706.192 ms +// (6 rows) +const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY + IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp + ON relation_tuple_transaction (xid DESC, timestamp);` + +func init() { + if err := DatabaseMigrations.Register("add-missing-gc-index", "add-expiration-support", + func(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil { + return fmt.Errorf("failed to add missing GC index: %w", err) + } + return nil + }, + noTxMigration); err != nil { + panic("failed to register migration: " + err.Error()) + } +} diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 7e9573c77f..a343b5e6a8 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -130,6 +130,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionInversionTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -139,6 +140,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionHeadTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -148,6 +150,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -157,6 +160,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OverlappingRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -166,6 +170,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RepairTransactionsTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -175,6 +180,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { NullCaveatWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -184,6 +190,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionTimestampAndTransactionIDTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -193,6 +200,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { CheckpointsOnOutOfBandChangesTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -202,6 +210,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { SerializationErrorTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -211,10 +220,21 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { StrictReadModeTest, RevisionQuantization(0), GCWindow(1000*time.Second), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), ReadStrictMode(true), )) + + t.Run("TestLocking", createMultiDatastoreTest( + b, + LockingTest, + RevisionQuantization(0), + GCWindow(1000*time.Second), + GCInterval(1*time.Hour), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) } t.Run("OTelTracing", createDatastoreTest( @@ -222,6 +242,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OTelTracingTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -288,6 +309,28 @@ func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf datasto } } +type multiDatastoreTestFunc func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore) + +func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + var secondDS datastore.Datastore + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + ds2, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...) + require.NoError(t, err) + + secondDS = ds2 + return ds + }) + defer ds.Close() + + tf(t, ds, secondDS) + } +} + func SerializationErrorTest(t *testing.T, ds datastore.Datastore) { require := require.New(t) @@ -1163,6 +1206,7 @@ func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgV primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), ) require.NoError(err) @@ -1190,6 +1234,7 @@ func BenchmarkPostgresQuery(b *testing.B) { primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), ) require.NoError(b, err) @@ -1223,6 +1268,7 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), WithQueryInterceptor(interceptor), ) @@ -1444,6 +1490,44 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) { require.Greater(t, currentMaximumID, 12345) } +func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) { + pds := ds.(*pgDatastore) + pds2 := ds2.(*pgDatastore) + + // Acquire a lock. + ctx := context.Background() + acquired, err := pds.tryAcquireLock(ctx, 42) + require.NoError(t, err) + require.True(t, acquired) + + // Try to acquire again. Must be on a different session, as these locks are reentrant. + acquired, err = pds2.tryAcquireLock(ctx, 42) + require.NoError(t, err) + require.False(t, acquired) + + // Acquire another lock. + acquired, err = pds.tryAcquireLock(ctx, 43) + require.NoError(t, err) + require.True(t, acquired) + + // Release the first lock. + err = pds.releaseLock(ctx, 42) + require.NoError(t, err) + + // Try to acquire the first lock again. + acquired, err = pds.tryAcquireLock(ctx, 42) + require.NoError(t, err) + require.True(t, acquired) + + // Release the second lock. + err = pds.releaseLock(ctx, 43) + require.NoError(t, err) + + // Release the first lock. + err = pds.releaseLock(ctx, 42) + require.NoError(t, err) +} + func StrictReadModeTest(t *testing.T, ds datastore.Datastore) { require := require.New(t) diff --git a/internal/testserver/datastore/postgres.go b/internal/testserver/datastore/postgres.go index 78574f914d..f5c08d7ea8 100644 --- a/internal/testserver/datastore/postgres.go +++ b/internal/testserver/datastore/postgres.go @@ -97,7 +97,10 @@ func RunPostgresForTestingWithCommitTimestamps(t testing.TB, bridgeNetworkName s } t.Cleanup(func() { - require.NoError(t, builder.hostConn.Close(context.Background())) + if builder.hostConn != nil { + require.NoError(t, builder.hostConn.Close(context.Background())) + } + require.NoError(t, pool.Purge(postgres)) })