-
Notifications
You must be signed in to change notification settings - Fork 286
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2159 from josephschorr/postgres-gc-lock
GC improvements: GC only on a single node and add a missing index in PG
- Loading branch information
Showing
12 changed files
with
389 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package mysql | ||
|
||
import "context" | ||
|
||
type lockName string | ||
|
||
const ( | ||
// gcRunLock is the lock name for the garbage collection run. | ||
gcRunLock lockName = "gc_run" | ||
) | ||
|
||
func (mds *Datastore) tryAcquireLock(ctx context.Context, lockName lockName) (bool, error) { | ||
// Acquire the lock, with max 1s timeout. | ||
// A lock obtained with GET_LOCK() is released explicitly by executing RELEASE_LOCK() | ||
// | ||
// NOTE: Lock is re-entrant, i.e. the same session can acquire the same lock multiple times. | ||
// > It is even possible for a given session to acquire multiple locks for the same name. | ||
// > Other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name. | ||
// See: https://dev.mysql.com/doc/refman/8.4/en/locking-functions.html#function_get-lock | ||
row := mds.db.QueryRowContext(ctx, ` | ||
SELECT GET_LOCK(?, 1) | ||
`, lockName) | ||
|
||
var acquired int | ||
if err := row.Scan(&acquired); err != nil { | ||
return false, err | ||
} | ||
|
||
return acquired == 1, nil | ||
} | ||
|
||
func (mds *Datastore) releaseLock(ctx context.Context, lockName lockName) error { | ||
// See: https://dev.mysql.com/doc/refman/8.4/en/locking-functions.html#function_release-lock | ||
_, err := mds.db.ExecContext(ctx, ` | ||
SELECT RELEASE_LOCK(?) | ||
`, | ||
lockName, | ||
) | ||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package postgres | ||
|
||
import ( | ||
"context" | ||
|
||
log "github.com/authzed/spicedb/internal/logging" | ||
) | ||
|
||
type lockID uint32 | ||
|
||
const ( | ||
// gcRunLock is the lock ID for the garbage collection run. | ||
gcRunLock lockID = 1 | ||
) | ||
|
||
func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) { | ||
// Acquire the lock. | ||
// | ||
// NOTE: The lock is re-entrant, i.e. the same session can acquire the same lock multiple times. | ||
// > A lock can be acquired multiple times by its owning process; for each completed lock request | ||
// > there must be a corresponding unlock request before the lock is actually released | ||
// > If a session already holds a given advisory lock, additional requests by it will always succeed, | ||
// > even if other sessions are awaiting the lock; this statement is true regardless of whether the | ||
// > existing lock hold and new request are at session level or transaction level. | ||
// See: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS | ||
row := pgd.writePool.QueryRow(ctx, ` | ||
SELECT pg_try_advisory_lock($1) | ||
`, lockID) | ||
|
||
var lockAcquired bool | ||
if err := row.Scan(&lockAcquired); err != nil { | ||
return false, err | ||
} | ||
return lockAcquired, nil | ||
} | ||
|
||
func (pgd *pgDatastore) releaseLock(ctx context.Context, lockID lockID) error { | ||
row := pgd.writePool.QueryRow(ctx, ` | ||
SELECT pg_advisory_unlock($1) | ||
`, lockID) | ||
|
||
var lockReleased bool | ||
if err := row.Scan(&lockReleased); err != nil { | ||
return err | ||
} | ||
|
||
if !lockReleased { | ||
log.Warn().Uint32("lock_id", uint32(lockID)).Msg("held lock not released; this likely indicates a bug") | ||
return nil | ||
} | ||
|
||
return nil | ||
} |
39 changes: 39 additions & 0 deletions
39
internal/datastore/postgres/migrations/zz_migration.0022_add_index_for_transaction_gc.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package migrations | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/jackc/pgx/v5" | ||
) | ||
|
||
// addGCIndexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table | ||
// to support garbage collection. This is in support of the query for selecting the most recent | ||
// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1` | ||
// | ||
// EXPLAIN before the index: | ||
// Limit (cost=0.56..1.78 rows=1 width=558) (actual time=5706.155..5706.156 rows=1 loops=1) | ||
// -> Index Scan Backward using pk_rttx on relation_tuple_transaction (cost=0.56..30428800.04 rows=25023202 width=558) (actual time=5706.154..5706.155 rows=1 loops=1) | ||
// | ||
// Filter: ("timestamp" < (now() - '04:00:00'::interval)) | ||
// Rows Removed by Filter: 6638121 | ||
// | ||
// Planning Time: 0.098 ms | ||
// Execution Time: 5706.192 ms | ||
// (6 rows) | ||
const addGCIndexForRelationTupleTransaction = `CREATE INDEX CONCURRENTLY | ||
IF NOT EXISTS ix_relation_tuple_transaction_xid_desc_timestamp | ||
ON relation_tuple_transaction (xid DESC, timestamp);` | ||
|
||
func init() { | ||
if err := DatabaseMigrations.Register("add-index-for-transaction-gc", "add-expiration-support", | ||
func(ctx context.Context, conn *pgx.Conn) error { | ||
if _, err := conn.Exec(ctx, addGCIndexForRelationTupleTransaction); err != nil { | ||
return fmt.Errorf("failed to add missing GC index: %w", err) | ||
} | ||
return nil | ||
}, | ||
noTxMigration); err != nil { | ||
panic("failed to register migration: " + err.Error()) | ||
} | ||
} |
Oops, something went wrong.