Skip to content

Commit

Permalink
Merge pull request #2159 from josephschorr/postgres-gc-lock
Browse files Browse the repository at this point in the history
GC improvements: GC only on a single node and add a missing index in PG
  • Loading branch information
josephschorr authored Dec 12, 2024
2 parents b3e567e + 6e584aa commit 5b4d416
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 10 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ require (
sigs.k8s.io/controller-runtime v0.19.2
)

require k8s.io/utils v0.0.0-20240711033017-18e509b52bc8

require (
4d63.com/gocheckcompilerdirectives v1.2.1 // indirect
4d63.com/gochecknoglobals v0.2.1 // indirect
Expand Down Expand Up @@ -404,7 +406,6 @@ require (
k8s.io/client-go v0.31.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
mvdan.cc/gofumpt v0.7.0 // indirect
mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
23 changes: 23 additions & 0 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type GarbageCollector interface {
MarkGCCompleted()
ResetGCCompleted()

LockForGCRun(ctx context.Context) (bool, error)
UnlockAfterGCRun(ctx context.Context) error

ReadyState(context.Context) (datastore.ReadyState, error)
Now(context.Context) (time.Time, error)
TxIDBefore(context.Context, time.Time) (datastore.Revision, error)
Expand Down Expand Up @@ -181,6 +184,26 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
return nil
}

ok, err := gc.LockForGCRun(ctx)
if err != nil {
return fmt.Errorf("error locking for gc run: %w", err)
}

if !ok {
log.Info().
Msg("datastore garbage collection already in progress on another node")
return nil
}

defer func() {
err := gc.UnlockAfterGCRun(ctx)
if err != nil {
log.Error().
Err(err).
Msg("error unlocking after gc run")
}
}()

now, err := gc.Now(ctx)
if err != nil {
return fmt.Errorf("error retrieving now: %w", err)
Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func newFakeGC(deleter gcDeleter) fakeGC {
}
}

func (*fakeGC) LockForGCRun(ctx context.Context) (bool, error) {
return true, nil
}

func (*fakeGC) UnlockAfterGCRun(ctx context.Context) error {
return nil
}

func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) {
return datastore.ReadyState{
Message: "Ready",
Expand Down
62 changes: 62 additions & 0 deletions internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF
}
}

type multiDatastoreTestFunc func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore)

func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()

var secondDS datastore.Datastore
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
require.NoError(t, err)

ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
require.NoError(t, err)

secondDS = ds2
return ds
})
defer failOnError(t, ds.Close)

tf(t, ds, secondDS)
}
}

func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) {
_, err := NewMySQLDatastore(context.Background(), "root:password@(localhost:1234)/mysql")
require.ErrorContains(t, err, "https://spicedb.dev/d/parse-time-mysql")
Expand Down Expand Up @@ -127,6 +150,45 @@ func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) {
t.Run("QuantizedRevisions", func(t *testing.T) {
QuantizedRevisionTest(t, b)
})
t.Run("Locking", createMultiDatastoreTest(b, LockingTest, defaultOptions...))
}

func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) {
mds := ds.(*Datastore)
mds2 := ds2.(*Datastore)

// Acquire a lock.
ctx := context.Background()
acquired, err := mds.tryAcquireLock(ctx, "testing123")
require.NoError(t, err)
require.True(t, acquired)

// Try to acquire the lock again.
acquired, err = mds2.tryAcquireLock(ctx, "testing123")
require.NoError(t, err)
require.False(t, acquired)

// Acquire another lock.
acquired, err = mds.tryAcquireLock(ctx, "testing456")
require.NoError(t, err)
require.True(t, acquired)

// Release the other lock.
err = mds.releaseLock(ctx, "testing123")
require.NoError(t, err)

// Release the lock.
err = mds.releaseLock(ctx, "testing123")
require.NoError(t, err)

// Try to acquire the lock again.
acquired, err = mds2.tryAcquireLock(ctx, "testing123")
require.NoError(t, err)
require.True(t, acquired)

// Release the lock.
err = mds2.releaseLock(ctx, "testing123")
require.NoError(t, err)
}

