Skip to content

Commit

Permalink
Move GC tests in PG out of parallel running
Browse files Browse the repository at this point in the history
This is necessary because GC takes an exclusive lock now
  • Loading branch information
josephschorr committed Dec 10, 2024
1 parent c5d52b1 commit b7d77b7
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 35 deletions.
30 changes: 15 additions & 15 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ctx, span := tracer.Start(ctx, "RunGarbageCollection")
defer span.End()

// Before attempting anything, check if the datastore is ready.
startTime := time.Now()
ready, err := gc.ReadyState(ctx)
if err != nil {
return err
}
if !ready.IsReady {
log.Ctx(ctx).Warn().
Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message)
return nil
}

ok, err := gc.LockForGCRun(ctx)
if err != nil {
return fmt.Errorf("error locking for gc run: %w", err)
Expand All @@ -189,21 +204,6 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er
}
}()

ctx, span := tracer.Start(ctx, "RunGarbageCollection")
defer span.End()

// Before attempting anything, check if the datastore is ready.
startTime := time.Now()
ready, err := gc.ReadyState(ctx)
if err != nil {
return err
}
if !ready.IsReady {
log.Ctx(ctx).Warn().
Msgf("datastore wasn't ready when attempting garbage collection: %s", ready.Message)
return nil
}

now, err := gc.Now(ctx)
if err != nil {
return fmt.Errorf("error retrieving now: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion internal/datastore/postgres/locks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package postgres

import "context"
import (
"context"
)

type lockID uint32

Expand Down
77 changes: 60 additions & 17 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
if config.pgbouncer {
pgbouncerStr = "pgbouncer-"
}
t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) {
t.Parallel()

t.Run(fmt.Sprintf("%spostgres-%s-%s-%s-gc", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) {
b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer)
ctx := context.Background()

test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
// NOTE: gc tests take exclusive locks, so they are run under non-parallel.
test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID,
RevisionQuantization(revisionQuantization),
Expand All @@ -101,6 +102,28 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
})
return ds, nil
}), false)
})

t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) {
t.Parallel()
b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer)
ctx := context.Background()

test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID,
RevisionQuantization(revisionQuantization),
GCWindow(gcWindow),
GCInterval(veryLargeGCInterval),
WatchBufferLength(watchBufferLength),
DebugAnalyzeBeforeStatistics(),
MigrationPhase(config.migrationPhase),
)
require.NoError(t, err)
return ds
})
return ds, nil
}), test.WithCategories(test.GCCategory), false)

t.Run("TransactionTimestamps", createDatastoreTest(
b,
Expand Down Expand Up @@ -130,7 +153,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
RevisionInversionTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -140,7 +163,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
ConcurrentRevisionHeadTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -150,7 +173,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
ConcurrentRevisionWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -160,7 +183,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
OverlappingRevisionWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -170,7 +193,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
RepairTransactionsTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -180,7 +203,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
NullCaveatWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -190,7 +213,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
RevisionTimestampAndTransactionIDTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -200,7 +223,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
CheckpointsOnOutOfBandChangesTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -210,7 +233,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
SerializationErrorTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -220,7 +243,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
StrictReadModeTest,
RevisionQuantization(0),
GCWindow(1000*time.Second),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
ReadStrictMode(true),
Expand All @@ -231,7 +254,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
LockingTest,
RevisionQuantization(0),
GCWindow(1000*time.Second),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -242,7 +265,7 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
OTelTracingTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(1*time.Hour),
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))
Expand All @@ -261,7 +284,27 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, pc []postgresCon
b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer)

// NOTE: watch API requires the commit timestamps, so we skip those tests here.
test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
// NOTE: gc tests take exclusive locks, so they are run under non-parallel.
test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID,
RevisionQuantization(revisionQuantization),
GCWindow(gcWindow),
GCInterval(veryLargeGCInterval),
WatchBufferLength(watchBufferLength),
DebugAnalyzeBeforeStatistics(),
)
require.NoError(t, err)
return ds
})
return ds, nil
}), test.WithCategories(test.WatchCategory, test.GCCategory), false)
})

t.Run(fmt.Sprintf("postgres-%s-gc", pgVersion), func(t *testing.T) {
ctx := context.Background()
b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer)
test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID,
RevisionQuantization(revisionQuantization),
Expand All @@ -274,7 +317,7 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, pc []postgresCon
return ds
})
return ds, nil
}), test.WithCategories(test.WatchCategory), false)
}), false)
})
}
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories,
t.Run("TestCheckRevisions", runner(tester, CheckRevisionsTest))

if !except.GC() {
t.Run("TestRevisionGC", runner(tester, RevisionGCTest))
t.Run("TestInvalidReads", runner(tester, InvalidReadsTest))
OnlyGCTests(t, tester, concurrent)
}

t.Run("TestBulkUpload", runner(tester, BulkUploadTest))
Expand Down Expand Up @@ -201,6 +200,16 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories,
t.Run("TestRelationshipCounterOverExpired", runner(tester, RelationshipCounterOverExpiredTest))
}

func OnlyGCTests(t *testing.T, tester DatastoreTester, concurrent bool) {
runner := serial
if concurrent {
runner = parallel
}

t.Run("TestRevisionGC", runner(tester, RevisionGCTest))
t.Run("TestInvalidReads", runner(tester, InvalidReadsTest))
}

// All runs all generic datastore tests on a DatastoreTester.
func All(t *testing.T, tester DatastoreTester, concurrent bool) {
AllWithExceptions(t, tester, noException, concurrent)
Expand Down

0 comments on commit b7d77b7

Please sign in to comment.