Skip to content

Commit

Permalink
tracker: implement emptying pending removal map once in a while
Browse files Browse the repository at this point in the history
  • Loading branch information
Wessie committed May 3, 2024
1 parent 624690b commit e7aff28
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 21 deletions.
11 changes: 7 additions & 4 deletions tracker/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
)

func TestListenerAddAndRemove(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

recorder := NewRecorder()
recorder := NewRecorder(ctx)
dummy := NewServer(ctx, "", recorder)

srv := httptest.NewServer(dummy.Handler)
Expand Down Expand Up @@ -100,8 +101,10 @@ func TestListenerAddAndRemove(t *testing.T) {
}

func BenchmarkListenerAdd(b *testing.B) {
recorder := NewRecorder()
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

recorder := NewRecorder(ctx)

handler := ListenerAdd(ctx, recorder)

Expand Down
18 changes: 13 additions & 5 deletions tracker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import (
"github.com/rs/zerolog"
)

var UpdateListenersTickrate = time.Second * 10
const (
UpdateListenersTickrate = time.Second * 10
RemoveStalePendingTickrate = time.Hour * 24
RemoveStalePendingPeriod = time.Minute * 5
)

func Execute(ctx context.Context, cfg config.Config) error {
manager := cfg.Conf().Manager.Client()

var recorder = NewRecorder()
var recorder = NewRecorder(ctx)

go PeriodicallyUpdateListeners(ctx, manager, recorder)
go PeriodicallyUpdateListeners(ctx, manager, recorder, UpdateListenersTickrate)

srv := NewServer(ctx, ":9999", recorder)

Expand All @@ -33,8 +37,12 @@ func Execute(ctx context.Context, cfg config.Config) error {
}
}

func PeriodicallyUpdateListeners(ctx context.Context, manager radio.ManagerService, recorder *Recorder) {
ticker := time.NewTicker(UpdateListenersTickrate)
func PeriodicallyUpdateListeners(ctx context.Context,
manager radio.ManagerService,
recorder *Recorder,
tickrate time.Duration,
) {
ticker := time.NewTicker(tickrate)
defer ticker.Stop()

for {
Expand Down
6 changes: 2 additions & 4 deletions tracker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestPeriodicallyUpdateListeners(t *testing.T) {
defer cancel()
done := make(chan struct{})

recorder := NewRecorder()
recorder := NewRecorder(ctx)
var last atomic.Int64
var count int
var closeOnce sync.Once
Expand Down Expand Up @@ -53,12 +53,10 @@ func TestPeriodicallyUpdateListeners(t *testing.T) {
},
}

// set the tickrate a bit higher for testing purposes
UpdateListenersTickrate = time.Millisecond * 10
finished := make(chan struct{})
go func() {
defer close(finished)
PeriodicallyUpdateListeners(ctx, manager, recorder)
PeriodicallyUpdateListeners(ctx, manager, recorder, time.Millisecond*10)
}()

// wait for the 10 updates
Expand Down
38 changes: 36 additions & 2 deletions tracker/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/R-a-dio/valkyrie/telemetry"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -38,11 +39,13 @@ type Listener struct {
Info url.Values
}

func NewRecorder() *Recorder {
return &Recorder{
func NewRecorder(ctx context.Context) *Recorder {
r := &Recorder{
pendingRemoval: make(map[ClientID]time.Time),
listeners: make(map[ClientID]*Listener),
}
go r.PeriodicallyRemoveStalePending(ctx, RemoveStalePendingTickrate)
return r
}

type Recorder struct {
Expand All @@ -52,6 +55,37 @@ type Recorder struct {
listenerAmount atomic.Int64
}

func (r *Recorder) PeriodicallyRemoveStalePending(ctx context.Context, tickrate time.Duration) {
ticker := time.NewTicker(tickrate)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stale := r.removeStalePending()
if stale > 0 {
zerolog.Ctx(ctx).Error().Int("amount", stale).Msg("found stale pending removals")
}
}
}
}

func (r *Recorder) removeStalePending() (found_stale int) {
deadline := time.Now().Add(-RemoveStalePendingPeriod)

r.mu.Lock()
defer r.mu.Unlock()
for k, v := range r.pendingRemoval {
if v.Before(deadline) {
delete(r.pendingRemoval, k)
found_stale++
}
}
return found_stale
}

func (r *Recorder) ListenerAmount() int64 {
return r.listenerAmount.Load()
}
Expand Down
81 changes: 75 additions & 6 deletions tracker/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ const (
)

func TestListenerAddAndRemoval(t *testing.T) {
r := NewRecorder()
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

r := NewRecorder(ctx)

req := httptest.NewRequest(http.MethodGet, "/test", nil)

count := ClientID(200)
Expand All @@ -27,23 +30,26 @@ func TestListenerAddAndRemoval(t *testing.T) {

assert.Eventually(t, func() bool {
return int64(count) == r.ListenerAmount()
}, time.Second*5, time.Millisecond*50)
}, eventuallyDelay, eventuallyTick)

for i := range count {
go r.ListenerRemove(ctx, i, req)
}

assert.Eventually(t, func() bool {
return 0 == r.ListenerAmount()
}, time.Second*5, time.Millisecond*50)
}, eventuallyDelay, eventuallyTick)

assert.Len(t, r.listeners, 0)
assert.Len(t, r.pendingRemoval, 0)
}

func TestListenerAddAndRemovalOutOfOrder(t *testing.T) {
r := NewRecorder()
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

r := NewRecorder(ctx)

req := httptest.NewRequest(http.MethodGet, "/test", nil)

count := int64(200)
Expand Down Expand Up @@ -97,3 +103,66 @@ func testPendingLength(t *testing.T, r *Recorder, expected int) bool {
defer r.mu.Unlock()
return assert.Len(t, r.pendingRemoval, expected)
}

func testCtx(t *testing.T, ctxx ...context.Context) (ctx context.Context) {
if len(ctxx) == 0 {
ctx = context.Background()
} else {
ctx = ctxx[0]
}

ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
return ctx
}

func TestRecorderRemoveStalePending(t *testing.T) {

t.Run("simple removal", func(t *testing.T) {
ctx := testCtx(t)
r := NewRecorder(ctx)

id := ClientID(10)
r.mu.Lock()
r.pendingRemoval[id] = time.Now().Add(-RemoveStalePendingPeriod)
r.mu.Unlock()

found := r.removeStalePending()
assert.Equal(t, 1, found)

testPendingLength(t, r, 0)
})
t.Run("many removal", func(t *testing.T) {
ctx := testCtx(t)
r := NewRecorder(ctx)

count := RemoveStalePendingPeriod / time.Second * 2
r.mu.Lock()
for i := range ClientID(count) {
r.pendingRemoval[i] = time.Now().Add(-time.Second * time.Duration(i))
}
r.mu.Unlock()

testPendingLength(t, r, int(count))
found := r.removeStalePending()
testPendingLength(t, r, int(count/2))
assert.Equal(t, int(count/2), found)
})
t.Run("removal by periodic goroutine", func(t *testing.T) {
ctx := testCtx(t)
r := NewRecorder(ctx)

id := ClientID(10)
r.mu.Lock()
r.pendingRemoval[id] = time.Now().Add(-RemoveStalePendingPeriod)
r.mu.Unlock()

// launch an extra period goroutine, since the one we create
// in NewRecorder is very slow
go r.PeriodicallyRemoveStalePending(ctx, eventuallyTick)

assert.Eventually(t, func() bool {
return testPendingLength(t, r, 0)
}, eventuallyDelay, eventuallyTick)
})
}

0 comments on commit e7aff28

Please sign in to comment.