Skip to content

Commit e7e9fba

Browse files
committed
Fix https_batch deadlock due to golang timer changes
It makes tests hopefully more robus It also replaces most sleeps with Consistently and Eventually. It makes the timings more forgiving. This should make it reliable on weak hardware. Add functional options pattern to allow test configuration
1 parent 58e63a4 commit e7e9fba

File tree

3 files changed

+110
-51
lines changed

3 files changed

+110
-51
lines changed

src/pkg/egress/syslog/https_batch.go

+70-35
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,35 @@ import (
44
"bytes"
55
"crypto/tls"
66
"log"
7+
"sync"
78
"time"
89

910
"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
1011
metrics "code.cloudfoundry.org/go-metric-registry"
1112
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
1213
)
1314

14-
const BATCHSIZE = 256 * 1024
15-
1615
type HTTPSBatchWriter struct {
1716
HTTPSWriter
18-
msgs chan []byte
1917
batchSize int
2018
sendInterval time.Duration
21-
egrMsgCount float64
19+
msgChan chan []byte
20+
quit chan struct{}
21+
wg sync.WaitGroup
22+
}
23+
24+
type Option func(*HTTPSBatchWriter)
25+
26+
func WithBatchSize(size int) Option {
27+
return func(w *HTTPSBatchWriter) {
28+
w.batchSize = size
29+
}
30+
}
31+
32+
func WithSendInterval(interval time.Duration) Option {
33+
return func(w *HTTPSBatchWriter) {
34+
w.sendInterval = interval
35+
}
2236
}
2337

