-
Notifications
You must be signed in to change notification settings - Fork 0
/
polling_manager.py
118 lines (108 loc) · 3.55 KB
/
polling_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import asyncio
import logging
from asyncio import AbstractEventLoop, CancelledError, Task, get_running_loop
from contextvars import Context
from typing import Any, Awaitable, Dict, List, Optional
from aiogram import Bot
from aiogram.dispatcher.dispatcher import DEFAULT_BACKOFF_CONFIG, Dispatcher
from aiogram.types import User
from aiogram.utils.backoff import BackoffConfig
logger = logging.getLogger(__name__)
class PollingManager:
def __init__(self):
self.polling_tasks: Dict[int, Task] = {}
def _create_pooling_task(
self,
dp: Dispatcher,
bot: Bot,
polling_timeout: int,
handle_as_tasks: bool,
backoff_config: BackoffConfig,
allowed_updates: Optional[List[str]],
**kwargs: Any,
):
asyncio.create_task(
self._start_bot_polling(
dp=dp,
bot=bot,
polling_timeout=polling_timeout,
handle_as_tasks=handle_as_tasks,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
**kwargs,
)
)
def start_bot_polling(
self,
dp: Dispatcher,
bot: Bot,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[List[str]] = None,
**kwargs: Any,
):
loop: AbstractEventLoop = get_running_loop()
# noinspection PyArgumentList
loop.call_soon(
self._create_pooling_task(
dp=dp,
bot=bot,
polling_timeout=polling_timeout,
handle_as_tasks=handle_as_tasks,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
**kwargs,
),
context=Context(),
)
async def _start_bot_polling(
self,
dp: Dispatcher,
bot: Bot,
polling_timeout: int = 10,
handle_as_tasks: bool = True,
backoff_config: BackoffConfig = DEFAULT_BACKOFF_CONFIG,
allowed_updates: Optional[List[str]] = None,
on_bot_startup: Optional[Awaitable] = None,
on_bot_shutdown: Optional[Awaitable] = None,
**kwargs: Any,
):
logger.info("Start poling")
user: User = await bot.me()
if on_bot_startup:
await on_bot_startup
try:
logger.info(
"Run polling for bot @%s id=%d - %r",
user.username,
bot.id,
user.full_name,
)
polling_task = asyncio.create_task(
dp._polling(
bot=bot,
handle_as_tasks=handle_as_tasks,
polling_timeout=polling_timeout,
backoff_config=backoff_config,
allowed_updates=allowed_updates,
**kwargs,
)
)
self.polling_tasks[bot.id] = polling_task
await polling_task
except CancelledError:
logger.info("Polling task Canceled")
finally:
logger.info(
"Polling stopped for bot @%s id=%d - %r",
user.username,
bot.id,
user.full_name,
)
if on_bot_shutdown:
await on_bot_shutdown
await bot.session.close()
def stop_bot_polling(self, bot_id: int):
polling_task = self.polling_tasks.pop(bot_id)
polling_task.cancel()