From 28a13a62e6edf206b209b5b395db4efe2cfb2210 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 9 Nov 2023 13:28:44 +0100 Subject: [PATCH] [resotocore][feat] Maintain subscribers in memory (#1819) --- resotocore/resotocore/__main__.py | 4 +- resotocore/resotocore/db/db_access.py | 4 - resotocore/resotocore/db/subscriberdb.py | 11 --- resotocore/resotocore/task/subscribers.py | 47 ++++------- resotocore/tests/resotocore/conftest.py | 3 +- .../tests/resotocore/db/subscriberdb_test.py | 78 ------------------- .../tests/resotocore/task/subscribers_test.py | 23 +----- 7 files changed, 22 insertions(+), 148 deletions(-) delete mode 100644 resotocore/resotocore/db/subscriberdb.py delete mode 100644 resotocore/tests/resotocore/db/subscriberdb_test.py diff --git a/resotocore/resotocore/__main__.py b/resotocore/resotocore/__main__.py index f6c67e17fc..035c388176 100644 --- a/resotocore/resotocore/__main__.py +++ b/resotocore/resotocore/__main__.py @@ -198,9 +198,7 @@ async def direct_tenant(deps: TenantDependencies) -> None: cli = deps.add(ServiceNames.cli, CLIService(deps, all_commands(deps), default_env, alias_names())) deps.add(ServiceNames.template_expander, TemplateExpanderService(db.template_entity_db, cli)) inspector = deps.add(ServiceNames.inspector, InspectorConfigService(cli)) - subscriptions = deps.add( - ServiceNames.subscription_handler, SubscriptionHandlerService(db.subscribers_db, message_bus) - ) + subscriptions = deps.add(ServiceNames.subscription_handler, SubscriptionHandlerService(message_bus)) core_config_handler = deps.add( ServiceNames.core_config_handler, CoreConfigHandler(config, message_bus, worker_task_queue, config_handler, event_sender, inspector), diff --git a/resotocore/resotocore/db/db_access.py b/resotocore/resotocore/db/db_access.py index d761e2ae23..e88ea734b3 100644 --- a/resotocore/resotocore/db/db_access.py +++ b/resotocore/resotocore/db/db_access.py @@ -24,7 +24,6 @@ from resotocore.db.modeldb import ModelDb, model_db from resotocore.db.packagedb import app_package_entity_db from resotocore.db.runningtaskdb import running_task_db -from resotocore.db.subscriberdb import subscriber_db from resotocore.db.system_data_db import SystemDataDb from resotocore.db.templatedb import template_entity_db from resotocore.error import NoSuchGraph, RequiredDependencyMissingError @@ -46,7 +45,6 @@ def __init__( event_sender: AnalyticsEventSender, adjust_node: AdjustNode, config: CoreConfig, - subscriber_name: str = "subscribers", running_task_name: str = "running_tasks", job_name: str = "jobs", deferred_outer_edge_name: str = "deferred_outer_edges", @@ -62,7 +60,6 @@ def __init__( self.db = AsyncArangoDB(arango_database) self.adjust_node = adjust_node self.graph_model_dbs: Dict[GraphName, ModelDb] = {} - self.subscribers_db = EventEntityDb(subscriber_db(self.db, subscriber_name), event_sender, subscriber_name) self.system_data_db = SystemDataDb(self.db) self.running_task_db = running_task_db(self.db, running_task_name) self.deferred_outer_edge_db = deferred_outer_edge_db(self.db, deferred_outer_edge_name) @@ -78,7 +75,6 @@ def __init__( async def start(self) -> None: if not self.config.multi_tenant_setup: - await self.subscribers_db.create_update_schema() await self.running_task_db.create_update_schema() await self.job_db.create_update_schema() await self.config_entity_db.create_update_schema() diff --git a/resotocore/resotocore/db/subscriberdb.py b/resotocore/resotocore/db/subscriberdb.py deleted file mode 100644 index 85eb758674..0000000000 --- a/resotocore/resotocore/db/subscriberdb.py +++ /dev/null @@ -1,11 +0,0 @@ -from resotocore.db.async_arangodb import AsyncArangoDB -from resotocore.db.entitydb import EntityDb, EventEntityDb, ArangoEntityDb -from resotocore.ids import SubscriberId -from resotocore.task.model import Subscriber - -SubscriberDb = EntityDb[SubscriberId, Subscriber] -EventSubscriberDb = EventEntityDb[SubscriberId, Subscriber] - - -def subscriber_db(db: AsyncArangoDB, collection: str) -> ArangoEntityDb[SubscriberId, Subscriber]: - return ArangoEntityDb(db, collection, Subscriber, lambda k: k.id) diff --git a/resotocore/resotocore/task/subscribers.py b/resotocore/resotocore/task/subscribers.py index 9669542a94..7e0e628833 100644 --- a/resotocore/resotocore/task/subscribers.py +++ b/resotocore/resotocore/task/subscribers.py @@ -7,7 +7,6 @@ from datetime import timedelta, datetime from typing import Optional, Iterable, Dict, List -from resotocore.db.subscriberdb import SubscriberDb from resotocore.ids import SubscriberId from resotocore.message_bus import MessageBus from resotocore.service import Service @@ -52,10 +51,6 @@ async def remove_subscriber(self, subscriber_id: SubscriberId) -> Optional[Subsc def subscribers_by_event(self) -> Dict[str, List[Subscriber]]: pass - @abstractmethod - def update_subscriber_by_event(self, subscribers: Iterable[Subscriber]) -> Dict[str, List[Subscriber]]: - pass - class SubscriptionHandlerService(SubscriptionHandler): """ @@ -64,21 +59,17 @@ class SubscriptionHandlerService(SubscriptionHandler): This handler belongs to the event system, which assumes there is only one instance running in each cluster! """ - def __init__(self, db: SubscriberDb, message_bus: MessageBus) -> None: + def __init__(self, message_bus: MessageBus) -> None: super().__init__() - self.db = db self.message_bus = message_bus self._subscribers_by_id: Dict[SubscriberId, Subscriber] = {} self._subscribers_by_event: Dict[str, List[Subscriber]] = {} self.started_at = utc() self.cleaner = Periodic("subscription_cleaner", self.check_outdated_handler, timedelta(seconds=10)) self.not_connected_since: Dict[str, datetime] = {} - self.lock: Optional[Lock] = None + self.lock: Lock = Lock() async def start(self) -> None: - self.lock = Lock() - await self.__load_from_db() - log.info(f"Loaded {len(self._subscribers_by_id)} subscribers for {len(self._subscribers_by_event)} events") await self.cleaner.start() async def stop(self) -> None: @@ -100,8 +91,7 @@ async def add_subscription( updated = existing.add_subscription(event_type, wait_for_completion, timeout) if existing != updated: log.info(f"Subscriber {subscriber_id}: add subscription={event_type} ({wait_for_completion}, {timeout})") - await self.db.update(updated) - await self.__load_from_db() + await self.__update_subscriber(updated) return updated async def remove_subscription(self, subscriber_id: SubscriberId, event_type: str) -> Subscriber: @@ -109,11 +99,7 @@ async def remove_subscription(self, subscriber_id: SubscriberId, event_type: str updated = existing.remove_subscription(event_type) if existing != updated: log.info(f"Subscriber {subscriber_id}: remove subscription={event_type}") - if updated.subscriptions: - await self.db.update(updated) - else: - await self.db.delete(subscriber_id) - await self.__load_from_db() + await self.__update_subscriber(updated) return updated async def update_subscriptions(self, subscriber_id: SubscriberId, subscriptions: List[Subscription]) -> Subscriber: @@ -121,33 +107,32 @@ async def update_subscriptions(self, subscriber_id: SubscriberId, subscriptions: updated = Subscriber.from_list(subscriber_id, subscriptions) if existing != updated: log.info(f"Subscriber {subscriber_id}: update all subscriptions={subscriptions}") - await self.db.update(updated) - await self.__load_from_db() + await self.__update_subscriber(updated) return updated async def remove_subscriber(self, subscriber_id: SubscriberId) -> Optional[Subscriber]: existing = self._subscribers_by_id.get(subscriber_id, None) if existing: log.info(f"Subscriber {subscriber_id}: remove subscriber") - await self.db.delete(subscriber_id) - await self.__load_from_db() + async with self.lock: + self._subscribers_by_id.pop(subscriber_id, None) + self.__update_subscriber_by_event() return existing - async def __load_from_db(self) -> None: - assert self.lock is not None - async with self.lock: - self._subscribers_by_id = {s.id: s async for s in self.db.all()} - self._subscribers_by_event = self.update_subscriber_by_event(self._subscribers_by_id.values()) - def subscribers_by_event(self) -> Dict[str, List[Subscriber]]: return self._subscribers_by_event - def update_subscriber_by_event(self, subscribers: Iterable[Subscriber]) -> Dict[str, List[Subscriber]]: + def __update_subscriber_by_event(self) -> None: result: Dict[str, List[Subscriber]] = defaultdict(list) - for subscriber in subscribers: + for subscriber in self._subscribers_by_id.values(): for subscription in subscriber.subscriptions.values(): result[subscription.message_type].append(subscriber) - return result + self._subscribers_by_event = result + + async def __update_subscriber(self, subscriber: Subscriber) -> None: + async with self.lock: + self._subscribers_by_id[subscriber.id] = subscriber + self.__update_subscriber_by_event() async def check_outdated_handler(self) -> None: """ diff --git a/resotocore/tests/resotocore/conftest.py b/resotocore/tests/resotocore/conftest.py index b702bfec43..6ea9f5bce1 100644 --- a/resotocore/tests/resotocore/conftest.py +++ b/resotocore/tests/resotocore/conftest.py @@ -730,8 +730,7 @@ def workflow_instance( @fixture async def subscription_handler(message_bus: MessageBus) -> AsyncIterator[SubscriptionHandlerService]: - in_mem = InMemoryDb(Subscriber, lambda x: x.id) - async with SubscriptionHandlerService(in_mem, message_bus) as handler: + async with SubscriptionHandlerService(message_bus) as handler: yield handler diff --git a/resotocore/tests/resotocore/db/subscriberdb_test.py b/resotocore/tests/resotocore/db/subscriberdb_test.py deleted file mode 100644 index 91f1f81758..0000000000 --- a/resotocore/tests/resotocore/db/subscriberdb_test.py +++ /dev/null @@ -1,78 +0,0 @@ -import asyncio -from typing import List - -import pytest -from arango.database import StandardDatabase - -from resotocore.analytics import AnalyticsEventSender, InMemoryEventSender -from resotocore.db import subscriberdb -from resotocore.db.async_arangodb import AsyncArangoDB -from resotocore.db.entitydb import EventEntityDb -from resotocore.db.subscriberdb import SubscriberDb, EventSubscriberDb -from resotocore.ids import SubscriberId -from resotocore.task.model import Subscriber, Subscription - - -@pytest.fixture -async def subscriber_db(test_db: StandardDatabase) -> SubscriberDb: - async_db = AsyncArangoDB(test_db) - subscriber_db = subscriberdb.subscriber_db(async_db, "subscriber") - await subscriber_db.create_update_schema() - await subscriber_db.wipe() - return subscriber_db - - -@pytest.fixture -def event_db(subscriber_db: SubscriberDb, event_sender: AnalyticsEventSender) -> EventSubscriberDb: - return EventEntityDb(subscriber_db, event_sender, "subscriber") - - -@pytest.fixture -def subscribers() -> List[Subscriber]: - subs = [Subscription("foo", True) for _ in range(0, 10)] - return [Subscriber.from_list(SubscriberId(str(a)), subs) for a in range(0, 10)] - - -@pytest.mark.asyncio -async def test_load(subscriber_db: SubscriberDb, subscribers: List[Subscriber]) -> None: - await subscriber_db.update_many(subscribers) - loaded = [sub async for sub in subscriber_db.all()] - assert subscribers.sort() == loaded.sort() - - -@pytest.mark.asyncio -async def test_update(subscriber_db: SubscriberDb, subscribers: List[Subscriber]) -> None: - # multiple updates should work as expected - await subscriber_db.update_many(subscribers) - await subscriber_db.update_many(subscribers) - await subscriber_db.update_many(subscribers) - loaded = [sub async for sub in subscriber_db.all()] - assert subscribers.sort() == loaded.sort() - - -@pytest.mark.asyncio -async def test_delete(subscriber_db: SubscriberDb, subscribers: List[Subscriber]) -> None: - await subscriber_db.update_many(subscribers) - remaining = list(subscribers) - for _ in subscribers: - sub = remaining.pop() - await subscriber_db.delete_value(sub) - loaded = [sub async for sub in subscriber_db.all()] - assert remaining.sort() == loaded.sort() - assert len([sub async for sub in subscriber_db.all()]) == 0 - - -@pytest.mark.asyncio -async def test_events( - event_db: EventSubscriberDb, subscribers: List[Subscriber], event_sender: InMemoryEventSender -) -> None: - # 2 times update - await event_db.update_many(subscribers) - await event_db.update_many(subscribers) - # 6 times delete - for sub in subscribers: - await event_db.delete_value(sub) - # make sure all events will arrive - await asyncio.sleep(0.1) - # ensure the correct count and order of events - assert [a.kind for a in event_sender.events] == ["subscriber-updated-many"] * 2 + ["subscriber-deleted"] * 10 diff --git a/resotocore/tests/resotocore/task/subscribers_test.py b/resotocore/tests/resotocore/task/subscribers_test.py index e064105df4..a60813c2cf 100644 --- a/resotocore/tests/resotocore/task/subscribers_test.py +++ b/resotocore/tests/resotocore/task/subscribers_test.py @@ -4,23 +4,16 @@ from deepdiff import DeepDiff from pytest import fixture, mark -from resotocore.db.subscriberdb import SubscriberDb from resotocore.ids import SubscriberId from resotocore.message_bus import MessageBus from resotocore.model.typed_model import to_js, from_js from resotocore.task.model import Subscription, Subscriber from resotocore.task.subscribers import SubscriptionHandler, SubscriptionHandlerService -from tests.resotocore.db.entitydb import InMemoryDb @fixture -def in_mem_db() -> SubscriberDb: - return InMemoryDb[SubscriberId, Subscriber](Subscriber, lambda x: x.id) - - -@fixture -async def handler(in_mem_db: SubscriberDb) -> AsyncIterator[SubscriptionHandlerService]: - async with SubscriptionHandlerService(in_mem_db, MessageBus()) as handler: +async def handler() -> AsyncIterator[SubscriptionHandlerService]: + async with SubscriptionHandlerService(MessageBus()) as handler: await handler.add_subscription(SubscriberId("sub_1"), "test", True, timedelta(seconds=3)) yield handler @@ -46,23 +39,19 @@ def test_json_marshalling_subscribers() -> None: @mark.asyncio -async def test_subscribe(handler: SubscriptionHandler, in_mem_db: SubscriberDb) -> None: +async def test_subscribe(handler: SubscriptionHandler) -> None: # register first time result = await handler.add_subscription(SubscriberId("foo"), "event_bla", True, timedelta(seconds=3)) assert len(result.subscriptions) == 1 assert result.subscriptions["event_bla"].message_type == "event_bla" - # should be persisted in database as well - assert len((await in_mem_db.get("foo")).subscriptions) == 1 # type: ignore # register again is ignored result = await handler.add_subscription(SubscriberId("foo"), "event_bla", True, timedelta(seconds=3)) assert len(result.subscriptions) == 1 assert result.subscriptions["event_bla"].message_type == "event_bla" - # should be persisted in database as well - assert len((await in_mem_db.get("foo")).subscriptions) == 1 # type: ignore @mark.asyncio -async def test_unsubscribe(handler: SubscriptionHandler, in_mem_db: SubscriberDb) -> None: +async def test_unsubscribe(handler: SubscriptionHandler) -> None: # register first time subscriber_id = SubscriberId("foo") subs = [Subscription("event_bla"), Subscription("event_bar")] @@ -70,16 +59,12 @@ async def test_unsubscribe(handler: SubscriptionHandler, in_mem_db: SubscriberDb assert len(result.subscriptions) == 2 updated = await handler.remove_subscription(subscriber_id, "event_bla") assert len(updated.subscriptions) == 1 - # should be persisted in database as well - assert len((await in_mem_db.get(subscriber_id)).subscriptions) == 1 # type: ignore # second time should be ignored updated = await handler.remove_subscription(subscriber_id, "event_bla") assert len(updated.subscriptions) == 1 # last subscription is removed updated = await handler.remove_subscription(subscriber_id, "event_bar") assert len(updated.subscriptions) == 0 - # should be persisted in database as well - assert await in_mem_db.get(subscriber_id) is None @mark.asyncio