Skip to content

Latest commit

 

History

History
451 lines (357 loc) · 22.5 KB

README.md

File metadata and controls

451 lines (357 loc) · 22.5 KB

альтернативный event loop на libuv - uvloop

5 способов параллелить задачи ^

  1. на каждую создаем новый поток (в питоне только io-bound, в других языках и cpu-bound)
  2. на каждую создаем новый процесс (io- и cpu-bound задачи)
  3. выполняем задачу на заранее созданном потоке из пула (аналогично 1, меньше расходы)
  4. выполняем задачу на заранее созданном процессе из пула (аналогично 2, меньше расходы)
  5. выполняем задачи асинхронно (только io-bound задачи и только в неблокирующем режиме)

Подробнее про плюсы и минусы каждого метода

Асинхронщина на уровне ОС

Реализуется за счет возможностей ОС наблюдать за группой дескрипторов и выбирать из них готовые

Поддержка в языках

Та или иная реализация есть почти во всех языках, например: Python (Asyncio, Twisted, Gevent и Tornado), Javascript (nodejs, deno), netty (EpollEventLoop.java), tokio (rust), evio (go), amp (php), eventmachine (ruby), coro-async(C++), а также применяется в браузерах, nginx, qt, gtk.

Системные вызовы в linux (epoll)

  • epoll_create - создает экземпляр epoll и возвращает указывающий на него файловый дескриптор
  • epoll_ctl - добавляет, изменяет, удаляет дескрипторы из списка интереса экземпляра epoll. В объекте event содержится тип нужного события и пользовательские данные
  • epoll_wait - ожидает события на экземпляре epoll, возвращает произошедшие отслеживаемые события

Пример кода на C:

int ep = epoll_create(1);
struct epoll_event new_ev;
new_ev.data.fd = server;
new_ev.events = EPOLLIN;
epoll_ctl(ep, EPOLL_CTL_ADD, server, &new_ev);
while(1) {
    if (epoll_wait(ep, &new_ev, 1, 2000) == 0) {
        printf("Timeout\n");
            continue;
    }
    if (new_ev.data.fd == server) {
        int client_sock = accept(server, NULL, NULL);
        printf("New client\n");
        new_ev.data.fd = client_sock;
        new_ev.events = EPOLLIN;
        epoll_ctl(ep, EPOLL_CTL_ADD, client_sock, &new_ev);
    } else {
        printf("Interact with fd %d\n", (int)new_ev.data.fd);
        if (interact(new_ev.data.fd) == 0) {
            printf("Client disconnected\n");
            close(new_ev.data.fd);
            epoll_ctl(ep, EPOLL_CTL_DEL, new_ev.data.fd, NULL);
        } 
    }
} 
close(ep);

Структура подобных приложений ^

Включают компоненты:

  • бесконечный цикл
  • очереди задач (одна или несколько)
  • опрос готовых
  • пул потоков для блокирующих (io-, cpu-bound задач) операций
  • также возможна работа с таймерами
ready = []
while True:
    poll_for_io()
    
    for callback in ready:
        callback()

Архитектура node.js ^

node_os
Node.js ничем не отличается от других асинхронных систем

