Skip to content

Commit

Permalink
chore: black .
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] authored and BobTheBuidler committed Nov 22, 2024
1 parent 09a7432 commit 3ec1ec7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 53 deletions.
11 changes: 9 additions & 2 deletions a_sync/primitives/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

class _Queue(asyncio.Queue, Generic[T]):
__slots__ = (
"_queue",
"_queue",
"_maxsize",
"_loop",
"_getters",
Expand All @@ -42,7 +42,14 @@ class _Queue(asyncio.Queue, Generic[T]):
else:

class _Queue(asyncio.Queue[T]):
__slots__ = "_queue", "_maxsize", "_getters", "_putters", "_unfinished_tasks", "_finished"
__slots__ = (
"_queue",
"_maxsize",
"_getters",
"_putters",
"_unfinished_tasks",
"_finished",
)


class Queue(_Queue[T]):
Expand Down
129 changes: 78 additions & 51 deletions tests/primitives/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,93 +2,105 @@
import asyncio
from a_sync.primitives.queue import Queue


@pytest.mark.asyncio_cooperative
async def test_queue_initialization():
queue = Queue()
assert isinstance(queue, Queue)


@pytest.mark.asyncio_cooperative
async def test_put_and_get():
queue = Queue()
await queue.put('item1')
await queue.put("item1")
result = await queue.get()
assert result == 'item1'
assert result == "item1"


@pytest.mark.asyncio_cooperative
async def test_put_nowait_and_get_nowait():
queue = Queue()
queue.put_nowait('item2')
queue.put_nowait("item2")
result = queue.get_nowait()
assert result == 'item2'
assert result == "item2"

with pytest.raises(asyncio.QueueEmpty):
queue.get_nowait()


@pytest.mark.asyncio_cooperative
async def test_get_all():
queue = Queue()
await queue.put('item3')
await queue.put('item4')
await queue.put("item3")
await queue.put("item4")
result = await queue.get_all()
assert result == ['item3', 'item4']
assert result == ["item3", "item4"]


@pytest.mark.asyncio_cooperative
async def test_get_all_nowait():
queue = Queue()
queue.put_nowait('item5')
queue.put_nowait('item6')
queue.put_nowait("item5")
queue.put_nowait("item6")
result = queue.get_all_nowait()
assert result == ['item5', 'item6']
assert result == ["item5", "item6"]

with pytest.raises(asyncio.QueueEmpty):
queue.get_all_nowait()


@pytest.mark.asyncio_cooperative
async def test_get_multi():
queue = Queue()
await queue.put('item7')
await queue.put('item8')
await queue.put("item7")
await queue.put("item8")
result = await queue.get_multi(2)
assert result == ['item7', 'item8']
assert result == ["item7", "item8"]


@pytest.mark.asyncio_cooperative
async def test_get_multi_nowait():
queue = Queue()
queue.put_nowait('item9')
queue.put_nowait('item10')
queue.put_nowait("item9")
queue.put_nowait("item10")
result = queue.get_multi_nowait(2)
assert result == ['item9', 'item10']
assert result == ["item9", "item10"]

with pytest.raises(ValueError, match="`i` must be an integer greater than 1. You passed 1"):
with pytest.raises(
ValueError, match="`i` must be an integer greater than 1. You passed 1"
):
queue.get_multi_nowait(1)
with pytest.raises(asyncio.QueueEmpty):
queue.get_multi_nowait(2)


@pytest.mark.asyncio_cooperative
async def test_queue_length():
queue = Queue()
await queue.put('item11')
await queue.put("item11")
assert not queue.empty()
assert len(queue) == 1
await queue.get()
assert len(queue) == 0


@pytest.mark.asyncio_cooperative
async def test_concurrent_access():
queue = Queue()
results = []

async def producer():
for i in range(5):
await queue.put(f'item{i}')
await queue.put(f"item{i}")

async def consumer():
for _ in range(5):
item = await queue.get()
results.append(item)

await asyncio.gather(producer(), consumer())
assert results == [f'item{i}' for i in range(5)]
assert results == [f"item{i}" for i in range(5)]


@pytest.mark.asyncio_cooperative
async def test_empty_queue_behavior():
Expand All @@ -99,9 +111,10 @@ async def consumer():

consumer_task = asyncio.create_task(consumer())
await asyncio.sleep(0.1) # Ensure the consumer is waiting
await queue.put('item1')
await queue.put("item1")
result = await consumer_task
assert result == 'item1'
assert result == "item1"


@pytest.mark.asyncio_cooperative
async def test_cancellation():
Expand All @@ -111,39 +124,42 @@ async def consumer():
try:
await queue.get()
except asyncio.CancelledError:
return 'cancelled'
return "cancelled"

consumer_task = asyncio.create_task(consumer())
await asyncio.sleep(0.1) # Ensure the consumer is waiting
consumer_task.cancel()
result = await consumer_task
assert result == 'cancelled'
assert result == "cancelled"


@pytest.mark.asyncio_cooperative
async def test_invalid_get_multi():
queue = Queue()
with pytest.raises(ValueError):
await queue.get_multi(-1)


@pytest.mark.asyncio_cooperative
async def test_type_consistency():
queue = Queue()
await queue.put(1)
await queue.put('string')
await queue.put("string")
await queue.put(None)

assert await queue.get() == 1
assert await queue.get() == 'string'
assert await queue.get() == "string"
assert await queue.get() is None


@pytest.mark.asyncio_cooperative
async def test_stress_testing():
queue = Queue()
num_items = 1000

async def producer():
for i in range(num_items):
await queue.put(f'item{i}')
await queue.put(f"item{i}")

async def consumer():
for _ in range(num_items):
Expand All @@ -152,26 +168,29 @@ async def consumer():
await asyncio.gather(producer(), consumer())
assert len(queue) == 0


@pytest.mark.asyncio_cooperative
async def test_order_preservation():
queue = Queue()
items = ['item1', 'item2', 'item3']
items = ["item1", "item2", "item3"]
for item in items:
await queue.put(item)

for item in items:
assert await queue.get() == item


@pytest.mark.asyncio_cooperative
async def test_edge_case_values():
queue = Queue()
await queue.put(None)
await queue.put('')
await queue.put(' ')
await queue.put("")
await queue.put(" ")

assert await queue.get() is None
assert await queue.get() == ''
assert await queue.get() == ' '
assert await queue.get() == ""
assert await queue.get() == " "


@pytest.mark.asyncio_cooperative
async def test_timeout_on_get():
Expand All @@ -181,10 +200,11 @@ async def consumer():
try:
await asyncio.wait_for(queue.get(), timeout=0.1)
except asyncio.TimeoutError:
return 'timeout'
return "timeout"

result = await consumer()
assert result == 'timeout'
assert result == "timeout"


@pytest.mark.asyncio_cooperative
async def test_exception_handling_in_callbacks():
Expand All @@ -197,25 +217,27 @@ async def faulty_consumer():
except ValueError as e:
return str(e)

await queue.put('item')
await queue.put("item")
result = await faulty_consumer()
assert result == "Intentional error"


@pytest.mark.asyncio_cooperative
async def test_memory_usage():
queue = Queue()
large_object = 'x' * 10**6 # 1 MB string
large_object = "x" * 10**6 # 1 MB string
await queue.put(large_object)
result = await queue.get()
assert result == large_object


@pytest.mark.asyncio_cooperative
async def test_thread_safety():
queue = Queue()

async def producer():
for i in range(5):
await queue.put(f'item{i}')
await queue.put(f"item{i}")

async def consumer():
results = []
Expand All @@ -229,6 +251,7 @@ async def consumer():
await asyncio.gather(producer_task, consumer_task)
assert len(queue) == 0


@pytest.mark.asyncio_cooperative
async def test_custom_object_handling():
class CustomObject:
Expand All @@ -247,14 +270,16 @@ def __eq__(self, other):
assert await queue.get() == obj1
assert await queue.get() == obj2


@pytest.mark.asyncio_cooperative
async def test_queue_capacity():
queue = Queue(maxsize=2)
await queue.put('item1')
await queue.put('item2')
await queue.put("item1")
await queue.put("item2")
assert queue.full()
with pytest.raises(asyncio.QueueFull):
queue.put_nowait('item3')
queue.put_nowait("item3")


@pytest.mark.asyncio_cooperative
async def test_performance_under_load():
Expand All @@ -263,7 +288,7 @@ async def test_performance_under_load():

async def producer():
for i in range(num_items):
await queue.put(f'item{i}')
await queue.put(f"item{i}")

async def consumer():
for _ in range(num_items):
Expand All @@ -272,11 +297,12 @@ async def consumer():
await asyncio.gather(producer(), consumer())
assert len(queue) == 0


@pytest.mark.asyncio_cooperative
async def test_state_persistence():
queue = Queue()
await queue.put('item1')
await queue.put('item2')
await queue.put("item1")
await queue.put("item2")

# Simulate state persistence
state = list(queue._queue)
Expand All @@ -285,16 +311,17 @@ async def test_state_persistence():
restored_queue = Queue()
restored_queue._queue.extend(state)

assert await restored_queue.get() == 'item1'
assert await restored_queue.get() == 'item2'
assert await restored_queue.get() == "item1"
assert await restored_queue.get() == "item2"


@pytest.mark.asyncio_cooperative
async def test_unusual_data_types():
queue = Queue()
await queue.put({'key': 'value'})
await queue.put(['list', 'of', 'items'])
await queue.put(('tuple', 'of', 'items'))
await queue.put({"key": "value"})
await queue.put(["list", "of", "items"])
await queue.put(("tuple", "of", "items"))

assert await queue.get() == {'key': 'value'}
assert await queue.get() == ['list', 'of', 'items']
assert await queue.get() == ('tuple', 'of', 'items')
assert await queue.get() == {"key": "value"}
assert await queue.get() == ["list", "of", "items"]
assert await queue.get() == ("tuple", "of", "items")

0 comments on commit 3ec1ec7

Please sign in to comment.