func DatabaseSeedingTest(t *testing.T, ds datastore.Datastore) {
Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/mysql/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ func (mds *Datastore) ResetGCCompleted() {
mds.gcHasRun.Store(false)
}

func (mds *Datastore) LockForGCRun(ctx context.Context) (bool, error) {
return mds.tryAcquireLock(ctx, gcRunLock)
}

func (mds *Datastore) UnlockAfterGCRun(ctx context.Context) error {
return mds.releaseLock(ctx, gcRunLock)
}

func (mds *Datastore) Now(ctx context.Context) (time.Time, error) {
// Retrieve the `now` time from the database.
nowSQL, nowArgs, err := getNow.ToSql()
Expand Down
40 changes: 40 additions & 0 deletions internal/datastore/mysql/locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package mysql

import "context"

type lockName string

const (
// gcRunLock is the lock name for the garbage collection run.
gcRunLock lockName = "gc_run"
)

func (mds *Datastore) tryAcquireLock(ctx context.Context, lockName lockName) (bool, error) {
// Acquire the lock, with max 1s timeout.
// A lock obtained with GET_LOCK() is released explicitly by executing RELEASE_LOCK()
//
// NOTE: Lock is re-entrant, i.e. the same session can acquire the same lock multiple times.
// > It is even possible for a given session to acquire multiple locks for the same name.
// > Other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name.
// See: https://dev.mysql.com/doc/refman/8.4/en/locking-functions.html#function_get-lock
row := mds.db.QueryRowContext(ctx, `
SELECT GET_LOCK(?, 1)
`, lockName)

var acquired int
if err := row.Scan(&acquired); err != nil {
return false, err
}

return acquired == 1, nil
}

func (mds *Datastore) releaseLock(ctx context.Context, lockName lockName) error {
// See: https://dev.mysql.com/doc/refman/8.4/en/locking-functions.html#function_release-lock
_, err := mds.db.ExecContext(ctx, `
SELECT RELEASE_LOCK(?)
`,
lockName,
)
return err
}
8 changes: 8 additions & 0 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ var (
gcPKCols = []string{"tableoid", "ctid"}
)

func (pgd *pgDatastore) LockForGCRun(ctx context.Context) (bool, error) {
return pgd.tryAcquireLock(ctx, gcRunLock)
}

func (pgd *pgDatastore) UnlockAfterGCRun(ctx context.Context) error {
return pgd.releaseLock(ctx, gcRunLock)
}

func (pgd *pgDatastore) HasGCRun() bool {
return pgd.gcHasRun.Load()
}
Expand Down
53 changes: 53 additions & 0 deletions internal/datastore/postgres/locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package postgres

import (
"context"

log "github.com/authzed/spicedb/internal/logging"
)

type lockID uint32

const (
// gcRunLock is the lock ID for the garbage collection run.
gcRunLock lockID = 1
)

func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) {
// Acquire the lock.
//
// NOTE: The lock is re-entrant, i.e. the same session can acquire the same lock multiple times.
// > A lock can be acquired multiple times by its owning process; for each completed lock request
// > there must be a corresponding unlock request before the lock is actually released
// > If a session already holds a given advisory lock, additional requests by it will always succeed,
// > even if other sessions are awaiting the lock; this statement is true regardless of whether the
// > existing lock hold and new request are at session level or transaction level.
// See: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
row := pgd.writePool.QueryRow(ctx, `
SELECT pg_try_advisory_lock($1)
`, lockID)

var lockAcquired bool
if err := row.Scan(&lockAcquired); err != nil {
return false, err
}
return lockAcquired, nil
}

func (pgd *pgDatastore) releaseLock(ctx context.Context, lockID lockID) error {
row := pgd.writePool.QueryRow(ctx, `
SELECT pg_advisory_unlock($1)
`, lockID)

var lockReleased bool
if err := row.Scan(&lockReleased); err != nil {
return err
}

if !lockReleased {
log.Warn().Uint32("lock_id", uint32(lockID)).Msg("held lock not released; this likely indicates a bug")
return nil
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

// addGCIndexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table
// to support garbage collection. This is in support of the query for selecting the most recent
// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1`
//
// EXPLAIN before the index:
// Limit (cost=0.56..1.78 rows=1 width=558) (actual time=5706.155..5706.156 rows=1 loops=1)
// -> Index Scan Backward using pk_rttx on relation_tuple_transaction (cost=0.56..30428800.04 rows=25023202 width=558) (actual time=5706.154..5706.155 rows=1 loops=1)
//
// Filter: ("timestamp" < (now() - '04:00:00'::interval))
// Rows Removed by Filter: 6638121
//
// Planning Time: 0.098 ms
// Execution Time: 5706.192 ms
// (6 rows)
const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY
IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp
ON relation_tuple_transaction (xid DESC, timestamp);`

func init() {
if err := DatabaseMigrations.Register("add-index-for-transaction-gc", "add-expiration-support",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil {
return fmt.Errorf("failed to add missing GC index: %w", err)
}
return nil
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Loading

0 comments on commit 5b4d416

Please sign in to comment.