diff --git a/proxy/proxy.go b/proxy/proxy.go index adf5c87..fbd642e 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -99,7 +99,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error, labels middle log.Errorf("Failed to handle request: %s: %+v", r.URL.String(), err) statusCode = 502 } - requestsTotalIncr(labels, statusCode) + requestsTotalIncr(r, labels, statusCode) if labels.Protocol() == config.Protocol_GRPC.String() { // see https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto code := strconv.Itoa(int(status.ToGRPCCode(statusCode))) @@ -192,22 +192,22 @@ func (p *Proxy) buildMiddleware(ms []*config.Middleware, next http.RoundTripper) return next, nil } -func splitRetryMetricsHandler(e *config.Endpoint) (func(int), func(int, error)) { +func splitRetryMetricsHandler(e *config.Endpoint) (func(*http.Request, int), func(*http.Request, int, error)) { labels := middleware.NewMetricsLabels(e) - success := func(i int) { + success := func(req *http.Request, i int) { if i <= 0 { return } - retryStateIncr(labels, true) + retryStateIncr(req, labels, true) } - failed := func(i int, err error) { + failed := func(req *http.Request, i int, err error) { if i <= 0 { return } if errors.Is(err, context.Canceled) { return } - retryStateIncr(labels, false) + retryStateIncr(req, labels, false) } return success, failed } @@ -236,14 +236,14 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht labels := middleware.NewMetricsLabels(e) markSuccessStat, markFailedStat := splitRetryMetricsHandler(e) retryBreaker := sre.NewBreaker(sre.WithSuccess(0.8)) - markSuccess := func(i int) { - markSuccessStat(i) + markSuccess := func(req *http.Request, i int) { + markSuccessStat(req, i) if i > 0 { retryBreaker.MarkSuccess() } } - markFailed := func(i int, err error) { - markFailedStat(i, err) + markFailed := func(req *http.Request, i int, err error) { + markFailedStat(req, i, err) if i > 0 { retryBreaker.MarkFailed() } @@ -257,7 +257,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht ctx, cancel := context.WithTimeout(ctx, retryStrategy.timeout) defer cancel() defer func() { - requestsDurationObserve(labels, time.Since(startTime).Seconds()) + requestsDurationObserve(req, labels, time.Since(startTime).Seconds()) }() body, err := io.ReadAll(req.Body) @@ -265,7 +265,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht writeError(w, req, err, labels) return } - receivedBytesAdd(labels, int64(len(body))) + receivedBytesAdd(req, labels, int64(len(body))) req.GetBody = func() (io.ReadCloser, error) { reader := bytes.NewReader(body) return io.NopCloser(reader), nil @@ -278,7 +278,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht break } if err := retryBreaker.Allow(); err != nil { - markFailed(i, err) + markFailed(req, i, err) break } } @@ -288,7 +288,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht } // canceled or deadline exceeded if err = ctx.Err(); err != nil { - markFailed(i, err) + markFailed(req, i, err) break } tryCtx, cancel := p.Interceptors.prepareAttemptTimeoutContext(ctx, req, retryStrategy.perTryTimeout) @@ -297,16 +297,16 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht req.Body = io.NopCloser(reader) resp, err = tripper.RoundTrip(req.Clone(tryCtx)) if err != nil { - markFailed(i, err) + markFailed(req, i, err) log.Errorf("Attempt at [%d/%d], failed to handle request: %s: %+v", i+1, retryStrategy.attempts, req.URL.String(), err) continue } if !judgeRetryRequired(retryStrategy.conditions, resp) { reqOpts.LastAttempt = true - markSuccess(i) + markSuccess(req, i) break } - markFailed(i, errors.New("assertion failed")) + markFailed(req, i, errors.New("assertion failed")) // continue the retry loop } if err != nil { @@ -328,11 +328,11 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht sent, err := io.Copy(w, resp.Body) if err != nil { reqOpts.DoneFunc(ctx, selector.DoneInfo{Err: err}) - sentBytesAdd(labels, sent) + sentBytesAdd(req, labels, sent) log.Errorf("Failed to copy backend response body to client: [%s] %s %s %d %+v\n", e.Protocol, e.Method, e.Path, sent, err) return false } - sentBytesAdd(labels, sent) + sentBytesAdd(req, labels, sent) reqOpts.DoneFunc(ctx, selector.DoneInfo{ReplyMD: getReplyMD(e, resp)}) // see https://pkg.go.dev/net/http#example-ResponseWriter-Trailers for k, v := range resp.Trailer { @@ -341,7 +341,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht return true } doCopyBody() - requestsTotalIncr(labels, resp.StatusCode) + requestsTotalIncr(req, labels, resp.StatusCode) }), closer, nil } @@ -352,28 +352,28 @@ func getReplyMD(ep *config.Endpoint, resp *http.Response) selector.ReplyMD { return resp.Header } -func receivedBytesAdd(labels middleware.MetricsLabels, received int64) { - _metricReceivedBytes.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Add(float64(received)) +func receivedBytesAdd(req *http.Request, labels middleware.MetricsLabels, received int64) { + _metricReceivedBytes.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Add(float64(received)) } -func sentBytesAdd(labels middleware.MetricsLabels, sent int64) { - _metricSentBytes.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Add(float64(sent)) +func sentBytesAdd(req *http.Request, labels middleware.MetricsLabels, sent int64) { + _metricSentBytes.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Add(float64(sent)) } -func requestsTotalIncr(labels middleware.MetricsLabels, statusCode int) { - _metricRequestsTotal.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), strconv.Itoa(statusCode), labels.Service(), labels.BasePath()).Inc() +func requestsTotalIncr(req *http.Request, labels middleware.MetricsLabels, statusCode int) { + _metricRequestsTotal.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), strconv.Itoa(statusCode), labels.Service(), labels.BasePath()).Inc() } -func requestsDurationObserve(labels middleware.MetricsLabels, seconds float64) { - _metricRequestsDuration.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Observe(seconds) +func requestsDurationObserve(req *http.Request, labels middleware.MetricsLabels, seconds float64) { + _metricRequestsDuration.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Observe(seconds) } -func retryStateIncr(labels middleware.MetricsLabels, success bool) { +func retryStateIncr(req *http.Request, labels middleware.MetricsLabels, success bool) { if success { - _metricRetryState.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath(), "true").Inc() + _metricRetryState.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath(), "true").Inc() return } - _metricRetryState.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath(), "false").Inc() + _metricRetryState.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath(), "false").Inc() } func closeOnError(closer io.Closer, err *error) {