Skip to content

Commit

Permalink
Update all tests to new streaming API
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Apr 28, 2021
1 parent fdf21dd commit fdd7dd5
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 154 deletions.
16 changes: 9 additions & 7 deletions tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err):
async def main():
async with tractor.open_nursery() as nursery:

portal = await nursery.run_in_actor(assert_err, name='errorer', **args)
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)

# get result(s) from main task
try:
Expand Down Expand Up @@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method):
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
rpc_module_paths=[__name__],
enable_modules=[__name__],
)

# this async for loop streams values from the above
# async generator running in a separate process
async for letter in await portal.run(stream_forever):
print(letter)
async with portal.open_stream_from(stream_forever) as stream:
async for letter in stream:
print(letter)

# we support trio's cancellation system
assert cancel_scope.cancelled_caught
Expand Down Expand Up @@ -430,15 +433,14 @@ async def main():
tractor.run(main)



async def spin_for(period=3):
"Sync sleep."
time.sleep(period)


async def spawn():
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor(
await tn.run_in_actor(
spin_for,
name='sleeper',
)
Expand All @@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor(
await tn.run_in_actor(
spawn,
name='spawn',
)
Expand Down
97 changes: 50 additions & 47 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0):


async def stream_from(portal):
async for value in await portal.result():
print(value)
async with portal.open_stream_from(stream_forever) as stream:
async for value in stream:
print(value)


async def spawn_and_check_registry(
Expand Down Expand Up @@ -139,18 +140,20 @@ async def get_reg():
registry = await get_reg()
assert actor.uid in registry

if with_streaming:
to_run = stream_forever
else:
to_run = trio.sleep_forever
try:
async with tractor.open_nursery() as n:
async with trio.open_nursery() as trion:

async with trio.open_nursery() as trion:
try:
async with tractor.open_nursery() as n:
portals = {}
for i in range(3):
name = f'a{i}'
portals[name] = await n.run_in_actor(to_run, name=name)
if with_streaming:
portals[name] = await n.start_actor(
name=name, enable_modules=[__name__])

else: # no streaming
portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name)

# wait on last actor to come up
async with tractor.wait_for_actor(name):
Expand All @@ -171,19 +174,19 @@ async def get_reg():
trion.start_soon(cancel, use_signal, 1)

last_p = pts[-1]
async for value in await last_p.result():
print(value)
await stream_from(last_p)

else:
await cancel(use_signal)

finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)

# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry
# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry


@pytest.mark.parametrize('use_signal', [False, True])
Expand Down Expand Up @@ -260,36 +263,36 @@ async def close_chans_before_nursery(
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')

async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor(
stream_forever,
name='consumer1',
)
agen1 = await portal1.result()

portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
agen2 = await portal2.run(stream_forever)

async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])

async with (
portal1.open_stream_from(stream_forever) as agen1,
portal2.open_stream_from(stream_forever) as agen2,
):
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(.5)
await trio.sleep(1)

# all subactors should have de-registered
registry = await get_reg()
Expand Down
82 changes: 45 additions & 37 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,44 +61,47 @@ def pred(i):
return isinstance(i, int)

async with tractor.find_actor(pub_actor_name) as portal:
stream = await portal.run(
pubber,
topics=which,
seed=seed,
)
task_status.started(stream)
times = 10
count = 0
await stream.__anext__()
async for pkt in stream:
for topic, value in pkt.items():
assert pred(value)
count += 1
if count >= times:
break

await stream.aclose()

stream = await portal.run(
pubber,
topics=['odd'],
seed=seed,
)

await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async with (
portal.open_stream_from(
pubber,
topics=which,
seed=seed,
) as stream
):
task_status.started(stream)
times = 10
count = 0
await stream.__anext__()
async for pkt in stream:
for topic, value in pkt.items():
pass
# assert pred(value)
assert pred(value)
count += 1
if count >= times:
break
finally:

await stream.aclose()

async with (
portal.open_stream_from(
pubber,
topics=['odd'],
seed=seed,
) as stream
):
await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async for pkt in stream:
for topic, value in pkt.items():
pass
# assert pred(value)
count += 1
if count >= times:
break
finally:
await stream.aclose()


@tractor.msg.pub(tasks=['one', 'two'])
async def multilock_pubber(get_topics):
Expand Down Expand Up @@ -128,20 +131,25 @@ async def test_required_args(callwith_expecterror):
await func(**kwargs)
else:
async with tractor.open_nursery() as n:
# await func(**kwargs)
portal = await n.run_in_actor(
multilock_pubber,

portal = await n.start_actor(
name='pubber',
**kwargs
enable_modules=[__name__],
)

async with tractor.wait_for_actor('pubber'):
pass

await trio.sleep(0.5)

async for val in await portal.result():
assert val == {'doggy': 10}
async with portal.open_stream_from(
multilock_pubber,
**kwargs
) as stream:
async for val in stream:
assert val == {'doggy': 10}

await portal.cancel_actor()


@pytest.mark.parametrize(
Expand Down
Loading

0 comments on commit fdd7dd5

Please sign in to comment.