Skip to content

Commit

Permalink
update retry logic on forward
Browse files Browse the repository at this point in the history
Signed-off-by: Thibault Mange <[email protected]>
  • Loading branch information
thibaultmg committed Dec 13, 2024
1 parent 854a6e8 commit ac0bdc0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 19 deletions.
62 changes: 45 additions & 17 deletions collectors/metrics/pkg/metricsclient/metricsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -509,12 +510,13 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
}
b.MaxElapsedTime = interval / time.Duration(halfInterval)
retryable := func() error {
return c.sendRequest(req.URL.String(), compressed)
return c.sendRequest(ctx, req.URL.String(), compressed)
}
notify := func(err error, t time.Duration) {
msg := fmt.Sprintf("error: %v happened at time: %v", err, t)
logger.Log(c.logger, logger.Warn, "msg", msg)
}

err = backoff.RetryNotify(retryable, b, notify)
if err != nil {
return err
Expand All @@ -524,43 +526,69 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
return nil
}

func (c *Client) sendRequest(serverURL string, body []byte) error {
func (c *Client) sendRequest(ctx context.Context, serverURL string, body []byte) error {
req1, err := http.NewRequest(http.MethodPost, serverURL, bytes.NewBuffer(body))
if err != nil {
msg := "failed to create forwarding request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
wrappedErr := fmt.Errorf("failed to create forwarding request: %w", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)
return backoff.Permanent(wrappedErr)
}

// req.Header.Add("THANOS-TENANT", tenantID)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

req1 = req1.WithContext(ctx)

resp, err := c.client.Do(req1)
if err != nil {
msg := "failed to forward request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)

wrappedErr := fmt.Errorf("failed to forward request: %w", err)
if isTransientError(err) {
return wrappedErr
}

return backoff.Permanent(wrappedErr)
}

c.metrics.ForwardRemoteWriteRequests.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()

if resp.StatusCode/100 != 2 {
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// surfacing upstreams error to our users too
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logger.Log(c.logger, logger.Warn, err)
}
bodyString := string(bodyBytes)
msg := fmt.Sprintf("response status code is %s, response body is %s", resp.Status, bodyString)
logger.Log(c.logger, logger.Warn, msg)
return errors.New(msg)

retErr := fmt.Errorf("response status code is %s, response body is %s", resp.Status, string(bodyBytes))

if isTransientResponseError(resp) {
return retErr
}

return backoff.Permanent(retErr)
}

return nil
}

func isTransientError(err error) bool {
if urlErr, ok := err.(*url.Error); ok {
return urlErr.Timeout()
}

return false
}

func isTransientResponseError(resp *http.Response) bool {
if resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented {
return true
}

if resp.StatusCode == http.StatusTooManyRequests {
return true
}

return false
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
github.com/IBM/controller-filtered-cache v0.3.6
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cloudflare/cfssl v1.6.4
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-co-op/gocron v1.37.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -844,10 +844,11 @@ github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/brancz/locutus v0.0.0-20210511124350-7a84f4d1bcb3 h1:N7vTNNytk6OFY5WoPJ+cSxxlRbNpCUWdyPW8nDHp0Sw=
github.com/brancz/locutus v0.0.0-20210511124350-7a84f4d1bcb3/go.mod h1:n+EREm6Tinr9eHmIls4DzojRkkA4IrBe6xRpO4HEw0I=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
Expand Down

0 comments on commit ac0bdc0

Please sign in to comment.