Skip to content

Commit

Permalink
improve proxy unit tests (#203)
Browse files Browse the repository at this point in the history
* fix tests relying on concurrency

* improvments:
make the timing larger to avoid issue
 allow cancelation of previous queries to avoid having many sleeping goroutines

* speed up test by removing unecessary sleep

* avoid infinite loop in tests

* fix from comment of PR
  • Loading branch information
mga-chka authored Aug 19, 2022
1 parent 2d6a5c7 commit 9582d5d
Showing 1 changed file with 80 additions and 18 deletions.
98 changes: 80 additions & 18 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"math/rand"
"net"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -21,6 +23,15 @@ import (
"github.com/stretchr/testify/assert"
)

var nbHeavyRequestsInflight int64 = 0
var nbRequestsInflight int64 = 0
var totalNbOfRequests uint64 = 0
var shouldStop uint64 = 0

const max_concurrent_goroutines = 256

const heavyRequestDuration = time.Millisecond * 512

const (
okResponse = "1"
)
Expand Down Expand Up @@ -170,8 +181,7 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
expStatusCode: http.StatusTooManyRequests,
f: func(p *reverseProxy) *http.Response {
p.clusters["cluster"].users["web"].maxConcurrentQueries = 1
go makeHeavyRequest(p, time.Millisecond*100)
time.Sleep(time.Millisecond * 10)
runHeavyRequestInGoroutine(p, 1, true)
return makeRequest(p)
},
},
Expand Down Expand Up @@ -214,8 +224,7 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
expStatusCode: http.StatusTooManyRequests,
f: func(p *reverseProxy) *http.Response {
p.users["default"].maxConcurrentQueries = 1
go makeHeavyRequest(p, time.Millisecond*100)
time.Sleep(time.Millisecond * 10)
runHeavyRequestInGoroutine(p, 1, true)
return makeRequest(p)
},
},
Expand All @@ -227,9 +236,8 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
f: func(p *reverseProxy) *http.Response {
p.users["default"].maxConcurrentQueries = 1
p.users["default"].queueCh = make(chan struct{}, 2)
go makeHeavyRequest(p, time.Millisecond*20)
time.Sleep(time.Millisecond * 10)
return makeHeavyRequest(p, time.Millisecond*20)
runHeavyRequestInGoroutine(p, 1, true)
return makeRequest(p)
},
},
{
Expand All @@ -240,9 +248,8 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
f: func(p *reverseProxy) *http.Response {
p.users["default"].maxConcurrentQueries = 1
p.clusters["cluster"].users["web"].queueCh = make(chan struct{}, 2)
go makeHeavyRequest(p, time.Millisecond*20)
time.Sleep(time.Millisecond * 10)
return makeHeavyRequest(p, time.Millisecond*20)
runHeavyRequestInGoroutine(p, 1, true)
return makeRequest(p)
},
},
{
Expand All @@ -253,11 +260,16 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
f: func(p *reverseProxy) *http.Response {
p.users["default"].maxConcurrentQueries = 1
p.users["default"].queueCh = make(chan struct{}, 1)
go makeHeavyRequest(p, time.Millisecond*100)
time.Sleep(time.Millisecond * 5)
go makeHeavyRequest(p, time.Millisecond*100)
time.Sleep(time.Millisecond * 5)
return makeHeavyRequest(p, time.Millisecond*20)
// we don't wait the requests to be handled by the fakeServer because one of them will be enqueued and not handled
// this is why we handle this part manually
nbRequest := atomic.LoadUint64(&totalNbOfRequests)
runHeavyRequestInGoroutine(p, 2, false)
counter := 0
for atomic.LoadUint64(&totalNbOfRequests) < nbRequest+2 && counter < 200 {
time.Sleep(1 * time.Millisecond)
counter++
}
return makeRequest(p)
},
},
{
Expand Down Expand Up @@ -392,6 +404,7 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
stopAllRequestsInFlight()
proxy, err := getProxy(tc.cfg)
if err != nil {
t.Fatalf("unexpected error: %s", err)
Expand Down Expand Up @@ -636,8 +649,18 @@ func getNetwork(s string) *net.IPNet {
const killQueryPattern = "KILL QUERY WHERE query_id"

var (
// quick fix until the test file is refactored.
// Many tests rely on concurrency of goroutines.
// But, because of the use of sleep, only a few goroutine can run in concurrence and the other are blocked
// cf: https://stackoverflow.com/questions/62527705/will-time-sleep-block-goroutine
// Because of that, on most tests the goroutines are running sequencially
// This id why we increase the number concurrent goroutines (the default number is the number of cpu of the computer running the tests, which is quite low on github actions)
// we must increase this number before the instanciation of the fakeServer, otherwise it's not taken into account during the tests
_ = increaseMaxConccurentGoroutine()
registry = newRequestRegistry()
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&nbRequestsInflight, 1)
defer atomic.AddInt64(&nbRequestsInflight, -1)
if r.URL.Path == "/fast" {
fmt.Fprintln(w, okResponse)
return
Expand Down Expand Up @@ -666,7 +689,19 @@ var (
if err != nil {
panic(fmt.Sprintf("error while imitating delay at fakeServer handler: %s", err))
}
time.Sleep(d)

// at this step the query is really processed by the server
// we're only interested by long queries, i.e the one taking more than 0 msec
if d == heavyRequestDuration {
atomic.AddInt64(&nbHeavyRequestsInflight, 1)
}

// instead of sleeping for the whole duration, we sleep msec per msec so that the query can be
// cancel once it's associated test is over because this test suite generate a lot of goroutines,
// which can be an issue on small cpu like github actions
for i := int64(0); i < d.Milliseconds() && atomic.LoadUint64(&shouldStop) == 0; i++ {
time.Sleep(time.Millisecond)
}
}
}

Expand All @@ -693,6 +728,7 @@ func (tcn *testCloseNotifier) CloseNotify() <-chan bool {
}

func makeCustomRequest(p *reverseProxy, req *http.Request) *http.Response {
atomic.AddUint64(&totalNbOfRequests, 1)
rw := httptest.NewRecorder()
cn := &testCloseNotifier{rw}
p.ServeHTTP(cn, req)
Expand All @@ -713,8 +749,6 @@ func getProxy(c *config.Config) (*reverseProxy, error) {
return nil, err
}

// wait till all hosts will do health-checking
time.Sleep(time.Millisecond * 50)
return proxy, nil
}

Expand Down Expand Up @@ -757,6 +791,10 @@ type requestRegistry struct {
r map[string]bool
}

func increaseMaxConccurentGoroutine() int {
nb := runtime.GOMAXPROCS(max_concurrent_goroutines)
return nb
}
func newRequestRegistry() *requestRegistry {
return &requestRegistry{
r: make(map[string]bool),
Expand Down Expand Up @@ -833,3 +871,27 @@ func TestCalcQueryParamsHash(t *testing.T) {
})
}
}

// this function runs an number of heavyRequests using goroutines
// then waits that all the requests are currently handled by the fakeServer
func runHeavyRequestInGoroutine(p *reverseProxy, nbHeavyRequest int64, shouldWait bool) {
atomic.StoreInt64(&nbHeavyRequestsInflight, 0)
for i := 0; i < int(nbHeavyRequest); i++ {
go makeHeavyRequest(p, heavyRequestDuration)
}
counter := 0
//we wait up to 200 msec for the requests to be handled by the fakeServer because
// the code on github actions can be very slow
for shouldWait && atomic.LoadInt64(&nbHeavyRequestsInflight) < nbHeavyRequest && counter < 200 {
time.Sleep(time.Millisecond)
counter++
}
}

func stopAllRequestsInFlight() {
atomic.StoreUint64(&shouldStop, 1)
for atomic.LoadInt64(&nbRequestsInflight) > 0 {
time.Sleep(time.Millisecond)
}
atomic.StoreUint64(&shouldStop, 0)
}

0 comments on commit 9582d5d

Please sign in to comment.