Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for leaky bucket throttling (rps + burst) in ffresty client #141

Merged
merged 11 commits into from
Jun 7, 2024
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
gitlab.com/hfuss/mux-prometheus v0.0.5
golang.org/x/crypto v0.18.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gotest.tools v2.2.0+incompatible
)
Expand Down
14 changes: 13 additions & 1 deletion pkg/ffresty/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -67,6 +67,14 @@ const (
HTTPIdleTimeout = "idleTimeout"
// HTTPMaxIdleConns the max number of idle connections to hold pooled
HTTPMaxIdleConns = "maxIdleConns"
// HTTPThrottleRequestsPerSecond The average rate at which requests are allowed to pass through over time. Default to RPS
// requests over the limit will be blocked using a buffered channel
// the blocked time period is not counted in request timeout
HTTPThrottleRequestsPerSecond = "throttle.requestsPerSecond"

// HTTPThrottleBurst The maximum number of requests that can be made in a short period of time before the RPS throttling kicks in.
HTTPThrottleBurst = "throttle.burst"

// HTTPMaxConnsPerHost the max number of concurrent connections
HTTPMaxConnsPerHost = "maxConnsPerHost"
// HTTPConnectionTimeout the connection timeout for new connections
Expand Down Expand Up @@ -94,6 +102,8 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime)
conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex)
conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout)
conf.AddKnownKey(HTTPThrottleRequestsPerSecond)
conf.AddKnownKey(HTTPThrottleBurst)
conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout)
conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns)
conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost)
Expand All @@ -120,6 +130,8 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) {
RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)),
RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)),
RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex),
ThrottleRequestsPerSecond: conf.GetInt(HTTPThrottleRequestsPerSecond),
ThrottleBurst: conf.GetInt(HTTPThrottleBurst),
HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)),
HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)),
HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns),
Expand Down
24 changes: 24 additions & 0 deletions pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/metric"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

type retryCtxKey struct{}
Expand All @@ -53,6 +54,7 @@ type Config struct {
}

var (
rateLimiter *rate.Limiter
metricsManager metric.MetricsManager
onErrorHooks []resty.ErrorHook
onSuccessHooks []resty.SuccessHook
Expand All @@ -69,6 +71,8 @@ type HTTPConfig struct {
HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"`
AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"`
AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"`
ThrottleRequestsPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"`
ThrottleBurst int `ffstruct:"RESTConfig" json:"burst,omitempty"`
Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"`
RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"`
RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"`
Expand Down Expand Up @@ -172,6 +176,17 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client
return NewWithConfig(ctx, *ffrestyConfig), nil
}

func getRateLimiter(rps, burst int) *rate.Limiter {
if rps != 0 { // if rps is not set no need for a rate limiter
rpsLimiter := rate.Limit(rps)
if burst == 0 {
burst = rps
}
return rate.NewLimiter(rpsLimiter, burst)
}
return nil
}

// New creates a new Resty client, using static configuration (from the config file)
// from a given section in the static configuration
//
Expand Down Expand Up @@ -210,6 +225,8 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client = resty.NewWithClient(httpClient)
}

rateLimiter = getRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst)

url := strings.TrimSuffix(ffrestyConfig.URL, "/")
if url != "" {
client.SetBaseURL(url)
Expand All @@ -223,6 +240,13 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout))

client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error {
if rateLimiter != nil {
// Wait for permission to proceed with the request
err := rateLimiter.Wait(req.Context())
if err != nil {
return err
}
}
rCtx := req.Context()
rc := rCtx.Value(retryCtxKey{})
if rc == nil {
Expand Down
157 changes: 149 additions & 8 deletions pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -42,6 +42,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftls"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/metric"
"golang.org/x/time/rate"

"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -90,6 +91,148 @@ func TestRequestOK(t *testing.T) {
assert.Equal(t, 1, httpmock.GetTotalCallCount())
}

func TestRequestWithRateLimiter(t *testing.T) {
rps := 5
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds

customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPThrottleRequestsPerSecond, rps)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
requestChan := make(chan bool, expectedNumberOfRequest)
startTime := time.Now()
for i := 0; i < expectedNumberOfRequest; i++ {
go func() {
resp, err := c.R().Get("/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())
requestChan <- true
}()
}
count := 0
for {
<-requestChan
count++
if count == expectedNumberOfRequest {
break

}
}

duration := time.Since(startTime)
assert.GreaterOrEqual(t, duration, 3*time.Second)
assert.LessOrEqual(t, duration, 4*time.Second)
assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRequestWithRateLimiterHighBurst(t *testing.T) {
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds

customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPThrottleRequestsPerSecond, 0)
utConf.Set(HTTPThrottleBurst, expectedNumberOfRequest)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
requestChan := make(chan bool, expectedNumberOfRequest)
startTime := time.Now()
for i := 0; i < expectedNumberOfRequest; i++ {
go func() {
resp, err := c.R().Get("/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())
requestChan <- true
}()
}
count := 0
for {
<-requestChan
count++
if count == expectedNumberOfRequest {
break

}
}

duration := time.Since(startTime)
assert.Less(t, duration, 1*time.Second)
assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRateLimiterFailure(t *testing.T) {
customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPConfigRetryEnabled, true)

utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
rateLimiter = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default
resp, err := c.R().Get("/test")
assert.Error(t, err)
assert.Regexp(t, "exceeds", err)
assert.Nil(t, resp)
rateLimiter = nil // reset limiter
}

func TestRequestRetry(t *testing.T) {

ctx := context.Background()
Expand Down Expand Up @@ -529,17 +672,15 @@ func TestEnableClientMetrics(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

}



func TestEnableClientMetricsIdempotent(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")
_ = EnableClientMetrics(ctx, mr)
_ = EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)
}
Expand All @@ -549,17 +690,17 @@ func TestHooks(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

onErrorCount := 0
onSuccessCount := 0

customOnError := func(req *resty.Request, err error){
customOnError := func(req *resty.Request, err error) {
onErrorCount++
}

customOnSuccess := func(c *resty.Client, resp *resty.Response){
customOnSuccess := func(c *resty.Client, resp *resty.Response) {
onSuccessCount++
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/i18n/en_base_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var (
ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType)
ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType)
ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType)
ConfigGlobalThrottleRPS = ffc("config.global.throttle.requestsPerSecond", "The average rate at which requests are allowed to pass through over time.", IntType)
ConfigGlobalThrottleBurst = ffc("config.global.throttle.burst", "The maximum number of requests that can be made in a short period of time before the throttling kicks in.", IntType)
ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType)
ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType)
ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType)
Expand Down
2 changes: 2 additions & 0 deletions pkg/i18n/en_base_field_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ var (
RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call")
RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection")
RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool")
RESTConfigThrottleRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Requests per second")
RESTConfigThrottleBurst = ffm("RESTConfig.burst", "Burst")
RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host")
RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool")
RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool")
Expand Down
Loading