Skip to content

Commit

Permalink
fix: as_yielded
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Oct 10, 2023
1 parent 8b10f78 commit 7c21716
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions a_sync/utils/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ async def exhaust_iterator(iterator: AsyncIterator[T], *, queue: Optional[asynci

async def exhaust_iterators(iterators, *, queue: Optional[asyncio.Queue] = None) -> None:
await asyncio.gather(*[exhaust_iterator(iterator, queue=queue) for iterator in iterators])
if queue:
queue.put_nowait(_Done())

T0 = TypeVar('T0')
T1 = TypeVar('T1')
Expand Down Expand Up @@ -58,16 +60,20 @@ async def as_yielded(*iterators: AsyncIterator[T]) -> AsyncIterator[T]:
def done_callback(t: asyncio.Task) -> None:
if t.exception() and not next_fut.done():
next_fut.set_exception(t.exception())

task.add_done_callback(done_callback)
while not task.done():
next_fut = asyncio.get_event_loop().create_future()
get_task = asyncio.create_task(coro=queue.get(), name=str(queue))
_chain_future(get_task, next_fut)
yield await next_fut
for next in queue.get_nowait(-1):
yield next

for item in (await next_fut, *queue.get_nowait(-1)):
if isinstance(item, _Done):
return
yield item

if e := task.exception():
get_task.cancel()
raise e

class _Done:
pass

0 comments on commit 7c21716

Please sign in to comment.