diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 42aec354d..eadcb44a2 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err): async def main(): async with tractor.open_nursery() as nursery: - portal = await nursery.run_in_actor(assert_err, name='errorer', **args) + portal = await nursery.run_in_actor( + assert_err, name='errorer', **args + ) # get result(s) from main task try: @@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method): async with tractor.open_nursery() as n: portal = await n.start_actor( 'donny', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(stream_forever): - print(letter) + async with portal.open_stream_from(stream_forever) as stream: + async for letter in stream: + print(letter) # we support trio's cancellation system assert cancel_scope.cancelled_caught @@ -430,7 +433,6 @@ async def main(): tractor.run(main) - async def spin_for(period=3): "Sync sleep." time.sleep(period) @@ -438,7 +440,7 @@ async def spin_for(period=3): async def spawn(): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( + await tn.run_in_actor( spin_for, name='sleeper', ) @@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep( async def main(): with trio.fail_after(2): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( + await tn.run_in_actor( spawn, name='spawn', ) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index eff289740..d1f84740b 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0): async def stream_from(portal): - async for value in await portal.result(): - print(value) + async with portal.open_stream_from(stream_forever) as stream: + async for value in stream: + print(value) async def spawn_and_check_registry( @@ -139,18 +140,20 @@ async def get_reg(): registry = await get_reg() assert actor.uid in registry - if with_streaming: - to_run = stream_forever - else: - to_run = trio.sleep_forever + try: + async with tractor.open_nursery() as n: + async with trio.open_nursery() as trion: - async with trio.open_nursery() as trion: - try: - async with tractor.open_nursery() as n: portals = {} for i in range(3): name = f'a{i}' - portals[name] = await n.run_in_actor(to_run, name=name) + if with_streaming: + portals[name] = await n.start_actor( + name=name, enable_modules=[__name__]) + + else: # no streaming + portals[name] = await n.run_in_actor( + trio.sleep_forever, name=name) # wait on last actor to come up async with tractor.wait_for_actor(name): @@ -171,19 +174,19 @@ async def get_reg(): trion.start_soon(cancel, use_signal, 1) last_p = pts[-1] - async for value in await last_p.result(): - print(value) + await stream_from(last_p) + else: await cancel(use_signal) - finally: - with trio.CancelScope(shield=True): - await trio.sleep(0.5) + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) - # all subactors should have de-registered - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -260,36 +263,36 @@ async def close_chans_before_nursery( get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') async with tractor.open_nursery() as tn: - portal1 = await tn.run_in_actor( - stream_forever, - name='consumer1', - ) - agen1 = await portal1.result() - - portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) - agen2 = await portal2.run(stream_forever) - - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() - - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block - - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() + portal1 = await tn.start_actor( + name='consumer1', enable_modules=[__name__]) + portal2 = await tn.start_actor( + 'consumer2', enable_modules=[__name__]) + + async with ( + portal1.open_stream_from(stream_forever) as agen1, + portal2.open_stream_from(stream_forever) as agen2, + ): + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() + + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() finally: with trio.CancelScope(shield=True): - await trio.sleep(.5) + await trio.sleep(1) # all subactors should have de-registered registry = await get_reg() diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 3fcb45dcd..48e65b26e 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -61,44 +61,47 @@ def pred(i): return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: - stream = await portal.run( - pubber, - topics=which, - seed=seed, - ) - task_status.started(stream) - times = 10 - count = 0 - await stream.__anext__() - async for pkt in stream: - for topic, value in pkt.items(): - assert pred(value) - count += 1 - if count >= times: - break - - await stream.aclose() - - stream = await portal.run( - pubber, - topics=['odd'], - seed=seed, - ) - - await stream.__anext__() - count = 0 - # async with aclosing(stream) as stream: - try: + async with ( + portal.open_stream_from( + pubber, + topics=which, + seed=seed, + ) as stream + ): + task_status.started(stream) + times = 10 + count = 0 + await stream.__anext__() async for pkt in stream: for topic, value in pkt.items(): - pass - # assert pred(value) + assert pred(value) count += 1 if count >= times: break - finally: + await stream.aclose() + async with ( + portal.open_stream_from( + pubber, + topics=['odd'], + seed=seed, + ) as stream + ): + await stream.__anext__() + count = 0 + # async with aclosing(stream) as stream: + try: + async for pkt in stream: + for topic, value in pkt.items(): + pass + # assert pred(value) + count += 1 + if count >= times: + break + finally: + await stream.aclose() + @tractor.msg.pub(tasks=['one', 'two']) async def multilock_pubber(get_topics): @@ -128,11 +131,10 @@ async def test_required_args(callwith_expecterror): await func(**kwargs) else: async with tractor.open_nursery() as n: - # await func(**kwargs) - portal = await n.run_in_actor( - multilock_pubber, + + portal = await n.start_actor( name='pubber', - **kwargs + enable_modules=[__name__], ) async with tractor.wait_for_actor('pubber'): @@ -140,8 +142,14 @@ async def test_required_args(callwith_expecterror): await trio.sleep(0.5) - async for val in await portal.result(): - assert val == {'doggy': 10} + async with portal.open_stream_from( + multilock_pubber, + **kwargs + ) as stream: + async for val in stream: + assert val == {'doggy': 10} + + await portal.cancel_actor() @pytest.mark.parametrize( diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 45fbd5b8d..9aba327f6 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -61,37 +61,38 @@ async def stream_from_single_subactor(stream_func): # no brokerd actor found portal = await nursery.start_actor( 'streamerd', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) seq = range(10) - stream = await portal.run( - stream_func, # one of the funcs above + async with portal.open_stream_from( + stream_func, sequence=list(seq), # has to be msgpack serializable - ) - # it'd sure be nice to have an asyncitertools here... - iseq = iter(seq) - ival = next(iseq) + ) as stream: - async for val in stream: - assert val == ival + # it'd sure be nice to have an asyncitertools here... + iseq = iter(seq) + ival = next(iseq) - try: - ival = next(iseq) - except StopIteration: - # should cancel far end task which will be - # caught and no error is raised - await stream.aclose() + async for val in stream: + assert val == ival + + try: + ival = next(iseq) + except StopIteration: + # should cancel far end task which will be + # caught and no error is raised + await stream.aclose() - await trio.sleep(0.3) + await trio.sleep(0.3) - try: - await stream.__anext__() - except StopAsyncIteration: - # stop all spawned subactors - await portal.cancel_actor() - # await nursery.cancel() + try: + await stream.__anext__() + except StopAsyncIteration: + # stop all spawned subactors + await portal.cancel_actor() + # await nursery.cancel() @pytest.mark.parametrize( @@ -132,7 +133,7 @@ async def aggregate(seed): # fork point portal = await nursery.start_actor( name=f'streamer_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) portals.append(portal) @@ -141,11 +142,14 @@ async def aggregate(seed): async def push_to_chan(portal, send_chan): async with send_chan: - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await send_chan.send(value) + + async with portal.open_stream_from( + stream_data, seed=seed, + ) as stream: + + async for value in stream: + # leverage trio's built-in backpressure + await send_chan.send(value) print(f"FINISHED ITERATING {portal.channel.uid}") @@ -183,22 +187,24 @@ async def a_quadruple_example(): seed = int(1e3) pre_start = time.time() - portal = await nursery.run_in_actor( - aggregate, - seed=seed, + portal = await nursery.start_actor( name='aggregator', + enable_modules=[__name__], ) start = time.time() # the portal call returns exactly what you'd expect # as if the remote "aggregate" function was called locally result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + + async with portal.open_stream_from(aggregate, seed=seed) as stream: + async for value in stream: + result_stream.append(value) print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") assert result_stream == list(range(seed)) + await portal.cancel_actor() return result_stream @@ -272,48 +278,55 @@ async def test_respawn_consumer_task( async with tractor.open_nursery() as n: - stream = await(await n.run_in_actor( + portal = await n.start_actor( + name='streamer', + enable_modules=[__name__] + ) + async with portal.open_stream_from( stream_data, seed=11, - name='streamer', - )).result() + ) as stream: + + expect = set(range(11)) + received = [] - expect = set(range(11)) - received = [] + # this is the re-spawn task routine + async def consume(task_status=trio.TASK_STATUS_IGNORED): + print('starting consume task..') + nonlocal stream - # this is the re-spawn task routine - async def consume(task_status=trio.TASK_STATUS_IGNORED): - print('starting consume task..') - nonlocal stream + with trio.CancelScope() as cs: + task_status.started(cs) - with trio.CancelScope() as cs: - task_status.started(cs) + # shield stream's underlying channel from cancellation + with stream.shield(): - # shield stream's underlying channel from cancellation - with stream.shield(): + async for v in stream: + print(f'from stream: {v}') + expect.remove(v) + received.append(v) - async for v in stream: - print(f'from stream: {v}') - expect.remove(v) - received.append(v) + print('exited consume') - print('exited consume') + async with trio.open_nursery() as ln: + cs = await ln.start(consume) - async with trio.open_nursery() as ln: - cs = await ln.start(consume) + while True: - while True: + await trio.sleep(0.1) - await trio.sleep(0.1) + if received[-1] % 2 == 0: - if received[-1] % 2 == 0: + print('cancelling consume task..') + cs.cancel() - print('cancelling consume task..') - cs.cancel() + # respawn + cs = await ln.start(consume) - # respawn - cs = await ln.start(consume) + if not expect: + print("all values streamed, BREAKING") + break - if not expect: - print("all values streamed, BREAKING") - break + # TODO: this is justification for a + # ``ActorNursery.stream_from_actor()`` helper? + await portal.cancel_actor()