Skip to content

Commit 80f13ab

Browse files
committed
Rate-limitting
1 parent 9afe612 commit 80f13ab

File tree

4 files changed

+21
-41
lines changed

4 files changed

+21
-41
lines changed

nginx/config/base.go

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ events {
2626
}
2727
2828
http {
29+
limit_req_zone $binary_remote_addr zone=per_ip_limit:10m rate=100r/s;
2930
3031
##
3132
# Basic Settings

nginx/config/proxy.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ upstream registry-override {
2424
}
2525
2626
upstream proxy-server {
27-
server {{.proxy-server}};
27+
server {{.proxy_server}};
2828
}
2929
3030
{{range .ports}}
@@ -64,6 +64,9 @@ server {
6464
}
6565
6666
location /proxy {
67+
limit_req zone=per_ip_limit burst=20 nodelay;
68+
limit_req_status 429;
69+
6770
proxy_pass http://proxy-server;
6871
6972
set $hostheader $hostname;

proxy/proxyserver/prefetch.go

+15-39
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
type PrefetchHandler struct {
3939
clusterClient blobclient.ClusterClient
4040
tagClient tagclient.Client
41-
threshold int64
4241

4342
metrics tally.Scope
4443
}
@@ -108,8 +107,8 @@ func writeInternalError(w http.ResponseWriter, msg string, traceId string) {
108107
}
109108

110109
// NewPrefetchHandler creates a new preheat handler.
111-
func NewPrefetchHandler(client blobclient.ClusterClient, tagClient tagclient.Client, threshold int64, metrics tally.Scope) *PrefetchHandler {
112-
return &PrefetchHandler{client, tagClient, threshold, metrics}
110+
func NewPrefetchHandler(client blobclient.ClusterClient, tagClient tagclient.Client, metrics tally.Scope) *PrefetchHandler {
111+
return &PrefetchHandler{client, tagClient, metrics}
113112
}
114113

115114
// Handle processes the prefetch request.
@@ -149,21 +148,13 @@ func (ph *PrefetchHandler) Handle(w http.ResponseWriter, r *http.Request) {
149148
}
150149

151150
// Process manifest (ManifestList or single Manifest)
152-
size, digests, err := ph.processManifest(logger, namespace, buf.Bytes())
151+
digests, err := ph.processManifest(logger, namespace, buf.Bytes())
153152
if err != nil {
154153
writeInternalError(w, fmt.Sprintf("failed to process manifest: %s", err), reqBody.TraceId)
155154
return
156155
}
157-
size += stat.Size
158156

159157
metrics := ph.metrics.SubScope("prefetch")
160-
if size < ph.threshold {
161-
metrics.Counter("below_threshold").Inc(1)
162-
logger.With("size", size, "tag", reqBody.Tag, "threshold", ph.threshold).Info("Size is too large, skipping prefetch")
163-
writePrefetchResponse(w, http.StatusUnprocessableEntity, reqBody.Tag, fmt.Sprintf("prefetching not initiated as imagesize < %d", ph.threshold), reqBody.TraceId)
164-
return
165-
}
166-
167158
metrics.Counter("initiated").Inc(1)
168159
writePrefetchResponse(w, http.StatusOK, reqBody.Tag, "prefetching initiated successfully", reqBody.TraceId)
169160

@@ -209,7 +200,7 @@ func parseTag(tag string) (namespace, name string, err error) {
209200
}
210201

211202
// processManifest handles both ManifestLists and single Manifests.
212-
func (ph *PrefetchHandler) processManifest(logger *zap.SugaredLogger, namespace string, manifestBytes []byte) (int64, []core.Digest, error) {
203+
func (ph *PrefetchHandler) processManifest(logger *zap.SugaredLogger, namespace string, manifestBytes []byte) ([]core.Digest, error) {
213204
reader := bytes.NewReader(manifestBytes)
214205

215206
var manifestList manifestlist.ManifestList
@@ -222,66 +213,51 @@ func (ph *PrefetchHandler) processManifest(logger *zap.SugaredLogger, namespace
222213
var manifest schema2.Manifest
223214
if err := json.NewDecoder(reader).Decode(&manifest); err != nil {
224215
logger.With("namespace", namespace).Errorf("Failed to parse single manifest: %v", err)
225-
return 0, nil, fmt.Errorf("invalid single manifest: %w", err)
216+
return nil, fmt.Errorf("invalid single manifest: %w", err)
226217
}
227218

228-
return ph.processLayers(namespace, manifest.Layers)
219+
return ph.processLayers(manifest.Layers)
229220
}
230221

231-
func (ph *PrefetchHandler) processManifestList(logger *zap.SugaredLogger, namespace string, manifestList manifestlist.ManifestList) (int64, []core.Digest, error) {
222+
func (ph *PrefetchHandler) processManifestList(logger *zap.SugaredLogger, namespace string, manifestList manifestlist.ManifestList) ([]core.Digest, error) {
232223
digestsResult := make([]core.Digest, 0)
233-
sizeResult := int64(0)
234224
buf := &bytes.Buffer{}
235225
for _, manifestDescriptor := range manifestList.Manifests {
236226
manifestDigestHex := manifestDescriptor.Digest.Hex()
237227
digest, err := core.NewSHA256DigestFromHex(manifestDigestHex)
238228
if err != nil {
239-
return 0, nil, fmt.Errorf("digest %s, failed to parse manifest digest: %s", manifestDigestHex, err)
229+
return nil, fmt.Errorf("digest %s, failed to parse manifest digest: %s", manifestDigestHex, err)
240230
}
241231

242-
stat, err := ph.clusterClient.Stat(namespace, digest)
243-
if err != nil {
244-
return 0, nil, fmt.Errorf("failed to get meta info: %w", err)
245-
}
246-
247-
sizeResult += stat.Size
248232
if err := ph.clusterClient.DownloadBlob(namespace, digest, buf); err != nil {
249233
logger.Errorf("Failed to download manifest blob: %s", err)
250234
continue
251235
}
252236

253237
var manifest schema2.Manifest
254238
if err := json.NewDecoder(buf).Decode(&manifest); err != nil {
255-
return 0, nil, fmt.Errorf("failed to parse manifest: %s", err)
239+
return nil, fmt.Errorf("failed to parse manifest: %s", err)
256240
}
257241

258-
size, digests, err := ph.processLayers(namespace, manifest.Layers)
242+
digests, err := ph.processLayers(manifest.Layers)
259243
if err != nil {
260-
return 0, nil, err
244+
return nil, err
261245
}
262246
digestsResult = append(digestsResult, digests...)
263-
sizeResult += size
264247
buf.Reset()
265248
}
266-
return sizeResult, digestsResult, nil
249+
return digestsResult, nil
267250
}
268251

269-
func (ph *PrefetchHandler) processLayers(namespace string, layers []distribution.Descriptor) (int64, []core.Digest, error) {
252+
func (ph *PrefetchHandler) processLayers(layers []distribution.Descriptor) ([]core.Digest, error) {
270253
digests := make([]core.Digest, 0, len(layers))
271-
var totalSize int64
272254

273255
for _, layer := range layers {
274256
digest, err := core.NewSHA256DigestFromHex(layer.Digest.Hex())
275257
if err != nil {
276-
return 0, nil, fmt.Errorf("invalid layer digest: %w", err)
277-
}
278-
279-
stat, err := ph.clusterClient.Stat(namespace, digest)
280-
if err != nil {
281-
return 0, nil, fmt.Errorf("failed to get metadata for layer %s: %w", digest, err)
258+
return nil, fmt.Errorf("invalid layer digest: %w", err)
282259
}
283-
totalSize += stat.Size
284260
digests = append(digests, digest)
285261
}
286-
return totalSize, digests, nil
262+
return digests, nil
287263
}

proxy/proxyserver/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func New(
4848
return &Server{
4949
metricsScope,
5050
NewPreheatHandler(client),
51-
NewPrefetchHandler(client, tagClient, config.Threshold, metricsScope),
51+
NewPrefetchHandler(client, tagClient, metricsScope),
5252
config,
5353
}
5454
}

0 commit comments

Comments
 (0)