Skip to content

Commit

Permalink
Have Postgres GC lock so that it only runs on a single node at a time
Browse files Browse the repository at this point in the history
This should reduce Postgres datastore CPU pressure
  • Loading branch information
josephschorr committed Dec 9, 2024
1 parent 4d0a80d commit 49d31c6
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 43 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ require (
4d63.com/gochecknoglobals v0.2.1 // indirect
buf.build/gen/go/gogo/protobuf/protocolbuffers/go v1.35.2-20210810001428-4df00b267f94.1 // indirect
cel.dev/expr v0.16.1 // indirect
cirello.io/pglock v1.16.0 // indirect
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.10.2 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect
Expand Down Expand Up @@ -282,6 +283,7 @@ require (
github.com/ldez/gomoddirectives v0.2.4 // indirect
github.com/ldez/tagliatelle v0.5.0 // indirect
github.com/leonklingele/grouper v1.1.2 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/macabu/inamedparam v0.1.3 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.35.2-20240802094132
buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.35.2-20240802094132-5b212ab78fb7.1/go.mod h1:u8eJSaoWukKSr1K0wPUthshFAnUgwUjhRc+yelsMCmk=
cel.dev/expr v0.16.1 h1:NR0+oFYzR1CqLFhTAqg3ql59G9VfN8fKq1TCHJ6gq1g=
cel.dev/expr v0.16.1/go.mod h1:AsGA5zb3WruAEQeQng1RZdGEXmBj0jvMWh6l5SnNuC8=
cirello.io/pglock v1.16.0 h1:/fJBBIWzKttgVHLdESxIjFXWFXnXtnWXP3KD+XdON2M=
cirello.io/pglock v1.16.0/go.mod h1:Xb3xYP3kj2vqGq4IyLSTYVEKaPWE7TbrTK30SUNM0Uk=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down
103 changes: 60 additions & 43 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ type GarbageCollector interface {
MarkGCCompleted()
ResetGCCompleted()

// LockGCRun invokes the GC function with a lock to ensure only one GC run.
// If the implementation does not support locking, it should just execute
// the function and return true.
// If GC was run within the last interval, the function should return false.
LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error)

ReadyState(context.Context) (datastore.ReadyState, error)
Now(context.Context) (time.Time, error)
TxIDBefore(context.Context, time.Time) (datastore.Revision, error)
Expand Down Expand Up @@ -166,60 +172,71 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ctx, span := tracer.Start(ctx, "RunGarbageCollection")
defer span.End()
wasRun, err := gc.LockGCRun(ctx, timeout, func(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "RunGarbageCollection")
defer span.End()

// Before attempting anything, check if the datastore is ready.
startTime := time.Now()
ready, err := gc.ReadyState(ctx)
if err != nil {
return err
}
if !ready.IsReady {
log.Ctx(ctx).Warn().
Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message)
return nil
}
// Before attempting anything, check if the datastore is ready.
startTime := time.Now()
ready, err := gc.ReadyState(ctx)
if err != nil {
return err
}
if !ready.IsReady {
log.Ctx(ctx).Warn().
Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message)
return nil
}

now, err := gc.Now(ctx)
if err != nil {
return fmt.Errorf("error retrieving now: %w", err)
}
now, err := gc.Now(ctx)
if err != nil {
return fmt.Errorf("error retrieving now: %w", err)
}

watermark, err := gc.TxIDBefore(ctx, now.Add(-1*window))
if err != nil {
return fmt.Errorf("error retrieving watermark: %w", err)
}
watermark, err := gc.TxIDBefore(ctx, now.Add(-1*window))
if err != nil {
return fmt.Errorf("error retrieving watermark: %w", err)
}

collected, err := gc.DeleteBeforeTx(ctx, watermark)
collected, err := gc.DeleteBeforeTx(ctx, watermark)

expiredRelationshipsCount, eerr := gc.DeleteExpiredRels(ctx)
expiredRelationshipsCount, eerr := gc.DeleteExpiredRels(ctx)

// even if an error happened, garbage would have been collected. This makes sure these are reflected even if the
// worker eventually fails or times out.
gcRelationshipsCounter.Add(float64(collected.Relationships))
gcTransactionsCounter.Add(float64(collected.Transactions))
gcNamespacesCounter.Add(float64(collected.Namespaces))
gcExpiredRelationshipsCounter.Add(float64(expiredRelationshipsCount))
collectionDuration := time.Since(startTime)
gcDurationHistogram.Observe(collectionDuration.Seconds())
// even if an error happened, garbage would have been collected. This makes sure these are reflected even if the
// worker eventually fails or times out.
gcRelationshipsCounter.Add(float64(collected.Relationships))
gcTransactionsCounter.Add(float64(collected.Transactions))
gcNamespacesCounter.Add(float64(collected.Namespaces))
gcExpiredRelationshipsCounter.Add(float64(expiredRelationshipsCount))
collectionDuration := time.Since(startTime)
gcDurationHistogram.Observe(collectionDuration.Seconds())

if err != nil {
return fmt.Errorf("error deleting in gc: %w", err)
}

if eerr != nil {
return fmt.Errorf("error deleting expired relationships in gc: %w", eerr)
}

log.Ctx(ctx).Info().
Stringer("highestTxID", watermark).
Dur("duration", collectionDuration).
Time("nowTime", now).
Interface("collected", collected).
Int64("expiredRelationships", expiredRelationshipsCount).
Msg("datastore garbage collection completed successfully")

gc.MarkGCCompleted()
return nil
})
if err != nil {
return fmt.Errorf("error deleting in gc: %w", err)
return fmt.Errorf("error running garbage collection: %w", err)
}

if eerr != nil {
return fmt.Errorf("error deleting expired relationships in gc: %w", eerr)
if !wasRun {
log.Ctx(ctx).Info().Msg("garbage collection run skipped due to another run in progress on another node")
}

log.Ctx(ctx).Info().
Stringer("highestTxID", watermark).
Dur("duration", collectionDuration).
Time("nowTime", now).
Interface("collected", collected).
Int64("expiredRelationships", expiredRelationshipsCount).
Msg("datastore garbage collection completed successfully")

gc.MarkGCCompleted()
return nil
}
4 changes: 4 additions & 0 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func newFakeGC(deleter gcDeleter) fakeGC {
}
}

func (*fakeGC) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) {
return true, gcRun(ctx)
}

func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) {
return datastore.ReadyState{
Message: "Ready",
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/mysql/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (mds *Datastore) ResetGCCompleted() {
mds.gcHasRun.Store(false)
}

func (mds *Datastore) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) {
return true, gcRun(ctx)
}

func (mds *Datastore) Now(ctx context.Context) (time.Time, error) {
// Retrieve the `now` time from the database.
nowSQL, nowArgs, err := getNow.ToSql()
Expand Down
54 changes: 54 additions & 0 deletions internal/datastore/postgres/common/locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package common

import (
"time"

"cirello.io/pglock"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
)

const (
locksTableName = "locks"
heartbeatFrequency = 2 * time.Second
)

// RunWithLocksClient runs the provided function with a pglock.Client. Should only be used
// for migrations.
func RunWithLocksClient(conn *pgx.Conn, runner func(client *pglock.Client) error) error {
db := stdlib.OpenDB(*conn.Config())
defer db.Close()

client, err := pglock.UnsafeNew(db, pglock.WithCustomTable(locksTableName))
if err != nil {
return err
}

if err := runner(client); err != nil {
return err
}

return nil
}

// RunWithLocksClientOverPool runs the provided function with a pglock.Client.
func RunWithLocksClientOverPool(pool *pgxpool.Pool, timeout time.Duration, runner func(client *pglock.Client) error) error {
db := stdlib.OpenDBFromPool(pool)
defer db.Close()

client, err := pglock.UnsafeNew(db,
pglock.WithCustomTable(locksTableName),
pglock.WithLeaseDuration(timeout),
pglock.WithHeartbeatFrequency(heartbeatFrequency),
)
if err != nil {
return err
}

if err := runner(client); err != nil {
return err
}

return nil
}
32 changes: 32 additions & 0 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package postgres

import (
"context"
"errors"
"fmt"
"strings"
"time"

"cirello.io/pglock"
sq "github.com/Masterminds/squirrel"
"github.com/rs/zerolog/log"

"github.com/authzed/spicedb/internal/datastore/common"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
)

Expand All @@ -21,6 +25,34 @@ var (
gcPKCols = []string{"tableoid", "ctid"}
)

const (
gcLockName = "pgdatastoregclock"
lockTimeoutDelay = 5 * time.Second
)

func (pgd *pgDatastore) LockGCRun(ctx context.Context, timeout time.Duration, gcRun func(context.Context) error) (bool, error) {
var wasSkipped bool
err := pgxcommon.RunWithLocksClientOverPool(pgd.rawWritePool, timeout+lockTimeoutDelay, func(client *pglock.Client) error {
// Run the GC process under the lock.
currentTimestampData, err := time.Now().UTC().MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal current timestamp: %w", err)
}

return client.Do(ctx, gcLockName, func(ctx context.Context, lock *pglock.Lock) error {
return gcRun(ctx)
}, pglock.WithData(currentTimestampData), pglock.FailIfLocked(), pglock.KeepOnRelease())
})
if err != nil {
if errors.Is(err, pglock.ErrNotAcquired) {
log.Debug().Err(err).Msg("did not acquire lock for GC run; GC is likely being run by another node")
return false, nil
}
return false, err
}
return !wasSkipped, nil
}

func (pgd *pgDatastore) HasGCRun() bool {
return pgd.gcHasRun.Load()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package migrations

import (
"context"

"cirello.io/pglock"
"github.com/jackc/pgx/v5"

"github.com/authzed/spicedb/internal/datastore/postgres/common"
)

func init() {
if err := DatabaseMigrations.Register("add-gc-lock-table", "add-expiration-support",
func(ctx context.Context, conn *pgx.Conn) error {
return common.RunWithLocksClient(conn, func(client *pglock.Client) error {
return client.TryCreateTable()
})
},
noTxMigration); err != nil {
panic("failed to register migration: " + err.Error())
}
}
3 changes: 3 additions & 0 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func newPostgresDatastore(
dburl: pgURL,
readPool: pgxcommon.MustNewInterceptorPooler(readPool, config.queryInterceptor),
writePool: nil, /* disabled by default */
rawWritePool: nil, /* disabled by default */
watchBufferLength: config.watchBufferLength,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
optimizedRevisionQuery: revisionQuery,
Expand All @@ -348,6 +349,7 @@ func newPostgresDatastore(

if isPrimary {
datastore.writePool = pgxcommon.MustNewInterceptorPooler(writePool, config.queryInterceptor)
datastore.rawWritePool = writePool
}

datastore.SetOptimizedRevisionFunc(datastore.optimizedRevisionFunc)
Expand Down Expand Up @@ -379,6 +381,7 @@ type pgDatastore struct {

dburl string
readPool, writePool pgxcommon.ConnPooler
rawWritePool *pgxpool.Pool
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
optimizedRevisionQuery string
Expand Down

0 comments on commit 49d31c6

Please sign in to comment.