Skip to content

Latest commit

 

History

History
539 lines (399 loc) · 18.3 KB

README.rst

File metadata and controls

539 lines (399 loc) · 18.3 KB
https://coveralls.io/repos/github/cjrh/aiorun/badge.svg?branch=master This project uses calendar-based versioning scheme Downloads This project uses the "black" style formatter for Python code

Here's the big idea (how you use it):

import asyncio
from aiorun import run

async def main():
    # Put your application code here
    await asyncio.sleep(1.0)

if __name__ == '__main__':
    run(main())

This package provides a run() function as the starting point of your asyncio-based application. The run() function will run forever. If you want to shut down when main() completes, just call loop.stop() inside it: that will initiate shutdown.

Warning

Note that aiorun.run(coro) will run forever, unlike the standard library's asyncio.run() helper. You can call aiorun.run() without a coroutine parameter, and it will still run forever.

This is surprising to many people, because they sometimes expect that unhandled exceptions should abort the program, with an exception and a traceback. If you want this behaviour, please see the section on error handling further down.

Warning

Note that aiorun.run(coro) will create a new event loop instance every time it is invoked (same as asyncio.run). This might cause confusing errors if your code interacts with the default event loop instance provided by the stdlib asyncio library. For such situations you can provide the actual loop you're using with aiorun.run(coro, loop=loop). There is more info about this further down.

However, generally speaking, configuring your own loop and providing it in this way is a code smell. You will find it much easier to reason about your code if you do all your task creation inside an async context, such as within an async def function, because then there will no ambiguity about which event loop is in play: it will always be the one returned by asyncio.get_running_loop().

The run() function will handle everything that normally needs to be done during the shutdown sequence of the application. All you need to do is write your coroutines and run them.

So what the heck does run() do exactly?? It does these standard, idiomatic actions for asyncio apps:

  • creates a Task for the given coroutine (schedules it on the event loop),
  • calls loop.run_forever(),
  • adds default (and smart) signal handlers for both SIGINT and SIGTERM that will stop the loop;
  • and when the loop stops (either by signal or called directly), then it will...
  • ...gather all outstanding tasks,
  • cancel them using task.cancel(),
  • resume running the loop until all those tasks are done,
  • wait for the executor to complete shutdown, and
  • finally close the loop.

All of this stuff is boilerplate that you will never have to write again. So, if you use aiorun this is what you need to remember:

  • Spawn all your work from a single, starting coroutine
  • When a shutdown signal is received, all currently-pending tasks will have CancelledError raised internally. It's up to you whether you want to handle this inside each coroutine with a try/except or not.
  • If you want to protect coros from cancellation, see shutdown_waits_for() further down.
  • Try to have executor jobs be shortish, since the shutdown process will wait for them to finish. If you need a long-running thread or process tasks, use a dedicated thread/subprocess and set daemon=True instead.

There's not much else to know for general use. aiorun has a few special tools that you might need in unusual circumstances. These are discussed next.

You will see in many examples online that for servers, startup happens in several run_until_complete() phases before the primary run_forever() which is the "main" running part of the program. How do we handle that with aiorun?

Let's recreate the echo client & server examples from the Standard Library documentation:

Client:

# echo_client.py
import asyncio
from aiorun import run

async def tcp_echo_client(message):
    # Same as original!
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
    print('Send: %r' % message)
    writer.write(message.encode())
    data = await reader.read(100)
    print('Received: %r' % data.decode())
    print('Close the socket')
    writer.close()
    asyncio.get_event_loop().stop()  # Exit after one msg like original

message = 'Hello World!'
run(tcp_echo_client(message))

Server:

import asyncio
from aiorun import run

async def handle_echo(reader, writer):
    # Same as original!
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))
    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()
    print("Close the client socket")
    writer.close()

async def main():
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    async with server:
        await server.serve_forever()

run(main())

It works the same as the original examples, except you see this when you hit CTRL-C on the server instance:

$ python echo_server.py
Running forever.
Serving on ('127.0.0.1', 8888)
Received 'Hello World!' from ('127.0.0.1', 57198)
Send: 'Hello World!'
Close the client socket
^CStopping the loop
Entering shutdown phase.
Cancelling pending tasks.
Cancelling task:  <Task pending coro=[...snip...]>
Running pending tasks till complete
Waiting for executor shutdown.
Leaving. Bye!

