Skip to content

Latest commit

 

History

History
199 lines (172 loc) · 10.2 KB

asyncio-futures-tasks-coroutines.md

File metadata and controls

199 lines (172 loc) · 10.2 KB

引言

协程语法 中我们介绍了生成器和协程的语法原理。本章开始介绍TasksFuturesCoroutines相关原理。

asyncio基本框架

asyncio库的核心是事件循环,调度的基本单位是TaskTaskFuture 的子类。 异步编程中,Future用来表示未来的结果(Future对象)。协程是用户编写的具体业务程序。 asyncio 运行一个协程会首先将其包装为一个 Task给事件循环调度。

事件循环类比为操作系统,协程(内部其实被包装为Task)类比为线程或进程。

Coroutines

协程建议使用 async/await 语法声明。下面是一个简单的协程样例:

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

asyncio.run(main())

协程不能通过调用而单独运行。协程必须交给事件循环调度运行,就好比线程或进程必须交给操作系统调度运行一样。asyncio提供了asyncio.run入口来运行协程。 asyncio.run的工作原理会在调度原理中详细说明。

Futures

Future是一个占位符,表示一个任务执行的未来结果,通常用于异步编程中。它也是一个有状态的容器,初始状态是PENDING,终态是CANCELLED/FINISHEDFuture对象详细介绍了Future的实现理念。

因为在编程逻辑中,其它执行体如线程、进程、协程等需要等待某一个任务的执行结果,所以Future必须是一个可等待对象。 在基于线程、进程异步编程中,Future通过条件变量实现可等待能力。基于协程的异步编程,由于是单线程模式,所以Future对象需要实现__await__方法以实现可等待能力。

class Future:
    ...
    _asyncio_future_blocking = False
    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

    __iter__ = __await__  # make compatible with 'yield from'.

变量_asyncio_future_blocking有两个作用:

  • 值取bool值,表示对象是asyncio实现的Future对象。
  • 值取True,表示Future对象还未完成,在阻塞中,也就是暂停在yield处。

同时考虑到一个协程在某处被暂停后,例如暂停在await <exp>处,当等待的对象(一般都是Future对象)完成后,此协程应该有机会恢复接着运行。 所以Future对象应该支持完成回调。而且回调方法的执行应该在协程对应在Task的上下文。同时为统一所有的Task都被事件循环调度。 因此Future需要将回调方法注册到事件循环中。

def __schedule_callbacks(self):
    """Internal: Ask the event loop to call all callbacks.

    The callbacks are scheduled to be called as soon as possible. Also
    clears the callback list.
    """
    callbacks = self._callbacks[:]
    if not callbacks:
        return

    self._callbacks[:] = []
    for callback, ctx in callbacks:
        # 往事件循环注册回调
        self._loop.call_soon(callback, self, context=ctx)

Tasks

协程不能单独被调用执行,也就是说不能将协程直接给事件循环调度。协程只能通过调用方执行send进行驱动执行。为此有Task这么一个概念。 协程在asyncio内部会被包装为一个Task,进而将Task交给事件循环调度执行。所以说,Task是协程的驱动器。又因每一个Task都应该有对应的执行结果, 所以TaskFuture的子类。

Task的核心方法是__step,其是被包装协程的驱动器。__step的工作流程总结如下:

__step工作流程

可以看到,协程的内部等待对象Future会注册一个完成回调方法__wakeup__wakeup的目的是当此协程等待的Future完成后, 会将此协程对应的Task继续添加到事件循环中等待被事件循环调度,也就是恢复Task的继续执行。__wakeup实现如下:

def __wakeup(self, future):
    try:
        future.result()
    except BaseException as exc:
        # This may also be a cancellation.
        self.__step(exc)
    else:
        # Don't pass the value of `future.result()` explicitly,
        # as `Future.__iter__` and `Future.__await__` don't need it.
        # If we call `_step(value, None)` instead of `_step()`,
        # Python eval loop would use `.send(value)` method call,
        # instead of `__next__()`, which is slower for futures
        # that return non-generator iterators from their `__iter__`.
        self.__step()
    self = None  # Needed to break cycles when an exception occurs.

协程内部等待对象其实就是协程内await future表达式中的future

python3.12中,Task增加了eager_start参数。当eager_start设置为True且事件循环正在运行时, 任务会在创建时立即开始执行其协程,直至协程首次遇到阻塞操作。如果协程在首次阻塞前就返回或引发异常,任务将直接完成, 并且不会被添加到事件循环的调度队列中。

