Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add circuit breaking #447

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,28 @@ type ConnectionPool struct {
// Maximum number of idle connections between chproxy and particuler ClickHouse instance
MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host,omitempty"`

// BreakerOn switch on CircuitBreaker for clickhouse http client.
BreakerOn bool `yaml:"breaker_on,omitempty"`
// BreakerMaxRequests is the maximum number of requests allowed to pass through
// when the CircuitBreaker is half-open.
// If BreakerMaxRequests is 0, the CircuitBreaker allows only 1 request.
BreakerMaxRequests uint32 `yaml:"breaker_max_requests,omitempty"`
// BreakerInterval is the cyclic period of the closed state
// for the CircuitBreaker to clear the internal Counts.
// If BreakerInterval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
BreakerInterval time.Duration `yaml:"breaker_interval,omitempty"`
// BreakerTimeout is the period of the open state,
// after which the state of the CircuitBreaker becomes half-open.
// If BreakerTimeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
BreakerTimeout time.Duration `yaml:"breaker_timeout,omitempty"`
// BreakerFailureRatio is a threshold for determining whether a system is considered ready to handle requests based on its recent failure rate
// Default value is 0.6.
BreakerFailureRatio float64 `yaml:"breaker_failure_ratio,omitempty"`
// BreakerErrorRequests is a variable that represents a threshold for the total number of failed requests
// that should be considered significant enough to trigger some action or state change within the system
// Default value is 3.
BreakerErrorRequests uint32 `yaml:"breaker_error_requests,omitempty"`

// Catches all undefined fields
XXX map[string]interface{} `yaml:",inline"`
}
Expand Down
16 changes: 14 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ var fullConfig = Config{
},

ConnectionPool: ConnectionPool{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 2,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 2,
BreakerOn: true,
BreakerMaxRequests: 10,
BreakerInterval: time.Second,
BreakerTimeout: time.Second,
BreakerFailureRatio: 0.1,
BreakerErrorRequests: 5,
},

Users: []User{
Expand Down Expand Up @@ -935,6 +941,12 @@ param_groups:
connection_pool:
max_idle_conns: 100
max_idle_conns_per_host: 2
breaker_on: true
breaker_max_requests: 10
breaker_interval: 1s
breaker_timeout: 1s
breaker_failure_ratio: 0.1
breaker_error_requests: 5
`, redisPort)
tested := fullConfig.String()
if tested != expected {
Expand Down
15 changes: 15 additions & 0 deletions config/testdata/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ connection_pool:
# Number of connections per ClickHouse host to keep open
# when they are not needed for clients.
max_idle_conns_per_host: 2
# switch on CircuitBreaker for clickhouse http client.
breaker_on: true
# Number of requests allowed to pass through when the CircuitBreaker is half-open.
breaker_max_requests: 10
# Is the cyclic period of the closed state for the CircuitBreaker
# to clear the internal Counts.
breaker_interval: 1s
# Is the period of the open state,
# after which the state of the CircuitBreaker becomes half-open.
breaker_timeout: 1s
# Is a threshold for determining whether a system is considered ready to handle
# requests based on its recent failure rate.
breaker_failure_ratio: 0.1
# Is a variable that represents a threshold for the total number of failed requests.
breaker_error_requests: 5

# Settings for `chproxy` input interfaces.
server:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ require (
github.com/pierrec/lz4 v2.4.0+incompatible
github.com/prometheus/client_golang v1.11.1
github.com/redis/go-redis/v9 v9.0.2
github.com/stretchr/testify v1.8.1
github.com/sony/gobreaker/v2 v2.0.0
github.com/stretchr/testify v1.8.4
golang.org/x/crypto v0.21.0
golang.org/x/time v0.3.0
gopkg.in/yaml.v2 v2.4.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEt
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4=
github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand All @@ -126,6 +128,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
69 changes: 68 additions & 1 deletion proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"sync"
"time"

"github.com/sony/gobreaker/v2"

"github.com/contentsquare/chproxy/cache"
"github.com/contentsquare/chproxy/config"
"github.com/contentsquare/chproxy/internal/topology"
Expand Down Expand Up @@ -51,8 +53,46 @@ type reverseProxy struct {
maxErrorReasonSize int64
}

type circuitBreakingTransport struct {
transport http.RoundTripper
cb *gobreaker.CircuitBreaker[*http.Response]
}

var (
// errCircuitBreaking is an error instance indicating a circuit breaker has been triggered.
errCircuitBreaking = errors.New("circuit breaking status code")

// clickhouseBadStatusForCircuitBreaking is a map that associates HTTP status codes with an empty struct.
// This map is used to quickly check if a ClickHouse operation resulted in a status code that indicates
// a condition severe enough to trigger circuit breaking.
clickhouseBadStatusForCircuitBreaking = map[int]struct{}{
201: {}, // QUOTA_EXCEEDED
241: {}, // MEMORY_LIMIT_EXCEEDED
439: {}, // CANNOT_SCHEDULE_TASK
}
)

func (b *circuitBreakingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
return b.cb.Execute(func() (*http.Response, error) {
resp, err := b.transport.RoundTrip(r)
if err != nil {
return nil, err
}

if _, ok := clickhouseBadStatusForCircuitBreaking[resp.StatusCode]; ok {
return nil, errCircuitBreaking
}

if resp.StatusCode >= http.StatusInternalServerError {
return nil, errCircuitBreaking
}

return resp, err
})
}

func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy {
transport := &http.Transport{
var transport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Expand All @@ -69,6 +109,33 @@ func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy {
ExpectContinueTimeout: 1 * time.Second,
}

if cfgCp.BreakerOn {
var st gobreaker.Settings
st.Name = "ch-proxy"
st.MaxRequests = cfgCp.BreakerMaxRequests
st.Interval = cfgCp.BreakerInterval
st.Timeout = cfgCp.BreakerTimeout

errorRequests := uint32(3)
if cfgCp.BreakerErrorRequests != 0 {
errorRequests = cfgCp.BreakerErrorRequests
}
failureRatio := 0.6
if cfgCp.BreakerFailureRatio != 0 {
failureRatio = cfgCp.BreakerFailureRatio
}

st.ReadyToTrip = func(counts gobreaker.Counts) bool {
fr := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= errorRequests && fr >= failureRatio
}

transport = &circuitBreakingTransport{
transport: transport,
cb: gobreaker.NewCircuitBreaker[*http.Response](st), //nolint:bodyclose
}
}

return &reverseProxy{
rp: &httputil.ReverseProxy{
Director: func(*http.Request) {},
Expand Down
Loading