Task gathering, cancellation, and executor shutdown all happen automatically.

Unlike the standard library's asyncio.run() method, aiorun.run will run forever, and does not stop on unhandled exceptions. This is partly because we predate the standard library method, during the time in which run_forever() was actually the recommended API for servers, and partly because it can make sense for long-lived servers to be resilient to unhandled exceptions. For example, if 99% of your API works fine, but the one new endpoint you just added has a bug: do you really want that one new endpoint to crash-loop your deployed service?

Nevertheless, not all usages of aiorun are long-lived servers, so some users would prefer that aiorun.run() crash on an unhandled exception, just like any normal Python program. For this, we have an extra parameter that enables it:

# stop_demo.py
from aiorun import run

async def main():
    raise Exception('ouch')

if __name__ == '__main__':
    run(main(), stop_on_unhandled_errors=True)

This produces the following output:

$ python stop_demo.py
Unhandled exception; stopping loop.
Traceback (most recent call last):
  File "/opt/project/examples/stop_unhandled.py", line 9, in <module>
    run(main(), stop_on_unhandled_errors=True)
  File "/opt/project/aiorun.py", line 294, in run
    raise pending_exception_to_raise
  File "/opt/project/aiorun.py", line 206, in new_coro
    await coro
  File "/opt/project/examples/stop_unhandled.py", line 5, in main
    raise Exception("ouch")
Exception: ouch

Error handling scenarios can get very complex, and I suggest that you try to keep your error handling as simple as possible. Nevertheless, sometimes people have special needs that require some complexity, so let's look at a few scenarios where error-handling considerations can be more challenging.

aiorun.run() can also be started without an initial coroutine, in which case any other created tasks still run as normal; in this case exceptions still abort the program if the parameter is supplied:

import asyncio
from aiorun import run


async def job():
    raise Exception("ouch")


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.create_task(job())

    run(loop=loop, stop_on_unhandled_errors=True)

The output is the same as the previous program. In this second example, we made a our own loop instance and passed that to run(). It is also possible to configure your exception handler on the loop, but if you do this the stop_on_unhandled_errors parameter is no longer allowed:

import asyncio
from aiorun import run


async def job():
    raise Exception("ouch")


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.create_task(job())
    loop.set_exception_handler(lambda loop, context: "Error")

    run(loop=loop, stop_on_unhandled_errors=True)

But this is not allowed:

