From 62550af9584d7a376a5cef36ea15ffcde3a5fe77 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 9 Dec 2024 16:11:03 -0500 Subject: [PATCH] Add missing index for GC in Postgres --- internal/datastore/postgres/gc.go | 11 +++++--- .../zz_migration.0023_add_missing_gc_index.go | 25 +++++++++++++++++++ .../postgres/postgres_shared_test.go | 14 +++++++++++ internal/testserver/datastore/postgres.go | 5 +++- 4 files changed, 50 insertions(+), 5 deletions(-) create mode 100644 internal/datastore/postgres/migrations/zz_migration.0023_add_missing_gc_index.go diff --git a/internal/datastore/postgres/gc.go b/internal/datastore/postgres/gc.go index 4be068c23c..1703ad0fc2 100644 --- a/internal/datastore/postgres/gc.go +++ b/internal/datastore/postgres/gc.go @@ -9,7 +9,6 @@ import ( "cirello.io/pglock" sq "github.com/Masterminds/squirrel" - "github.com/rs/zerolog/log" "github.com/authzed/spicedb/internal/datastore/common" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" @@ -26,11 +25,16 @@ var ( ) const ( - gcLockName = "pgdatastoregclock" - lockTimeoutDelay = 5 * time.Second + gcLockName = "pgdatastoregclock" + lockTimeoutDelay = 5 * time.Second + lockMinimumInterval = 30 * time.Second ) func (pgd *pgDatastore) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) { + if pgd.gcInterval < lockMinimumInterval { + return true, gcRun(ctx) + } + var wasSkipped bool err := pgxcommon.RunWithLocksClientOverPool(pgd.rawWritePool, timeout+lockTimeoutDelay, func(client *pglock.Client) error { // Run the GC process under the lock. @@ -45,7 +49,6 @@ func (pgd *pgDatastore) LockGCRun(ctx context.Context, timeout time.Duration, gc }) if err != nil { if errors.Is(err, pglock.ErrNotAcquired) { - log.Debug().Err(err).Msg("did not acquire lock for GC run; GC is likely being run by another node") return false, nil } return false, err diff --git a/internal/datastore/postgres/migrations/zz_migration.0023_add_missing_gc_index.go b/internal/datastore/postgres/migrations/zz_migration.0023_add_missing_gc_index.go new file mode 100644 index 0000000000..952572301e --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0023_add_missing_gc_index.go @@ -0,0 +1,25 @@ +package migrations + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY + IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp + ON relation_tuple_transaction (xid DESC, timestamp);` + +func init() { + if err := DatabaseMigrations.Register("add-missing-gc-index", "add-gc-lock-table", + func(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil { + return fmt.Errorf("failed to add missing GC index: %w", err) + } + return nil + }, + noTxMigration); err != nil { + panic("failed to register migration: " + err.Error()) + } +} diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 7e9573c77f..1359027eac 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -130,6 +130,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionInversionTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -139,6 +140,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionHeadTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -148,6 +150,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { ConcurrentRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -157,6 +160,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OverlappingRevisionWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -166,6 +170,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RepairTransactionsTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -175,6 +180,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { NullCaveatWatchTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -184,6 +190,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { RevisionTimestampAndTransactionIDTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -193,6 +200,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { CheckpointsOnOutOfBandChangesTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -202,6 +210,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { SerializationErrorTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -211,6 +220,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { StrictReadModeTest, RevisionQuantization(0), GCWindow(1000*time.Second), + GCInterval(1*time.Hour), WatchBufferLength(50), MigrationPhase(config.migrationPhase), ReadStrictMode(true), @@ -222,6 +232,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { OTelTracingTest, RevisionQuantization(0), GCWindow(1*time.Millisecond), + GCInterval(1*time.Hour), WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -1163,6 +1174,7 @@ func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgV primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), ) require.NoError(err) @@ -1190,6 +1202,7 @@ func BenchmarkPostgresQuery(b *testing.B) { primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), ) require.NoError(b, err) @@ -1223,6 +1236,7 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer primaryInstanceID, RevisionQuantization(0), GCWindow(time.Millisecond*1), + GCInterval(veryLargeGCInterval), WatchBufferLength(1), WithQueryInterceptor(interceptor), ) diff --git a/internal/testserver/datastore/postgres.go b/internal/testserver/datastore/postgres.go index 78574f914d..f5c08d7ea8 100644 --- a/internal/testserver/datastore/postgres.go +++ b/internal/testserver/datastore/postgres.go @@ -97,7 +97,10 @@ func RunPostgresForTestingWithCommitTimestamps(t testing.TB, bridgeNetworkName s } t.Cleanup(func() { - require.NoError(t, builder.hostConn.Close(context.Background())) + if builder.hostConn != nil { + require.NoError(t, builder.hostConn.Close(context.Background())) + } + require.NoError(t, pool.Purge(postgres)) })