- 5 способов параллелить задачи
- структура асинхронного приложения
- архитектура node.js
- фазы событийного цикла node.js
- событийный цикл asyncio
- альтернативные реализации цикла событий для asyncio
- подключение uvloop и его результаты
5 способов параллелить задачи ^
- на каждую создаем новый поток (в питоне только io-bound, в других языках и cpu-bound)
- на каждую создаем новый процесс (io- и cpu-bound задачи)
- выполняем задачу на заранее созданном потоке из пула (аналогично 1, меньше расходы)
- выполняем задачу на заранее созданном процессе из пула (аналогично 2, меньше расходы)
- выполняем задачи асинхронно (только io-bound задачи и только в неблокирующем режиме)
Подробнее про плюсы и минусы каждого метода
Реализуется за счет возможностей ОС наблюдать за группой дескрипторов и выбирать из них готовые
- linux: poll, select, epoll (reactor), aio, io_uring
- mac, *bsd: kqueue
- solaris: event ports
- windows: IOCP (proactor)
Та или иная реализация есть почти во всех языках, например: Python (Asyncio, Twisted, Gevent и Tornado), Javascript (nodejs, deno), netty (EpollEventLoop.java), tokio (rust), evio (go), amp (php), eventmachine (ruby), coro-async(C++), а также применяется в браузерах, nginx, qt, gtk.
- epoll_create - создает экземпляр epoll и возвращает указывающий на него файловый дескриптор
- epoll_ctl - добавляет, изменяет, удаляет дескрипторы из списка интереса экземпляра epoll. В объекте event содержится тип нужного события и пользовательские данные
- epoll_wait - ожидает события на экземпляре epoll, возвращает произошедшие отслеживаемые события
Пример кода на C:
int ep = epoll_create(1);
struct epoll_event new_ev;
new_ev.data.fd = server;
new_ev.events = EPOLLIN;
epoll_ctl(ep, EPOLL_CTL_ADD, server, &new_ev);
while(1) {
if (epoll_wait(ep, &new_ev, 1, 2000) == 0) {
printf("Timeout\n");
continue;
}
if (new_ev.data.fd == server) {
int client_sock = accept(server, NULL, NULL);
printf("New client\n");
new_ev.data.fd = client_sock;
new_ev.events = EPOLLIN;
epoll_ctl(ep, EPOLL_CTL_ADD, client_sock, &new_ev);
} else {
printf("Interact with fd %d\n", (int)new_ev.data.fd);
if (interact(new_ev.data.fd) == 0) {
printf("Client disconnected\n");
close(new_ev.data.fd);
epoll_ctl(ep, EPOLL_CTL_DEL, new_ev.data.fd, NULL);
}
}
}
close(ep);
Структура подобных приложений ^
Включают компоненты:
- бесконечный цикл
- очереди задач (одна или несколько)
- опрос готовых
- пул потоков для блокирующих (io-, cpu-bound задач) операций
- также возможна работа с таймерами
ready = []
while True:
poll_for_io()
for callback in ready:
callback()
Архитектура node.js ^
Node.js ничем не отличается от других асинхронных систем
Node.js (код https://github.com/nodejs/node) использует ряд C/C++ библиотек (https://github.com/nodejs/node/tree/master/deps) и биндинги к ним, а также аддоны на C++.
Node.js использует движок V8 для выполнения javascript, однако он не содержит своей реализации event loop'а, для чего используется событийный цикл из библиотеки libuv.
В отличие от node.js, барузер Chrome, также основанный на V8, использует реализацию событийного цикла из другой библиотеки - libevent (github), а аналог node.js - deno, разработанный также Райаном Далем - из библиотеки tokio (rust).
Фазы event loop:
- timers - коллбеки из setTimeout и setInterval
- pending callbacks - коллбеки сетевых операций
- idle, prepare - нет доступа к ним, перед чтением файла
- poll - опрос IO, выполнение IO-коллбеков
- check - коллбеки из setImmediate
- close - коллбеки из .close()-методов
Между фазами event loop выполняет коллбеки из приоритетных очередей:
- nextTick - коллбеки из process.nextTick()
- microtaskQueue - коллбеки из промисов, queueMicrotask
В коде библиотеки libuv есть только основная очередь задач и еще одна для setImmediate, приоритетных очередей она не
содержит. Микротаски реализованы в V8, а очередь nextTick
- в коде самой ноды.
Пример 1 (js) - что будет выведено в консоль?
node node_js_loop.js
strace -e epoll_create1,epoll_wait,epoll_ctl node node_js_loop.js >> /dev/null
Библиотека libuv (github) была создана для node.js ее автором в 2009 г. для кроссплатформенной работы, один интерфейс - несколько ОС. Она хорошо написана и используется также во многих других популярных проектах.
В libuv нет очередей nextTick и микротасок, так как первая реализована в самой ноде, а вторая - в V8.
Стандартный событийный цикл asyncio ^
Написан на питоне. Используется по умолчанию.
Аналоги функций node.js:
setImmediate(cb)
->loop.call_soon(cb)
setTimeout(cb, timeout)
->loop.call_later(timeout, cb)
Содержит
- очереди: 1) таймеры (с приоритетом); 2) готовые коллбеки
- фазы: 1) опрос IO; 2) выполнение результирующих коллбеков; 3) планирование call_later; 4) выполнение готовых коллбеков
Сильно сокращенный и упрощенный код в интерпретаторе:
import concurrent.futures
import heapq
class EventLoop:
def __init__(self):
self._ready = collections.deque()
self._scheduled = []
self.stopping = False
def run_forever(self):
while True:
self._run_once()
if self._stopping:
break
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
heapq.heapify(_scheduled) # преобразовывает список в кучу
# 1 ----------- Polls for IO -----------
event_list = self._selector.select(timeout) # epoll_wait
# 2 ----------- Schedules the resulting callbacks ---------
for key, mask in event_list:
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ:
self._ready.append(reader)
if mask & selectors.EVENT_WRITE:
self._ready.append(writer)
# 3 ------------- Schedules 'call_later' callbacks --------------
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# 4 ----------- Calls all currently ready callbacks -------------
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
handle._run()
def stop(self):
self._stopping = True
def create_task(self, coro, *, name=None):
return tasks.Task(coro, loop=self, name=name)
def call_soon(self, callback, *args, context=None):
handle = events.Handle(callback, args, self, context)
self._ready.append(handle)
def call_later(self, delay, callback, *args, context=None):
timer = events.TimerHandle(self.time() + delay, callback, args, self, context)
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer
def run_in_executor(self, executor, func, *args):
if executor is None:
executor = self._default_executor
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor(
thread_name_prefix='asyncio'
)
self._default_executor = executor
return futures.wrap_future(
executor.submit(func, *args), loop=self)
class Task(Future):
"""A coroutine wrapped in a Future."""
def __init__(self, coro, *, loop=None, name=None):
super().__init__(loop=loop)
self._coro = coro
self._context = contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context)
def __step(self, exc=None):
coro = self._coro
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
super().set_result(exc.value)
class Future:
_state = _PENDING
_result = None
_exception = None
_loop = None
def __init__(self, *, loop=None):
self._loop = loop
self._callbacks = []
def cancel(self, msg=None):
"""Cancel the future and schedule callbacks.
If the future is already done or cancelled, return False. Otherwise,
change the future's state to cancelled, schedule the callbacks and
return True.
"""
if self._state != _PENDING:
return False
self._state = _CANCELLED
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
return True
def add_done_callback(self, fn, *, context=None):
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
return True
def set_exception(self, exception):
self._exception = exception
self._state = _FINISHED
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
return True
def __await__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self
if not self.done():
raise RuntimeError("await wasn't used with future")
return self.result()
Пример 2 (python) - что будет выведено в консоль?
python3 python_asyncio_loop.js
strace -e epoll_create1,epoll_wait,epoll_ctl python3 python_asyncio_loop.js >> /dev/null
В отличие от node.js, где в коде допустимы и коллбеки, и асинхронные функции, а первые до
недавнего времени еще и были лучше по производительности, а питоне в пользовательском коде должны
быть только корутины и таски, а коллбеки, фьючи - в низкоуровневом коде библиотек.
Но как же работают корутины, когда архитектура системы предусмаотривает работу с коллбеками?
В двух словах, корутина - это генератор, имеющий несколько шагов, и в коллбеке на какое-то событие
корутина прокручивается на один шаг вызовом coro.send()
.
Альтернативные реализации цикла событий для asyncio ^
Написаны на C (либо Rust) для увеличения производительности системы.
Для использования с библиотекой asyncio event loop должен реализовывать
методы класса AbstractEventLoop
(run_forever, run_until_complete, stop, is_running, is_closed, close, shutdown_asyncgens,
shutdown_default_executor, call_soon, call_later, call_at, time, create_future, create_task,
call_soon_threadsafe, run_in_executor, set_default_executor, getaddrinfo, getnameinfo,
create_connection, create_server, sendfile, start_tls, create_unix_connection,
create_unix_server, create_datagram_endpoint, connect_read_pipe, connect_write_pipe,
subprocess_shell, subprocess_exec, add_reader, remove_reader, add_writer, remove_writer,
sock_recv, sock_recv_into, sock_sendall, sock_connect, sock_accept, sock_sendfile,
add_signal_handler, remove_signal_handler, set_task_factory, get_task_factory,
get_exception_handler, set_exception_handler, default_exception_handler,
call_exception_handler, get_debug, set_debug).
uvloop ^
Написан Юрием Селивановым (core-разработчик питона) с использованием libuv в 2016 г. В настоящее время используется по умолчанию во всех современных асинхронных фреймворках, кроме aiohttp, куда он легко подключается отдельно при желании. По сравнению со стандартным дает прирост производительности в 2-4 раза (по данным статьи), на реальных проектах результат скромнее, но тоже ощутим (согласно замерам, проведенным в компании Rambler, ~30%).
- Биндинги: https://github.com/MagicStack/uvloop (loop)
- Сама сишная либа: https://libuv.org/, https://github.com/libuv/libuv, loop
Как использовать:
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # старый способ
uvloop.install() # новый способ
tokio ^
Написан с использованием tokio.rs в 2017 г., с 2018 г. проект не поддерживается более.
Как использовать:
import asyncio
import tokio
asyncio.set_event_loop_policy(tokio.EventLoopPolicy())
- Биндинги: https://github.com/PyO3/tokio (loop)
- Сама библиотека на rust: https://tokio.rs/, https://github.com/tokio-rs/tokio
Подключение uvloop и его результаты ^
В каждый воркер (процесс с собственым event loop'ом).
uvloop.install()
Исследуемый сервис - относительно новый проект, торнадо 5 + нативные корутины. Альтернативный event loop, написанный на C (либо Rust) нужен нам для снижения потребления CPU. Помог ли он?
По результатам нагрузочного тестирования отличия незначительные. После релиза на графиках потребление CPU несколько снизилось, но ярко выраженного скачка, как это было при переходе с питона 3.5 на версию 3.7, не было.
Результат полностью соответствует ожиданиям, так как в tornado 5 все еще много костылей, и поэтому event loop здесь не является узким местом.
Ожидается, что для tornado 6, а тем более современных фреймворков, результаты будут гораздо более ощутимые - и это подтверждается исследованиями, проведенными в ряде компаний.
Напротив, для более проблемного, с большим количеством блокирующих операций, старого проекта на tornado 5, да еще и на торнадовским корутинах, выигрыша вообще практически не будет.
- Event loop в node.js
- Знай свой инструмент: Event Loop в libuv
- Browser event loop explainer
- Further Adventures of the Event Loop - Erin Zimmer
- Документация libuv
- The Node.js Event Loop, Timers, and process.nextTick()
- The method to epoll’s madness
- Node.js event loop architecture
- Node.js Under The Hood #3 - Deep Dive Into the Event Loop
- Introduction to libuv: What's a Unicorn Velociraptor? - Colin Ihrig, Joyent