Skip to content

Commit

Permalink
ACM-14080: update retry logic on forward (#1697)
Browse files Browse the repository at this point in the history
* update retry logic on forward

Signed-off-by: Thibault Mange <[email protected]>

* add tests

Signed-off-by: Thibault Mange <[email protected]>

---------

Signed-off-by: Thibault Mange <[email protected]>
  • Loading branch information
thibaultmg authored Dec 4, 2024
1 parent 9e7521f commit 55809a7
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 20 deletions.
5 changes: 5 additions & 0 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ func (w *Worker) LastMetrics() []*clientmodel.MetricFamily {
}

func (w *Worker) Run(ctx context.Context) {
// Forward metrics immediately on startup.
if err := w.forward(ctx); err != nil {
rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err)
}

ticker := time.NewTicker(w.interval)
defer ticker.Stop()

Expand Down
62 changes: 45 additions & 17 deletions collectors/metrics/pkg/metricsclient/metricsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -503,12 +504,13 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
}
b.MaxElapsedTime = interval / time.Duration(halfInterval)
retryable := func() error {
return c.sendRequest(req.URL.String(), compressed)
return c.sendRequest(ctx, req.URL.String(), compressed)
}
notify := func(err error, t time.Duration) {
msg := fmt.Sprintf("error: %v happened at time: %v", err, t)
logger.Log(c.logger, logger.Warn, "msg", msg)
}

err = backoff.RetryNotify(retryable, b, notify)
if err != nil {
return err
Expand All @@ -518,43 +520,69 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
return nil
}

func (c *Client) sendRequest(serverURL string, body []byte) error {
func (c *Client) sendRequest(ctx context.Context, serverURL string, body []byte) error {
req1, err := http.NewRequest(http.MethodPost, serverURL, bytes.NewBuffer(body))
if err != nil {
msg := "failed to create forwarding request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
wrappedErr := fmt.Errorf("failed to create forwarding request: %w", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)
return backoff.Permanent(wrappedErr)
}

// req.Header.Add("THANOS-TENANT", tenantID)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

req1 = req1.WithContext(ctx)

resp, err := c.client.Do(req1)
if err != nil {
msg := "failed to forward request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)

wrappedErr := fmt.Errorf("failed to forward request: %w", err)
if isTransientError(err) {
return wrappedErr
}

return backoff.Permanent(wrappedErr)
}

c.metrics.ForwardRemoteWriteRequests.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()

if resp.StatusCode/100 != 2 {
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// surfacing upstreams error to our users too
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logger.Log(c.logger, logger.Warn, err)
}
bodyString := string(bodyBytes)
msg := fmt.Sprintf("response status code is %s, response body is %s", resp.Status, bodyString)
logger.Log(c.logger, logger.Warn, msg)
return errors.New(msg)

retErr := fmt.Errorf("response status code is %s, response body is %s", resp.Status, string(bodyBytes))

if isTransientResponseError(resp) {
return retErr
}

return backoff.Permanent(retErr)
}

return nil
}

func isTransientError(err error) bool {
if urlErr, ok := err.(*url.Error); ok {
return urlErr.Timeout()
}

return false
}

func isTransientResponseError(resp *http.Response) bool {
if resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented {
return true
}

if resp.StatusCode == http.StatusTooManyRequests {
return true
}

return false
}
103 changes: 103 additions & 0 deletions collectors/metrics/pkg/metricsclient/metricsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
package metricsclient

import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
clientmodel "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
)

func TestDefaultTransport(t *testing.T) {
Expand Down Expand Up @@ -271,3 +278,99 @@ func timeseriesEqual(t1 []prompb.TimeSeries, t2 []prompb.TimeSeries) (bool, erro

return true, nil
}

func TestClient_RemoteWrite(t *testing.T) {
tests := []struct {
name string
families []*clientmodel.MetricFamily
serverHandler http.HandlerFunc
expect func(t *testing.T, err error, retryCount int)
}{
{
name: "successful write with metrics",
families: []*clientmodel.MetricFamily{mockMetricFamily()},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.NoError(t, err)
assert.Equal(t, 1, retryCount)
},
},
{
name: "no metrics to write",
families: []*clientmodel.MetricFamily{},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.NoError(t, err)
assert.Equal(t, 0, retryCount)
},
},
{
name: "retryable error",
families: []*clientmodel.MetricFamily{mockMetricFamily()},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.Error(t, err)
assert.Greater(t, retryCount, 1)
},
},
{
name: "non-retryable error",
families: []*clientmodel.MetricFamily{mockMetricFamily()},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusConflict)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.Error(t, err)
assert.Equal(t, 1, retryCount)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requestCount := 0

handler := func(w http.ResponseWriter, r *http.Request) {
requestCount++
tt.serverHandler(w, r)
}
ts := httptest.NewServer(http.HandlerFunc(handler))
defer ts.Close()

reg := prometheus.NewRegistry()
clientMetrics := &ClientMetrics{
ForwardRemoteWriteRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "forward_write_requests_total",
Help: "Counter of forward remote write requests.",
}, []string{"status_code"}),
}
client := &Client{logger: log.NewNopLogger(), client: ts.Client(), metrics: clientMetrics}

req, err := http.NewRequest("POST", ts.URL, bytes.NewBuffer([]byte{}))
assert.NoError(t, err)

err = client.RemoteWrite(context.Background(), req, tt.families, 30*time.Second)

tt.expect(t, err, requestCount)
})
}
}

func mockMetricFamily() *clientmodel.MetricFamily {
return &clientmodel.MetricFamily{
Name: proto.String("test_metric"),
Type: clientmodel.MetricType_COUNTER.Enum(),
Metric: []*clientmodel.Metric{
{
Counter: &clientmodel.Counter{Value: proto.Float64(1)},
TimestampMs: proto.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
},
},
}
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

require (
github.com/IBM/controller-filtered-cache v0.3.6
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cloudflare/cfssl v1.6.3
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-co-op/gocron v1.37.0
Expand Down Expand Up @@ -75,7 +75,6 @@ require (
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/brancz/locutus v0.0.0-20210511124350-7a84f4d1bcb3 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ github.com/caarlos0/ctrlc v1.0.0/go.mod h1:CdXpj4rmq0q/1Eb44M9zi2nKB0QraNKuRGYGr
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e/go.mod h1:9IOqJGCPMSc6E5ydlp5NIonxObaeu/Iub/X03EKPVYo=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cavaliercoder/go-cpio v0.0.0-20180626203310-925f9528c45e/go.mod h1:oDpT4efm8tSYHXV5tHSdRvBet/b/QzxZ+XyyPehvm3A=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
Expand Down

0 comments on commit 55809a7

Please sign in to comment.