Skip to content

Commit

Permalink
Garbage Collection improvements in MySQL and Postgres
Browse files Browse the repository at this point in the history
1) Have GC lock so that it only runs on a single node at a time
2) Add a missing index in the Postgres datastore for GC

This should reduce datastore CPU pressure
  • Loading branch information
josephschorr committed Dec 10, 2024
1 parent 4d0a80d commit c5d52b1
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 2 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ require (
sigs.k8s.io/controller-runtime v0.19.2
)

require k8s.io/utils v0.0.0-20240711033017-18e509b52bc8

require (
4d63.com/gocheckcompilerdirectives v1.2.1 // indirect
4d63.com/gochecknoglobals v0.2.1 // indirect
Expand Down Expand Up @@ -404,7 +406,6 @@ require (
k8s.io/client-go v0.31.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
mvdan.cc/gofumpt v0.7.0 // indirect
mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
23 changes: 23 additions & 0 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type GarbageCollector interface {
MarkGCCompleted()
ResetGCCompleted()

LockForGCRun(ctx context.Context) (bool, error)
UnlockAfterGCRun(ctx context.Context) error

ReadyState(context.Context) (datastore.ReadyState, error)
Now(context.Context) (time.Time, error)
TxIDBefore(context.Context, time.Time) (datastore.Revision, error)
Expand Down Expand Up @@ -166,6 +169,26 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ok, err := gc.LockForGCRun(ctx)
if err != nil {
return fmt.Errorf("error locking for gc run: %w", err)
}

if !ok {
log.Info().
Msg("datastore garbage collection already in progress on another node")
return nil
}

defer func() {
err := gc.UnlockAfterGCRun(ctx)
if err != nil {
log.Error().
Err(err).
Msg("error unlocking after gc run")
}
}()

ctx, span := tracer.Start(ctx, "RunGarbageCollection")
defer span.End()

Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func newFakeGC(deleter gcDeleter) fakeGC {
}
}

func (*fakeGC) LockForGCRun(ctx context.Context) (bool, error) {
return true, nil
}

func (*fakeGC) UnlockAfterGCRun(ctx context.Context) error {
return nil
}

func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) {
return datastore.ReadyState{
Message: "Ready",
Expand Down
62 changes: 62 additions & 0 deletions internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF
}
}

type multiDatastoreTestFunc func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore)

func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()

var secondDS datastore.Datastore
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
require.NoError(t, err)

ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
require.NoError(t, err)

secondDS = ds2
return ds
})
defer failOnError(t, ds.Close)

tf(t, ds, secondDS)
}
}

func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) {
_, err := NewMySQLDatastore(context.Background(), "root:password@(localhost:1234)/mysql")
require.ErrorContains(t, err, "https://spicedb.dev/d/parse-time-mysql")
Expand Down Expand Up @@ -127,6 +150,45 @@ func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) {
t.Run("QuantizedRevisions", func(t *testing.T) {
QuantizedRevisionTest(t, b)
})
t.Run("Locking", createMultiDatastoreTest(b, LockingTest, defaultOptions...))
}

func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) {
mds := ds.(*Datastore)
mds2 := ds2.(*Datastore)

// Acquire a lock.
ctx := context.Background()
acquired, err := mds.tryAcquireLock(ctx, "testing123")
require.NoError(t, err)
require.True(t, acquired)

// Try to acquire the lock again.
acquired, err = mds2.tryAcquireLock(ctx, "testing123")
require.NoError(t, err)
require.False(t, acquired)

// Acquire another lock.
acquired, err = mds.tryAcquireLock(ctx, "testing456")
require.NoError(t, err)
require.True(t, acquired)

// Release the other lock.
err = mds.releaseLock(ctx, "testing123")
require.NoError(t, err)

// Release the lock.
err = mds.releaseLock(ctx, "testing123")
require.NoError(t, err)

// Try to acquire the lock again.
acquired, err = mds2.tryAcquireLock(ctx, "testing123")
require.NoError(t, err)
require.True(t, acquired)

// Release the lock.
err = mds2.releaseLock(ctx, "testing123")
require.NoError(t, err)
}

