Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using request method in metrics to seperate the OPTIONS request #228

Merged
merged 1 commit into from
May 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error, labels middle
log.Errorf("Failed to handle request: %s: %+v", r.URL.String(), err)
statusCode = 502
}
requestsTotalIncr(labels, statusCode)
requestsTotalIncr(r, labels, statusCode)
if labels.Protocol() == config.Protocol_GRPC.String() {
// see https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
code := strconv.Itoa(int(status.ToGRPCCode(statusCode)))
Expand Down Expand Up @@ -192,22 +192,22 @@ func (p *Proxy) buildMiddleware(ms []*config.Middleware, next http.RoundTripper)
return next, nil
}

func splitRetryMetricsHandler(e *config.Endpoint) (func(int), func(int, error)) {
func splitRetryMetricsHandler(e *config.Endpoint) (func(*http.Request, int), func(*http.Request, int, error)) {
labels := middleware.NewMetricsLabels(e)
success := func(i int) {
success := func(req *http.Request, i int) {
if i <= 0 {
return
}
retryStateIncr(labels, true)
retryStateIncr(req, labels, true)
}
failed := func(i int, err error) {
failed := func(req *http.Request, i int, err error) {
if i <= 0 {
return
}
if errors.Is(err, context.Canceled) {
return
}
retryStateIncr(labels, false)
retryStateIncr(req, labels, false)
}
return success, failed
}
Expand Down Expand Up @@ -236,14 +236,14 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht
labels := middleware.NewMetricsLabels(e)
markSuccessStat, markFailedStat := splitRetryMetricsHandler(e)
retryBreaker := sre.NewBreaker(sre.WithSuccess(0.8))
markSuccess := func(i int) {
markSuccessStat(i)
markSuccess := func(req *http.Request, i int) {
markSuccessStat(req, i)
if i > 0 {
retryBreaker.MarkSuccess()
}
}
markFailed := func(i int, err error) {
markFailedStat(i, err)
markFailed := func(req *http.Request, i int, err error) {
markFailedStat(req, i, err)
if i > 0 {
retryBreaker.MarkFailed()
}
Expand All @@ -257,15 +257,15 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht
ctx, cancel := context.WithTimeout(ctx, retryStrategy.timeout)
defer cancel()
defer func() {
requestsDurationObserve(labels, time.Since(startTime).Seconds())
requestsDurationObserve(req, labels, time.Since(startTime).Seconds())
}()

body, err := io.ReadAll(req.Body)
if err != nil {
writeError(w, req, err, labels)
return
}
receivedBytesAdd(labels, int64(len(body)))
receivedBytesAdd(req, labels, int64(len(body)))
req.GetBody = func() (io.ReadCloser, error) {
reader := bytes.NewReader(body)
return io.NopCloser(reader), nil
Expand All @@ -278,7 +278,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht
break
}
if err := retryBreaker.Allow(); err != nil {
markFailed(i, err)
markFailed(req, i, err)
break
}
}
Expand All @@ -288,7 +288,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht
}
// canceled or deadline exceeded
if err = ctx.Err(); err != nil {
markFailed(i, err)
markFailed(req, i, err)
break
}
tryCtx, cancel := p.Interceptors.prepareAttemptTimeoutContext(ctx, req, retryStrategy.perTryTimeout)
Expand All @@ -297,16 +297,16 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht
req.Body = io.NopCloser(reader)
resp, err = tripper.RoundTrip(req.Clone(tryCtx))
if err != nil {
markFailed(i, err)
markFailed(req, i, err)
log.Errorf("Attempt at [%d/%d], failed to handle request: %s: %+v", i+1, retryStrategy.attempts, req.URL.String(), err)
continue
}
if !judgeRetryRequired(retryStrategy.conditions, resp) {
reqOpts.LastAttempt = true
markSuccess(i)
markSuccess(req, i)
break
}
markFailed(i, errors.New("assertion failed"))
markFailed(req, i, errors.New("assertion failed"))
// continue the retry loop
}
if err != nil {
Expand All @@ -328,11 +328,11 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht
sent, err := io.Copy(w, resp.Body)
if err != nil {
reqOpts.DoneFunc(ctx, selector.DoneInfo{Err: err})
sentBytesAdd(labels, sent)
sentBytesAdd(req, labels, sent)
log.Errorf("Failed to copy backend response body to client: [%s] %s %s %d %+v\n", e.Protocol, e.Method, e.Path, sent, err)
return false
}
sentBytesAdd(labels, sent)
sentBytesAdd(req, labels, sent)
reqOpts.DoneFunc(ctx, selector.DoneInfo{ReplyMD: getReplyMD(e, resp)})
// see https://pkg.go.dev/net/http#example-ResponseWriter-Trailers
for k, v := range resp.Trailer {
Expand All @@ -341,7 +341,7 @@ func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (_ ht
return true
}
doCopyBody()
requestsTotalIncr(labels, resp.StatusCode)
requestsTotalIncr(req, labels, resp.StatusCode)
}), closer, nil
}

Expand All @@ -352,28 +352,28 @@ func getReplyMD(ep *config.Endpoint, resp *http.Response) selector.ReplyMD {
return resp.Header
}

func receivedBytesAdd(labels middleware.MetricsLabels, received int64) {
_metricReceivedBytes.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Add(float64(received))
func receivedBytesAdd(req *http.Request, labels middleware.MetricsLabels, received int64) {
_metricReceivedBytes.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Add(float64(received))
}

func sentBytesAdd(labels middleware.MetricsLabels, sent int64) {
_metricSentBytes.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Add(float64(sent))
func sentBytesAdd(req *http.Request, labels middleware.MetricsLabels, sent int64) {
_metricSentBytes.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Add(float64(sent))
}

func requestsTotalIncr(labels middleware.MetricsLabels, statusCode int) {
_metricRequestsTotal.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), strconv.Itoa(statusCode), labels.Service(), labels.BasePath()).Inc()
func requestsTotalIncr(req *http.Request, labels middleware.MetricsLabels, statusCode int) {
_metricRequestsTotal.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), strconv.Itoa(statusCode), labels.Service(), labels.BasePath()).Inc()
}

func requestsDurationObserve(labels middleware.MetricsLabels, seconds float64) {
_metricRequestsDuration.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Observe(seconds)
func requestsDurationObserve(req *http.Request, labels middleware.MetricsLabels, seconds float64) {
_metricRequestsDuration.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Observe(seconds)
}

func retryStateIncr(labels middleware.MetricsLabels, success bool) {
func retryStateIncr(req *http.Request, labels middleware.MetricsLabels, success bool) {
if success {
_metricRetryState.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath(), "true").Inc()
_metricRetryState.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath(), "true").Inc()
return
}
_metricRetryState.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath(), "false").Inc()
_metricRetryState.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath(), "false").Inc()
}

func closeOnError(closer io.Closer, err *error) {
Expand Down
Loading