Skip to content

Commit

Permalink
Add callback repositories
Browse files Browse the repository at this point in the history
  • Loading branch information
Nimond committed May 24, 2022
1 parent bb703ed commit 0f0de04
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 219 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def status_handler(request: Request) -> JSONResponse:
# выполнения асинхронных методов в BotX.
@app.post("/notification/callback")
async def callback_handler(request: Request) -> JSONResponse:
bot.set_raw_botx_method_result(await request.json())
await bot.set_raw_botx_method_result(await request.json())
return JSONResponse(
build_command_accepted_response(),
status_code=HTTPStatus.ACCEPTED,
Expand Down
2 changes: 2 additions & 0 deletions pybotx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from pybotx.bot.api.responses.command_accepted import build_command_accepted_response
from pybotx.bot.bot import Bot
from pybotx.bot.callbacks.callback_repo_proto import CallbackRepoProto
from pybotx.bot.exceptions import (
AnswerDestinationLookupError,
BotShuttingDownError,
Expand Down Expand Up @@ -120,6 +121,7 @@
"CTSLoginEvent",
"CTSLogoutEvent",
"CallbackNotReceivedError",
"CallbackRepoProto",
"CantUpdatePersonalChatError",
"Chat",
"ChatCreatedEvent",
Expand Down
17 changes: 12 additions & 5 deletions pybotx/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

from pybotx.async_buffer import AsyncBufferReadable, AsyncBufferWritable
from pybotx.bot.bot_accounts_storage import BotAccountsStorage
from pybotx.bot.callbacks_manager import CallbacksManager
from pybotx.bot.callbacks.callback_manager import CallbackManager
from pybotx.bot.callbacks.callback_memory_repo import CallbackMemoryRepo
from pybotx.bot.callbacks.callback_repo_proto import CallbackRepoProto
from pybotx.bot.contextvars import bot_id_var, chat_id_var
from pybotx.bot.exceptions import AnswerDestinationLookupError
from pybotx.bot.handler import Middleware
Expand Down Expand Up @@ -182,6 +184,7 @@ def __init__(
httpx_client: Optional[httpx.AsyncClient] = None,
exception_handlers: Optional[ExceptionHandlersDict] = None,
default_callback_timeout: float = BOTX_DEFAULT_TIMEOUT,
callback_repo: Optional[CallbackRepoProto] = None,
) -> None:
if not collectors:
logger.warning("Bot has no connected collectors")
Expand All @@ -198,7 +201,11 @@ def __init__(
self._default_callback_timeout = default_callback_timeout
self._bot_accounts_storage = BotAccountsStorage(list(bot_accounts))
self._httpx_client = httpx_client or httpx.AsyncClient()
self._callbacks_manager = CallbacksManager()

if not callback_repo:
callback_repo = CallbackMemoryRepo()

self._callbacks_manager = CallbackManager(callback_repo)

self.state: SimpleNamespace = SimpleNamespace()

Expand Down Expand Up @@ -253,7 +260,7 @@ async def get_status(self, status_recipient: StatusRecipient) -> BotMenu:

return await self._handler_collector.get_bot_menu(status_recipient, self)

def set_raw_botx_method_result(
async def set_raw_botx_method_result(
self,
raw_botx_method_result: Dict[str, Any],
) -> None:
Expand All @@ -265,7 +272,7 @@ def set_raw_botx_method_result(
raw_botx_method_result,
)

self._callbacks_manager.set_botx_method_callback_result(callback)
await self._callbacks_manager.set_botx_method_callback_result(callback)

async def wait_botx_method_callback(
self,
Expand Down Expand Up @@ -296,7 +303,7 @@ async def startup(self) -> None:
self._bot_accounts_storage.set_token(bot_account.id, token)

async def shutdown(self) -> None:
self._callbacks_manager.stop_callbacks_waiting()
await self._callbacks_manager.stop_callbacks_waiting()
await self._handler_collector.wait_active_tasks()
await self._httpx_client.aclose()

Expand Down
Empty file.
101 changes: 101 additions & 0 deletions pybotx/bot/callbacks/callback_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import asyncio
from typing import Dict, Literal, NamedTuple, Optional, overload
from uuid import UUID

from pybotx.bot.callbacks.callback_repo_proto import CallbackRepoProto
from pybotx.bot.exceptions import BotXMethodCallbackNotFoundError
from pybotx.logger import logger
from pybotx.models.method_callbacks import BotXMethodCallback


class CallbackAlarm(NamedTuple):
alarm_time: float
# TODO: Fix after dropping Python 3.8
task: asyncio.Future # type: ignore


async def _callback_timeout_alarm(
callbacks_manager: "CallbackManager",
sync_id: UUID,
timeout: float,
) -> None:
await asyncio.sleep(timeout)

callbacks_manager.cancel_callback_timeout_alarm(sync_id)
await callbacks_manager.pop_botx_method_callback(sync_id)

logger.error("Callback `{sync_id}` wasn't waited", sync_id=sync_id)


class CallbackManager:
def __init__(self, callback_repo: CallbackRepoProto) -> None:
self._callback_repo = callback_repo
self._callback_alarms: Dict[UUID, CallbackAlarm] = {}

async def create_botx_method_callback(self, sync_id: UUID) -> None:
await self._callback_repo.create_botx_method_callback(sync_id)

async def set_botx_method_callback_result(
self,
callback: BotXMethodCallback,
) -> None:
await self._callback_repo.set_botx_method_callback_result(callback)

async def wait_botx_method_callback(
self,
sync_id: UUID,
timeout: float,
) -> BotXMethodCallback:
return await self._callback_repo.wait_botx_method_callback(sync_id, timeout)

async def pop_botx_method_callback(
self,
sync_id: UUID,
) -> "asyncio.Future[BotXMethodCallback]":
return await self._callback_repo.pop_botx_method_callback(sync_id)

async def stop_callbacks_waiting(self) -> None:
await self._callback_repo.stop_callbacks_waiting()

def setup_callback_timeout_alarm(self, sync_id: UUID, timeout: float) -> None:
loop = asyncio.get_event_loop()

self._callback_alarms[sync_id] = CallbackAlarm(
alarm_time=loop.time() + timeout,
task=asyncio.create_task(_callback_timeout_alarm(self, sync_id, timeout)),
)

@overload
def cancel_callback_timeout_alarm(
self,
sync_id: UUID,
) -> None:
... # noqa: WPS428

@overload
def cancel_callback_timeout_alarm(
self,
sync_id: UUID,
return_remaining_time: Literal[True],
) -> float:
... # noqa: WPS428

def cancel_callback_timeout_alarm(
self,
sync_id: UUID,
return_remaining_time: bool = False,
) -> Optional[float]:
try:
alarm_time, alarm = self._callback_alarms.pop(sync_id)
except KeyError:
raise BotXMethodCallbackNotFoundError(sync_id) from None

time_before_alarm: Optional[float] = None

if return_remaining_time:
loop = asyncio.get_event_loop()
time_before_alarm = alarm_time - loop.time()

alarm.cancel()

return time_before_alarm
62 changes: 62 additions & 0 deletions pybotx/bot/callbacks/callback_memory_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
from typing import TYPE_CHECKING, Dict
from uuid import UUID

from pybotx.bot.callbacks.callback_repo_proto import CallbackRepoProto
from pybotx.bot.exceptions import BotShuttingDownError, BotXMethodCallbackNotFoundError
from pybotx.client.exceptions.callbacks import CallbackNotReceivedError
from pybotx.models.method_callbacks import BotXMethodCallback

if TYPE_CHECKING:
from asyncio import Future # noqa: WPS458


class CallbackMemoryRepo(CallbackRepoProto):
def __init__(self) -> None:
self._callback_futures: Dict[UUID, "Future[BotXMethodCallback]"] = {}

async def create_botx_method_callback(self, sync_id: UUID) -> None:
self._callback_futures[sync_id] = asyncio.Future()

async def set_botx_method_callback_result(
self,
callback: BotXMethodCallback,
) -> None:
sync_id = callback.sync_id

future = self._get_botx_method_callback(sync_id)
future.set_result(callback)

async def wait_botx_method_callback(
self,
sync_id: UUID,
timeout: float,
) -> BotXMethodCallback:
future = self._get_botx_method_callback(sync_id)

try:
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError as exc:
del self._callback_futures[sync_id] # noqa: WPS420
raise CallbackNotReceivedError(sync_id) from exc

async def pop_botx_method_callback(
self,
sync_id: UUID,
) -> "Future[BotXMethodCallback]":
return self._callback_futures.pop(sync_id)

async def stop_callbacks_waiting(self) -> None:
for sync_id, future in self._callback_futures.items():
if not future.done():
future.set_exception(
BotShuttingDownError(
f"Callback with sync_id `{sync_id!s}` can't be received",
),
)

def _get_botx_method_callback(self, sync_id: UUID) -> "Future[BotXMethodCallback]":
try:
return self._callback_futures[sync_id]
except KeyError:
raise BotXMethodCallbackNotFoundError(sync_id) from None
39 changes: 39 additions & 0 deletions pybotx/bot/callbacks/callback_repo_proto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import TYPE_CHECKING
from uuid import UUID

from pybotx.models.method_callbacks import BotXMethodCallback

if TYPE_CHECKING:
from asyncio import Future # noqa: WPS458

try:
from typing import Protocol
except ImportError:
from typing_extensions import Protocol # type: ignore # noqa: WPS440


class CallbackRepoProto(Protocol):
async def create_botx_method_callback(self, sync_id: UUID) -> None:
... # noqa: WPS428

async def set_botx_method_callback_result(
self,
callback: BotXMethodCallback,
) -> None:
... # noqa: WPS428

async def wait_botx_method_callback(
self,
sync_id: UUID,
timeout: float,
) -> BotXMethodCallback:
... # noqa: WPS428

async def pop_botx_method_callback(
self,
sync_id: UUID,
) -> "Future[BotXMethodCallback]":
... # noqa: WPS428

async def stop_callbacks_waiting(self) -> None:
... # noqa: WPS428
Loading

0 comments on commit 0f0de04

Please sign in to comment.