From 104eabeca8e082a7fa5a66b161b462b88a1cd045 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 10 Sep 2024 16:10:03 -0700 Subject: [PATCH] Use anyio for timeout in dran test helper --- libs/scheduler-kafka/tests/drain.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/libs/scheduler-kafka/tests/drain.py b/libs/scheduler-kafka/tests/drain.py index b6199adbb0..2a6ed426bd 100644 --- a/libs/scheduler-kafka/tests/drain.py +++ b/libs/scheduler-kafka/tests/drain.py @@ -1,5 +1,4 @@ import asyncio -import functools from typing import Callable, Optional, TypeVar import anyio @@ -17,19 +16,6 @@ R = TypeVar("R") -def timeout(delay: int): - def decorator(func: Callable[C, R]) -> Callable[C, R]: - @functools.wraps(func) - async def new_func(*args: C.args, **kwargs: C.kwargs) -> R: - async with asyncio.timeout(delay): - return await func(*args, **kwargs) - - return new_func - - return decorator - - -@timeout(20) async def drain_topics( topics: Topics, graph: Pregel, @@ -79,6 +65,7 @@ async def poller(expected_next: tuple[str, ...]) -> None: # run the orchestrator and executor until break_when async with anyio.create_task_group() as tg: + tg.cancel_scope.deadline = anyio.current_time() + 20 scope = tg.cancel_scope tg.start_soon(orchestrator, name="orchestrator") tg.start_soon(executor, name="executor")