Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for leaky bucket throttling (rps + burst) in ffresty client #141

Merged
merged 11 commits into from
Jun 7, 2024
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
gitlab.com/hfuss/mux-prometheus v0.0.5
golang.org/x/crypto v0.18.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gotest.tools v2.2.0+incompatible
)
Expand Down
6 changes: 5 additions & 1 deletion pkg/ffresty/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -67,6 +67,8 @@ const (
HTTPIdleTimeout = "idleTimeout"
// HTTPMaxIdleConns the max number of idle connections to hold pooled
HTTPMaxIdleConns = "maxIdleConns"
// HTTPRPS the max number of request to submit per second, default value is 0, which turns off the RPS control
HTTPRPS = "rps"
// HTTPMaxConnsPerHost the max number of concurrent connections
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
HTTPMaxConnsPerHost = "maxConnsPerHost"
// HTTPConnectionTimeout the connection timeout for new connections
Expand Down Expand Up @@ -94,6 +96,7 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime)
conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex)
conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout)
conf.AddKnownKey(HTTPRPS)
conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout)
conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns)
conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost)
Expand All @@ -120,6 +123,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) {
RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)),
RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)),
RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex),
RequestPerSecond: conf.GetInt(HTTPRPS),
HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)),
HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)),
HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns),
Expand Down
15 changes: 15 additions & 0 deletions pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

type retryCtxKey struct{}
Expand All @@ -61,6 +62,7 @@ type HTTPConfig struct {
HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"`
AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"`
AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"`
RequestPerSecond int `ffstruct:"RESTConfig" json:"rps,omitempty"`
Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"`
RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"`
RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"`
Expand Down Expand Up @@ -148,6 +150,12 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client = resty.NewWithClient(httpClient)
}

var rpsLimiter *rate.Limiter
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
if ffrestyConfig.RequestPerSecond != 0 { // NOTE: 0 is treated as RPS control disabled
rps := ffrestyConfig.RequestPerSecond
rpsLimiter = rate.NewLimiter(rate.Limit(rps), rps)
}

url := strings.TrimSuffix(ffrestyConfig.URL, "/")
if url != "" {
client.SetBaseURL(url)
Expand All @@ -161,6 +169,13 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout))

client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error {
if rpsLimiter != nil {
// Wait for permission to proceed with the request
err := rpsLimiter.Wait(context.Background())
if err != nil {
return err
}
}
rCtx := req.Context()
rc := rCtx.Value(retryCtxKey{})
if rc == nil {
Expand Down
47 changes: 46 additions & 1 deletion pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -88,6 +88,51 @@ func TestRequestOK(t *testing.T) {
assert.Equal(t, 1, httpmock.GetTotalCallCount())
}

func TestRequestWithRateLimiter(t *testing.T) {
rps := 5
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds

customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPRPS, rps)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})

startTime := time.Now()
for i := 0; i < expectedNumberOfRequest; i++ {
resp, err := c.R().Get("/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())
}

duration := time.Since(startTime)

assert.GreaterOrEqual(t, duration, 3*time.Second)
assert.LessOrEqual(t, duration, 4*time.Second)

assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRequestRetry(t *testing.T) {

ctx := context.Background()
Expand Down
1 change: 1 addition & 0 deletions pkg/i18n/en_base_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var (
ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType)
ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType)
ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType)
ConfigGlobalRPS = ffc("config.global.rps", "The max number of requests to submit per second, this rate limiter is off by default with value 0", IntType)
ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType)
ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType)
ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType)
Expand Down
1 change: 1 addition & 0 deletions pkg/i18n/en_base_field_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var (
RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call")
RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection")
RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool")
RESTConfigRPS = ffm("RESTConfig.rps", "Maximum requests to submit per second")
RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host")
RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool")
RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool")
Expand Down
Loading