Skip to content

Commit

Permalink
fix: flaky asm tests fixed by wrapping async provider (#2102)
Browse files Browse the repository at this point in the history
fixes #2092
- mock clock was causing the same race conditions as this issue:
#1368 (comment)
- instead we are now wrapping the ASM provider with a manual sync
provider
- This wrapper allows us to trigger when syncs should happen in tests,
and block until sync completes
  • Loading branch information
matt2e authored Jul 18, 2024
1 parent b64f7cf commit 60acb0b
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 63 deletions.
69 changes: 27 additions & 42 deletions common/configuration/asm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ import (
"path"
"sort"
"testing"
"time"

"connectrpc.com/connect"
"github.com/TBD54566975/ftl/backend/controller/leases"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/benbjohnson/clock"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
Expand All @@ -30,11 +28,9 @@ import (
"github.com/aws/aws-sdk-go-v2/service/secretsmanager/types"
)

func setUp(ctx context.Context, t *testing.T, router optional.Option[Router[Secrets]]) (*Manager[Secrets], *ASM, *asmLeader, *secretsmanager.Client, *clock.Mock, *leases.FakeLeaser) {
func setUp(ctx context.Context, t *testing.T, router optional.Option[Router[Secrets]]) (*Manager[Secrets], *ASM, *asmLeader, *secretsmanager.Client, *ManualSyncProvider[Secrets], *leases.FakeLeaser) {
t.Helper()

mockClock := clock.NewMock()

if _, ok := router.Get(); !ok {
dir := t.TempDir()
projectPath := path.Join(dir, "ftl-project.toml")
Expand All @@ -51,14 +47,16 @@ func setUp(ctx context.Context, t *testing.T, router optional.Option[Router[Secr
})
leaser := leases.NewFakeLeaser()
asm := NewASM(ctx, externalClient, URL("http://localhost:1234"), leaser)
manualSyncProvider := NewManualSyncProvider[Secrets](asm)

sm := newForTesting(ctx, router.MustGet(), []Provider[Secrets]{asm}, mockClock)
sm, err := New(ctx, router.MustGet(), []Provider[Secrets]{manualSyncProvider})
assert.NoError(t, err)

leaderOrFollower, err := asm.coordinator.Get()
assert.NoError(t, err)
leader, ok := leaderOrFollower.(*asmLeader)
assert.True(t, ok, "expected test to get an asm leader not a follower")
return sm, asm, leader, externalClient, mockClock, leaser
return sm, asm, leader, externalClient, manualSyncProvider, leaser
}

func waitForUpdatesToProcess(c *cache[Secrets]) {
Expand Down Expand Up @@ -172,23 +170,17 @@ func TestASMPagination(t *testing.T) {
// TestLeaderSync sets and gets values via the leader, as well as directly with ASM
func TestLeaderSync(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
sm, _, _, externalClient, clock, _ := setUp(ctx, t, None[Router[Secrets]]())
testClientSync(ctx, t, sm, externalClient, true, func(percentage float64) {
clock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0))
if percentage == 1.0 {
time.Sleep(time.Second * 4)
}
})
sm, _, _, externalClient, manualSync, _ := setUp(ctx, t, None[Router[Secrets]]())
testClientSync(ctx, t, sm, externalClient, true, []*ManualSyncProvider[Secrets]{manualSync})
}

// TestFollowerSync tests setting and getting values from a follower to the leader to ASM, and vice versa
func TestFollowerSync(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
leaderManager, _, _, externalClient, leaderClock, leaser := setUp(ctx, t, None[Router[Secrets]]())
leaderManager, _, _, externalClient, leaderManualSync, leaser := setUp(ctx, t, None[Router[Secrets]]())

// fakeRPCClient connects the follower to the leader
fakeRPCClient := &fakeAdminClient{sm: leaderManager}
followerClock := clock.NewMock()
follower := newASMFollower(fakeRPCClient, "fake")

followerASM := newASMForTesting(ctx, externalClient, URL("http://localhost:1235"), leaser, optional.Some[asmClient](follower))
Expand All @@ -197,22 +189,12 @@ func TestFollowerSync(t *testing.T) {
_, ok := asmClient.(*asmFollower)
assert.True(t, ok, "expected test to get an asm follower not a leader")

sm := newForTesting(ctx, leaderManager.router, []Provider[Secrets]{followerASM}, followerClock)
assert.NoError(t, err)
followerManualSync := NewManualSyncProvider(followerASM)

testClientSync(ctx, t, sm, externalClient, false, func(percentage float64) {
// sync leader
leaderClock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0))
if percentage == 1.0 {
time.Sleep(time.Second * 4)
}
sm, err := New(ctx, leaderManager.router, []Provider[Secrets]{followerManualSync})
assert.NoError(t, err)

// then sync follower
followerClock.Add(time.Second * (time.Duration(asmFollowerSyncInterval.Seconds()*percentage) + 1.0))
if percentage == 1.0 {
time.Sleep(time.Second * 4)
}
})
testClientSync(ctx, t, sm, externalClient, false, []*ManualSyncProvider[Secrets]{leaderManualSync, followerManualSync})
}

// testClientSync uses a Manager and a secretsmanager.Client to test setting and getting secrets
Expand All @@ -221,19 +203,14 @@ func testClientSync(ctx context.Context,
sm *Manager[Secrets],
externalClient *secretsmanager.Client,
isLeader bool,
progressByIntervalPercentage func(percentage float64)) {
manualSyncProviders []*ManualSyncProvider[Secrets]) {
t.Helper()

// advance clock to half way between syncs
progressByIntervalPercentage(0.5)

// wait for initial load
err := sm.cache.providers["asm"].waitForInitialSync()
assert.NoError(t, err)
waitForManualSync(t, manualSyncProviders)

// write a secret via asmClient
clientRef := Ref{Module: Some("sync"), Name: "set-by-client"}
err = sm.Set(ctx, "asm", clientRef, "client-first")
err := sm.Set(ctx, "asm", clientRef, "client-first")
assert.NoError(t, err)
waitForUpdatesToProcess(sm.cache)
value, err := sm.getData(ctx, clientRef)
Expand Down Expand Up @@ -270,8 +247,7 @@ func testClientSync(ctx context.Context,
assert.NoError(t, err, "failed to load secret via asm")
assert.Equal(t, value, jsonBytes(t, "sm-client-second"), "unexpected secret value")

// give client a change to sync
progressByIntervalPercentage(1.0)
waitForManualSync(t, manualSyncProviders)

// confirm that all secrets are up to date
list, err := sm.List(ctx)
Expand Down Expand Up @@ -316,15 +292,24 @@ func testClientSync(ctx context.Context,
})
assert.NoError(t, err)

// give client a change to sync
progressByIntervalPercentage(1.0)
waitForManualSync(t, manualSyncProviders)

_, err = sm.getData(ctx, smRef)
assert.Error(t, err, "expected to fail because secret was deleted")
_, err = sm.getData(ctx, smClientRef)
assert.Error(t, err, "expected to fail because secret was deleted")
}

// waitForManualSync syncs each provider in order
func waitForManualSync[R Role](t *testing.T, providers []*ManualSyncProvider[R]) {
t.Helper()

for _, provider := range providers {
err := provider.SyncAndWait()
assert.NoError(t, err)
}
}

func storeUnobfuscatedValueInASM(ctx context.Context, sm *Manager[Secrets], externalClient *secretsmanager.Client, ref Ref, value []byte, isNew bool) error {
obfuscator := Secrets{}.obfuscator()
obfuscatedValue, err := obfuscator.Obfuscate(value)
Expand Down
27 changes: 13 additions & 14 deletions common/configuration/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
"github.com/benbjohnson/clock"
"github.com/puzpuzpuz/xsync/v3"
)

Expand Down Expand Up @@ -48,7 +47,7 @@ type cache[R Role] struct {
topicWaitGroup *sync.WaitGroup
}

func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R], listProvider listProvider, clock clock.Clock) *cache[R] {
func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R], listProvider listProvider) *cache[R] {
cacheProviders := make(map[string]*cacheProvider[R], len(providers))
for _, provider := range providers {
cacheProviders[provider.Key()] = &cacheProvider[R]{
Expand All @@ -64,7 +63,7 @@ func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R],
topic: pubsub.New[updateCacheEvent](),
topicWaitGroup: &sync.WaitGroup{},
}
go cache.sync(ctx, clock)
go cache.sync(ctx)

return cache
}
Expand Down Expand Up @@ -121,7 +120,7 @@ func (c *cache[R]) deletedValue(ref Ref, pkey string) {
// Errors returned by a provider cause retries with exponential backoff.
//
// Events are processed when all providers are not being synced
func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
func (c *cache[R]) sync(ctx context.Context) {
if len(c.providers) == 0 {
// nothing to sync
return
Expand All @@ -134,7 +133,7 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
defer c.topic.Unsubscribe(events)

// start syncing immediately
next := clock.Now()
next := time.Now()

for {
select {
Expand All @@ -145,12 +144,12 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
c.processEvent(e)

// Can not calculate next sync date for each provider as sync intervals can change (eg when follower becomes leader)
case <-clock.After(next.Sub(clock.Now())):
case <-time.After(time.Until(next)):
wg := &sync.WaitGroup{}

providersToSync := []*cacheProvider[R]{}
for _, cp := range c.providers {
if cp.needsSync(clock) {
if cp.needsSync() {
providersToSync = append(providersToSync, cp)
}
}
Expand All @@ -168,12 +167,12 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
})
wg.Add(1)
go func(cp *cacheProvider[R]) {
cp.sync(ctx, entriesForProvider, clock)
cp.sync(ctx, entriesForProvider)
wg.Done()
}(cp)
}
wg.Wait()
next = clock.Now().Add(time.Second)
next = time.Now().Add(time.Second)
}
}
}
Expand Down Expand Up @@ -211,22 +210,22 @@ func (c *cacheProvider[R]) waitForInitialSync() error {
}

// needsSync returns true if the provider needs to be synced.
func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool {
func (c *cacheProvider[R]) needsSync() bool {
lastSyncAttempt, ok := c.lastSyncAttempt.Get()
if !ok {
return true
}
if currentBackoff, ok := c.currentBackoff.Get(); ok {
return clock.Now().After(lastSyncAttempt.Add(currentBackoff))
return time.Now().After(lastSyncAttempt.Add(currentBackoff))
}
return clock.Now().After(lastSyncAttempt.Add(c.provider.SyncInterval()))
return time.Now().After(lastSyncAttempt.Add(c.provider.SyncInterval()))
}

// sync executes sync on the provider and updates the cacheProvider sync state
func (c *cacheProvider[R]) sync(ctx context.Context, entries []Entry, clock clock.Clock) {
func (c *cacheProvider[R]) sync(ctx context.Context, entries []Entry) {
logger := log.FromContext(ctx)

c.lastSyncAttempt = optional.Some(clock.Now())
c.lastSyncAttempt = optional.Some(time.Now())
err := c.provider.Sync(ctx, entries, c.values)
if err != nil {
logger.Errorf(err, "Error syncing %s", c.provider.Key())
Expand Down
9 changes: 2 additions & 7 deletions common/configuration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"

"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
)

// Role of [Manager], either Secrets or Configuration.
Expand Down Expand Up @@ -65,10 +64,6 @@ func NewDefaultConfigurationManagerFromConfig(ctx context.Context, config string

// New configuration manager.
func New[R Role](ctx context.Context, router Router[R], providers []Provider[R]) (*Manager[R], error) {
return newForTesting(ctx, router, providers, clock.New()), nil
}

func newForTesting[R Role](ctx context.Context, router Router[R], providers []Provider[R], clock clock.Clock) *Manager[R] {
m := &Manager[R]{
providers: map[string]Provider[R]{},
}
Expand All @@ -86,9 +81,9 @@ func newForTesting[R Role](ctx context.Context, router Router[R], providers []Pr
asyncProviders = append(asyncProviders, sp)
}
}
m.cache = newCache[R](ctx, asyncProviders, m, clock)
m.cache = newCache[R](ctx, asyncProviders, m)

return m
return m, nil
}

func ProviderKeyForAccessor(accessor *url.URL) string {
Expand Down
86 changes: 86 additions & 0 deletions common/configuration/manual_sync_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//go:build !release

package configuration

import (
"context"
"net/url"
"time"

"github.com/alecthomas/atomic"

"github.com/alecthomas/types/optional"
"github.com/puzpuzpuz/xsync/v3"
)

type manualSyncBlock struct {
sync chan optional.Option[error]
}

// ManualSyncProvider prevents normal syncs by returning a very high sync interval
// when syncAndWait() is called, it starts returning a 0 sync interval and then then blocks until sync completes.
// See why we didn't use mock clocks to schedule syncs here: https://github.com/TBD54566975/ftl/issues/2092
type ManualSyncProvider[R Role] struct {
syncRequested atomic.Value[optional.Option[manualSyncBlock]]

provider AsynchronousProvider[R]
}

var _ AsynchronousProvider[Secrets] = &ManualSyncProvider[Secrets]{}

func NewManualSyncProvider[R Role](provider AsynchronousProvider[R]) *ManualSyncProvider[R] {
return &ManualSyncProvider[R]{
provider: provider,
}
}

func (a *ManualSyncProvider[R]) SyncAndWait() error {
block := manualSyncBlock{
sync: make(chan optional.Option[error]),
}
a.syncRequested.Store(optional.Some(block))
err := <-block.sync
if err, hasErr := err.Get(); hasErr {
return err //nolint:wrapcheck
}
return nil
}

func (a *ManualSyncProvider[R]) Role() R {
return a.provider.Role()
}

func (a *ManualSyncProvider[R]) Key() string {
return a.provider.Key()
}

func (a *ManualSyncProvider[R]) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) {
return a.provider.Store(ctx, ref, value) //nolint:wrapcheck
}

func (a *ManualSyncProvider[R]) Delete(ctx context.Context, ref Ref) error {
return a.provider.Delete(ctx, ref) //nolint:wrapcheck
}

func (a *ManualSyncProvider[R]) SyncInterval() time.Duration {
if _, ok := a.syncRequested.Load().Get(); ok {
// sync now
return 0
}
// prevent sync
return time.Hour * 24 * 365
}

func (a *ManualSyncProvider[R]) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error {
err := a.provider.Sync(ctx, entries, values)

if block, ok := a.syncRequested.Load().Get(); ok {
a.syncRequested.Store(optional.None[manualSyncBlock]())
if err == nil {
block.sync <- optional.None[error]()
} else {
block.sync <- optional.Some(err)
}
}
return err //nolint:wrapcheck
}

0 comments on commit 60acb0b

Please sign in to comment.