每一个任务Task创建的时候都会往事件循环添加__step方法,以使得任务自身可以被事件循环调度。

class Task(futures._PyFuture):
    def __init__(self, coro, *, loop=None, name=None, context=None,
                 eager_start=False):
        super().__init__(loop=loop)
        ...
        if eager_start and self._loop.is_running():
            # 事件循环在运行,且设置eager_start
            self.__eager_start()
        else:
            # 往事件循环注册__step
            self._loop.call_soon(self.__step, context=self._context)
            _register_task(self)

为了对 Task是事件循环调度的基本单位加深理解,下面看一个样例:

import asyncio
import time

async def counter(name: str):
    for i in range(0, 2):
        print("{0}: {1}".format(name, i))
        await asyncio.sleep(1)

async def main_task():
    start_time = time.time()
    tasks = []
    for n in range(4):
        tasks.append(asyncio.create_task(counter("task{0}".format(n))))
    for task in tasks:
        res = await task
    print("main_task cost {0}s".format(time.time() - start_time))


async def main_coro():
    start_time = time.time()
    for n in range(4):
        await counter("coro{0}".format(n))
    print("main_coro cost {0}s".format(time.time() - start_time))


print("Start run task...")
asyncio.run(main_task())
print("Start run coro...")
asyncio.run(main_coro())

main_coro方法是直接运行协程。main_task方法是先将协程转为任务,然后运行。运行结果如下:

code $ (master) python3 coro_and_task.py
Start run task...
task0: 0
task1: 0
task2: 0
task3: 0
task0: 1
task2: 1
task1: 1
task3: 1
main_task cost 2.0148472785949707s
Start run coro...
coro0: 0
coro0: 1
coro1: 0
coro1: 1
coro2: 0
coro2: 1
coro3: 0
coro3: 1
main_coro cost 8.061261653900146s

main_task4个任务是并发调度运行,耗时2s,而main_coro4个协程是串行运行,耗时8s

main_coro 的执行流程如下:

  • aysncio.run(main_coro())首先将协程main_coro包装创建一个Task给事件循环调度,此时事件循环就绪队列只有一个main_coro_task.__step
  • 事件循环从就绪队列取出main_coro_task.__step开始执行。当遇到await asyncio.sleep(1),获取的协程内部等待对象Future会添加当前Task__wakeup方法, 当等待的Future完成,此Task可恢复运行。同时asyncio.sleep(1)内部会往事件循环中注册一个1s后执行futures._set_result_unless_cancelled方法的handle。 然后把控制权从新交给事件循环,但由于此时就绪队列为空(1s时间还没到),事件循环会等待。
  • 1s时间到后,事件循环就绪队列会有futures._set_result_unless_cancelled方法的可执行handle,方法futures._set_result_unless_cancelled 会被调用, 将第2步获取的协程内部等待对象Future 设置结果,此时注册的回调函数main_coro_task.__wakeup会被调用,恢复Task执行,继续下个for循环。

main_coro方法内,每次只有一个Task在事件循环中,所以4个协程是串行执行。

main_task 的执行流程如下:

  • aysncio.run(main_task())首先将协程main_task包装创建一个Task给事件循环调度,此时事件循环就绪队列只有一个main_task_task.__step
  • 事件循环从就绪队列取出main_task_task.__step开始执行。asyncio.create_task会将协程counter包装创建Task。 此时有事件循环中有5Task (一个 main_task,四个 counter),事件循环就绪队列有4Task(counter("task{}")).__stephandle。 当遇到await task,获取的协程内部等待对象Future会添加当前Task__wakeup方法,当等待的Future完成,此Task可恢复运行。 然后把控制权从新交给事件循环。
  • 事件循环从就绪队列取出一个Task(counter("task{}")).__step执行。当遇到asyncio.sleep(1),获取的协程内部等待对象Future会添加当前Task__wakeup方法, 当此Future完成,此Task可恢复运行。同时asyncio.sleep(1)内部会往事件循环中注册一个1s后执行futures._set_result_unless_cancelled方法的handle。 然后把控制权交给事件循环,继续处理后面3Task(counter("task{}")).__step。此步可以认为4个任务同时运行。
  • 1s时间到后,事件循环就绪队列会有4futures._set_result_unless_cancelled的可执行handle,方法futures._set_result_unless_cancelled 会被调用, 将第3步获取的协程内部等待对象Future设置结果,此时注册的回调函数Tasks.__wakeup会被调用,继续程序往下执行。

main_task方法内,同时有4Task在事件循环中,所以4个协程是并发执行。