Skip to content

Commit

Permalink
Add missing index for GC in Postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Dec 9, 2024
1 parent 49d31c6 commit 62550af
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 5 deletions.
11 changes: 7 additions & 4 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"cirello.io/pglock"
sq "github.com/Masterminds/squirrel"
"github.com/rs/zerolog/log"

"github.com/authzed/spicedb/internal/datastore/common"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
Expand All @@ -26,11 +25,16 @@ var (
)

const (
gcLockName = "pgdatastoregclock"
lockTimeoutDelay = 5 * time.Second
gcLockName = "pgdatastoregclock"
lockTimeoutDelay = 5 * time.Second
lockMinimumInterval = 30 * time.Second
)

func (pgd *pgDatastore) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) {
if pgd.gcInterval < lockMinimumInterval {
return true, gcRun(ctx)
}

var wasSkipped bool
err := pgxcommon.RunWithLocksClientOverPool(pgd.rawWritePool, timeout+lockTimeoutDelay, func(client *pglock.Client) error {
// Run the GC process under the lock.
Expand All @@ -45,7 +49,6 @@ func (pgd *pgDatastore) LockGCRun(ctx context.Context, timeout time.Duration, gc
})
if err != nil {
if errors.Is(err, pglock.ErrNotAcquired) {
log.Debug().Err(err).Msg("did not acquire lock for GC run; GC is likely being run by another node")
return false, nil
}
return false, err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY
IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp
ON relation_tuple_transaction (xid DESC, timestamp);`

func init() {
if err := DatabaseMigrations.Register("add-missing-gc-index", "add-gc-lock-table",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil {
return fmt.Errorf("failed to add missing GC index: %w", err)
}
return nil
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
}
}
14 changes: 14 additions & 0 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
RevisionInversionTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -139,6 +140,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
ConcurrentRevisionHeadTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -148,6 +150,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
ConcurrentRevisionWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -157,6 +160,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
OverlappingRevisionWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -166,6 +170,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
RepairTransactionsTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -175,6 +180,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
NullCaveatWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -184,6 +190,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
RevisionTimestampAndTransactionIDTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -193,6 +200,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
CheckpointsOnOutOfBandChangesTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -202,6 +210,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
SerializationErrorTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -211,6 +220,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
StrictReadModeTest,
RevisionQuantization(0),
GCWindow(1000*time.Second),
GCInterval(1*time.Hour),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
ReadStrictMode(true),
Expand All @@ -222,6 +232,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
OTelTracingTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand Down Expand Up @@ -1163,6 +1174,7 @@ func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgV
primaryInstanceID,
RevisionQuantization(0),
GCWindow(time.Millisecond*1),
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
)
require.NoError(err)
Expand Down Expand Up @@ -1190,6 +1202,7 @@ func BenchmarkPostgresQuery(b *testing.B) {
primaryInstanceID,
RevisionQuantization(0),
GCWindow(time.Millisecond*1),
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
)
require.NoError(b, err)
Expand Down Expand Up @@ -1223,6 +1236,7 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer
primaryInstanceID,
RevisionQuantization(0),
GCWindow(time.Millisecond*1),
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
WithQueryInterceptor(interceptor),
)
Expand Down
5 changes: 4 additions & 1 deletion internal/testserver/datastore/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ func RunPostgresForTestingWithCommitTimestamps(t testing.TB, bridgeNetworkName s
}

t.Cleanup(func() {
require.NoError(t, builder.hostConn.Close(context.Background()))
if builder.hostConn != nil {
require.NoError(t, builder.hostConn.Close(context.Background()))
}

require.NoError(t, pool.Purge(postgres))
})

Expand Down

0 comments on commit 62550af

Please sign in to comment.