2438
func NewHTTPSBatchWriter(
@@ -27,10 +41,12 @@ func NewHTTPSBatchWriter(
2741
tlsConf *tls.Config,
2842
egressMetric metrics.Counter,
2943
c *Converter,
44+
options ...Option,
3045
) egress.WriteCloser {
3146
client := httpClient(netConf, tlsConf)
32-
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
33-
BatchWriter := &HTTPSBatchWriter{
47+
binding.URL.Scheme = "https"
48+
49+
writer := &HTTPSBatchWriter{
3450
HTTPSWriter: HTTPSWriter{
3551
url: binding.URL,
3652
appID: binding.AppID,
@@ -39,60 +55,79 @@ func NewHTTPSBatchWriter(
3955
egressMetric: egressMetric,
4056
syslogConverter: c,
4157
},
42-
batchSize: BATCHSIZE,
43-
sendInterval: 1 * time.Second,
44-
egrMsgCount: 0,
45-
msgs: make(chan []byte),
58+
batchSize: 256 * 1024, // Default value
59+
sendInterval: 1 * time.Second, // Default value
60+
msgChan: make(chan []byte), // Buffered channel for messages
61+
quit: make(chan struct{}),
62+
}
63+
64+
for _, opt := range options {
65+
opt(writer)
4666
}
47-
go BatchWriter.startSender()
48-
return BatchWriter
67+
68+
writer.wg.Add(1)
69+
go writer.startSender()
70+
71+
return writer
4972
}
5073

51-
// Modified Write function
5274
func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
5375
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
5476
if err != nil {
55-
log.Printf("Failed to parse syslog, dropping faulty message, err: %s", err)
77+
log.Printf("Failed to parse syslog, dropping message, err: %s", err)
5678
return nil
5779
}
5880

5981
for _, msg := range msgs {
60-
//There is no correct way of implementing error based retries in the current architecture.
61-
//Retries for https-batching will be implemented at a later point in time.
62-
w.msgs <- msg
82+
w.msgChan <- msg
6383
}
84+
6485
return nil
6586
}
6687

6788
func (w *HTTPSBatchWriter) startSender() {
68-
t := time.NewTimer(w.sendInterval)
89+
defer w.wg.Done()
90+
91+
ticker := time.NewTicker(w.sendInterval)
92+
defer ticker.Stop()
6993

7094
var msgBatch bytes.Buffer
7195
var msgCount float64
72-
reset := func() {
73-
msgBatch.Reset()
74-
msgCount = 0
75-
t.Reset(w.sendInterval)
96+
97+
sendBatch := func() {
98+
if msgBatch.Len() > 0 {
99+
w.sendHttpRequest(msgBatch.Bytes(), msgCount) // nolint:errcheck
100+
msgBatch.Reset()
101+
msgCount = 0
102+
}
76103
}
104+
77105
for {
78106
select {
79-
case msg := <-w.msgs:
80-
length, buffer_err := msgBatch.Write(msg)
81-
if buffer_err != nil {
82-
log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", length, buffer_err)
83-
reset()
107+
case msg := <-w.msgChan:
108+
_, err := msgBatch.Write(msg)
109+
if err != nil {
110+
log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", msgBatch.Len(), err)
111+
msgBatch.Reset()
112+
msgCount = 0
84113
} else {
85114
msgCount++
86-
if length >= w.batchSize {
87-
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
88-
reset()
115+
if msgBatch.Len() >= w.batchSize {
116+
sendBatch()
89117
}
90118
}
91-
case <-t.C:
92-
if msgBatch.Len() > 0 {
93-
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
94-
reset()
95-
}
119+
case <-ticker.C:
120+
sendBatch()
121+
case <-w.quit:
122+
sendBatch()
123+
return
96124
}
97125
}
98126
}
127+
128+
func (w *HTTPSBatchWriter) Close() error {
129+
close(w.quit)
130+
w.wg.Wait() // Ensure sender finishes processing before closing
131+
close(w.msgChan)
132+
return nil
133+
}

src/pkg/egress/syslog/https_batch_test.go

+35-16
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,14 @@ import (
1717
. "github.com/onsi/gomega"
1818
)
1919

20-
var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf"
20+
var stringTo256Chars string
21+
22+
func init() {
23+
//With the rest of the syslog, this results in a syslogenvelope of the size 400
24+
for i := 0; i < 256; i++ {
25+
stringTo256Chars += "a"
26+
}
27+
}
2128

2229
var _ = Describe("HTTPS_batch", func() {
2330
var (
@@ -30,10 +37,10 @@ var _ = Describe("HTTPS_batch", func() {
3037
b *syslog.URLBinding
3138
writer egress.WriteCloser
3239
)
33-
string_to_1024_chars += string_to_1024_chars
3440

3541
BeforeEach(func() {
3642
drain = newBatchMockDrain(200)
43+
drain.Reset()
3744
b = buildURLBinding(
3845
drain.URL,
3946
"test-app-id",
@@ -45,6 +52,8 @@ var _ = Describe("HTTPS_batch", func() {
4552
skipSSLTLSConfig,
4653
&metricsHelpers.SpyMetric{},
4754
c,
55+
syslog.WithBatchSize(5000),
56+
syslog.WithSendInterval(100*time.Millisecond),
4857
)
4958
})
5059

@@ -53,7 +62,7 @@ var _ = Describe("HTTPS_batch", func() {
5362
Expect(writer.Write(env1)).To(Succeed())
5463
env2 := buildLogEnvelope("APP", "2", "message 2", loggregator_v2.Log_OUT)
5564
Expect(writer.Write(env2)).To(Succeed())
56-
time.Sleep(1050 * time.Millisecond)
65+
time.Sleep(150 * time.Millisecond)
5766

5867
Expect(drain.getMessagesSize()).Should(Equal(2))
5968
expected := &rfc5424.Message{
@@ -83,24 +92,36 @@ var _ = Describe("HTTPS_batch", func() {
8392
})
8493

8594
It("test batch dispatching with all logs in a given timeframe", func() {
86-
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
95+
env1 := buildLogEnvelope("APP", "1", "string to get log to 400 characters:"+stringTo256Chars, loggregator_v2.Log_OUT)
8796
for i := 0; i < 10; i++ {
8897
Expect(writer.Write(env1)).To(Succeed())
89-
time.Sleep(99 * time.Millisecond)
9098
}
91-
Expect(drain.getMessagesSize()).Should(Equal(0))
92-
time.Sleep(100 * time.Millisecond)
93-
Expect(drain.getMessagesSize()).Should(Equal(10))
99+
Expect(drain.getMessagesSize()).To(Equal(0))
100+
Eventually(drain.getMessagesSize, 180*time.Millisecond).Should(Equal(10))
94101
})
95102

96-
It("probabilistic test for race condition", func() {
97-
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
98-
for i := 0; i < 10; i++ {
103+
It("test dispatching for batches before timewindow is finished", func() {
104+
// One envelope has the size of 400byte
105+
env1 := buildLogEnvelope("APP", "1", "string to get log to 400 characters:"+stringTo256Chars, loggregator_v2.Log_OUT)
106+
107+
for i := 0; i < 20; i++ {
99108
Expect(writer.Write(env1)).To(Succeed())
100-
time.Sleep(99 * time.Millisecond)
101109
}
102-
time.Sleep(100 * time.Millisecond)
103-
Expect(drain.getMessagesSize()).Should(Equal(10))
110+
// DefaultBatchSize = 5000byte, 12 * 400byte = 4800byte, 13 * 400byte = 5200byte
111+
// -> The batch will trigger after 13 messages, and this is not a direct hit to prevent inconsistencies.
112+
Eventually(drain.getMessagesSize, 50*time.Millisecond).Should(Equal(13))
113+
Eventually(drain.getMessagesSize, 120*time.Millisecond).Should(Equal(20))
114+
})
115+
116+
It("test for hanging after some ticks", func() {
117+
// This test will not succeed on the timer based implementation,
118+
// it works fine with a ticker based implementation.
119+
env1 := buildLogEnvelope("APP", "1", "only a short test message", loggregator_v2.Log_OUT)
120+
for i := 0; i < 5; i++ {
121+
Expect(writer.Write(env1)).To(Succeed())
122+
time.Sleep(220 * time.Millisecond) // this sleeps at least 2 ticks, to trigger once without events
123+
}
124+
Eventually(drain.getMessagesSize, 120*time.Millisecond).Should(Equal(5))
104125
})
105126
})
106127

@@ -112,8 +133,6 @@ func newBatchMockDrain(status int) *SpyDrain {
112133
Expect(err).ToNot(HaveOccurred())
113134
defer r.Body.Close()
114135

115-
println(body)
116-
117136
message := &rfc5424.Message{}
118137

119138
messages := bytes.SplitAfter(body, []byte("\n"))

src/pkg/egress/syslog/https_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ type SpyDrain struct {
251251
headers []http.Header
252252
}
253253

254+
func (d *SpyDrain) Reset() {
255+
d.messages = nil
256+
d.headers = nil
257+
}
258+
254259
func (d *SpyDrain) appendMessage(message *rfc5424.Message) {
255260
d.mu.Lock()
256261
defer d.mu.Unlock()

0 commit comments

Comments
 (0)