From 63785eb9836468bb75c1f66dee8d42a5461fe5e2 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sat, 19 Aug 2017 19:59:04 -0700 Subject: [PATCH] Add nursery.start and nursery.start_soon start_soon is just a new name for spawn, except it doesn't return the new task (in preparation for gh-136, where we're going to stop emphasizing task objects in the main api) start is a major new feature: it provides a very simple way to start up a long running task, while blocking until it's finished whatever initialization it wants to do. At least... it's simple from the user's point of view. Internally it's quite tricky indeed. The whole _run.py file probably needs some refactoring and splitting up, but this is one of those cases where I think it's best to first get the new functionality working and nailed down, and then we can see what shape the new abstractions should be. Fixes gh-284. --- trio/_core/_run.py | 183 +++++++++++++++++++++++++++++++---- trio/_core/tests/conftest.py | 5 + trio/_core/tests/test_run.py | 159 ++++++++++++++++++++++++++++++ 3 files changed, 327 insertions(+), 20 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index bca1b50c3c..34ac17e95a 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -43,7 +43,8 @@ # namespaces. __all__ = [ "Task", "run", "open_nursery", "open_cancel_scope", "yield_briefly", - "current_task", "current_effective_deadline", "yield_if_cancelled" + "current_task", "current_effective_deadline", "yield_if_cancelled", + "STATUS_IGNORED" ] GLOBAL_RUN_CONTEXT = threading.local() @@ -156,6 +157,15 @@ def _remove_task(self, task): assert task._cancel_stack[-1] is self task._cancel_stack.pop() + # Used by the nursery.start trickiness + def _tasks_removed_by_adoption(self, tasks): + with self._might_change_effective_deadline(): + self._tasks.difference_update(tasks) + + # Used by the nursery.start trickiness + def _tasks_added_by_adoption(self, tasks): + self._tasks.update(tasks) + def _make_exc(self): exc = Cancelled() exc._scope = self @@ -192,6 +202,99 @@ def open_cancel_scope(*, deadline=inf, shield=False): ################################################################ +# This code needs to be read alongside the code from Nursery.start to make +# sense. +@attr.s(slots=True, cmp=False, hash=False, repr=False) +class _TaskStatus: + _old_nursery = attr.ib() + _new_nursery = attr.ib() + _called_started = attr.ib(default=False) + _value = attr.ib(default=None) + + def __repr__(self): + return "".format(id(self)) + + def started(self, value=None): + if self._called_started: + raise RuntimeError( + "called 'started' twice on the same task status" + ) + self._called_started = True + self._value = value + + # If the old nursery is cancelled, then quietly quit now; the child + # will eventually exit on its own, and we don't want to risk moving + # the children into a different scope while they might have + # propagating Cancelled exceptions that assume they're under the old + # scope. + if _pending_cancel_scope(self._old_nursery._cancel_stack) is not None: + return + + # if the new nursery is not accepting new children, then inject an + # error into the old nursery (which will implicitly cancel the child) + if self._new_nursery._closed: + + async def raise_new_nursery_closed(): + raise RuntimeError("Nursery is closed to new arrivals") + + self._old_nursery.start_soon(raise_new_nursery_closed) + return + + # otherwise, find all the tasks under the old nursery, and move them + # under the new nursery instead. This means: + # - changing parents of direct children + # - changing cancel stack of all direct+indirect children + # - changing cancel stack of all direct+indirect children's nurseries + # - checking for cancellation in all changed cancel stacks + old_stack = self._old_nursery._cancel_stack + new_stack = self._new_nursery._cancel_stack + # LIFO todo stack for depth-first traversal + todo = list(self._old_nursery._children) + munged_tasks = [] + while todo: + task = todo.pop() + # Direct children need to be reparented + if task._parent_nursery is self._old_nursery: + self._old_nursery._children.remove(task) + task._parent_nursery = self._new_nursery + self._new_nursery._children.add(task) + # Everyone needs their cancel scopes fixed up... + assert task._cancel_stack[:len(old_stack)] == old_stack + task._cancel_stack[:len(old_stack)] = new_stack + # ...and their nurseries' cancel scopes fixed up. + for nursery in task._child_nurseries: + assert nursery._cancel_stack[:len(old_stack)] == old_stack + nursery._cancel_stack[:len(old_stack)] = new_stack + # And then add all the nursery's children to our todo list + todo.extend(nursery._children) + # And make a note to check for cancellation later + munged_tasks.append(task) + + # Tell all the cancel scopes about the change. (There are probably + # some scopes in common between the two stacks, so some scopes will + # get the same tasks removed and then immediately re-added. This is + # fine though.) + for cancel_scope in old_stack: + cancel_scope._tasks_removed_by_adoption(munged_tasks) + for cancel_scope in new_stack: + cancel_scope._tasks_added_by_adoption(munged_tasks) + + # That should have removed all the children from the old nursery + assert not self._old_nursery._children + + # After all the delicate surgery is done, check for cancellation in + # all the tasks that had their cancel scopes munged. This can trigger + # arbitrary abort() callbacks, so we put it off until our internal + # data structures are all self-consistent again. + for task in munged_tasks: + task._attempt_delivery_of_any_pending_cancel() + + # And finally, we cancel the old nursery's scope, so that its + # __aexit__ notices that all the children are gone and it can exit. + # (This is a bit of a hack.) + self._old_nursery.cancel_scope.cancel() + + @acontextmanager @async_generator @enable_ki_protection @@ -223,13 +326,13 @@ async def open_nursery(): # async def __aenter__(self): # self._scope_manager = open_cancel_scope() # scope = self._scope_manager.__enter__() -# self._nursery = Nursery(current_task(), scope) -# return self._nursery +# self._parent_nursery = Nursery(current_task(), scope) +# return self._parent_nursery # # @enable_ki_protection # async def __aexit__(self, etype, exc, tb): # try: -# await self._nursery._clean_up(exc) +# await self._parent_nursery._clean_up(exc) # except BaseException as new_exc: # if not self._scope_manager.__exit__( # type(new_exc), new_exc, new_exc.__traceback__): @@ -250,6 +353,7 @@ def __init__(self, parent, cancel_scope): # the parent task -- only used for introspection, to implement # task.parent_task self._parent = parent + parent._child_nurseries.append(self) # the cancel stack that children inherit - we take a snapshot, so it # won't be affected by any changes in the parent. self._cancel_stack = list(parent._cancel_stack) @@ -275,9 +379,28 @@ def _child_finished(self, task): self._zombies.add(task) self.monitor.put_nowait(task) + def start_soon(self, async_fn, *args, name=None): + GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) + + # Returns the task, unlike start_soon + #@deprecated("nursery.spawn", version="0.2.0", "nursery.start_soon") def spawn(self, async_fn, *args, name=None): return GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) + async def start(self, async_fn, *args, name=None): + async with open_nursery() as old_nursery: + task_status = _TaskStatus(old_nursery, self) + thunk = functools.partial(async_fn, task_status=task_status) + old_nursery.start_soon(thunk, *args, name=name) + # If we get here, then the child either got reparented or exited + # normally. The complicated logic is all in __TaskStatus.started(). + # (Any exceptions propagate directly out of the above.) + if not task_status._called_started: + raise RuntimeError( + "child exited without calling task_status.started()" + ) + return task_status._value + def reap(self, task): try: self._zombies.remove(task) @@ -329,6 +452,8 @@ async def _clean_up(self, pending_exc): exceptions.append(exc) self._closed = True + popped = self._parent._child_nurseries.pop() + assert popped is self if exceptions: mexc = MultiError(exceptions) if (pending_exc and mexc.__cause__ is None @@ -363,9 +488,22 @@ def __del__(self): ################################################################ +def _pending_cancel_scope(cancel_stack): + # Return the outermost exception that is is not outside a shield. + pending_scope = None + for scope in cancel_stack: + # Check shield before _exc, because shield should not block + # processing of *this* scope's exception + if scope.shield: + pending_scope = None + if pending_scope is None and scope.cancel_called: + pending_scope = scope + return pending_scope + + @attr.s(slots=True, cmp=False, hash=False, repr=False) class Task: - _nursery = attr.ib() + _parent_nursery = attr.ib() coro = attr.ib() _runner = attr.ib() name = attr.ib() @@ -380,6 +518,9 @@ class Task: _next_send = attr.ib(default=None) _abort_func = attr.ib(default=None) + # For introspection and nursery.start() + _child_nurseries = attr.ib(default=attr.Factory(list)) + # Task-local values, see _local.py _locals = attr.ib(default=attr.Factory(dict)) @@ -399,10 +540,10 @@ def parent_task(self): Example use case: drawing a visualization of the task tree. """ - if self._nursery is None: + if self._parent_nursery is None: return None else: - return self._nursery._parent + return self._parent_nursery._parent ################ # Monitoring task exit @@ -469,16 +610,7 @@ async def wait(self): _cancel_stack = attr.ib(default=attr.Factory(list), repr=False) def _pending_cancel_scope(self): - # Return the outermost exception that is is not outside a shield. - pending_scope = None - for scope in self._cancel_stack: - # Check shield before _exc, because shield should not block - # processing of *this* scope's exception - if scope.shield: - pending_scope = None - if pending_scope is None and scope.cancel_called: - pending_scope = scope - return pending_scope + return _pending_cancel_scope(self._cancel_stack) def _attempt_abort(self, raise_cancel): # Either the abort succeeds, in which case we will reschedule the @@ -757,7 +889,7 @@ def _return_value_looks_like_wrong_library(value): name = "{}.{}".format(name.__module__, name.__qualname__) except AttributeError: name = repr(name) - task = Task(coro=coro, nursery=nursery, runner=self, name=name) + task = Task(coro=coro, parent_nursery=nursery, runner=self, name=name) self.tasks.add(task) if nursery is not None: nursery._children.add(task) @@ -784,11 +916,11 @@ def task_exited(self, task, result): while task._cancel_stack: task._cancel_stack[-1]._remove_task(task) self.tasks.remove(task) - if task._nursery is None: + if task._parent_nursery is None: # the init task should be the last task to exit assert not self.tasks else: - task._nursery._child_finished(task) + task._parent_nursery._child_finished(task) for monitor in task._monitors: monitor.put_nowait(task) task._monitors.clear() @@ -1443,6 +1575,17 @@ def run_impl(runner, async_fn, args): ################################################################ +class _StatusIgnored: + def __repr__(self): + return "STATUS_IGNORED" + + def started(self, value=None): + pass + + +STATUS_IGNORED = _StatusIgnored() + + def current_task(): """Return the :class:`Task` object representing the current task. diff --git a/trio/_core/tests/conftest.py b/trio/_core/tests/conftest.py index a60b6d16a2..551e650456 100644 --- a/trio/_core/tests/conftest.py +++ b/trio/_core/tests/conftest.py @@ -10,6 +10,11 @@ def mock_clock(): return MockClock() +@pytest.fixture +def autojump_clock(): + return MockClock(autojump_threshold=0) + + # FIXME: split off into a package (or just make part of trio's public # interface?), with config file to enable? and I guess a mark option too; I # guess it's useful with the class- and file-level marking machinery (where diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 48a96141a4..5b32888193 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -1625,3 +1625,162 @@ async def trivial(): assert t.result is not None with assert_yields(): await t.wait() + + +async def test_nursery_start(autojump_clock): + async def no_args(): # pragma: no cover + pass + + # Errors in calling convention get raised immediately from start + async with _core.open_nursery() as nursery: + with pytest.raises(TypeError): + await nursery.start(no_args) + + async def sleep_then_start(seconds, *, task_status=_core.STATUS_IGNORED): + await sleep(seconds) + task_status.started(seconds) + await sleep(seconds) + + # Basic happy-path check: start waits for the task to call started(), then + # returns, passes back the value, and the given nursery then waits for it + # to exit. + for seconds in [1, 2]: + async with _core.open_nursery() as nursery: + assert len(nursery.children) == 0 + t0 = _core.current_time() + assert await nursery.start(sleep_then_start, seconds) == seconds + assert _core.current_time() - t0 == seconds + assert len(nursery.children) == 1 + assert _core.current_time() - t0 == 2 * seconds + + # Make sure STATUS_IGNORED works so task function can be called directly + t0 = _core.current_time() + await sleep_then_start(3) + assert _core.current_time() - t0 == 2 * 3 + + # calling started twice + async def double_started(task_status=_core.STATUS_IGNORED): + task_status.started() + with pytest.raises(RuntimeError): + task_status.started() + + async with _core.open_nursery() as nursery: + await nursery.start(double_started) + + # child crashes before calling started -> error comes out of .start() + async def raise_keyerror(task_status=_core.STATUS_IGNORED): + raise KeyError("oops") + + async with _core.open_nursery() as nursery: + with pytest.raises(KeyError): + await nursery.start(raise_keyerror) + + # child exiting cleanly before calling started -> triggers a RuntimeError + async def nothing(task_status=_core.STATUS_IGNORED): + return + + async with _core.open_nursery() as nursery: + with pytest.raises(RuntimeError) as excinfo: + await nursery.start(nothing) + assert "exited without calling" in str(excinfo.value) + + # if the call to start() is cancelled, then the call to started() does + # nothing -- the child keeps executing under start(). The value it passed + # is ignored; start() raises Cancelled. + async def just_started(task_status=_core.STATUS_IGNORED): + task_status.started("hi") + + async with _core.open_nursery() as nursery: + with _core.open_cancel_scope() as cs: + cs.cancel() + with pytest.raises(_core.Cancelled): + await nursery.start(just_started) + + # and if after the no-op started(), the child crashes, the error comes out + # of start() + async def raise_keyerror_after_started(task_status=_core.STATUS_IGNORED): + task_status.started() + raise KeyError("whoopsiedaisy") + + async with _core.open_nursery() as nursery: + with _core.open_cancel_scope() as cs: + cs.cancel() + with pytest.raises(_core.MultiError) as excinfo: + await nursery.start(raise_keyerror_after_started) + assert set(type(e) for e in excinfo.value.exceptions) == { + _core.Cancelled, KeyError} + + # trying to start in a closed nursery raises an error, but not until + # calling task_status.started(), at which point the child is cancelled + async with _core.open_nursery() as closed_nursery: + pass + t0 = _core.current_time() + with pytest.raises(RuntimeError): + await closed_nursery.start(sleep_then_start, 7) + # Should have done the first sleep, but then the second sleep was + # cancelled: + assert _core.current_time() - t0 == 7 + + # and if the task being put into the closed nursery *also* raises an + # error, then again, both come out of start() + with pytest.raises(_core.MultiError) as excinfo: + await closed_nursery.start(raise_keyerror_after_started) + assert set(type(e) for e in excinfo.value.exceptions) == { + RuntimeError, KeyError} + + # if start() is cancelled, then that takes priority -- we don't notice + # that the new nursery is closed + with _core.open_cancel_scope() as cs: + cs.cancel() + with pytest.raises(_core.MultiError) as excinfo: + await closed_nursery.start(raise_keyerror_after_started) + assert set(type(e) for e in excinfo.value.exceptions) == { + _core.Cancelled, KeyError} + + +async def test_task_nursery_stack(): + task = _core.current_task() + assert task._child_nurseries == [] + async with _core.open_nursery() as nursery1: + assert task._child_nurseries == [nursery1] + with pytest.raises(KeyError): + async with _core.open_nursery() as nursery2: + assert task._child_nurseries == [nursery1, nursery2] + raise KeyError + assert task._child_nurseries == [nursery1] + assert task._child_nurseries == [] + + +async def test_nursery_start_with_cancelled_nursery(): + # This function isn't testing task_status, it's using task_status as a + # convenient way to get a nursery that we can test spawning stuff into. + async def setup_nursery(task_status=_core.STATUS_IGNORED): + async with _core.open_nursery() as nursery: + task_status.started(nursery) + await sleep_forever() + + # Calls started() while children are asleep, so we can make sure + # that the cancellation machinery notices and aborts when a sleeping task + # is moved into a cancelled scope. + async def sleeping_children(fn, *, task_status=_core.STATUS_IGNORED): + async with _core.open_nursery() as nursery: + nursery.start_soon(sleep_forever) + nursery.start_soon(sleep_forever) + await wait_all_tasks_blocked() + fn() + task_status.started() + + # Cancelling the setup_nursery just *before* calling started() + async with _core.open_nursery() as nursery: + target_nursery = await nursery.start(setup_nursery) + await target_nursery.start(sleeping_children, target_nursery.cancel_scope.cancel) + + # Cancelling the setup_nursery just *after* calling started() + async with _core.open_nursery() as nursery: + target_nursery = await nursery.start(setup_nursery) + await target_nursery.start(sleeping_children, lambda: None) + target_nursery.cancel_scope.cancel() + + + +# maybe start() should hold the new nursery open?