Traceback (most recent call last):
  File "/opt/project/examples/stop_unhandled_illegal.py", line 15, in <module>
    run(loop=loop, stop_on_unhandled_errors=True)
  File "/opt/project/aiorun.py", line 171, in run
    raise Exception(
Exception: If you provide a loop instance, and you've configured a
custom exception handler on it, then the 'stop_on_unhandled_errors'
parameter is unavailable (all exceptions will be handled).
/usr/local/lib/python3.8/asyncio/base_events.py:633:
    RuntimeWarning: coroutine 'job' was never awaited

Remember that the parameter stop_on_unhandled_errors is just a convenience. If you're going to go to the trouble of making your own loop instance anyway, you can stop the loop yourself inside your own exception handler just fine, and then you no longer need to set stop_on_unhandled_errors:

# custom_stop.py
import asyncio
from aiorun import run


async def job():
    raise Exception("ouch")


async def other_job():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("other_job was cancelled!")


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.create_task(job())
    loop.create_task(other_job())

    def handler(loop, context):
        # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_exception_handler
        print(f'Stopping loop due to error: {context["exception"]} ')
        loop.stop()

    loop.set_exception_handler(handler=handler)

    run(loop=loop)

In this example, we schedule two jobs on the loop. One of them raises an exception, and you can see in the output that the other job was still cancelled during shutdown as expected (which is what you expect aiorun to do!):

$ python custom_stop.py
Stopping loop due to error: ouch
other_job was cancelled!

Note however that in this situation the exception is being handled by your custom exception handler, and does not bubble up out of the run() like you saw in earlier examples. If you want to do something with that exception, like reraise it or something, you need to capture it inside your custom exception handler and then do something with it, like add it to a list that you check after run() completes, and then reraise there or something similar.

💨 Do you like uvloop?

import asyncio
from aiorun import run

async def main():
    <snip>

if __name__ == '__main__':
    run(main(), use_uvloop=True)

Note that you have to pip install uvloop yourself.

It's unusual, but sometimes you're going to want a coroutine to not get interrupted by cancellation during the shutdown sequence. You'll look in the official docs and find asyncio.shield().

Unfortunately, shield() doesn't work in shutdown scenarios because the protection offered by shield() only applies if the specific coroutine inside which the shield() is used, gets cancelled directly.

Let me explain: if you do a conventional shutdown sequence (like aiorun is doing internally), this is the sequence of steps:

  • tasks = all_tasks(), followed by
  • [t.cancel() for t in tasks], and then
  • run_until_complete(gather(*tasks))

The way shield() works internally is it creates a secret, inner task—which also gets included in the all_tasks() call above! Thus it also receives a cancellation exception just like everything else.

Therefore, we have an alternative version of shield() that works better for us: shutdown_waits_for(). If you've got a coroutine that must not be cancelled during the shutdown sequence, just wrap it in shutdown_waits_for()!

Here's an example:

import asyncio
from aiorun import run, shutdown_waits_for

async def corofn():
    for i in range(10):
        print(i)
        await asyncio.sleep(1)
    print('done!')

async def main():
    try:
        await shutdown_waits_for(corofn())
    except asyncio.CancelledError:
        print('oh noes!')

run(main())

If you hit CTRL-C before 10 seconds has passed, you will see oh noes! printed immediately, and then after 10 seconds (since start), done! is printed, and thereafter the program exits.

Output:

$ python testshield.py
0
1
2
3
4
^CStopping the loop
oh noes!
5
6
7
8
9
done!

Behind the scenes, all_tasks() would have been cancelled by CTRL-C, except ones wrapped in shutdown_waits_for() calls. In this respect, it is loosely similar to asyncio.shield(), but with special applicability to our shutdown scenario in aiorun().

Be careful with this: the coroutine should still finish up at some point. The main use case for this is short-lived tasks that you don't want to write explicit cancellation handling.

Oh, and you can use shutdown_waits_for() as if it were asyncio.shield() too. For that use-case it works the same. If you're using aiorun, there is no reason to use shield().

aiorun also supports Windows! Kinda. Sorta. The root problem with Windows, for a thing like aiorun is that Windows doesn't support signal handling the way Linux or Mac OS X does. Like, at all.

For Linux, aiorun does "the right thing" out of the box for the SIGINT and SIGTERM signals; i.e., it will catch them and initiate a safe shutdown process as described earlier. However, on Windows, these signals don't work.

There are two signals that work on Windows: the CTRL-C signal (happens when you press, unsurprisingly, CTRL-C, and the CTRL-BREAK signal which happens when you...well, you get the picture.

The good news is that, for aiorun, both of these will work. Yay! The bad news is that for them to work, you have to run your code in a Console window. Boo!

Fortunately, it turns out that you can run an asyncio-based process not attached to a Console window, e.g. as a service or a subprocess, and have it also receive a signal to safely shut down in a controlled way. It turns out that it is possible to send a CTRL-BREAK signal to another process, with no console window involved, but only as long as that process was created in a particular way and---here is the drop---this targetted process is a child process of the one sending the signal. Yeah, I know, it's a downer.

There is an example of how to do this in the tests:

import subprocess as sp

proc = sp.Popen(
    ['python', 'app.py'],
    stdout=sp.PIPE,
    stderr=sp.STDOUT,
    creationflags=sp.CREATE_NEW_PROCESS_GROUP
)
print(proc.pid)

Notice how we print out the process id (pid). Then you can send that process the signal from a completely different process, once you know the pid:

import os, signal

os.kill(pid, signal.CTRL_BREAK_EVENT)

(Remember, os.kill() doesn't actually kill, it only sends a signal)

aiorun supports this use-case above, although I'll be pretty surprised if anyone actually uses it to manage microservices (does anyone do this?)

So to summarize: aiorun will do a controlled shutdown if either CTRL-C or CTRL-BREAK is entered via keyboard in a Console window with a running instance, or if the CTRL-BREAK signal is sent to a subprocess that was created with the CREATE_NEW_PROCESS_GROUP flag set. Here is a much more detailed explanation of these issues.

Finally, uvloop is not yet supported on Windows so that won't work either.

At the very least, aiorun will, well, run on Windows ¯\_(ツ)_/¯