Skip to content

Commit

Permalink
Use anyio for timeout in dran test helper
Browse files Browse the repository at this point in the history
  • Loading branch information
nfcampos committed Sep 10, 2024
1 parent 8e390db commit 104eabe
Showing 1 changed file with 1 addition and 14 deletions.
15 changes: 1 addition & 14 deletions libs/scheduler-kafka/tests/drain.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import functools
from typing import Callable, Optional, TypeVar

import anyio
Expand All @@ -17,19 +16,6 @@
R = TypeVar("R")


def timeout(delay: int):
def decorator(func: Callable[C, R]) -> Callable[C, R]:
@functools.wraps(func)
async def new_func(*args: C.args, **kwargs: C.kwargs) -> R:
async with asyncio.timeout(delay):
return await func(*args, **kwargs)

return new_func

return decorator


@timeout(20)
async def drain_topics(
topics: Topics,
graph: Pregel,
Expand Down Expand Up @@ -79,6 +65,7 @@ async def poller(expected_next: tuple[str, ...]) -> None:

# run the orchestrator and executor until break_when
async with anyio.create_task_group() as tg:
tg.cancel_scope.deadline = anyio.current_time() + 20
scope = tg.cancel_scope
tg.start_soon(orchestrator, name="orchestrator")
tg.start_soon(executor, name="executor")
Expand Down

0 comments on commit 104eabe

Please sign in to comment.