func DatabaseSeedingTest(t *testing.T, ds datastore.Datastore) {
Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/mysql/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ func (mds *Datastore) ResetGCCompleted() {
mds.gcHasRun.Store(false)
}

func (mds *Datastore) LockForGCRun(ctx context.Context) (bool, error) {
return mds.tryAcquireLock(ctx, gcRunLock)
}

func (mds *Datastore) UnlockAfterGCRun(ctx context.Context) error {
return mds.releaseLock(ctx, gcRunLock)
}

func (mds *Datastore) Now(ctx context.Context) (time.Time, error) {
// Retrieve the `now` time from the database.
nowSQL, nowArgs, err := getNow.ToSql()
Expand Down
33 changes: 33 additions & 0 deletions internal/datastore/mysql/locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package mysql

import "context"

type lockName string

const (
// gcRunLock is the lock name for the garbage collection run.
gcRunLock lockName = "gc_run"
)

func (mds *Datastore) tryAcquireLock(ctx context.Context, lockName lockName) (bool, error) {
// Acquire the lock, with max 1s timeout.
row := mds.db.QueryRowContext(ctx, `
SELECT GET_LOCK(?, 1)
`, lockName)

var acquired int
if err := row.Scan(&acquired); err != nil {
return false, err
}

return acquired == 1, nil
}

func (mds *Datastore) releaseLock(ctx context.Context, lockName lockName) error {
_, err := mds.db.ExecContext(ctx, `
SELECT RELEASE_LOCK(?)
`,
lockName,
)
return err
}
8 changes: 8 additions & 0 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ var (
gcPKCols = []string{"tableoid", "ctid"}
)

func (pgd *pgDatastore) LockForGCRun(ctx context.Context) (bool, error) {
return pgd.tryAcquireLock(ctx, gcRunLock)
}

func (pgd *pgDatastore) UnlockAfterGCRun(ctx context.Context) error {
return pgd.releaseLock(ctx, gcRunLock)
}

func (pgd *pgDatastore) HasGCRun() bool {
return pgd.gcHasRun.Load()
}
Expand Down
30 changes: 30 additions & 0 deletions internal/datastore/postgres/locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package postgres

import "context"

type lockID uint32

const (
// gcRunLock is the lock ID for the garbage collection run.
gcRunLock lockID = 1
)

func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) {
// Acquire the lock.
row := pgd.writePool.QueryRow(ctx, `
SELECT pg_try_advisory_lock($1)
`, lockID)

var lockAcquired bool
if err := row.Scan(&lockAcquired); err != nil {
return false, err
}
return lockAcquired, nil
}

func (pgd *pgDatastore) releaseLock(ctx context.Context, lockID lockID) error {
_, err := pgd.writePool.Exec(ctx, `
SELECT pg_advisory_unlock($1)
`, lockID)
return err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package migrations

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

// addGCIndexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table
// to support garbage collection. This is in support of the query for selecting the most recent
// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1`
//
// EXPLAIN before the index:
// Limit (cost=0.56..1.78 rows=1 width=558) (actual time=5706.155..5706.156 rows=1 loops=1)
// -> Index Scan Backward using pk_rttx on relation_tuple_transaction (cost=0.56..30428800.04 rows=25023202 width=558) (actual time=5706.154..5706.155 rows=1 loops=1)
//
// Filter: ("timestamp" < (now() - '04:00:00'::interval))
// Rows Removed by Filter: 6638121
//
// Planning Time: 0.098 ms
// Execution Time: 5706.192 ms
// (6 rows)
const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY
IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp
ON relation_tuple_transaction (xid DESC, timestamp);`

func init() {
if err := DatabaseMigrations.Register("add-missing-gc-index", "add-expiration-support",
func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil {
return fmt.Errorf("failed to add missing GC index: %w", err)
}
return nil
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Loading

0 comments on commit c5d52b1

Please sign in to comment.