Skip to content

Commit

Permalink
Better names in throttle.py
Browse files Browse the repository at this point in the history
  • Loading branch information
leshchenko1979 committed Mar 3, 2024
1 parent e7f197c commit e8846a7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
2 changes: 1 addition & 1 deletion fast_bitrix24/srh.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def request_attempt(self, method, params=None) -> dict:
logger.debug("Response: %s", json)

request_run_time = json["time"]["operating"]
self.throttlers[method].register(request_run_time)
self.throttlers[method].add_request_record(request_run_time)

return json

Expand Down
41 changes: 21 additions & 20 deletions fast_bitrix24/throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,54 @@


class SlidingWindowThrottler:
"""The class emulates a leaky bucket where the consumer may only run requests
until he has used up X seconds of request running time in total
during a period of Y seconds.
"""The class emulates a sliding window throttler.
The consumer may only run requests until he has used up X seconds
of request running time in total during a period of Y seconds.
When the consumer has hit the limit, he will have to wait.
"""

def __init__(self, max_request_running_time: float, measurement_period: float):
# how much time fits into the bucket before it starts failing
self.max_request_running_time = max_request_running_time
self._max_request_running_time = max_request_running_time

# over what period of time should the max_request_running_time be measured
self.measurement_period = measurement_period
self._measurement_period = measurement_period

# request register. left - most recent, right - least recent
self.request_register = collections.deque()
# request history. left - most recent, right - least recent
self._request_history = collections.deque()

@contextlib.asynccontextmanager
async def acquire(self):
"""A context manager that will wait until it's safe to make the next request"""
await asyncio.sleep(self.get_needed_sleep_time())
await asyncio.sleep(self._calculate_needed_sleep_time())

try:
yield
finally:
self.clean_up()
self._remove_stale_records()

def get_needed_sleep_time(self) -> float:
def _calculate_needed_sleep_time(self) -> float:
"""How much time to sleep before it's safe to make a request"""
acc = 0
for record in self.request_register:
for record in self._request_history:
acc += record.duration
if acc >= self.max_request_running_time:
return record.when + self.measurement_period - time.monotonic()
if acc >= self._max_request_running_time:
return record.when + self._measurement_period - time.monotonic()
return 0

def clean_up(self):
def _remove_stale_records(self):
"""Remove all stale records from the record register"""
if not self.request_register:
if not self._request_history:
return

cut_off = time.monotonic() - self.measurement_period
while self.request_register[-1].when < cut_off:
self.request_register.pop()
cut_off = time.monotonic() - self._measurement_period
while self._request_history[-1].when < cut_off:
self._request_history.pop()

def register(self, request_duration: float):
def add_request_record(self, request_duration: float):
"""Register how long the last request has taken"""
self.request_register.appendleft(
self._request_history.appendleft(
RequestRecord(time.monotonic(), request_duration)
)
10 changes: 5 additions & 5 deletions tests/test_throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ def test_needed_sleep_time(
):
when, duration = requests.pop(0)
monkeypatch.setattr("time.monotonic", lambda: when)
throttler.register(duration)
throttler.add_request_record(duration)
else:
call_point, expected = measurements.pop(0)
monkeypatch.setattr("time.monotonic", lambda: call_point)
print("Request record:", throttler.request_register)
print("Request record:", throttler._request_history)
print("Time", call_point)
assert math.isclose(throttler.get_needed_sleep_time(), expected)
assert math.isclose(throttler._calculate_needed_sleep_time(), expected)


@pytest.mark.asyncio
Expand Down Expand Up @@ -90,11 +90,11 @@ async def fake_sleep(duration):
for duration in request_durations:
async with throttler.acquire():
pass
throttler.register(duration)
throttler.add_request_record(duration)
start_time += duration
await asyncio.sleep(duration)

# Assert
assert math.isclose(
throttler.get_needed_sleep_time(), expected_sleep_time
throttler._calculate_needed_sleep_time(), expected_sleep_time
), f"Test failed for {test_id}"

0 comments on commit e8846a7

Please sign in to comment.