From 49d31c67f408ba1a8e6bb72f009b929ba59ac6f3 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 9 Dec 2024 15:46:01 -0500 Subject: [PATCH] Have Postgres GC lock so that it only runs on a single node at a time This should reduce Postgres datastore CPU pressure --- go.mod | 2 + go.sum | 2 + internal/datastore/common/gc.go | 103 ++++++++++-------- internal/datastore/common/gc_test.go | 4 + internal/datastore/mysql/gc.go | 4 + internal/datastore/postgres/common/locks.go | 54 +++++++++ internal/datastore/postgres/gc.go | 32 ++++++ .../zz_migration.0022_add_gc_lock_table.go | 22 ++++ internal/datastore/postgres/postgres.go | 3 + 9 files changed, 183 insertions(+), 43 deletions(-) create mode 100644 internal/datastore/postgres/common/locks.go create mode 100644 internal/datastore/postgres/migrations/zz_migration.0022_add_gc_lock_table.go diff --git a/go.mod b/go.mod index 5cd3b703e2..e41e91ae78 100644 --- a/go.mod +++ b/go.mod @@ -122,6 +122,7 @@ require ( 4d63.com/gochecknoglobals v0.2.1 // indirect buf.build/gen/go/gogo/protobuf/protocolbuffers/go v1.35.2-20210810001428-4df00b267f94.1 // indirect cel.dev/expr v0.16.1 // indirect + cirello.io/pglock v1.16.0 // indirect cloud.google.com/go v0.116.0 // indirect cloud.google.com/go/auth v0.10.2 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect @@ -282,6 +283,7 @@ require ( github.com/ldez/gomoddirectives v0.2.4 // indirect github.com/ldez/tagliatelle v0.5.0 // indirect github.com/leonklingele/grouper v1.1.2 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/macabu/inamedparam v0.1.3 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/go.sum b/go.sum index 24d1671733..28cd123e3b 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.35.2-20240802094132 buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.35.2-20240802094132-5b212ab78fb7.1/go.mod h1:u8eJSaoWukKSr1K0wPUthshFAnUgwUjhRc+yelsMCmk= cel.dev/expr v0.16.1 h1:NR0+oFYzR1CqLFhTAqg3ql59G9VfN8fKq1TCHJ6gq1g= cel.dev/expr v0.16.1/go.mod h1:AsGA5zb3WruAEQeQng1RZdGEXmBj0jvMWh6l5SnNuC8= +cirello.io/pglock v1.16.0 h1:/fJBBIWzKttgVHLdESxIjFXWFXnXtnWXP3KD+XdON2M= +cirello.io/pglock v1.16.0/go.mod h1:Xb3xYP3kj2vqGq4IyLSTYVEKaPWE7TbrTK30SUNM0Uk= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= diff --git a/internal/datastore/common/gc.go b/internal/datastore/common/gc.go index d3c90647ab..1dbdb15cf8 100644 --- a/internal/datastore/common/gc.go +++ b/internal/datastore/common/gc.go @@ -84,6 +84,12 @@ type GarbageCollector interface { MarkGCCompleted() ResetGCCompleted() + // LockGCRun invokes the GC function with a lock to ensure only one GC run. + // If the implementation does not support locking, it should just execute + // the function and return true. + // If GC was run within the last interval, the function should return false. + LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) + ReadyState(context.Context) (datastore.ReadyState, error) Now(context.Context) (time.Time, error) TxIDBefore(context.Context, time.Time) (datastore.Revision, error) @@ -166,60 +172,71 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - ctx, span := tracer.Start(ctx, "RunGarbageCollection") - defer span.End() + wasRun, err := gc.LockGCRun(ctx, timeout, func(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "RunGarbageCollection") + defer span.End() - // Before attempting anything, check if the datastore is ready. - startTime := time.Now() - ready, err := gc.ReadyState(ctx) - if err != nil { - return err - } - if !ready.IsReady { - log.Ctx(ctx).Warn(). - Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message) - return nil - } + // Before attempting anything, check if the datastore is ready. + startTime := time.Now() + ready, err := gc.ReadyState(ctx) + if err != nil { + return err + } + if !ready.IsReady { + log.Ctx(ctx).Warn(). + Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message) + return nil + } - now, err := gc.Now(ctx) - if err != nil { - return fmt.Errorf("error retrieving now: %w", err) - } + now, err := gc.Now(ctx) + if err != nil { + return fmt.Errorf("error retrieving now: %w", err) + } - watermark, err := gc.TxIDBefore(ctx, now.Add(-1*window)) - if err != nil { - return fmt.Errorf("error retrieving watermark: %w", err) - } + watermark, err := gc.TxIDBefore(ctx, now.Add(-1*window)) + if err != nil { + return fmt.Errorf("error retrieving watermark: %w", err) + } - collected, err := gc.DeleteBeforeTx(ctx, watermark) + collected, err := gc.DeleteBeforeTx(ctx, watermark) - expiredRelationshipsCount, eerr := gc.DeleteExpiredRels(ctx) + expiredRelationshipsCount, eerr := gc.DeleteExpiredRels(ctx) - // even if an error happened, garbage would have been collected. This makes sure these are reflected even if the - // worker eventually fails or times out. - gcRelationshipsCounter.Add(float64(collected.Relationships)) - gcTransactionsCounter.Add(float64(collected.Transactions)) - gcNamespacesCounter.Add(float64(collected.Namespaces)) - gcExpiredRelationshipsCounter.Add(float64(expiredRelationshipsCount)) - collectionDuration := time.Since(startTime) - gcDurationHistogram.Observe(collectionDuration.Seconds()) + // even if an error happened, garbage would have been collected. This makes sure these are reflected even if the + // worker eventually fails or times out. + gcRelationshipsCounter.Add(float64(collected.Relationships)) + gcTransactionsCounter.Add(float64(collected.Transactions)) + gcNamespacesCounter.Add(float64(collected.Namespaces)) + gcExpiredRelationshipsCounter.Add(float64(expiredRelationshipsCount)) + collectionDuration := time.Since(startTime) + gcDurationHistogram.Observe(collectionDuration.Seconds()) + if err != nil { + return fmt.Errorf("error deleting in gc: %w", err) + } + + if eerr != nil { + return fmt.Errorf("error deleting expired relationships in gc: %w", eerr) + } + + log.Ctx(ctx).Info(). + Stringer("highestTxID", watermark). + Dur("duration", collectionDuration). + Time("nowTime", now). + Interface("collected", collected). + Int64("expiredRelationships", expiredRelationshipsCount). + Msg("datastore garbage collection completed successfully") + + gc.MarkGCCompleted() + return nil + }) if err != nil { - return fmt.Errorf("error deleting in gc: %w", err) + return fmt.Errorf("error running garbage collection: %w", err) } - if eerr != nil { - return fmt.Errorf("error deleting expired relationships in gc: %w", eerr) + if !wasRun { + log.Ctx(ctx).Info().Msg("garbage collection run skipped due to another run in progress on another node") } - log.Ctx(ctx).Info(). - Stringer("highestTxID", watermark). - Dur("duration", collectionDuration). - Time("nowTime", now). - Interface("collected", collected). - Int64("expiredRelationships", expiredRelationshipsCount). - Msg("datastore garbage collection completed successfully") - - gc.MarkGCCompleted() return nil } diff --git a/internal/datastore/common/gc_test.go b/internal/datastore/common/gc_test.go index aa42bc9062..77fe261f5d 100644 --- a/internal/datastore/common/gc_test.go +++ b/internal/datastore/common/gc_test.go @@ -39,6 +39,10 @@ func newFakeGC(deleter gcDeleter) fakeGC { } } +func (*fakeGC) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) { + return true, gcRun(ctx) +} + func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) { return datastore.ReadyState{ Message: "Ready", diff --git a/internal/datastore/mysql/gc.go b/internal/datastore/mysql/gc.go index bf2c04e993..25a74c1c89 100644 --- a/internal/datastore/mysql/gc.go +++ b/internal/datastore/mysql/gc.go @@ -29,6 +29,10 @@ func (mds *Datastore) ResetGCCompleted() { mds.gcHasRun.Store(false) } +func (mds *Datastore) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) { + return true, gcRun(ctx) +} + func (mds *Datastore) Now(ctx context.Context) (time.Time, error) { // Retrieve the `now` time from the database. nowSQL, nowArgs, err := getNow.ToSql() diff --git a/internal/datastore/postgres/common/locks.go b/internal/datastore/postgres/common/locks.go new file mode 100644 index 0000000000..1240491198 --- /dev/null +++ b/internal/datastore/postgres/common/locks.go @@ -0,0 +1,54 @@ +package common + +import ( + "time" + + "cirello.io/pglock" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/stdlib" +) + +const ( + locksTableName = "locks" + heartbeatFrequency = 2 * time.Second +) + +// RunWithLocksClient runs the provided function with a pglock.Client. Should only be used +// for migrations. +func RunWithLocksClient(conn *pgx.Conn, runner func(client *pglock.Client) error) error { + db := stdlib.OpenDB(*conn.Config()) + defer db.Close() + + client, err := pglock.UnsafeNew(db, pglock.WithCustomTable(locksTableName)) + if err != nil { + return err + } + + if err := runner(client); err != nil { + return err + } + + return nil +} + +// RunWithLocksClientOverPool runs the provided function with a pglock.Client. +func RunWithLocksClientOverPool(pool *pgxpool.Pool, timeout time.Duration, runner func(client *pglock.Client) error) error { + db := stdlib.OpenDBFromPool(pool) + defer db.Close() + + client, err := pglock.UnsafeNew(db, + pglock.WithCustomTable(locksTableName), + pglock.WithLeaseDuration(timeout), + pglock.WithHeartbeatFrequency(heartbeatFrequency), + ) + if err != nil { + return err + } + + if err := runner(client); err != nil { + return err + } + + return nil +} diff --git a/internal/datastore/postgres/gc.go b/internal/datastore/postgres/gc.go index 769859646e..4be068c23c 100644 --- a/internal/datastore/postgres/gc.go +++ b/internal/datastore/postgres/gc.go @@ -2,13 +2,17 @@ package postgres import ( "context" + "errors" "fmt" "strings" "time" + "cirello.io/pglock" sq "github.com/Masterminds/squirrel" + "github.com/rs/zerolog/log" "github.com/authzed/spicedb/internal/datastore/common" + pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/pkg/datastore" ) @@ -21,6 +25,34 @@ var ( gcPKCols = []string{"tableoid", "ctid"} ) +const ( + gcLockName = "pgdatastoregclock" + lockTimeoutDelay = 5 * time.Second +) + +func (pgd *pgDatastore) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) { + var wasSkipped bool + err := pgxcommon.RunWithLocksClientOverPool(pgd.rawWritePool, timeout+lockTimeoutDelay, func(client *pglock.Client) error { + // Run the GC process under the lock. + currentTimestampData, err := time.Now().UTC().MarshalBinary() + if err != nil { + return fmt.Errorf("failed to marshal current timestamp: %w", err) + } + + return client.Do(ctx, gcLockName, func(ctx context.Context, lock *pglock.Lock) error { + return gcRun(ctx) + }, pglock.WithData(currentTimestampData), pglock.FailIfLocked(), pglock.KeepOnRelease()) + }) + if err != nil { + if errors.Is(err, pglock.ErrNotAcquired) { + log.Debug().Err(err).Msg("did not acquire lock for GC run; GC is likely being run by another node") + return false, nil + } + return false, err + } + return !wasSkipped, nil +} + func (pgd *pgDatastore) HasGCRun() bool { return pgd.gcHasRun.Load() } diff --git a/internal/datastore/postgres/migrations/zz_migration.0022_add_gc_lock_table.go b/internal/datastore/postgres/migrations/zz_migration.0022_add_gc_lock_table.go new file mode 100644 index 0000000000..6da06e88b2 --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0022_add_gc_lock_table.go @@ -0,0 +1,22 @@ +package migrations + +import ( + "context" + + "cirello.io/pglock" + "github.com/jackc/pgx/v5" + + "github.com/authzed/spicedb/internal/datastore/postgres/common" +) + +func init() { + if err := DatabaseMigrations.Register("add-gc-lock-table", "add-expiration-support", + func(ctx context.Context, conn *pgx.Conn) error { + return common.RunWithLocksClient(conn, func(client *pglock.Client) error { + return client.TryCreateTable() + }) + }, + noTxMigration); err != nil { + panic("failed to register migration: " + err.Error()) + } +} diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index dec442c9b8..863c5f694a 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -323,6 +323,7 @@ func newPostgresDatastore( dburl: pgURL, readPool: pgxcommon.MustNewInterceptorPooler(readPool, config.queryInterceptor), writePool: nil, /* disabled by default */ + rawWritePool: nil, /* disabled by default */ watchBufferLength: config.watchBufferLength, watchBufferWriteTimeout: config.watchBufferWriteTimeout, optimizedRevisionQuery: revisionQuery, @@ -348,6 +349,7 @@ func newPostgresDatastore( if isPrimary { datastore.writePool = pgxcommon.MustNewInterceptorPooler(writePool, config.queryInterceptor) + datastore.rawWritePool = writePool } datastore.SetOptimizedRevisionFunc(datastore.optimizedRevisionFunc) @@ -379,6 +381,7 @@ type pgDatastore struct { dburl string readPool, writePool pgxcommon.ConnPooler + rawWritePool *pgxpool.Pool watchBufferLength uint16 watchBufferWriteTimeout time.Duration optimizedRevisionQuery string