From f405c4c7e0ce4f217b0c744ed3a66629a4677532 Mon Sep 17 00:00:00 2001 From: "v.raskin" Date: Fri, 19 Jul 2024 17:43:32 +0300 Subject: [PATCH] add circuit breaking --- config/config.go | 22 +++++ config/config_test.go | 16 +++- config/testdata/full.yml | 15 ++++ go.mod | 3 +- go.sum | 4 + proxy.go | 69 ++++++++++++++- proxy_test.go | 185 +++++++++++++++++++++++++++++++++++++-- 7 files changed, 303 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index 7d32d08d..933d8224 100644 --- a/config/config.go +++ b/config/config.go @@ -1052,6 +1052,28 @@ type ConnectionPool struct { // Maximum number of idle connections between chproxy and particuler ClickHouse instance MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host,omitempty"` + // BreakerOn switch on CircuitBreaker for clickhouse http client. + BreakerOn bool `yaml:"breaker_on,omitempty"` + // BreakerMaxRequests is the maximum number of requests allowed to pass through + // when the CircuitBreaker is half-open. + // If BreakerMaxRequests is 0, the CircuitBreaker allows only 1 request. + BreakerMaxRequests uint32 `yaml:"breaker_max_requests,omitempty"` + // BreakerInterval is the cyclic period of the closed state + // for the CircuitBreaker to clear the internal Counts. + // If BreakerInterval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state. + BreakerInterval time.Duration `yaml:"breaker_interval,omitempty"` + // BreakerTimeout is the period of the open state, + // after which the state of the CircuitBreaker becomes half-open. + // If BreakerTimeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds. + BreakerTimeout time.Duration `yaml:"breaker_timeout,omitempty"` + // BreakerFailureRatio is a threshold for determining whether a system is considered ready to handle requests based on its recent failure rate + // Default value is 0.6. + BreakerFailureRatio float64 `yaml:"breaker_failure_ratio,omitempty"` + // BreakerErrorRequests is a variable that represents a threshold for the total number of failed requests + // that should be considered significant enough to trigger some action or state change within the system + // Default value is 3. + BreakerErrorRequests uint32 `yaml:"breaker_error_requests,omitempty"` + // Catches all undefined fields XXX map[string]interface{} `yaml:",inline"` } diff --git a/config/config_test.go b/config/config_test.go index 1d07cecc..b281a3ab 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -210,8 +210,14 @@ var fullConfig = Config{ }, ConnectionPool: ConnectionPool{ - MaxIdleConns: 100, - MaxIdleConnsPerHost: 2, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 2, + BreakerOn: true, + BreakerMaxRequests: 10, + BreakerInterval: time.Second, + BreakerTimeout: time.Second, + BreakerFailureRatio: 0.1, + BreakerErrorRequests: 5, }, Users: []User{ @@ -935,6 +941,12 @@ param_groups: connection_pool: max_idle_conns: 100 max_idle_conns_per_host: 2 + breaker_on: true + breaker_max_requests: 10 + breaker_interval: 1s + breaker_timeout: 1s + breaker_failure_ratio: 0.1 + breaker_error_requests: 5 `, redisPort) tested := fullConfig.String() if tested != expected { diff --git a/config/testdata/full.yml b/config/testdata/full.yml index 140d6f3c..1c3dc4e3 100644 --- a/config/testdata/full.yml +++ b/config/testdata/full.yml @@ -106,6 +106,21 @@ connection_pool: # Number of connections per ClickHouse host to keep open # when they are not needed for clients. max_idle_conns_per_host: 2 + # switch on CircuitBreaker for clickhouse http client. + breaker_on: true + # Number of requests allowed to pass through when the CircuitBreaker is half-open. + breaker_max_requests: 10 + # Is the cyclic period of the closed state for the CircuitBreaker + # to clear the internal Counts. + breaker_interval: 1s + # Is the period of the open state, + # after which the state of the CircuitBreaker becomes half-open. + breaker_timeout: 1s + # Is a threshold for determining whether a system is considered ready to handle + # requests based on its recent failure rate. + breaker_failure_ratio: 0.1 + # Is a variable that represents a threshold for the total number of failed requests. + breaker_error_requests: 5 # Settings for `chproxy` input interfaces. server: diff --git a/go.mod b/go.mod index 6bf4f59c..fb077627 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,8 @@ require ( github.com/pierrec/lz4 v2.4.0+incompatible github.com/prometheus/client_golang v1.11.1 github.com/redis/go-redis/v9 v9.0.2 - github.com/stretchr/testify v1.8.1 + github.com/sony/gobreaker/v2 v2.0.0 + github.com/stretchr/testify v1.8.4 golang.org/x/crypto v0.21.0 golang.org/x/time v0.3.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 61868a65..574b6c64 100644 --- a/go.sum +++ b/go.sum @@ -115,6 +115,8 @@ github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEt github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4= +github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -126,6 +128,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/proxy.go b/proxy.go index 24940762..e400eb6f 100644 --- a/proxy.go +++ b/proxy.go @@ -15,6 +15,8 @@ import ( "sync" "time" + "github.com/sony/gobreaker/v2" + "github.com/contentsquare/chproxy/cache" "github.com/contentsquare/chproxy/config" "github.com/contentsquare/chproxy/internal/topology" @@ -51,8 +53,46 @@ type reverseProxy struct { maxErrorReasonSize int64 } +type circuitBreakingTransport struct { + transport http.RoundTripper + cb *gobreaker.CircuitBreaker[*http.Response] +} + +var ( + // errCircuitBreaking is an error instance indicating a circuit breaker has been triggered. + errCircuitBreaking = errors.New("circuit breaking status code") + + // clickhouseBadStatusForCircuitBreaking is a map that associates HTTP status codes with an empty struct. + // This map is used to quickly check if a ClickHouse operation resulted in a status code that indicates + // a condition severe enough to trigger circuit breaking. + clickhouseBadStatusForCircuitBreaking = map[int]struct{}{ + 201: {}, // QUOTA_EXCEEDED + 241: {}, // MEMORY_LIMIT_EXCEEDED + 439: {}, // CANNOT_SCHEDULE_TASK + } +) + +func (b *circuitBreakingTransport) RoundTrip(r *http.Request) (*http.Response, error) { + return b.cb.Execute(func() (*http.Response, error) { + resp, err := b.transport.RoundTrip(r) + if err != nil { + return nil, err + } + + if _, ok := clickhouseBadStatusForCircuitBreaking[resp.StatusCode]; ok { + return nil, errCircuitBreaking + } + + if resp.StatusCode >= http.StatusInternalServerError { + return nil, errCircuitBreaking + } + + return resp, err + }) +} + func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy { - transport := &http.Transport{ + var transport http.RoundTripper = &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { dialer := &net.Dialer{ @@ -69,6 +109,33 @@ func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy { ExpectContinueTimeout: 1 * time.Second, } + if cfgCp.BreakerOn { + var st gobreaker.Settings + st.Name = "ch-proxy" + st.MaxRequests = cfgCp.BreakerMaxRequests + st.Interval = cfgCp.BreakerInterval + st.Timeout = cfgCp.BreakerTimeout + + errorRequests := uint32(3) + if cfgCp.BreakerErrorRequests != 0 { + errorRequests = cfgCp.BreakerErrorRequests + } + failureRatio := 0.6 + if cfgCp.BreakerFailureRatio != 0 { + failureRatio = cfgCp.BreakerFailureRatio + } + + st.ReadyToTrip = func(counts gobreaker.Counts) bool { + fr := float64(counts.TotalFailures) / float64(counts.Requests) + return counts.Requests >= errorRequests && fr >= failureRatio + } + + transport = &circuitBreakingTransport{ + transport: transport, + cb: gobreaker.NewCircuitBreaker[*http.Response](st), //nolint:bodyclose + } + } + return &reverseProxy{ rp: &httputil.ReverseProxy{ Director: func(*http.Request) {}, diff --git a/proxy_test.go b/proxy_test.go index aa2f149a..b67a5fd4 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -8,25 +8,24 @@ import ( "io" "math/rand" "net" + "net/http" + "net/http/httptest" + "net/url" "os" "regexp" "runtime" + "strconv" "strings" "sync" "sync/atomic" "testing" "time" - "golang.org/x/time/rate" - "github.com/contentsquare/chproxy/cache" - - "net/http" - "net/http/httptest" - "net/url" - "github.com/contentsquare/chproxy/config" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "golang.org/x/time/rate" ) var nbHeavyRequestsInflight int64 = 0 @@ -160,6 +159,44 @@ var goodCfgWithCacheAndMaxErrorReasonSize = &config.Config{ }, }, } +var goodCfgWithBreaker = &config.Config{ + Server: config.Server{ + Metrics: config.Metrics{ + Namespace: "proxy_test"}, + }, + Clusters: []config.Cluster{ + { + Name: "cluster", + Scheme: "http", + Replicas: []config.Replica{ + { + Nodes: []string{"localhost:8123"}, + }, + }, + ClusterUsers: []config.ClusterUser{ + { + Name: "web", + }, + }, + }, + }, + Users: []config.User{ + { + Name: defaultUsername, + ToCluster: "cluster", + ToUser: "web", + }, + }, + ParamGroups: []config.ParamGroup{ + {Name: "param_test", Params: []config.Param{{Key: "param_key", Value: "param_value"}}}, + }, + ConnectionPool: config.ConnectionPool{ + BreakerOn: true, + BreakerErrorRequests: 1, + BreakerFailureRatio: 0.5, + BreakerTimeout: time.Second, + }, +} func newConfiguredProxy(cfg *config.Config) (*reverseProxy, error) { p := newReverseProxy(&cfg.ConnectionPool) @@ -795,6 +832,17 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) { }, transactionFailReason: "unknown error reason", }, + { + cfg: goodCfgWithBreaker, + name: "error with breaker on", + expResponse: badGatewayResponse, + expStatusCode: http.StatusBadGateway, + f: func(p *reverseProxy) *http.Response { + req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil) + return makeCustomRequest(p, req) + }, + transactionFailReason: "unknown error reason1", + }, } for _, tc := range testCases { @@ -807,6 +855,7 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) { resp := tc.f(proxy) b := bbToString(t, resp.Body) resp.Body.Close() + if len(tc.cfg.Caches) != 0 { compareTransactionFailReason(t, proxy, tc.cfg.Clusters[0].ClusterUsers[0], query, tc.transactionFailReason) } @@ -1321,3 +1370,125 @@ func stopAllRequestsInFlight() { } atomic.StoreUint64(&shouldStop, 0) } + +func Test_CircuitBreaking(t *testing.T) { + t.Parallel() + + promLabels := prometheus.Labels{ + "user": "", "cluster": "", "cluster_user": "", "replica": "", "cluster_node": "", + } + + testCases := []struct { + name string + statusCodes []int + visitServer int32 + connectionPool *config.ConnectionPool + }{ + { + name: "all ok", + statusCodes: []int{200, 200, 200, 200, 200, 200, 200, 200, 200, 200}, + visitServer: 10, + }, + { + name: "50% rate errors all", + statusCodes: []int{439, 201, 241, 439, 201, 241, 439, 201, 241}, + visitServer: 1, + }, + { + name: "50% rate errors float", + statusCodes: []int{200, 200, 200, 439, 201, 241, 439, 201, 241, 439, 201, 241}, + visitServer: 6, + }, + { + name: "10% rate errors all", + statusCodes: []int{439, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200}, + visitServer: 1, + connectionPool: &config.ConnectionPool{ + BreakerOn: true, + BreakerErrorRequests: 1, + BreakerFailureRatio: 0.1, + BreakerTimeout: time.Second, + }, + }, + { + name: "50% rate errors after 5 times", + statusCodes: []int{439, 439, 439, 439, 439, 439, 439, 439, 439, 439, 439, 439, 439, 439, 439}, + visitServer: 5, + connectionPool: &config.ConnectionPool{ + BreakerOn: true, + BreakerErrorRequests: 5, + BreakerFailureRatio: 0.5, + BreakerTimeout: time.Second, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var visitServer int32 + // init test serv + serv := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + atomic.AddInt32(&visitServer, 1) + + code := request.URL.Query().Get("code") + + if statusCode, err := strconv.Atoi(code); err == nil { + writer.WriteHeader(statusCode) // CH code + + if _, ok := clickhouseBadStatusForCircuitBreaking[statusCode]; ok { + writer.Write([]byte("Fail CH")) + + return + } + } + + writer.Write([]byte("OK")) + })) + defer serv.Close() + + cfg := *goodCfgWithBreaker + if tc.connectionPool != nil { + cfg.ConnectionPool = *tc.connectionPool + } + + initMetrics(&cfg) + + p, err := getProxy(&cfg) + assert.NoError(t, err) + + for _, code := range tc.statusCodes { + target := fmt.Sprintf("%s/test?code=%d", serv.URL, code) + req := httptest.NewRequest(http.MethodGet, target, http.NoBody) + + rec := httptest.NewRecorder() + recNot200 := httptest.NewRecorder() + + cls := testGetCluster() + p.proxyRequest(&scope{ + user: testGetUser(), + clusterUser: testGetClusterUser(), + cluster: cls, + host: cls.getHost(), + labels: promLabels, + }, &statResponseWriter{ + ResponseWriter: &testCloseNotifier{ + ResponseWriter: rec, + }, + bytesWritten: configSuccess, + }, &statResponseWriter{ + ResponseWriter: recNot200, + bytesWritten: configSuccess, + }, req) + + if code == http.StatusOK { + assert.Equal(t, http.StatusOK, rec.Result().StatusCode) + assert.Equal(t, http.StatusOK, recNot200.Result().StatusCode) + } else { + assert.NotEqual(t, http.StatusOK, recNot200.Result().StatusCode) + } + } + + assert.Equal(t, tc.visitServer, visitServer) + }) + } +}