From dde4ca8ca5f528aadf23fa2fdcf1f9be3fda7b80 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Thu, 30 May 2024 10:29:17 +0100 Subject: [PATCH 1/9] Add Request Per Second rate limiter Signed-off-by: Chengxuan Xing --- go.mod | 1 + pkg/ffresty/config.go | 4 ++++ pkg/ffresty/ffresty.go | 17 +++++++++++++- pkg/ffresty/ffresty_test.go | 45 +++++++++++++++++++++++++++++++++++++ 4 files changed, 66 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index ac65640..d14f8e1 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( gitlab.com/hfuss/mux-prometheus v0.0.5 golang.org/x/crypto v0.18.0 golang.org/x/text v0.14.0 + golang.org/x/time v0.5.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gotest.tools v2.2.0+incompatible ) diff --git a/pkg/ffresty/config.go b/pkg/ffresty/config.go index 5706fb5..9958cac 100644 --- a/pkg/ffresty/config.go +++ b/pkg/ffresty/config.go @@ -67,6 +67,8 @@ const ( HTTPIdleTimeout = "idleTimeout" // HTTPMaxIdleConns the max number of idle connections to hold pooled HTTPMaxIdleConns = "maxIdleConns" + // HTTPRPS the max number of request to submit per second, default value is 0, which turns off the RPS control + HTTPRPS = "rps" // HTTPMaxConnsPerHost the max number of concurrent connections HTTPMaxConnsPerHost = "maxConnsPerHost" // HTTPConnectionTimeout the connection timeout for new connections @@ -94,6 +96,7 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime) conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex) conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout) + conf.AddKnownKey(HTTPRPS) conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout) conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns) conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost) @@ -120,6 +123,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) { RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)), RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)), RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex), + RequestPerSecond: conf.GetInt(HTTPRPS), HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)), HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)), HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns), diff --git a/pkg/ffresty/ffresty.go b/pkg/ffresty/ffresty.go index 656f889..ce4b3d6 100644 --- a/pkg/ffresty/ffresty.go +++ b/pkg/ffresty/ffresty.go @@ -35,6 +35,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" "github.com/sirupsen/logrus" + "golang.org/x/time/rate" ) type retryCtxKey struct{} @@ -61,6 +62,7 @@ type HTTPConfig struct { HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"` AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"` AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"` + RequestPerSecond int `ffstruct:"RESTConfig" json:"rps,omitempty"` Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"` RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"` RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"` @@ -148,6 +150,12 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli client = resty.NewWithClient(httpClient) } + var rpsLimiter *rate.Limiter + if ffrestyConfig.RequestPerSecond != 0 { // NOTE: 0 is treated as RPS control disabled + rps := ffrestyConfig.RequestPerSecond + rpsLimiter = rate.NewLimiter(rate.Limit(rps), rps) + } + url := strings.TrimSuffix(ffrestyConfig.URL, "/") if url != "" { client.SetBaseURL(url) @@ -160,7 +168,14 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout)) - client.OnBeforeRequest(func(c *resty.Client, req *resty.Request) error { + client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error { + if rpsLimiter != nil { + // Wait for permission to proceed with the request + err := rpsLimiter.Wait(context.Background()) + if err != nil { + return err + } + } rCtx := req.Context() rc := rCtx.Value(retryCtxKey{}) if rc == nil { diff --git a/pkg/ffresty/ffresty_test.go b/pkg/ffresty/ffresty_test.go index 85236c9..820b6e7 100644 --- a/pkg/ffresty/ffresty_test.go +++ b/pkg/ffresty/ffresty_test.go @@ -88,6 +88,51 @@ func TestRequestOK(t *testing.T) { assert.Equal(t, 1, httpmock.GetTotalCallCount()) } +func TestRequestWithRateLimiter(t *testing.T) { + rps := 5 + expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds + + customClient := &http.Client{} + + resetConf() + utConf.Set(HTTPConfigURL, "http://localhost:12345") + utConf.Set(HTTPConfigHeaders, map[string]interface{}{ + "someheader": "headervalue", + }) + utConf.Set(HTTPConfigAuthUsername, "user") + utConf.Set(HTTPConfigAuthPassword, "pass") + utConf.Set(HTTPRPS, rps) + utConf.Set(HTTPConfigRetryEnabled, true) + utConf.Set(HTTPCustomClient, customClient) + + c, err := New(context.Background(), utConf) + assert.Nil(t, err) + httpmock.ActivateNonDefault(customClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/test", + func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "headervalue", req.Header.Get("someheader")) + assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization")) + return httpmock.NewStringResponder(200, `{"some": "data"}`)(req) + }) + + startTime := time.Now() + for i := 0; i < expectedNumberOfRequest; i++ { + resp, err := c.R().Get("/test") + assert.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode()) + assert.Equal(t, `{"some": "data"}`, resp.String()) + } + + duration := time.Since(startTime) + + assert.GreaterOrEqual(t, duration, 3*time.Second) + assert.LessOrEqual(t, duration, 4*time.Second) + + assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount()) +} + func TestRequestRetry(t *testing.T) { ctx := context.Background() From 225fc90042b083b3107a23573329b29ee5cce47e Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Thu, 30 May 2024 10:44:37 +0100 Subject: [PATCH 2/9] fixing copy right year Signed-off-by: Chengxuan Xing --- pkg/ffresty/config.go | 2 +- pkg/ffresty/ffresty_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ffresty/config.go b/pkg/ffresty/config.go index 9958cac..ccc3097 100644 --- a/pkg/ffresty/config.go +++ b/pkg/ffresty/config.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/pkg/ffresty/ffresty_test.go b/pkg/ffresty/ffresty_test.go index 820b6e7..f8b5391 100644 --- a/pkg/ffresty/ffresty_test.go +++ b/pkg/ffresty/ffresty_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // From 1c46446a565fdf0ef2c03e4552a5313872c64961 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Thu, 30 May 2024 10:48:39 +0100 Subject: [PATCH 3/9] nls Signed-off-by: Chengxuan Xing --- pkg/i18n/en_base_config_descriptions.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/i18n/en_base_config_descriptions.go b/pkg/i18n/en_base_config_descriptions.go index 770ee70..a7babca 100644 --- a/pkg/i18n/en_base_config_descriptions.go +++ b/pkg/i18n/en_base_config_descriptions.go @@ -84,6 +84,7 @@ var ( ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType) ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType) ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType) + ConfigGlobalRPS = ffc("config.global.rps", "The max number of requests to submit per second, this rate limiter is off by default with value 0", IntType) ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType) ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType) ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType) From 6231febe21a837881775058560fa23bb33c7f9b5 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Thu, 30 May 2024 10:51:43 +0100 Subject: [PATCH 4/9] add field description Signed-off-by: Chengxuan Xing --- pkg/i18n/en_base_field_descriptions.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/i18n/en_base_field_descriptions.go b/pkg/i18n/en_base_field_descriptions.go index 7dde0ac..a05c718 100644 --- a/pkg/i18n/en_base_field_descriptions.go +++ b/pkg/i18n/en_base_field_descriptions.go @@ -88,6 +88,7 @@ var ( RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call") RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection") RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool") + RESTConfigRPS = ffm("RESTConfig.rps", "Maximum requests to submit per second") RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host") RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool") RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool") From 5a3000a42901c7b400b59494a7ee45217e731d5c Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 5 Jun 2024 15:59:30 +0100 Subject: [PATCH 5/9] address review comment Signed-off-by: Chengxuan Xing --- pkg/ffresty/ffresty.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ffresty/ffresty.go b/pkg/ffresty/ffresty.go index 7fe78db..a0b2d89 100644 --- a/pkg/ffresty/ffresty.go +++ b/pkg/ffresty/ffresty.go @@ -171,7 +171,7 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error { if rpsLimiter != nil { // Wait for permission to proceed with the request - err := rpsLimiter.Wait(context.Background()) + err := rpsLimiter.Wait(req.Context()) if err != nil { return err } From 6d069196781a3c221e8e2acf44651fb6e0791d11 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 5 Jun 2024 16:06:53 +0100 Subject: [PATCH 6/9] update config name Signed-off-by: Chengxuan Xing --- pkg/ffresty/config.go | 10 ++++++---- pkg/ffresty/ffresty_test.go | 2 +- pkg/i18n/en_base_config_descriptions.go | 2 +- pkg/i18n/en_base_field_descriptions.go | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/ffresty/config.go b/pkg/ffresty/config.go index ccc3097..f2ccc32 100644 --- a/pkg/ffresty/config.go +++ b/pkg/ffresty/config.go @@ -67,8 +67,10 @@ const ( HTTPIdleTimeout = "idleTimeout" // HTTPMaxIdleConns the max number of idle connections to hold pooled HTTPMaxIdleConns = "maxIdleConns" - // HTTPRPS the max number of request to submit per second, default value is 0, which turns off the RPS control - HTTPRPS = "rps" + // HTTPRateControlRPS the max number of request to submit per second, default value is 0, which turns off the RPS throttling + // requests over the limit will be blocked using a buffered channel + // the blocked time period is not counted in request timeout + HTTPRateControlRPS = "rateControl.rps" // HTTPMaxConnsPerHost the max number of concurrent connections HTTPMaxConnsPerHost = "maxConnsPerHost" // HTTPConnectionTimeout the connection timeout for new connections @@ -96,7 +98,7 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime) conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex) conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout) - conf.AddKnownKey(HTTPRPS) + conf.AddKnownKey(HTTPRateControlRPS) conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout) conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns) conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost) @@ -123,7 +125,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) { RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)), RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)), RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex), - RequestPerSecond: conf.GetInt(HTTPRPS), + RequestPerSecond: conf.GetInt(HTTPRateControlRPS), HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)), HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)), HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns), diff --git a/pkg/ffresty/ffresty_test.go b/pkg/ffresty/ffresty_test.go index f8b5391..7f1d0b6 100644 --- a/pkg/ffresty/ffresty_test.go +++ b/pkg/ffresty/ffresty_test.go @@ -101,7 +101,7 @@ func TestRequestWithRateLimiter(t *testing.T) { }) utConf.Set(HTTPConfigAuthUsername, "user") utConf.Set(HTTPConfigAuthPassword, "pass") - utConf.Set(HTTPRPS, rps) + utConf.Set(HTTPRateControlRPS, rps) utConf.Set(HTTPConfigRetryEnabled, true) utConf.Set(HTTPCustomClient, customClient) diff --git a/pkg/i18n/en_base_config_descriptions.go b/pkg/i18n/en_base_config_descriptions.go index a7babca..b667648 100644 --- a/pkg/i18n/en_base_config_descriptions.go +++ b/pkg/i18n/en_base_config_descriptions.go @@ -84,7 +84,7 @@ var ( ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType) ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType) ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType) - ConfigGlobalRPS = ffc("config.global.rps", "The max number of requests to submit per second, this rate limiter is off by default with value 0", IntType) + ConfigGlobalRPS = ffc("config.global.rateControl.rps", "The max number of requests to submit per second, this rate limiter is off by default with value 0. Request over the limit will wait.", IntType) ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType) ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType) ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType) diff --git a/pkg/i18n/en_base_field_descriptions.go b/pkg/i18n/en_base_field_descriptions.go index a05c718..f41296b 100644 --- a/pkg/i18n/en_base_field_descriptions.go +++ b/pkg/i18n/en_base_field_descriptions.go @@ -88,7 +88,7 @@ var ( RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call") RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection") RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool") - RESTConfigRPS = ffm("RESTConfig.rps", "Maximum requests to submit per second") + RESTConfigRateControlRPS = ffm("RESTConfig.rps", "Maximum requests to submit per second") RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host") RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool") RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool") From 7838b57d1b57c473733491bd96fee16d4960fe7b Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Thu, 6 Jun 2024 14:17:45 +0100 Subject: [PATCH 7/9] wording and better tests Signed-off-by: Chengxuan Xing --- pkg/ffresty/config.go | 8 ++-- pkg/ffresty/ffresty.go | 12 ++--- pkg/ffresty/ffresty_test.go | 59 +++++++++++++++++++++---- pkg/i18n/en_base_config_descriptions.go | 2 +- pkg/i18n/en_base_field_descriptions.go | 2 +- 5 files changed, 63 insertions(+), 20 deletions(-) diff --git a/pkg/ffresty/config.go b/pkg/ffresty/config.go index f2ccc32..20bf75a 100644 --- a/pkg/ffresty/config.go +++ b/pkg/ffresty/config.go @@ -67,10 +67,10 @@ const ( HTTPIdleTimeout = "idleTimeout" // HTTPMaxIdleConns the max number of idle connections to hold pooled HTTPMaxIdleConns = "maxIdleConns" - // HTTPRateControlRPS the max number of request to submit per second, default value is 0, which turns off the RPS throttling + // HTTPRequestsPerSecond the max number of request to submit per second // requests over the limit will be blocked using a buffered channel // the blocked time period is not counted in request timeout - HTTPRateControlRPS = "rateControl.rps" + HTTPRequestsPerSecond = "requestsPerSecond" // HTTPMaxConnsPerHost the max number of concurrent connections HTTPMaxConnsPerHost = "maxConnsPerHost" // HTTPConnectionTimeout the connection timeout for new connections @@ -98,7 +98,7 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime) conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex) conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout) - conf.AddKnownKey(HTTPRateControlRPS) + conf.AddKnownKey(HTTPRequestsPerSecond) conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout) conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns) conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost) @@ -125,7 +125,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) { RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)), RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)), RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex), - RequestPerSecond: conf.GetInt(HTTPRateControlRPS), + RequestPerSecond: conf.GetInt(HTTPRequestsPerSecond), HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)), HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)), HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns), diff --git a/pkg/ffresty/ffresty.go b/pkg/ffresty/ffresty.go index a0b2d89..62efd8c 100644 --- a/pkg/ffresty/ffresty.go +++ b/pkg/ffresty/ffresty.go @@ -51,6 +51,8 @@ type Config struct { HTTPConfig } +var rateLimiter *rate.Limiter + // HTTPConfig is all the optional configuration separate to the URL you wish to invoke. // This is JSON serializable with docs, so you can embed it into API objects. type HTTPConfig struct { @@ -62,7 +64,7 @@ type HTTPConfig struct { HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"` AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"` AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"` - RequestPerSecond int `ffstruct:"RESTConfig" json:"rps,omitempty"` + RequestPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"` Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"` RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"` RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"` @@ -150,10 +152,8 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli client = resty.NewWithClient(httpClient) } - var rpsLimiter *rate.Limiter if ffrestyConfig.RequestPerSecond != 0 { // NOTE: 0 is treated as RPS control disabled - rps := ffrestyConfig.RequestPerSecond - rpsLimiter = rate.NewLimiter(rate.Limit(rps), rps) + rateLimiter = rate.NewLimiter(rate.Limit(ffrestyConfig.RequestPerSecond), ffrestyConfig.RequestPerSecond) } url := strings.TrimSuffix(ffrestyConfig.URL, "/") @@ -169,9 +169,9 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli client.SetTimeout(time.Duration(ffrestyConfig.HTTPRequestTimeout)) client.OnBeforeRequest(func(_ *resty.Client, req *resty.Request) error { - if rpsLimiter != nil { + if rateLimiter != nil { // Wait for permission to proceed with the request - err := rpsLimiter.Wait(req.Context()) + err := rateLimiter.Wait(req.Context()) if err != nil { return err } diff --git a/pkg/ffresty/ffresty_test.go b/pkg/ffresty/ffresty_test.go index 7f1d0b6..16f467e 100644 --- a/pkg/ffresty/ffresty_test.go +++ b/pkg/ffresty/ffresty_test.go @@ -43,6 +43,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/jarcoal/httpmock" "github.com/stretchr/testify/assert" + "golang.org/x/time/rate" ) const configDir = "../../test/data/config" @@ -101,7 +102,7 @@ func TestRequestWithRateLimiter(t *testing.T) { }) utConf.Set(HTTPConfigAuthUsername, "user") utConf.Set(HTTPConfigAuthPassword, "pass") - utConf.Set(HTTPRateControlRPS, rps) + utConf.Set(HTTPRequestsPerSecond, rps) utConf.Set(HTTPConfigRetryEnabled, true) utConf.Set(HTTPCustomClient, customClient) @@ -116,23 +117,65 @@ func TestRequestWithRateLimiter(t *testing.T) { assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization")) return httpmock.NewStringResponder(200, `{"some": "data"}`)(req) }) - + requestChan := make(chan bool, expectedNumberOfRequest) startTime := time.Now() for i := 0; i < expectedNumberOfRequest; i++ { - resp, err := c.R().Get("/test") - assert.NoError(t, err) - assert.Equal(t, 200, resp.StatusCode()) - assert.Equal(t, `{"some": "data"}`, resp.String()) + go func() { + resp, err := c.R().Get("/test") + assert.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode()) + assert.Equal(t, `{"some": "data"}`, resp.String()) + requestChan <- true + }() } + count := 0 + for { + <-requestChan + count++ + if count == expectedNumberOfRequest { + break - duration := time.Since(startTime) + } + } + duration := time.Since(startTime) assert.GreaterOrEqual(t, duration, 3*time.Second) assert.LessOrEqual(t, duration, 4*time.Second) - assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount()) } +func TestRateLimiterFailure(t *testing.T) { + customClient := &http.Client{} + + resetConf() + utConf.Set(HTTPConfigURL, "http://localhost:12345") + utConf.Set(HTTPConfigHeaders, map[string]interface{}{ + "someheader": "headervalue", + }) + utConf.Set(HTTPConfigAuthUsername, "user") + utConf.Set(HTTPConfigAuthPassword, "pass") + utConf.Set(HTTPConfigRetryEnabled, true) + utConf.Set(HTTPCustomClient, customClient) + + c, err := New(context.Background(), utConf) + assert.Nil(t, err) + httpmock.ActivateNonDefault(customClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/test", + func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "headervalue", req.Header.Get("someheader")) + assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization")) + return httpmock.NewStringResponder(200, `{"some": "data"}`)(req) + }) + rateLimiter = rate.NewLimiter(rate.Limit(0), 0) // fake limiter with error + resp, err := c.R().Get("/test") + assert.Error(t, err) + assert.Regexp(t, "exceeds", err) + assert.Nil(t, resp) + rateLimiter = nil // reset limiter +} + func TestRequestRetry(t *testing.T) { ctx := context.Background() diff --git a/pkg/i18n/en_base_config_descriptions.go b/pkg/i18n/en_base_config_descriptions.go index b667648..aee2e33 100644 --- a/pkg/i18n/en_base_config_descriptions.go +++ b/pkg/i18n/en_base_config_descriptions.go @@ -84,7 +84,7 @@ var ( ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType) ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType) ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType) - ConfigGlobalRPS = ffc("config.global.rateControl.rps", "The max number of requests to submit per second, this rate limiter is off by default with value 0. Request over the limit will wait.", IntType) + ConfigGlobalRPS = ffc("config.global.requestsPerSecond", "The max number of requests to submit per second, requests over the limit will wait.", IntType) ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType) ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType) ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType) diff --git a/pkg/i18n/en_base_field_descriptions.go b/pkg/i18n/en_base_field_descriptions.go index f41296b..dd9ad21 100644 --- a/pkg/i18n/en_base_field_descriptions.go +++ b/pkg/i18n/en_base_field_descriptions.go @@ -88,7 +88,7 @@ var ( RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call") RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection") RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool") - RESTConfigRateControlRPS = ffm("RESTConfig.rps", "Maximum requests to submit per second") + RESTConfigRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Maximum requests to submit per second") RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host") RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool") RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool") From b96a7149ecbc06ec699246fa7bbc87c05e439115 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Thu, 6 Jun 2024 14:54:39 +0100 Subject: [PATCH 8/9] address review comments Signed-off-by: Chengxuan Xing --- pkg/ffresty/config.go | 14 +++-- pkg/ffresty/ffresty.go | 21 +++++-- pkg/ffresty/ffresty_test.go | 73 +++++++++++++++++++++---- pkg/i18n/en_base_config_descriptions.go | 3 +- pkg/i18n/en_base_field_descriptions.go | 3 +- 5 files changed, 94 insertions(+), 20 deletions(-) diff --git a/pkg/ffresty/config.go b/pkg/ffresty/config.go index 20bf75a..7e3e592 100644 --- a/pkg/ffresty/config.go +++ b/pkg/ffresty/config.go @@ -67,10 +67,14 @@ const ( HTTPIdleTimeout = "idleTimeout" // HTTPMaxIdleConns the max number of idle connections to hold pooled HTTPMaxIdleConns = "maxIdleConns" - // HTTPRequestsPerSecond the max number of request to submit per second + // HTTPThrottleRequestsPerSecond The average rate at which requests are allowed to pass through over time. Default to RPS // requests over the limit will be blocked using a buffered channel // the blocked time period is not counted in request timeout - HTTPRequestsPerSecond = "requestsPerSecond" + HTTPThrottleRequestsPerSecond = "throttle.requestsPerSecond" + + // HTTPThrottleBurst The maximum number of requests that can be made in a short period of time before the RPS throttling kicks in. + HTTPThrottleBurst = "throttle.burst" + // HTTPMaxConnsPerHost the max number of concurrent connections HTTPMaxConnsPerHost = "maxConnsPerHost" // HTTPConnectionTimeout the connection timeout for new connections @@ -98,7 +102,8 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime) conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex) conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout) - conf.AddKnownKey(HTTPRequestsPerSecond) + conf.AddKnownKey(HTTPThrottleRequestsPerSecond) + conf.AddKnownKey(HTTPThrottleBurst) conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout) conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns) conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost) @@ -125,7 +130,8 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) { RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)), RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)), RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex), - RequestPerSecond: conf.GetInt(HTTPRequestsPerSecond), + ThrottleRequestsPerSecond: conf.GetInt(HTTPThrottleRequestsPerSecond), + ThrottleBurst: conf.GetInt(HTTPThrottleBurst), HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)), HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)), HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns), diff --git a/pkg/ffresty/ffresty.go b/pkg/ffresty/ffresty.go index b6b0dcc..96bad3c 100644 --- a/pkg/ffresty/ffresty.go +++ b/pkg/ffresty/ffresty.go @@ -71,7 +71,8 @@ type HTTPConfig struct { HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"` AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"` AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"` - RequestPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"` + ThrottleRequestsPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"` + ThrottleBurst int `ffstruct:"RESTConfig" json:"burst,omitempty"` Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"` RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"` RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"` @@ -175,6 +176,20 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client return NewWithConfig(ctx, *ffrestyConfig), nil } +func getRateLimiter(rps, burst int) *rate.Limiter { + if rps != 0 || burst != 0 { // if neither was set, no need for a rate limiter + rpsLimiter := rate.Limit(rps) + if rps == 0 { // only want to control max concurrent requests + rpsLimiter = rate.Inf + } + if rps != 0 && burst == 0 { + burst = rps + } + return rate.NewLimiter(rpsLimiter, burst) + } + return nil +} + // New creates a new Resty client, using static configuration (from the config file) // from a given section in the static configuration // @@ -213,9 +228,7 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli client = resty.NewWithClient(httpClient) } - if ffrestyConfig.RequestPerSecond != 0 { // NOTE: 0 is treated as RPS control disabled - rateLimiter = rate.NewLimiter(rate.Limit(ffrestyConfig.RequestPerSecond), ffrestyConfig.RequestPerSecond) - } + rateLimiter = getRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst) url := strings.TrimSuffix(ffrestyConfig.URL, "/") if url != "" { diff --git a/pkg/ffresty/ffresty_test.go b/pkg/ffresty/ffresty_test.go index 08a5dd7..b7028fd 100644 --- a/pkg/ffresty/ffresty_test.go +++ b/pkg/ffresty/ffresty_test.go @@ -42,10 +42,10 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftls" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/metric" + "golang.org/x/time/rate" "github.com/jarcoal/httpmock" "github.com/stretchr/testify/assert" - "golang.org/x/time/rate" ) const configDir = "../../test/data/config" @@ -104,7 +104,7 @@ func TestRequestWithRateLimiter(t *testing.T) { }) utConf.Set(HTTPConfigAuthUsername, "user") utConf.Set(HTTPConfigAuthPassword, "pass") - utConf.Set(HTTPRequestsPerSecond, rps) + utConf.Set(HTTPThrottleRequestsPerSecond, rps) utConf.Set(HTTPConfigRetryEnabled, true) utConf.Set(HTTPCustomClient, customClient) @@ -146,6 +146,60 @@ func TestRequestWithRateLimiter(t *testing.T) { assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount()) } +func TestRequestWithRateLimiterHighBurst(t *testing.T) { + expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds + + customClient := &http.Client{} + + resetConf() + utConf.Set(HTTPConfigURL, "http://localhost:12345") + utConf.Set(HTTPConfigHeaders, map[string]interface{}{ + "someheader": "headervalue", + }) + utConf.Set(HTTPConfigAuthUsername, "user") + utConf.Set(HTTPConfigAuthPassword, "pass") + utConf.Set(HTTPThrottleRequestsPerSecond, 0) + utConf.Set(HTTPThrottleBurst, expectedNumberOfRequest) + utConf.Set(HTTPConfigRetryEnabled, true) + utConf.Set(HTTPCustomClient, customClient) + + c, err := New(context.Background(), utConf) + assert.Nil(t, err) + httpmock.ActivateNonDefault(customClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/test", + func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "headervalue", req.Header.Get("someheader")) + assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization")) + return httpmock.NewStringResponder(200, `{"some": "data"}`)(req) + }) + requestChan := make(chan bool, expectedNumberOfRequest) + startTime := time.Now() + for i := 0; i < expectedNumberOfRequest; i++ { + go func() { + resp, err := c.R().Get("/test") + assert.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode()) + assert.Equal(t, `{"some": "data"}`, resp.String()) + requestChan <- true + }() + } + count := 0 + for { + <-requestChan + count++ + if count == expectedNumberOfRequest { + break + + } + } + + duration := time.Since(startTime) + assert.Less(t, duration, 1*time.Second) + assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount()) +} + func TestRateLimiterFailure(t *testing.T) { customClient := &http.Client{} @@ -157,6 +211,7 @@ func TestRateLimiterFailure(t *testing.T) { utConf.Set(HTTPConfigAuthUsername, "user") utConf.Set(HTTPConfigAuthPassword, "pass") utConf.Set(HTTPConfigRetryEnabled, true) + utConf.Set(HTTPCustomClient, customClient) c, err := New(context.Background(), utConf) @@ -170,7 +225,7 @@ func TestRateLimiterFailure(t *testing.T) { assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization")) return httpmock.NewStringResponder(200, `{"some": "data"}`)(req) }) - rateLimiter = rate.NewLimiter(rate.Limit(0), 0) // fake limiter with error + rateLimiter = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default resp, err := c.R().Get("/test") assert.Error(t, err) assert.Regexp(t, "exceeds", err) @@ -617,17 +672,15 @@ func TestEnableClientMetrics(t *testing.T) { ctx := context.Background() mr := metric.NewPrometheusMetricsRegistry("test") - err := EnableClientMetrics(ctx, mr) + err := EnableClientMetrics(ctx, mr) assert.NoError(t, err) } - - func TestEnableClientMetricsIdempotent(t *testing.T) { ctx := context.Background() mr := metric.NewPrometheusMetricsRegistry("test") - _ = EnableClientMetrics(ctx, mr) + _ = EnableClientMetrics(ctx, mr) err := EnableClientMetrics(ctx, mr) assert.NoError(t, err) } @@ -637,17 +690,17 @@ func TestHooks(t *testing.T) { ctx := context.Background() mr := metric.NewPrometheusMetricsRegistry("test") - err := EnableClientMetrics(ctx, mr) + err := EnableClientMetrics(ctx, mr) assert.NoError(t, err) onErrorCount := 0 onSuccessCount := 0 - customOnError := func(req *resty.Request, err error){ + customOnError := func(req *resty.Request, err error) { onErrorCount++ } - customOnSuccess := func(c *resty.Client, resp *resty.Response){ + customOnSuccess := func(c *resty.Client, resp *resty.Response) { onSuccessCount++ } diff --git a/pkg/i18n/en_base_config_descriptions.go b/pkg/i18n/en_base_config_descriptions.go index aee2e33..733395c 100644 --- a/pkg/i18n/en_base_config_descriptions.go +++ b/pkg/i18n/en_base_config_descriptions.go @@ -84,7 +84,8 @@ var ( ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType) ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType) ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType) - ConfigGlobalRPS = ffc("config.global.requestsPerSecond", "The max number of requests to submit per second, requests over the limit will wait.", IntType) + ConfigGlobalThrottleRPS = ffc("config.global.throttle.requestsPerSecond", "The average rate at which requests are allowed to pass through over time.", IntType) + ConfigGlobalThrottleBurst = ffc("config.global.throttle.burst", "The maximum number of requests that can be made in a short period of time before the throttling kicks in.", IntType) ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType) ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType) ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType) diff --git a/pkg/i18n/en_base_field_descriptions.go b/pkg/i18n/en_base_field_descriptions.go index dd9ad21..88ccb17 100644 --- a/pkg/i18n/en_base_field_descriptions.go +++ b/pkg/i18n/en_base_field_descriptions.go @@ -88,7 +88,8 @@ var ( RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call") RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection") RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool") - RESTConfigRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Maximum requests to submit per second") + RESTConfigThrottleRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Requests per second") + RESTConfigThrottleBurst = ffm("RESTConfig.burst", "Burst") RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host") RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool") RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool") From 802321f20359f16c1158cceec20a60a6ba1ca0d6 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Thu, 6 Jun 2024 14:59:42 +0100 Subject: [PATCH 9/9] clean up logic Signed-off-by: Chengxuan Xing --- pkg/ffresty/ffresty.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/ffresty/ffresty.go b/pkg/ffresty/ffresty.go index 96bad3c..4ecef4e 100644 --- a/pkg/ffresty/ffresty.go +++ b/pkg/ffresty/ffresty.go @@ -177,12 +177,9 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client } func getRateLimiter(rps, burst int) *rate.Limiter { - if rps != 0 || burst != 0 { // if neither was set, no need for a rate limiter + if rps != 0 { // if rps is not set no need for a rate limiter rpsLimiter := rate.Limit(rps) - if rps == 0 { // only want to control max concurrent requests - rpsLimiter = rate.Inf - } - if rps != 0 && burst == 0 { + if burst == 0 { burst = rps } return rate.NewLimiter(rpsLimiter, burst)