Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for leaky bucket throttling (rps + burst) in ffresty client #141

Merged
merged 11 commits into from
Jun 7, 2024
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
gitlab.com/hfuss/mux-prometheus v0.0.5
golang.org/x/crypto v0.18.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gotest.tools v2.2.0+incompatible
)
Expand Down
8 changes: 7 additions & 1 deletion pkg/ffresty/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -67,6 +67,10 @@ const (
HTTPIdleTimeout = "idleTimeout"
// HTTPMaxIdleConns the max number of idle connections to hold pooled
HTTPMaxIdleConns = "maxIdleConns"
// HTTPRequestsPerSecond the max number of request to submit per second
// requests over the limit will be blocked using a buffered channel
// the blocked time period is not counted in request timeout
HTTPRequestsPerSecond = "requestsPerSecond"
// HTTPMaxConnsPerHost the max number of concurrent connections
HTTPMaxConnsPerHost = "maxConnsPerHost"
// HTTPConnectionTimeout the connection timeout for new connections
Expand Down Expand Up @@ -94,6 +98,7 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime)
conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex)
conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout)
conf.AddKnownKey(HTTPRequestsPerSecond)
conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout)
conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns)
conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost)
Expand All @@ -120,6 +125,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) {
RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)),
RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)),
RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex),
RequestPerSecond: conf.GetInt(HTTPRequestsPerSecond),
HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)),
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)),
HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns),
Expand Down
14 changes: 14 additions & 0 deletions pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/metric"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

type retryCtxKey struct{}
Expand All @@ -53,6 +54,7 @@ type Config struct {
}

var (
rateLimiter *rate.Limiter
metricsManager metric.MetricsManager
onErrorHooks []resty.ErrorHook
onSuccessHooks []resty.SuccessHook
Expand All @@ -69,6 +71,7 @@ type HTTPConfig struct {
HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"`
AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"`
AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"`
RequestPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"`
Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"`
RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"`
RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"`
Expand Down Expand Up @@ -210,6 +213,10 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client = resty.NewWithClient(httpClient)
}

if ffrestyConfig.RequestPerSecond != 0 { // NOTE: 0 is treated as RPS control disabled
rateLimiter = rate.NewLimiter(rate.Limit(ffrestyConfig.RequestPerSecond), ffrestyConfig.RequestPerSecond)
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
}

url := strings.TrimSuffix(ffrestyConfig.URL, "/")
if url != "" {
client.SetBaseURL(url)
Expand All @@ -223,6 +230,13 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout))

client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error {
if rateLimiter != nil {
// Wait for permission to proceed with the request
err := rateLimiter.Wait(req.Context())
if err != nil {
return err
}
}
rCtx := req.Context()
rc := rCtx.Value(retryCtxKey{})
if rc == nil {
Expand Down
90 changes: 89 additions & 1 deletion pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -45,6 +45,7 @@ import (

"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"
)

const configDir = "../../test/data/config"
Expand Down Expand Up @@ -90,6 +91,93 @@ func TestRequestOK(t *testing.T) {
assert.Equal(t, 1, httpmock.GetTotalCallCount())
}

func TestRequestWithRateLimiter(t *testing.T) {
rps := 5
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds

customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPRequestsPerSecond, rps)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
requestChan := make(chan bool, expectedNumberOfRequest)
startTime := time.Now()
for i := 0; i < expectedNumberOfRequest; i++ {
go func() {
resp, err := c.R().Get("/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())
requestChan <- true
}()
}
count := 0
for {
<-requestChan
count++
if count == expectedNumberOfRequest {
break

}
}

duration := time.Since(startTime)
assert.GreaterOrEqual(t, duration, 3*time.Second)
assert.LessOrEqual(t, duration, 4*time.Second)
assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRateLimiterFailure(t *testing.T) {
customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
rateLimiter = rate.NewLimiter(rate.Limit(0), 0) // fake limiter with error
resp, err := c.R().Get("/test")
assert.Error(t, err)
assert.Regexp(t, "exceeds", err)
assert.Nil(t, resp)
rateLimiter = nil // reset limiter
}

func TestRequestRetry(t *testing.T) {

ctx := context.Background()
Expand Down
1 change: 1 addition & 0 deletions pkg/i18n/en_base_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ var (
ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType)
ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType)
ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType)
ConfigGlobalRPS = ffc("config.global.requestsPerSecond", "The max number of requests to submit per second, requests over the limit will wait.", IntType)
ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType)
ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType)
ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType)
Expand Down
1 change: 1 addition & 0 deletions pkg/i18n/en_base_field_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var (
RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call")
RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection")
RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool")
RESTConfigRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Maximum requests to submit per second")
RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host")
RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool")
RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool")
Expand Down
Loading