Skip to content

Commit

Permalink
Update: add delete bucket (#181)
Browse files Browse the repository at this point in the history
* Add dispose method

* Update test & refactor bucket factory

* Update Limiter & BucketFactory api

* update readme

* update readme

* update

* Update

* update
  • Loading branch information
vutran1710 authored Aug 11, 2024
1 parent bcb8b5a commit b309854
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 100 deletions.
74 changes: 52 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Full project documentation can be found at [pyratelimiter.readthedocs.io](https:
- [Defining rate limits & buckets](#defining-rate-limits-and-buckets)
- [Defining clock & routing logic](#defining-clock--routing-logic-with-bucketfactory)
- [Wrapping all up with Limiter](#wrapping-all-up-with-limiter)
- [Limiter API](#limiter-api)
- [Weight](#weight)
- [Handling exceeded limits](#handling-exceeded-limits)
- [Bucket analogy](#bucket-analogy)
Expand All @@ -36,7 +37,6 @@ Full project documentation can be found at [pyratelimiter.readthedocs.io](https:
- [SQLiteBucket](#sqlitebucket)
- [RedisBucket](#redisbucket)
- [PostgresBucket](#postgresbucket)
- [Decorator](#decorator)
- [Async or Sync?](#async-or-sync)
- [Advanced Usage](#advanced-usage)
- [Component-level Diagram](#component-level-diagram)
Expand Down Expand Up @@ -317,6 +317,57 @@ async def async_request_function(some_number: int):
requests.get('https://example.com')
```

### Limiter API

#### `bucket()`: get list of all active buckets
Return list of all active buckets with `limiter.buckets()`


#### `dispose(bucket: int | BucketObject)`: dispose/remove/delete the given bucket

Method signature:
```python
def dispose(self, bucket: Union[int, AbstractBucket]) -> bool:
"""Dispose/Remove a specific bucket,
using bucket-id or bucket object as param
"""
```

Example of usage:
```python
active_buckets = limiter.buckets()
assert len(active_buckets) > 0

bucket_to_remove = active_buckets[0]
assert limiter.dispose(bucket_to_remove)
```

If a bucket is found and get deleted, calling this method will return **True**, otherwise **False**.
If there is no more buckets in the limiter's bucket-factory, all the leaking tasks will be stopped.


#### `as_decorator()`: use limiter as decorator

Limiter can be used as decorator, but you have to provide a `mapping` function that maps the wrapped function's arguments to `limiter.try_acquire` function arguments. The mapping function must return either a tuple of `(str, int)` or just a `str`

The decorator can work with both sync & async function

```python
decorator = limiter.as_decorator()

def mapping(*args, **kwargs):
return "demo", 1

@decorator(mapping)
def handle_something(*args, **kwargs):
"""function logic"""

@decorator(mapping)
async def handle_something_async(*args, **kwargs):
"""function logic"""
```


### Weight

Item can have weight. By default item's weight = 1, but you can modify the weight before passing to `limiter.try_acquire`.
Expand Down Expand Up @@ -535,27 +586,6 @@ rates = [Rate(3, 1000), Rate(4, 1500)]
bucket = PostgresBucket(connection_pool, "my-bucket-table", rates)
```

### Decorator

Limiter can be used as decorator, but you have to provide a `mapping` function that maps the wrapped function's arguments to `limiter.try_acquire` function arguments. The mapping function must return either a tuple of `(str, int)` or just a `str`

The decorator can work with both sync & async function

```python
decorator = limiter.as_decorator()

def mapping(*args, **kwargs):
return "demo", 1

@decorator(mapping)
def handle_something(*args, **kwargs):
"""function logic"""

@decorator(mapping)
async def handle_something_async(*args, **kwargs):
"""function logic"""
```

### Async or Sync?

The Limiter is basically made of a Clock backend and a Bucket backend. Depends on how each of these component works async-or-sync wise, PyrateLimiter will change its methods' signatures to sync or async accordingly.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyrate-limiter"
version = "3.6.2"
version = "3.7.0"
description = "Python Rate-Limiter using Leaky-Bucket Algorithm"
authors = ["vutr <[email protected]>"]
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions pyrate_limiter/abstracts/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .bucket import * # noqa
from .clock import * # noqa
from .rate import * # noqa
from .wrappers import * # noqa
136 changes: 59 additions & 77 deletions pyrate_limiter/abstracts/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class Leaker(Thread):
async_buckets: Optional[Dict[int, AbstractBucket]] = None
clocks: Optional[Dict[int, AbstractClock]] = None
leak_interval: int = 10_000
is_async_leak_started = False
aio_leak_task: Optional[asyncio.Task] = None

def __init__(self, leak_interval: int):
self.sync_buckets = defaultdict()
Expand All @@ -132,22 +132,41 @@ def register(self, bucket: AbstractBucket, clock: AbstractClock):
assert self.async_buckets is not None

try_leak = bucket.leak(0)
bucket_id = id(bucket)

if iscoroutine(try_leak):
try_leak.close()
self.async_buckets[id(bucket)] = bucket
self.async_buckets[bucket_id] = bucket
else:
self.sync_buckets[id(bucket)] = bucket
self.sync_buckets[bucket_id] = bucket

self.clocks[id(bucket)] = clock
self.clocks[bucket_id] = clock

async def _leak(self, sync=True) -> None:
assert self.clocks
def deregister(self, bucket_id: int) -> bool:
"""Deregister a bucket"""
if self.sync_buckets and bucket_id in self.sync_buckets:
del self.sync_buckets[bucket_id]
assert self.clocks
del self.clocks[bucket_id]
return True

if self.async_buckets and bucket_id in self.async_buckets:
del self.async_buckets[bucket_id]
assert self.clocks
del self.clocks[bucket_id]

if not self.async_buckets and self.aio_leak_task:
self.aio_leak_task.cancel()
self.aio_leak_task = None

return True

while True:
buckets = self.sync_buckets if sync else self.async_buckets
assert buckets
return False

async def _leak(self, buckets: Dict[int, AbstractBucket]) -> None:
assert self.clocks

while buckets:
for bucket_id, bucket in list(buckets.items()):
clock = self.clocks[bucket_id]
now = clock.now()
Expand All @@ -163,21 +182,23 @@ async def _leak(self, sync=True) -> None:

assert isinstance(leak, int)

bucket_type = "sync" if sync else "async"
logger.debug("> Leaking (%s) bucket: %s, %s items", bucket_type, bucket, leak)

await asyncio.sleep(self.leak_interval / 1000)

def leak_async(self):
if self.async_buckets and not self.is_async_leak_started:
self.is_async_leak_started = True
asyncio.create_task(self._leak(sync=False))
if self.async_buckets and not self.aio_leak_task:
self.aio_leak_task = asyncio.create_task(self._leak(self.async_buckets))

def run(self) -> None:
""" Override the original method of Thread
Not meant to be called directly
"""
assert self.sync_buckets
asyncio.run(self._leak(sync=True))
asyncio.run(self._leak(self.sync_buckets))

def start(self) -> None:
""" Override the original method of Thread
Call to run leaking sync buckets
"""
if self.sync_buckets and not self.is_alive():
super().start()

Expand Down Expand Up @@ -234,7 +255,7 @@ def create(

def schedule_leak(self, new_bucket: AbstractBucket, associated_clock: AbstractClock) -> None:
"""Schedule all the buckets' leak, reset bucket's failing rate"""
assert new_bucket.rates
assert new_bucket.rates, "Bucket rates are not set"

if not self._leaker:
self._leaker = Leaker(self.leak_interval)
Expand All @@ -243,71 +264,32 @@ def schedule_leak(self, new_bucket: AbstractBucket, associated_clock: AbstractCl
self._leaker.start()
self._leaker.leak_async()

def get_buckets(self) -> List[AbstractBucket]:
"""Iterator over all buckets in the factory
"""
if not self._leaker:
return []

class BucketAsyncWrapper(AbstractBucket):
"""BucketAsyncWrapper is a wrapping over any bucket
that turns a async/synchronous bucket into an async one
"""

def __init__(self, bucket: AbstractBucket):
assert isinstance(bucket, AbstractBucket)
self.bucket = bucket

async def put(self, item: RateItem):
result = self.bucket.put(item)

while isawaitable(result):
result = await result

return result

async def count(self):
result = self.bucket.count()

while isawaitable(result):
result = await result

return result

async def leak(self, current_timestamp: Optional[int] = None) -> int:
result = self.bucket.leak(current_timestamp)

while isawaitable(result):
result = await result

assert isinstance(result, int)
return result

async def flush(self) -> None:
result = self.bucket.flush()

while isawaitable(result):
result = await result

return None

async def peek(self, index: int) -> Optional[RateItem]:
item = self.bucket.peek(index)
buckets = []

while isawaitable(item):
item = await item
if self._leaker.sync_buckets:
for _, bucket in self._leaker.sync_buckets.items():
buckets.append(bucket)

assert item is None or isinstance(item, RateItem)
return item
if self._leaker.async_buckets:
for _, bucket in self._leaker.async_buckets.items():
buckets.append(bucket)

async def waiting(self, item: RateItem) -> int:
wait = super().waiting(item)
return buckets

if isawaitable(wait):
wait = await wait
def dispose(self, bucket: Union[int, AbstractBucket]) -> bool:
"""Delete a bucket from the factory"""
if isinstance(bucket, AbstractBucket):
bucket = id(bucket)

assert isinstance(wait, int)
return wait
assert isinstance(bucket, int), "not valid bucket id"

@property
def failing_rate(self):
return self.bucket.failing_rate
if not self._leaker:
return False

@property
def rates(self):
return self.bucket.rates
return self._leaker.deregister(bucket)
76 changes: 76 additions & 0 deletions pyrate_limiter/abstracts/wrappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
""" Wrappers over different abstract types
"""
from inspect import isawaitable
from typing import Optional

from .bucket import AbstractBucket
from .rate import RateItem


class BucketAsyncWrapper(AbstractBucket):
"""BucketAsyncWrapper is a wrapping over any bucket
that turns a async/synchronous bucket into an async one
"""

def __init__(self, bucket: AbstractBucket):
assert isinstance(bucket, AbstractBucket)
self.bucket = bucket

async def put(self, item: RateItem):
result = self.bucket.put(item)

while isawaitable(result):
result = await result

return result

async def count(self):
result = self.bucket.count()

while isawaitable(result):
result = await result

return result

async def leak(self, current_timestamp: Optional[int] = None) -> int:
result = self.bucket.leak(current_timestamp)

while isawaitable(result):
result = await result

assert isinstance(result, int)
return result

async def flush(self) -> None:
result = self.bucket.flush()

while isawaitable(result):
result = await result

return None

async def peek(self, index: int) -> Optional[RateItem]:
item = self.bucket.peek(index)

while isawaitable(item):
item = await item

assert item is None or isinstance(item, RateItem)
return item

async def waiting(self, item: RateItem) -> int:
wait = super().waiting(item)

if isawaitable(wait):
wait = await wait

assert isinstance(wait, int)
return wait

@property
def failing_rate(self):
return self.bucket.failing_rate

@property
def rates(self):
return self.bucket.rates
Loading

0 comments on commit b309854

Please sign in to comment.