Skip to content

Commit

Permalink
Add (and use) conditional fillup (#12)
Browse files Browse the repository at this point in the history
For some features - such as "deliver this message at most once" - you need to be able to tell whether a leaky bucket has accepted a topup or not. This is especially treacherous when the bucket has capacity of 1. Observe:

* A bucket of `capacity=1` gets configured for a specific leak rate (say, `over_time=1` for 1 second
* The bucket gets incremented by 1 (one message should be delivered)
* The result is that the bucket "did fill up", so the caller will suppress the message (to the caller it looks like the bucket is already full)

The proper way to deal with this is to have "compare and set" semantics instead:

* A bucket of `capacity=1` gets configured for a specific leak rate (say, `over_time=1` for 1 second
* The bucket gets incremented by 1 (one message should be delivered). The caller gets both the achieved level (`1`) and a flag indicating that the bucket _did_ in fact accept the write
* The caller delivers the "once per second" message
* On second call, after 0.5 seconds, the caller tries to fillup with `1` and gets the the achieved level (which remains `~0.5` because no write was granted) and a flag indicating that the write was rejected

For this we need to juggle some SQL around, but there is nothing making this impossible.
  • Loading branch information
julik authored Jan 22, 2024
1 parent ba8778c commit 9845a74
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 88 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
- Use Bucket#connditional_fillup inside Throttle and throttle only when the capacity _would_ be exceeded, as opposed
to throttling when capacity has already been exceeded. This allows for finer-grained throttles such as
"at most once in", where filling "exactly to capacity" is a requirement. It also provides for more accurate
and easier to understand throttling in general.
- Make sure Bucket#able_to_accept? allows the bucket to be filled to capacity, not only to below capacity
- Improve YARD documentation
- Allow "conditional fillup" - only add tokens to the leaky bucket if the bucket has enough space.
- Fix `over_time` leading to incorrect `leak_rate`. The divider/divisor were swapped, leading to the inverse leak rate getting computed.

## [0.3.0] - 2024-01-18

- Allow `over_time` in addition to `leak_rate`, which is a more intuitive parameter to tweak
Expand Down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ And then execute:

## Usage

Once the installation is done you can use Pecorino to start defining your throttles. Imagine you have a resource called `vault` and you want to limit the number of updates to it to 5 per second. To achieve that, instantiate a new `Throttle` in your controller or job code, and then trigger it using `Throttle#request!`. A call to `request!` registers 1 token getting added to the bucket. If the bucket is full, or the throttle is currently in "block" mode (has recently been triggered), a `Pecorino::Throttle::Throttled` exception will be raised.
Once the installation is done you can use Pecorino to start defining your throttles. Imagine you have a resource called `vault` and you want to limit the number of updates to it to 5 per second. To achieve that, instantiate a new `Throttle` in your controller or job code, and then trigger it using `Throttle#request!`. A call to `request!` registers 1 token getting added to the bucket. If the bucket would overspill (your request would make it overflow), or the throttle is currently in "block" mode (has recently been triggered), a `Pecorino::Throttle::Throttled` exception will be raised.

```ruby
throttle = Pecorino::Throttle.new(key: "vault", over_time: 1.second, capacity: 5)
Expand Down Expand Up @@ -58,7 +58,7 @@ return render :capacity_exceeded unless throttle.able_to_accept?
If you are dealing with a metered resource (like throughput, money, amount of storage...) you can supply the number of tokens to either `request!` or `able_to_accept?` to indicate the desired top-up of the leaky bucket. For example, if you are maintaining user wallets and want to ensure no more than 100 dollars may be taken from the wallet within a certain amount of time, you can do it like so:

```ruby
throttle = Pecorino::Throttle.new(key: "wallet_t_#{current_user.id}", over_time_: 1.hour, capacity: 100, block_for: 60*60*3)
throttle = Pecorino::Throttle.new(key: "wallet_t_#{current_user.id}", over_time_: 1.hour, capacity: 100, block_for: 3.hours)
throttle.request!(20) # Attempt to withdraw 20 dollars
throttle.request!(20) # Attempt to withdraw 20 dollars more
throttle.request!(20) # Attempt to withdraw 20 dollars more
Expand All @@ -67,6 +67,8 @@ throttle.request!(20) # Attempt to withdraw 20 dollars more
throttle.request!(2) # Attempt to withdraw 2 dollars more, will raise `Throttled` and block withdrawals for 3 hours
```

## Using just the leaky bucket

Sometimes you don't want to use a throttle, but you want to track the amount added to the leaky bucket over time. A lower-level abstraction is available for that purpose in the form of the `LeakyBucket` class. It will not raise any exceptions and will not install blocks, but will permit you to track a bucket's state over time:


Expand All @@ -77,9 +79,10 @@ sleep 0.2
b.state #=> Pecorino::LeakyBucket::State(full?: false, level: 1.8)
```

Check out the inline YARD documentation for more options.
Check out the inline YARD documentation for more options. Do take note of the differences between `fillup()` and `fillup_conditionally` as you
might want to pick one or the other depending on your use case.

## Cleaning out stale locks from the database
## Cleaning out stale buckets and blocks from the database

We recommend running the following bit of code every couple of hours (via cron or similar) to delete the stale blocks and leaky buckets from the system:

Expand Down
96 changes: 61 additions & 35 deletions lib/pecorino/leaky_bucket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,39 @@
# The storage use is one DB row per leaky bucket you need to manage (likely - one throttled entity such
# as a combination of an IP address + the URL you need to procect). The `key` is an arbitrary string you provide.
class Pecorino::LeakyBucket
State = Struct.new(:level, :full) do
# Returns the level of the bucket after the operation on the LeakyBucket
# object has taken place. There is a guarantee that no tokens have leaked
# from the bucket between the operation and the freezing of the State
# struct.
#
# @!attribute [r] level
# @return [Float]
# Returned from `.state` and `.fillup`
class State
def initialize(level, is_full)
@level = level.to_f
@full = !!is_full
end

# Returns the level of the bucket
# @return [Float]
attr_reader :level

# Tells whether the bucket was detected to be full when the operation on
# the LeakyBucket was performed. There is a guarantee that no tokens have leaked
# from the bucket between the operation and the freezing of the State
# struct.
#
# @!attribute [r] full
# @return [Boolean]
# the LeakyBucket was performed.
# @return [Boolean]
def full?
@full
end

alias_method :full?, :full
alias_method :full, :full?
end

# Returns the bucket level of the bucket state as a Float
#
# @return [Float]
def to_f
level.to_f
# Same as `State` but also communicates whether the write has been permitted or not. A conditional fillup
# may refuse a write if it would make the bucket overflow
class ConditionalFillupResult < State
def initialize(level, is_full, accepted)
super(level, is_full)
@accepted = !!accepted
end

# Returns the bucket level of the bucket state rounded to an Integer
#
# @return [Integer]
def to_i
level.to_i
# Tells whether the bucket did accept the requested fillup
# @return [Boolean]
def accepted?
@accepted
end
end

Expand Down Expand Up @@ -91,22 +93,46 @@ def to_i
def initialize(key:, capacity:, leak_rate: nil, over_time: nil)
raise ArgumentError, "Either leak_rate: or over_time: must be specified" if leak_rate.nil? && over_time.nil?
raise ArgumentError, "Either leak_rate: or over_time: may be specified, but not both" if leak_rate && over_time
@leak_rate = leak_rate || (over_time.to_f / capacity)
@leak_rate = leak_rate || (capacity / over_time.to_f)
@key = key
@capacity = capacity.to_f
end

# Places `n` tokens in the bucket. Once tokens are placed, the bucket is set to expire
# within 2 times the time it would take it to leak to 0, regardless of how many tokens
# get put in - since the amount of tokens put in the bucket will always be capped
# to the `capacity:` value you pass to the constructor. Calling `fillup` also deletes
# leaky buckets which have expired.
# Places `n` tokens in the bucket. If the bucket has less capacity than `n` tokens, the bucket will be filled to capacity.
# If the bucket has less capacity than `n` tokens, it will be filled to capacity. If the bucket is already full
# when the fillup is requested, the bucket stays at capacity.
#
# @param n_tokens[Float]
# Once tokens are placed, the bucket is set to expire within 2 times the time it would take it to leak to 0,
# regardless of how many tokens get put in - since the amount of tokens put in the bucket will always be capped
# to the `capacity:` value you pass to the constructor.
#
# @param n_tokens[Float] How many tokens to fillup by
# @return [State] the state of the bucket after the operation
def fillup(n_tokens)
capped_level_after_fillup, did_overflow = Pecorino.adapter.add_tokens(capacity: @capacity, key: @key, leak_rate: @leak_rate, n_tokens: n_tokens)
State.new(capped_level_after_fillup, did_overflow)
capped_level_after_fillup, is_full = Pecorino.adapter.add_tokens(capacity: @capacity, key: @key, leak_rate: @leak_rate, n_tokens: n_tokens)
State.new(capped_level_after_fillup, is_full)
end

# Places `n` tokens in the bucket. If the bucket has less capacity than `n` tokens, the fillup will be rejected.
# This can be used for "exactly once" semantics or just more precise rate limiting. Note that if the bucket has
# _exactly_ `n` tokens of capacity the fillup will be accepted.
#
# Once tokens are placed, the bucket is set to expire within 2 times the time it would take it to leak to 0,
# regardless of how many tokens get put in - since the amount of tokens put in the bucket will always be capped
# to the `capacity:` value you pass to the constructor.
#
# @example
# withdrawals = LeakyBuket.new(key: "wallet-#{user.id}", capacity: 200, over_time: 1.day)
# if withdrawals.fillup_conditionally(amount_to_withdraw).accepted?
# user.wallet.withdraw(amount_to_withdraw)
# else
# raise "You need to wait a bit before withdrawing more"
# end
# @param n_tokens[Float] How many tokens to fillup by
# @return [ConditionalFillupResult] the state of the bucket after the operation and whether the operation succeeded
def fillup_conditionally(n_tokens)
capped_level_after_fillup, is_full, did_accept = Pecorino.adapter.add_tokens_conditionally(capacity: @capacity, key: @key, leak_rate: @leak_rate, n_tokens: n_tokens)
ConditionalFillupResult.new(capped_level_after_fillup, is_full, did_accept)
end

# Returns the current state of the bucket, containing the level and whether the bucket is full.
Expand All @@ -127,6 +153,6 @@ def state
# @param n_tokens[Float]
# @return [boolean]
def able_to_accept?(n_tokens)
(state.level + n_tokens) < @capacity
(state.level + n_tokens) <= @capacity
end
end
71 changes: 65 additions & 6 deletions lib/pecorino/postgres.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,75 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
RETURNING
level,
-- Compare level to the capacity inside the DB so that we won't have rounding issues
level >= :capacity AS did_overflow
level >= :capacity AS at_capacity
SQL

# Note the use of .uncached here. The AR query cache will actually see our
# query as a repeat (since we use "select_one" for the RETURNING bit) and will not call into Postgres
# correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here.
# See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test
upserted = model_class.connection.uncached { model_class.connection.select_one(sql) }
capped_level_after_fillup, did_overflow = upserted.fetch("level"), upserted.fetch("did_overflow")
[capped_level_after_fillup, did_overflow]
capped_level_after_fillup, at_capacity = upserted.fetch("level"), upserted.fetch("at_capacity")
[capped_level_after_fillup, at_capacity]
end

def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
# Take double the time it takes the bucket to empty under normal circumstances
# until the bucket may be deleted.
may_be_deleted_after_seconds = (capacity.to_f / leak_rate.to_f) * 2.0

# Create the leaky bucket if it does not exist, and update
# to the new level, taking the leak rate into account - if the bucket exists.
query_params = {
key: key.to_s,
capacity: capacity.to_f,
delete_after_s: may_be_deleted_after_seconds,
leak_rate: leak_rate.to_f,
fillup: n_tokens.to_f
}

sql = model_class.sanitize_sql_array([<<~SQL, query_params])
WITH pre AS MATERIALIZED (
SELECT
-- Note the double clamping here. First we clamp the "current level - leak" to not go below zero,
-- then we also clamp the above + fillup to not go below 0
GREATEST(0.0,
GREATEST(0.0, level - (EXTRACT(EPOCH FROM (clock_timestamp() - last_touched_at)) * :leak_rate)) + :fillup
) AS level_post_with_uncapped_fillup,
GREATEST(0.0,
level - (EXTRACT(EPOCH FROM (clock_timestamp() - last_touched_at)) * :leak_rate)
) AS level_post
FROM pecorino_leaky_buckets
WHERE key = :key
)
INSERT INTO pecorino_leaky_buckets AS t
(key, last_touched_at, may_be_deleted_after, level)
VALUES
(
:key,
clock_timestamp(),
clock_timestamp() + ':delete_after_s second'::interval,
GREATEST(0.0,
(CASE WHEN :fillup > :capacity THEN 0.0 ELSE :fillup END)
)
)
ON CONFLICT (key) DO UPDATE SET
last_touched_at = EXCLUDED.last_touched_at,
may_be_deleted_after = EXCLUDED.may_be_deleted_after,
level = CASE WHEN (SELECT level_post_with_uncapped_fillup FROM pre) <= :capacity THEN
(SELECT level_post_with_uncapped_fillup FROM pre)
ELSE
(SELECT level_post FROM pre)
END
RETURNING
COALESCE((SELECT level_post FROM pre), 0.0) AS level_before,
level AS level_after
SQL

upserted = model_class.connection.uncached { model_class.connection.select_one(sql) }
level_after = upserted.fetch("level_after")
level_before = upserted.fetch("level_before")
[level_after, level_after >= capacity, level_after != level_before]
end

def set_block(key:, block_for:)
Expand All @@ -90,17 +149,17 @@ def set_block(key:, block_for:)
INSERT INTO pecorino_blocks AS t
(key, blocked_until)
VALUES
(:key, NOW() + ':block_for seconds'::interval)
(:key, clock_timestamp() + ':block_for seconds'::interval)
ON CONFLICT (key) DO UPDATE SET
blocked_until = GREATEST(EXCLUDED.blocked_until, t.blocked_until)
RETURNING blocked_until;
RETURNING blocked_until
SQL
model_class.connection.uncached { model_class.connection.select_value(block_set_query) }
end

def blocked_until(key:)
block_check_query = model_class.sanitize_sql_array([<<~SQL, key])
SELECT blocked_until FROM pecorino_blocks WHERE key = ? AND blocked_until >= NOW() LIMIT 1
SELECT blocked_until FROM pecorino_blocks WHERE key = ? AND blocked_until >= clock_timestamp() LIMIT 1
SQL
model_class.connection.uncached { model_class.connection.select_value(block_check_query) }
end
Expand Down
68 changes: 68 additions & 0 deletions lib/pecorino/sqlite.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,74 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
[capped_level_after_fillup, one_if_did_overflow == 1]
end

def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
# Take double the time it takes the bucket to empty under normal circumstances
# until the bucket may be deleted.
may_be_deleted_after_seconds = (capacity.to_f / leak_rate.to_f) * 2.0

# Create the leaky bucket if it does not exist, and update
# to the new level, taking the leak rate into account - if the bucket exists.
query_params = {
key: key.to_s,
capacity: capacity.to_f,
delete_after_s: may_be_deleted_after_seconds,
leak_rate: leak_rate.to_f,
now_s: Time.now.to_f, # See above as to why we are using a time value passed in
fillup: n_tokens.to_f,
id: SecureRandom.uuid # SQLite3 does not autogenerate UUIDs
}

# Sadly with SQLite we need to do an INSERT first, because otherwise the inserted row is visible
# to the WITH clause, so we cannot combine the initial fillup and the update into one statement.
# This shuld be fine however since we will suppress the INSERT on a key conflict
insert_sql = model_class.sanitize_sql_array([<<~SQL, query_params])
INSERT INTO pecorino_leaky_buckets AS t
(id, key, last_touched_at, may_be_deleted_after, level)
VALUES
(
:id,
:key,
:now_s, -- Precision loss must be avoided here as it is used for calculations
DATETIME('now', '+:delete_after_s seconds'), -- Precision loss is acceptable here
0.0
)
ON CONFLICT (key) DO UPDATE SET
-- Make sure we extend the lifetime of the row
-- so that it can't be deleted between our INSERT and our UPDATE
may_be_deleted_after = EXCLUDED.may_be_deleted_after
SQL
model_class.connection.execute(insert_sql)

sql = model_class.sanitize_sql_array([<<~SQL, query_params])
-- With SQLite MATERIALIZED has to be used so that level_post is calculated before the UPDATE takes effect
WITH pre(level_post_with_uncapped_fillup, level_post) AS MATERIALIZED (
SELECT
-- Note the double clamping here. First we clamp the "current level - leak" to not go below zero,
-- then we also clamp the above + fillup to not go below 0
MAX(0.0, MAX(0.0, level - ((:now_s - last_touched_at) * :leak_rate)) + :fillup) AS level_post_with_uncapped_fillup,
MAX(0.0, level - ((:now_s - last_touched_at) * :leak_rate)) AS level_post
FROM
pecorino_leaky_buckets
WHERE key = :key
) UPDATE pecorino_leaky_buckets SET
last_touched_at = :now_s,
may_be_deleted_after = DATETIME('now', '+:delete_after_s seconds'),
level = CASE WHEN (SELECT level_post_with_uncapped_fillup FROM pre) <= :capacity THEN
(SELECT level_post_with_uncapped_fillup FROM pre)
ELSE
(SELECT level_post FROM pre)
END
RETURNING
(SELECT level_post FROM pre) AS level_before,
level AS level_after
SQL

upserted = model_class.connection.uncached { model_class.connection.select_one(sql) }
level_after = upserted.fetch("level_after")
level_before = upserted.fetch("level_before")
[level_after, level_after >= capacity, level_after != level_before]
end

def set_block(key:, block_for:)
query_params = {id: SecureRandom.uuid, key: key.to_s, block_for: block_for.to_f, now_s: Time.now.to_f}
block_set_query = model_class.sanitize_sql_array([<<~SQL, query_params])
Expand Down
Loading

0 comments on commit 9845a74

Please sign in to comment.