Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move unlock call to a background context in GC #2198

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,38 @@ func RegisterGCMetrics() error {
// GarbageCollector represents any datastore that supports external garbage
// collection.
type GarbageCollector interface {
// HasGCRun returns true if a garbage collection run has been completed.
HasGCRun() bool

// MarkGCCompleted marks that a garbage collection run has been completed.
MarkGCCompleted()

// ResetGCCompleted resets the state of the garbage collection run.
ResetGCCompleted()

// LockForGCRun attempts to acquire a lock for garbage collection. This lock
// is typically done at the datastore level, to ensure that no other nodes are
// running garbage collection at the same time.
LockForGCRun(ctx context.Context) (bool, error)
UnlockAfterGCRun(ctx context.Context) error

// UnlockAfterGCRun releases the lock after a garbage collection run.
// NOTE: this method does not take a context, as the context used for the
// reset of the GC run can be canceled/timed out and the unlock will still need to happen.
UnlockAfterGCRun() error

// ReadyState returns the current state of the datastore.
ReadyState(context.Context) (datastore.ReadyState, error)

// Now returns the current time from the datastore.
Now(context.Context) (time.Time, error)

// TxIDBefore returns the highest transaction ID before the provided time.
TxIDBefore(context.Context, time.Time) (datastore.Revision, error)

// DeleteBeforeTx deletes all data before the provided transaction ID.
DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (DeletionCounts, error)

// DeleteExpiredRels deletes all relationships that have expired.
DeleteExpiredRels(ctx context.Context) (int64, error)
}

Expand Down Expand Up @@ -196,7 +217,7 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
}

defer func() {
err := gc.UnlockAfterGCRun(ctx)
err := gc.UnlockAfterGCRun()
if err != nil {
log.Error().
Err(err).
Expand Down
38 changes: 36 additions & 2 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type fakeGC struct {
deleter gcDeleter
metrics gcMetrics
lock sync.RWMutex
wasLocked bool
wasUnlocked bool
}

type gcMetrics struct {
Expand All @@ -39,11 +41,19 @@ func newFakeGC(deleter gcDeleter) fakeGC {
}
}

func (*fakeGC) LockForGCRun(ctx context.Context) (bool, error) {
func (gc *fakeGC) LockForGCRun(ctx context.Context) (bool, error) {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.wasLocked = true
return true, nil
}

func (*fakeGC) UnlockAfterGCRun(ctx context.Context) error {
func (gc *fakeGC) UnlockAfterGCRun() error {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.wasUnlocked = true
return nil
}

Expand Down Expand Up @@ -227,3 +237,27 @@ func TestGCFailureBackoffReset(t *testing.T) {
// the GC enough time to run.
require.Greater(t, gc.GetMetrics().markedCompleteCount, 20, "Next interval was not reset with backoff")
}

func TestGCUnlockOnTimeout(t *testing.T) {
gc := newFakeGC(alwaysErrorDeleter{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
interval := 10 * time.Millisecond
window := 10 * time.Second
timeout := 1 * time.Millisecond

require.Error(t, StartGarbageCollector(ctx, &gc, interval, window, timeout))
}()

time.Sleep(30 * time.Millisecond)
require.False(t, gc.HasGCRun(), "GC should not have run")

gc.lock.Lock()
defer gc.lock.Unlock()

require.True(t, gc.wasLocked, "GC should have been locked")
require.True(t, gc.wasUnlocked, "GC should have been unlocked")
}
4 changes: 2 additions & 2 deletions internal/datastore/mysql/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (mds *Datastore) LockForGCRun(ctx context.Context) (bool, error) {
return mds.tryAcquireLock(ctx, gcRunLock)
}

func (mds *Datastore) UnlockAfterGCRun(ctx context.Context) error {
return mds.releaseLock(ctx, gcRunLock)
func (mds *Datastore) UnlockAfterGCRun() error {
return mds.releaseLock(context.Background(), gcRunLock)
}

func (mds *Datastore) Now(ctx context.Context) (time.Time, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (pgd *pgDatastore) LockForGCRun(ctx context.Context) (bool, error) {
return pgd.tryAcquireLock(ctx, gcRunLock)
}

func (pgd *pgDatastore) UnlockAfterGCRun(ctx context.Context) error {
return pgd.releaseLock(ctx, gcRunLock)
func (pgd *pgDatastore) UnlockAfterGCRun() error {
return pgd.releaseLock(context.Background(), gcRunLock)
}

func (pgd *pgDatastore) HasGCRun() bool {
Expand Down
Loading