Node.js (код https://github.com/nodejs/node) использует ряд C/C++ библиотек (https://github.com/nodejs/node/tree/master/deps) и биндинги к ним, а также аддоны на C++.

node_main
Архитектура node.js

Node.js использует движок V8 для выполнения javascript, однако он не содержит своей реализации event loop'а, для чего используется событийный цикл из библиотеки libuv.

В отличие от node.js, барузер Chrome, также основанный на V8, использует реализацию событийного цикла из другой библиотеки - libevent (github), а аналог node.js - deno, разработанный также Райаном Далем - из библиотеки tokio (rust).

Node.js event loop

node_loop

Фазы event loop:

  • timers - коллбеки из setTimeout и setInterval
  • pending callbacks - коллбеки сетевых операций
  • idle, prepare - нет доступа к ним, перед чтением файла
  • poll - опрос IO, выполнение IO-коллбеков
  • check - коллбеки из setImmediate
  • close - коллбеки из .close()-методов

Между фазами event loop выполняет коллбеки из приоритетных очередей:

  • nextTick - коллбеки из process.nextTick()
  • microtaskQueue - коллбеки из промисов, queueMicrotask

В коде библиотеки libuv есть только основная очередь задач и еще одна для setImmediate, приоритетных очередей она не содержит. Микротаски реализованы в V8, а очередь nextTick - в коде самой ноды.

Пример 1 (js) - что будет выведено в консоль?

node node_js_loop.js
strace -e epoll_create1,epoll_wait,epoll_ctl node node_js_loop.js >> /dev/null

libuv

Библиотека libuv (github) была создана для node.js ее автором в 2009 г. для кроссплатформенной работы, один интерфейс - несколько ОС. Она хорошо написана и используется также во многих других популярных проектах.

uv_run, uv__io_poll

В libuv нет очередей nextTick и микротасок, так как первая реализована в самой ноде, а вторая - в V8.

Стандартный событийный цикл asyncio ^

Написан на питоне. Используется по умолчанию.

Аналоги функций node.js:

  • setImmediate(cb) -> loop.call_soon(cb)
  • setTimeout(cb, timeout) -> loop.call_later(timeout, cb)

Содержит

  • очереди: 1) таймеры (с приоритетом); 2) готовые коллбеки
  • фазы: 1) опрос IO; 2) выполнение результирующих коллбеков; 3) планирование call_later; 4) выполнение готовых коллбеков

Сильно сокращенный и упрощенный код в интерпретаторе:

import concurrent.futures
import heapq

class EventLoop:
    def __init__(self):
        self._ready = collections.deque()
        self._scheduled = []
        self.stopping = False
        
    def run_forever(self):
        while True:
            self._run_once()
            
            if self._stopping:
                break
    
    def _run_once(self):
        """Run one full iteration of the event loop.
        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """

        heapq.heapify(_scheduled)   # преобразовывает список в кучу

        # 1 ----------- Polls for IO -----------
        event_list = self._selector.select(timeout)  # epoll_wait

        # 2 ----------- Schedules the resulting callbacks ---------
        for key, mask in event_list:
            fileobj, (reader, writer) = key.fileobj, key.data
            if mask & selectors.EVENT_READ:
                self._ready.append(reader)
            if mask & selectors.EVENT_WRITE:
                self._ready.append(writer)

        # 3 ------------- Schedules 'call_later' callbacks --------------
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        # 4 ----------- Calls all currently ready callbacks -------------
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            handle._run()
    
    def stop(self):
        self._stopping = True
        
    def create_task(self, coro, *, name=None):
        return tasks.Task(coro, loop=self, name=name)
        
    def call_soon(self, callback, *args, context=None):
        handle = events.Handle(callback, args, self, context)
        self._ready.append(handle)

    def call_later(self, delay, callback, *args, context=None):        
        timer = events.TimerHandle(self.time() + delay, callback, args, self, context)
        heapq.heappush(self._scheduled, timer)
        timer._scheduled = True
        return timer
        
    def run_in_executor(self, executor, func, *args):
        if executor is None:
            executor = self._default_executor
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor(
                    thread_name_prefix='asyncio'
                )
                self._default_executor = executor
        return futures.wrap_future(
            executor.submit(func, *args), loop=self)


class Task(Future):  

    """A coroutine wrapped in a Future."""

    def __init__(self, coro, *, loop=None, name=None):
        super().__init__(loop=loop)

        self._coro = coro
        self._context = contextvars.copy_context()

        self._loop.call_soon(self.__step, context=self._context)
        
    def __step(self, exc=None):
        coro = self._coro

        try:
            if exc is None:
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
                super().set_result(exc.value)


