diff --git a/internal/datastore/common/gc.go b/internal/datastore/common/gc.go index 729739273e..5788134a5d 100644 --- a/internal/datastore/common/gc.go +++ b/internal/datastore/common/gc.go @@ -80,17 +80,38 @@ func RegisterGCMetrics() error { // GarbageCollector represents any datastore that supports external garbage // collection. type GarbageCollector interface { + // HasGCRun returns true if a garbage collection run has been completed. HasGCRun() bool + + // MarkGCCompleted marks that a garbage collection run has been completed. MarkGCCompleted() + + // ResetGCCompleted resets the state of the garbage collection run. ResetGCCompleted() + // LockForGCRun attempts to acquire a lock for garbage collection. This lock + // is typically done at the datastore level, to ensure that no other nodes are + // running garbage collection at the same time. LockForGCRun(ctx context.Context) (bool, error) - UnlockAfterGCRun(ctx context.Context) error + // UnlockAfterGCRun releases the lock after a garbage collection run. + // NOTE: this method does not take a context, as the context used for the + // reset of the GC run can be canceled/timed out and the unlock will still need to happen. + UnlockAfterGCRun() error + + // ReadyState returns the current state of the datastore. ReadyState(context.Context) (datastore.ReadyState, error) + + // Now returns the current time from the datastore. Now(context.Context) (time.Time, error) + + // TxIDBefore returns the highest transaction ID before the provided time. TxIDBefore(context.Context, time.Time) (datastore.Revision, error) + + // DeleteBeforeTx deletes all data before the provided transaction ID. DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (DeletionCounts, error) + + // DeleteExpiredRels deletes all relationships that have expired. DeleteExpiredRels(ctx context.Context) (int64, error) } @@ -196,7 +217,7 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er } defer func() { - err := gc.UnlockAfterGCRun(ctx) + err := gc.UnlockAfterGCRun() if err != nil { log.Error(). Err(err). diff --git a/internal/datastore/common/gc_test.go b/internal/datastore/common/gc_test.go index 60a1db203e..1f6fbd2714 100644 --- a/internal/datastore/common/gc_test.go +++ b/internal/datastore/common/gc_test.go @@ -23,6 +23,8 @@ type fakeGC struct { deleter gcDeleter metrics gcMetrics lock sync.RWMutex + wasLocked bool + wasUnlocked bool } type gcMetrics struct { @@ -39,11 +41,19 @@ func newFakeGC(deleter gcDeleter) fakeGC { } } -func (*fakeGC) LockForGCRun(ctx context.Context) (bool, error) { +func (gc *fakeGC) LockForGCRun(ctx context.Context) (bool, error) { + gc.lock.Lock() + defer gc.lock.Unlock() + + gc.wasLocked = true return true, nil } -func (*fakeGC) UnlockAfterGCRun(ctx context.Context) error { +func (gc *fakeGC) UnlockAfterGCRun() error { + gc.lock.Lock() + defer gc.lock.Unlock() + + gc.wasUnlocked = true return nil } @@ -227,3 +237,27 @@ func TestGCFailureBackoffReset(t *testing.T) { // the GC enough time to run. require.Greater(t, gc.GetMetrics().markedCompleteCount, 20, "Next interval was not reset with backoff") } + +func TestGCUnlockOnTimeout(t *testing.T) { + gc := newFakeGC(alwaysErrorDeleter{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + interval := 10 * time.Millisecond + window := 10 * time.Second + timeout := 1 * time.Millisecond + + require.Error(t, StartGarbageCollector(ctx, &gc, interval, window, timeout)) + }() + + time.Sleep(30 * time.Millisecond) + require.False(t, gc.HasGCRun(), "GC should not have run") + + gc.lock.Lock() + defer gc.lock.Unlock() + + require.True(t, gc.wasLocked, "GC should have been locked") + require.True(t, gc.wasUnlocked, "GC should have been unlocked") +} diff --git a/internal/datastore/mysql/gc.go b/internal/datastore/mysql/gc.go index d4375eddbd..2137fe427e 100644 --- a/internal/datastore/mysql/gc.go +++ b/internal/datastore/mysql/gc.go @@ -33,8 +33,8 @@ func (mds *Datastore) LockForGCRun(ctx context.Context) (bool, error) { return mds.tryAcquireLock(ctx, gcRunLock) } -func (mds *Datastore) UnlockAfterGCRun(ctx context.Context) error { - return mds.releaseLock(ctx, gcRunLock) +func (mds *Datastore) UnlockAfterGCRun() error { + return mds.releaseLock(context.Background(), gcRunLock) } func (mds *Datastore) Now(ctx context.Context) (time.Time, error) { diff --git a/internal/datastore/postgres/gc.go b/internal/datastore/postgres/gc.go index 674082171d..7252876d94 100644 --- a/internal/datastore/postgres/gc.go +++ b/internal/datastore/postgres/gc.go @@ -25,8 +25,8 @@ func (pgd *pgDatastore) LockForGCRun(ctx context.Context) (bool, error) { return pgd.tryAcquireLock(ctx, gcRunLock) } -func (pgd *pgDatastore) UnlockAfterGCRun(ctx context.Context) error { - return pgd.releaseLock(ctx, gcRunLock) +func (pgd *pgDatastore) UnlockAfterGCRun() error { + return pgd.releaseLock(context.Background(), gcRunLock) } func (pgd *pgDatastore) HasGCRun() bool {