forked from kubernetes/test-infra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcoalesce.go
139 lines (119 loc) · 4.04 KB
/
coalesce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ghcache
import (
"bufio"
"bytes"
"net/http"
"net/http/httputil"
"sync"
"github.com/sirupsen/logrus"
"k8s.io/test-infra/ghproxy/ghmetrics"
)
// requestCoalescer allows concurrent requests for the same URI to share a
// single upstream request and response.
type requestCoalescer struct {
sync.Mutex
keys map[string]*responseWaiter
delegate http.RoundTripper
hasher ghmetrics.Hasher
}
type responseWaiter struct {
*sync.Cond
waiting bool
resp []byte
err error
}
// RoundTrip coalesces concurrent GET requests for the same URI by blocking
// the later requests until the first request returns and then sharing the
// response between all requests.
//
// Notes: Deadlock shouldn't be possible because the map lock is always
// acquired before responseWaiter lock if both locks are to be held and we
// never hold multiple responseWaiter locks.
func (r *requestCoalescer) RoundTrip(req *http.Request) (*http.Response, error) {
// Only coalesce GET requests
if req.Method != http.MethodGet {
return r.delegate.RoundTrip(req)
}
var cacheMode = ModeError
resp, err := func() (*http.Response, error) {
key := req.URL.String()
r.Lock()
waiter, ok := r.keys[key]
if ok {
// Earlier request in flight. Wait for it's response.
if req.Body != nil {
defer req.Body.Close() // Since we won't pass the request we must close it.
}
waiter.L.Lock()
r.Unlock()
waiter.waiting = true
// The documentation for Wait() says:
// "Because c.L is not locked when Wait first resumes, the caller typically
// cannot assume that the condition is true when Wait returns. Instead, the
// caller should Wait in a loop."
// This does not apply to this use of Wait() because the condition we are
// waiting for remains true once it becomes true. This lets us avoid the
// normal check to see if the condition has switched back to false between
// the signal being sent and this thread acquiring the lock.
waiter.Wait()
waiter.L.Unlock()
// Earlier request completed.
if waiter.err != nil {
// Don't log the error, it will be logged by requester.
return nil, waiter.err
}
resp, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(waiter.resp)), nil)
if err != nil {
logrus.WithField("cache-key", key).WithError(err).Error("Error loading response.")
return nil, err
}
cacheMode = ModeCoalesced
return resp, nil
}
// No earlier request in flight (common case).
// Register a new responseWaiter and make the request ourself.
waiter = &responseWaiter{Cond: sync.NewCond(&sync.Mutex{})}
r.keys[key] = waiter
r.Unlock()
resp, err := r.delegate.RoundTrip(req)
// Real response received. Remove this responseWaiter from the map THEN
// wake any requesters that were waiting on this response.
r.Lock()
delete(r.keys, key)
r.Unlock()
waiter.L.Lock()
if waiter.waiting {
if err != nil {
waiter.resp, waiter.err = nil, err
} else {
// Copy the response before releasing to waiter(s).
waiter.resp, waiter.err = httputil.DumpResponse(resp, true)
}
waiter.Broadcast()
}
waiter.L.Unlock()
if err != nil {
logrus.WithField("cache-key", key).WithError(err).Warn("Error from cache transport layer.")
return nil, err
}
cacheMode = cacheResponseMode(resp.Header)
return resp, nil
}()
ghmetrics.CollectCacheRequestMetrics(string(cacheMode), req.URL.Path, req.Header.Get("User-Agent"), r.hasher.Hash(req))
if resp != nil {
resp.Header.Set(CacheModeHeader, string(cacheMode))
}
return resp, err
}