Skip to content

Commit 8de7a7b

Browse files
faecmergify[bot]
authored andcommitted
Fix flaky logstash output test (#43402)
Fix a flaky logstash output test for the `deadlockListener`. The old test used live time tickers and occasionally failed because timing on CI machines is highly variable. This PR refactors `deadlockListener` to allow tests to override the current time and the ticker channel, so the test can run deterministically in a single goroutine with no calls to `time.Sleep`. (cherry picked from commit 6d92a40)
1 parent b5a823d commit 8de7a7b

File tree

2 files changed

+96
-48
lines changed

2 files changed

+96
-48
lines changed

libbeat/outputs/logstash/deadlock.go

+67-36
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,25 @@
1818
package logstash
1919

2020
import (
21+
"context"
2122
"time"
2223

2324
"github.com/elastic/elastic-agent-libs/logp"
2425
)
2526

2627
type deadlockListener struct {
27-
log *logp.Logger
28-
timeout time.Duration
29-
ticker *time.Ticker
28+
log *logp.Logger
29+
timeout time.Duration
30+
tickerChan <-chan time.Time
31+
32+
// getTime returns the current time (allows overriding time.Now in the tests)
33+
getTime func() time.Time
34+
lastTime time.Time
3035

3136
ackChan chan int
3237

33-
doneChan chan struct{}
38+
ctx context.Context
39+
ctxCancel context.CancelFunc
3440
}
3541

3642
const logstashDeadlockTimeout = 5 * time.Minute
@@ -39,57 +45,82 @@ func newDeadlockListener(log *logp.Logger, timeout time.Duration) *deadlockListe
3945
if timeout <= 0 {
4046
return nil
4147
}
42-
r := &deadlockListener{
43-
log: log,
44-
timeout: timeout,
45-
ticker: time.NewTicker(timeout),
4648

47-
ackChan: make(chan int),
48-
doneChan: make(chan struct{}),
49+
dl := idleDeadlockListener(log, timeout, time.Now)
50+
51+
// Check timeouts at a steady interval of once a second. This loses
52+
// sub-second granularity (which we don't need) in exchange for making
53+
// the API deterministically testable (using real timers in the tests
54+
// was causing flakiness).
55+
ticker := time.NewTicker(time.Second)
56+
dl.tickerChan = ticker.C
57+
go func() {
58+
defer ticker.Stop()
59+
dl.run()
60+
}()
61+
62+
return dl
63+
}
64+
65+
// Initialize the listener without an active ticker, and don't start the run()
66+
// goroutine, so unit tests can control the execution timing.
67+
func idleDeadlockListener(log *logp.Logger, timeout time.Duration, getTime func() time.Time) *deadlockListener {
68+
ctx, cancel := context.WithCancel(context.Background())
69+
return &deadlockListener{
70+
log: log,
71+
timeout: timeout,
72+
getTime: getTime,
73+
lastTime: getTime(),
74+
75+
ackChan: make(chan int),
76+
77+
ctx: ctx,
78+
ctxCancel: cancel,
4979
}
50-
go r.run()
51-
return r
5280
}
5381

54-
func (r *deadlockListener) run() {
55-
defer r.ticker.Stop()
56-
defer close(r.doneChan)
57-
for {
58-
select {
59-
case n, ok := <-r.ackChan:
60-
if !ok {
61-
// Listener has been closed
62-
return
63-
}
64-
if n > 0 {
65-
// If progress was made, reset the countdown.
66-
r.ticker.Reset(r.timeout)
67-
}
68-
case <-r.ticker.C:
82+
func (dl *deadlockListener) run() {
83+
for dl.ctx.Err() == nil {
84+
dl.runIteration()
85+
}
86+
}
87+
88+
func (dl *deadlockListener) runIteration() {
89+
select {
90+
case <-dl.ctx.Done():
91+
// Listener has been closed
92+
return
93+
case n := <-dl.ackChan:
94+
if n > 0 {
95+
// If progress was made, reset the countdown.
96+
dl.lastTime = dl.getTime()
97+
}
98+
case <-dl.tickerChan:
99+
if dl.getTime().Sub(dl.lastTime) >= dl.timeout {
69100
// No progress was made within the timeout, log error so users
70101
// know there is likely a problem with the upstream host
71-
r.log.Errorf("Logstash batch hasn't reported progress in the last %v, the Logstash host may be stalled. This problem can be prevented by configuring Logstash to use PipelineBusV1 or by upgrading Logstash to 8.17+, for details see https://github.com/elastic/logstash/issues/16657", r.timeout)
72-
return
102+
dl.log.Errorf("Logstash batch hasn't reported progress in the last %v, the Logstash host may be stalled. This problem can be prevented by configuring Logstash to use PipelineBusV1 or by upgrading Logstash to 8.17+, for details see https://github.com/elastic/logstash/issues/16657", dl.timeout)
103+
dl.ctxCancel()
73104
}
74105
}
75106
}
76107

77-
func (r *deadlockListener) ack(n int) {
78-
if r == nil {
108+
func (dl *deadlockListener) ack(n int) {
109+
if dl == nil {
79110
return
80111
}
81112
// Send the new ack to the run loop, unless it has already shut down in
82113
// which case it can be safely ignored.
83114
select {
84-
case r.ackChan <- n:
85-
case <-r.doneChan:
115+
case dl.ackChan <- n:
116+
case <-dl.ctx.Done():
86117
}
87118
}
88119

89-
func (r *deadlockListener) close() {
90-
if r == nil {
120+
func (dl *deadlockListener) close() {
121+
if dl == nil {
91122
return
92123
}
93124
// Signal the run loop to shut down
94-
close(r.ackChan)
125+
dl.ctxCancel()
95126
}

libbeat/outputs/logstash/deadlock_test.go

+29-12
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,48 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
2526

2627
"github.com/elastic/elastic-agent-libs/logp"
2728
)
2829

2930
func TestDeadlockListener(t *testing.T) {
30-
const timeout = 5 * time.Millisecond
31-
log := logp.NewLogger("test")
32-
listener := newDeadlockListener(log, timeout)
31+
const timeout = time.Second
32+
var currentTime time.Time
33+
getTime := func() time.Time { return currentTime }
34+
35+
dl := idleDeadlockListener(logp.NewLogger("test"), timeout, getTime)
36+
37+
// Channels get a buffer so we can trigger them deterministically in
38+
// one goroutine.
39+
tickerChan := make(chan time.Time, 1)
40+
dl.tickerChan = tickerChan
41+
dl.ackChan = make(chan int, 1)
3342

3443
// Verify that the listener doesn't trigger when receiving regular acks
3544
for i := 0; i < 5; i++ {
36-
time.Sleep(timeout / 2)
37-
listener.ack(1)
38-
}
39-
select {
40-
case <-listener.doneChan:
41-
require.Fail(t, "Deadlock listener should not trigger unless there is no progress for the configured time interval")
42-
case <-time.After(timeout / 2):
45+
// Advance the "current time" and ping the ticker channel to refresh
46+
// the timeout check, then send an ack and confirm that it hasn't timed
47+
// out yet.
48+
currentTime = currentTime.Add(timeout - 1)
49+
tickerChan <- currentTime
50+
dl.runIteration()
51+
52+
dl.ack(1)
53+
dl.runIteration()
54+
assert.Equal(t, currentTime, dl.lastTime)
55+
assert.Nil(t, dl.ctx.Err(), "Deadlock listener context shouldn't expire until the timeout is reached")
4356
}
4457

4558
// Verify that the listener does trigger when the acks stop
59+
currentTime = currentTime.Add(timeout)
60+
tickerChan <- currentTime
61+
dl.runIteration()
62+
4663
select {
47-
case <-time.After(timeout):
64+
case <-dl.ctx.Done():
65+
default:
4866
require.Fail(t, "Deadlock listener should trigger when there is no progress for the configured time interval")
49-
case <-listener.doneChan:
5067
}
5168
}

0 commit comments

Comments
 (0)