class Future:
    _state = _PENDING
    _result = None
    _exception = None
    _loop = None

    def __init__(self, *, loop=None):
        self._loop = loop
        self._callbacks = []

    def cancel(self, msg=None):
        """Cancel the future and schedule callbacks.
        If the future is already done or cancelled, return False.  Otherwise,
        change the future's state to cancelled, schedule the callbacks and
        return True.
        """
        if self._state != _PENDING:
            return False
        self._state = _CANCELLED
        
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)
        return True

    def add_done_callback(self, fn, *, context=None):
        if self._state != _PENDING:
            self._loop.call_soon(fn, self, context=context)
        else:
            if context is None:
                context = contextvars.copy_context()
            self._callbacks.append((fn, context))

    def set_result(self, result):
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED
        
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)
        return True

    def set_exception(self, exception):
        self._exception = exception
        self._state = _FINISHED
        
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)
        return True

    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  

Пример 2 (python) - что будет выведено в консоль?

python3 python_asyncio_loop.js
strace -e epoll_create1,epoll_wait,epoll_ctl python3 python_asyncio_loop.js >> /dev/null

Коллбеки vs корутины

В отличие от node.js, где в коде допустимы и коллбеки, и асинхронные функции, а первые до недавнего времени еще и были лучше по производительности, а питоне в пользовательском коде должны быть только корутины и таски, а коллбеки, фьючи - в низкоуровневом коде библиотек. Но как же работают корутины, когда архитектура системы предусмаотривает работу с коллбеками? В двух словах, корутина - это генератор, имеющий несколько шагов, и в коллбеке на какое-то событие корутина прокручивается на один шаг вызовом coro.send().

Подробнее

Альтернативные реализации цикла событий для asyncio ^

Написаны на C (либо Rust) для увеличения производительности системы. Для использования с библиотекой asyncio event loop должен реализовывать методы класса AbstractEventLoop (run_forever, run_until_complete, stop, is_running, is_closed, close, shutdown_asyncgens, shutdown_default_executor, call_soon, call_later, call_at, time, create_future, create_task, call_soon_threadsafe, run_in_executor, set_default_executor, getaddrinfo, getnameinfo, create_connection, create_server, sendfile, start_tls, create_unix_connection, create_unix_server, create_datagram_endpoint, connect_read_pipe, connect_write_pipe, subprocess_shell, subprocess_exec, add_reader, remove_reader, add_writer, remove_writer, sock_recv, sock_recv_into, sock_sendall, sock_connect, sock_accept, sock_sendfile, add_signal_handler, remove_signal_handler, set_task_factory, get_task_factory, get_exception_handler, set_exception_handler, default_exception_handler, call_exception_handler, get_debug, set_debug).

uvloop ^

Написан Юрием Селивановым (core-разработчик питона) с использованием libuv в 2016 г. В настоящее время используется по умолчанию во всех современных асинхронных фреймворках, кроме aiohttp, куда он легко подключается отдельно при желании. По сравнению со стандартным дает прирост производительности в 2-4 раза (по данным статьи), на реальных проектах результат скромнее, но тоже ощутим (согласно замерам, проведенным в компании Rambler, ~30%).

Как использовать:

import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())  # старый способ
uvloop.install()  # новый способ

tokio ^

Написан с использованием tokio.rs в 2017 г., с 2018 г. проект не поддерживается более.

Как использовать:

import asyncio
import tokio

asyncio.set_event_loop_policy(tokio.EventLoopPolicy())

Подключение uvloop и его результаты ^

В каждый воркер (процесс с собственым event loop'ом).

uvloop.install()

Исследуемый сервис - относительно новый проект, торнадо 5 + нативные корутины. Альтернативный event loop, написанный на C (либо Rust) нужен нам для снижения потребления CPU. Помог ли он?

По результатам нагрузочного тестирования отличия незначительные. После релиза на графиках потребление CPU несколько снизилось, но ярко выраженного скачка, как это было при переходе с питона 3.5 на версию 3.7, не было.

Результат полностью соответствует ожиданиям, так как в tornado 5 все еще много костылей, и поэтому event loop здесь не является узким местом.

Ожидается, что для tornado 6, а тем более современных фреймворков, результаты будут гораздо более ощутимые - и это подтверждается исследованиями, проведенными в ряде компаний.

Напротив, для более проблемного, с большим количеством блокирующих операций, старого проекта на tornado 5, да еще и на торнадовским корутинах, выигрыша вообще практически не будет.

Дополнительные материалы: