Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway rate limiting #1942

Merged
merged 18 commits into from
Jul 24, 2024
Merged

Gateway rate limiting #1942

merged 18 commits into from
Jul 24, 2024

Conversation

zkokelj
Copy link
Contributor

@zkokelj zkokelj commented Jun 3, 2024

Why this change is needed

Due to huge amount of calls from some IPs we want to guard some of our expensive endpoints with rate limiter

What changes were made as part of this PR

Please provide a high level list of the changes made

PR checks pre-merging

Please indicate below by ticking the checkbox that you have read and performed the required
PR checks

  • PR checks reviewed and performed

Copy link
Collaborator

@tudor-malene tudor-malene left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested a slight change in the approach, as it looks like we need to handle a few more cases.

Each request that has the calculateRateLimitScore lambda defined in the ExecConfig,, will append that value to the current score for the user.

We need to make the RateLimiter type a service with a Start and Stop method.
In the start method, there should be an infinite loop with a timer that fires every second (or something configurable), and reduces the score of all users by some amount.

This way, we have the requests adding as they make them, and in the background we reduce

tools/walletextension/ratelimiter/rate_limiter.go Outdated Show resolved Hide resolved
tools/walletextension/rpcapi/blockchain_api.go Outdated Show resolved Hide resolved
tools/walletextension/rpcapi/utils.go Outdated Show resolved Hide resolved
tools/walletextension/ratelimiter/rate_limiter.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@tudor-malene tudor-malene left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm
(after avoiding the conversion of the userid to string)

@zkokelj zkokelj force-pushed the ziga/gateway_rate_limiting branch from 1982a94 to 7ed9dfe Compare June 7, 2024 15:46
@zkokelj zkokelj marked this pull request as ready for review June 10, 2024 07:50
@ten-protocol ten-protocol deleted a comment from vercel bot Jun 11, 2024
@zkokelj zkokelj force-pushed the ziga/gateway_rate_limiting branch from 935ee5a to 1baf55c Compare June 12, 2024 11:12
Copy link
Collaborator

@tudor-malene tudor-malene left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is clean now. Needs an additional mechanism

// The Execution cost is 100 times the execution duration in milliseconds;
// we can change how much user can run by changing decay rate
executionCost := uint32(executionDuration) * 100
w.RateLimiter.UpdateScore(gethcommon.Address(userID), executionCost)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest passing the duration into the rate-limiter, so we have all rate-limiting logic in a single place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

tools/walletextension/ratelimiter/rate_limiter.go Outdated Show resolved Hide resolved

// UpdateScore updates the score of the user based on the additional score (time taken to process the request)
func (rl *RateLimiter) UpdateScore(userID common.Address, additionalScore uint32) {
rl.mu.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth adding the threshold==0 check here as well. Or maybe create an internal method: "isEnabled"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

tools/walletextension/ratelimiter/rate_limiter.go Outdated Show resolved Hide resolved
rateLimitThresholdUsage = "Rate limit threshold per user. Default: 1000."

rateLimitDecayName = "rateLimitDecay"
rateLimitDecayDefault = 0.2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make this "1", then we keep parity with the milliseconds, and we can reason in units of time

Copy link
Contributor Author

@zkokelj zkokelj Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make this 1, it means we are decaying for 1ms every 1ms.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. It's easier to reason about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but it means that we allow by default for every user to use 1000ms in 1000ms on average (100%). Rate limits will be hit very rarely and only with concurrent requests.

rateLimitDecayDefault = 0.2
rateLimitDecayUsage = "Rate limit decay per user. Default: 0.2"

rateLimitMaxScoreName = "rateLimitMaxScore"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this parameter is confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this parameter

@@ -59,6 +59,18 @@ const (
storeIncomingTxs = "storeIncomingTxs"
storeIncomingTxsDefault = true
storeIncomingTxsUsage = "Flag to enable storing incoming transactions in the database for debugging purposes. Default: true"

rateLimitThresholdName = "rateLimitThreshold"
rateLimitThresholdDefault = 1000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is 200 and the decay is 1, we allocate 200ms/per second to each user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the decay is 1 then every ms one ms is subtracted from the score.
Setting decay to 0.2 means that every second it decays for 200ms.
Setting threshold to 1000 means that users can use 1000ms in a second, but it will decay with the decay rate above. So we are not rate limiting the user if he created a request that takes 250ms and wants to call another request immediately after that. But if he/she keeps doing it rate limiting will kick in.

// decays based on the time since the last request.

type Score struct {
lastRequest time.Time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a "concurrentRequests" value here as well, which we increment during Allow and decrement during "Update".

In "Allow" we should also check that the concurrentRequests needs to be below a configured threshold (A small value like 3).

With this mechanism we protect someone from Dos-ing the gateway by triggering a very large number of expensive requests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Collaborator

@tudor-malene tudor-malene left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify a bit by keeping the responsibilities more compact.

Also, the rate limit: allow/update needs to be added to the: filter_api.GetLogs method, which doesn't use ExecAuthRPC

rateLimitThresholdUsage = "Rate limit threshold per user. Default: 1000."

rateLimitDecayName = "rateLimitDecay"
rateLimitDecayDefault = 0.2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. It's easier to reason about it.

// Increase the score of the user based on the execution duration of the request
// and update the last request time
newScore := rl.users[userID].score + executionDuration - scoreDecay
rl.users[userID] = Score{lastRequest: now, score: newScore, concurrentRequests: userScore.concurrentRequests - 1}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the -1 needs to be checked. if the concurrentRequests is already 0 it will overflow.

Actually, it can't ever be 0 at this point.
Anyway, worth checking to avoid something weird

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should never happen, but added additional check for it

userScore, exists := rl.users[userID]
scoreDecay := uint32(0)
// if user exists decay the score based on the time since the last request
if exists {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no score for that user, it's a programming error. This method should always be called after the previous one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging that error now

// if user exists decay the score based on the time since the last request
if exists {
// Decay the score based on the time since the last request and the decay rate
timeSinceLastRequest := float64(now.Sub(userScore.lastRequest).Milliseconds())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why we need to decay the score here. It's hard to reason about what's going on, as that responsibility is now in 2 places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decay it only here in UpdateScore.
But we need to calculate it also in Allow function to compare it to the threshold is this now what you meant in this comment: #1942 (comment) ?

// Increase the score of the user based on the execution duration of the request
// and update the last request time
newScore := rl.users[userID].score + executionDuration - scoreDecay
rl.users[userID] = Score{lastRequest: now, score: newScore, concurrentRequests: userScore.concurrentRequests - 1}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not change the lastRequest here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@zkokelj zkokelj requested a review from tudor-malene July 3, 2024 16:05
Copy link
Collaborator

@BedrockSquirrel BedrockSquirrel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM to what Tudor requested. My comments are minor things, please ignore any you disagree with!

${{ vars.DOCKER_BUILD_TAG_GATEWAY }} \
-host=0.0.0.0 -port=8080 -portWS=81 -nodeHost=${{ vars.L2_RPC_URL_VALIDATOR }} -verbose=true \
-logPath=sys_out -dbType=mariaDB -dbConnectionURL="obscurouser:${{ secrets.OBSCURO_GATEWAY_MARIADB_USER_PWD }}@tcp(obscurogateway-mariadb-${{ github.event.inputs.testnet_type }}.uksouth.cloudapp.azure.com:3306)/ogdb" \
-rateLimitThreshold=${{ vars.GATEWAY_RATE_LIMIT_THRESHOLD }} -rateLimitDecay=${{ vars.GATEWAY_RATE_LIMIT_DECAY }} -maxConcurrentRequestsPerUser=${{ vars.GATEWAY_MAX_CONCURRENT_REQUESTS_PER_USER }} '
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this config usage is out of date now since the latest changes (also you probably need to duplicate the config flags onto the manual-upgrade yaml script).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks 👍

@@ -59,6 +59,18 @@ const (
storeIncomingTxs = "storeIncomingTxs"
storeIncomingTxsDefault = true
storeIncomingTxsUsage = "Flag to enable storing incoming transactions in the database for debugging purposes. Default: true"

rateLimitUserComputeTimeName = "rateLimitUserComputeTime"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For time duration config we usually either use a string and then call time.ParseDuration on it (like BatchInterval), or we include the units in the flag name (like l1RPCTimeoutSecs). Or if you think it's clear enough then maybe just mention millis in the usage description here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using duration now, thanks.

}

// UpdateRequest updates the end time of a request interval given its UUID.
func (rl *RateLimiter) UpdateRequest(userID common.Address, id uuid.UUID) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe call this method like EndRequest() or SetRequestEnd() or something for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed 👍

request.End = &now
user.CurrentRequests[id] = request
} else {
log.Printf("Request with ID %s not found for user %s.", id, userID.Hex())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use the builtin log library in wallet extension server? That seems quite convenient to work with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logger used

if user, exists := rl.users[userID]; exists {
cutoff := time.Now().Add(-time.Duration(rl.window) * time.Millisecond)
for _, interval := range user.CurrentRequests {
if interval.End != nil && interval.End.After(cutoff) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could include in-flight requests to this sum, like a totalComputeTime += now.Sub(interval.Start) if End is nil maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


// Check if user is in limits of rate limiting
userComputeTimeForUser := rl.SumComputeTime(userID)
if userComputeTimeForUser <= rl.userComputeTime {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe a bit subjective but I think in Go they usually put the happy path at the end of the method. So like check if userComputeTimeForUser > threshold here for the bad case and have return true, requestUUID be the last line of the method. Feel free to ignore that though, nitpicking haha.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed & noted.. will stick with happy path at the end of methods when possible.

defer rl.mu.Unlock()

// delete all the requests that have
cutoff := time.Now().Add(-time.Duration(rl.window) * time.Millisecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice if rl.window was already a Duration (like I mentioned in the config stuff) for lines like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed not needed conversions to Duration

}

// PruneRequests deletes all requests that have ended before the rate limiter's window.
func (rl *RateLimiter) PruneRequests() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be nice to know if this starts being slow, maybe a log line at the end if it's over some amount like:

timeTaken := time.Since(startTime)
if timeTaken > 1 * time.Second {
   log.Printf("PruneRequests completed in %s", timeTaken)
}

Bit worried the contention on the mutex lock could become troublesome. If it does I guess we can add a per-user rw-lock in the User object so pruning doesn't stop new requests on existing users being allowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added logs if time taken > 1s.

I am aware of mutex lock problems (created getters, increments and not locking it for too long).
Do you think we should try with this and later (if it becomes a problem) switch to per-user locking?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it's fine for now all your locks looked good to me and might not be an issue at all. But something to consider if we start seeing unexplained slowness at some point.

Copy link
Collaborator

@BedrockSquirrel BedrockSquirrel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -149,6 +154,8 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s
return nil, rpcErr
})

w.RateLimiter.SetRequestEnd(gethcommon.Address(userID), requestUUID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one comment. I think this should better be in a defer
(Same for filter_api)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Collaborator

@tudor-malene tudor-malene left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@zkokelj zkokelj merged commit ad61bdb into main Jul 24, 2024
5 checks passed
@zkokelj zkokelj deleted the ziga/gateway_rate_limiting branch July 24, 2024 12:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants