From 2e507f71f51084deeb70d2a7e73c0119d4f01307 Mon Sep 17 00:00:00 2001 From: Jeroen Dekkers Date: Fri, 15 Dec 2023 14:53:35 +0100 Subject: [PATCH] Add locking to katalogus service (1.13) (#2144) Co-authored-by: JP Bruins Slot --- .../connectors/services/katalogus.py | 109 ++++++++++-------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/mula/scheduler/connectors/services/katalogus.py b/mula/scheduler/connectors/services/katalogus.py index d8eeb3bcb57..0fe3a62afc4 100644 --- a/mula/scheduler/connectors/services/katalogus.py +++ b/mula/scheduler/connectors/services/katalogus.py @@ -1,3 +1,4 @@ +import threading from typing import Dict, List from scheduler.connectors.errors import exception_handler @@ -15,6 +16,8 @@ class Katalogus(HTTPService): def __init__(self, host: str, source: str, timeout: int = 5, cache_ttl: int = 30): super().__init__(host, source, timeout) + self.lock = threading.Lock() + # For every organisation we cache its plugins, it references the # plugin-id as key and the plugin as value. self.organisations_plugin_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl) @@ -44,76 +47,82 @@ def flush_caches(self) -> None: def flush_organisations_plugin_cache(self) -> None: self.logger.debug("flushing plugin cache [cache=%s]", self.organisations_plugin_cache.cache) - # First, we reset the cache, to make sure we won't get any ExpiredError - self.organisations_plugin_cache.expiration_enabled = False - self.organisations_plugin_cache.reset() + with self.lock: + # First, we reset the cache, to make sure we won't get any ExpiredError + self.organisations_plugin_cache.expiration_enabled = False + self.organisations_plugin_cache.reset() + + orgs = self.get_organisations() + for org in orgs: + if org.id not in self.organisations_plugin_cache: + self.organisations_plugin_cache[org.id] = {} + self.organisations_new_boefjes_cache[org.id] = {} - orgs = self.get_organisations() - for org in orgs: - if org.id not in self.organisations_plugin_cache: - self.organisations_plugin_cache[org.id] = {} - self.organisations_new_boefjes_cache[org.id] = {} + plugins = self.get_plugins_by_organisation(org.id) + self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled} - plugins = self.get_plugins_by_organisation(org.id) - self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled} + self.organisations_plugin_cache.expiration_enabled = True - self.organisations_plugin_cache.expiration_enabled = True self.logger.debug("flushed plugins cache [cache=%s]", self.organisations_plugin_cache.cache) def flush_organisations_boefje_type_cache(self) -> None: """boefje.consumes -> plugin type boefje""" self.logger.debug("flushing boefje cache [cache=%s]", self.organisations_boefje_type_cache.cache) - # First, we reset the cache, to make sure we won't get any ExpiredError - self.organisations_boefje_type_cache.expiration_enabled = False - self.organisations_boefje_type_cache.reset() + with self.lock: + # First, we reset the cache, to make sure we won't get any ExpiredError + self.organisations_boefje_type_cache.expiration_enabled = False + self.organisations_boefje_type_cache.reset() - orgs = self.get_organisations() - for org in orgs: - self.organisations_boefje_type_cache[org.id] = {} + orgs = self.get_organisations() + for org in orgs: + self.organisations_boefje_type_cache[org.id] = {} - for plugin in self.get_plugins_by_organisation(org.id): - if plugin.type != "boefje": - continue + for plugin in self.get_plugins_by_organisation(org.id): + if plugin.type != "boefje": + continue - if plugin.enabled is False: - continue + if plugin.enabled is False: + continue - # NOTE: backwards compatibility, when it is a boefje the - # consumes field is a string field. - if isinstance(plugin.consumes, str): - self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin) - continue + # NOTE: backwards compatibility, when it is a boefje the + # consumes field is a string field. + if isinstance(plugin.consumes, str): + self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin) + continue - for type_ in plugin.consumes: - self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin) + for type_ in plugin.consumes: + self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin) + + self.organisations_boefje_type_cache.expiration_enabled = True - self.organisations_boefje_type_cache.expiration_enabled = True self.logger.debug("flushed boefje cache [cache=%s]", self.organisations_boefje_type_cache.cache) def flush_organisations_normalizer_type_cache(self) -> None: """normalizer.consumes -> plugin type normalizer""" self.logger.debug("flushing normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache) - # First, we reset the cache, to make sure we won't get any ExpiredError - self.organisations_normalizer_type_cache.expiration_enabled = False - self.organisations_normalizer_type_cache.reset() + with self.lock: + # First, we reset the cache, to make sure we won't get any ExpiredError + self.organisations_normalizer_type_cache.expiration_enabled = False + self.organisations_normalizer_type_cache.reset() + + orgs = self.get_organisations() + for org in orgs: + self.organisations_normalizer_type_cache[org.id] = {} - orgs = self.get_organisations() - for org in orgs: - self.organisations_normalizer_type_cache[org.id] = {} + for plugin in self.get_plugins_by_organisation(org.id): + if plugin.type != "normalizer": + continue - for plugin in self.get_plugins_by_organisation(org.id): - if plugin.type != "normalizer": - continue + if plugin.enabled is False: + continue - if plugin.enabled is False: - continue + for type_ in plugin.consumes: + self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin) - for type_ in plugin.consumes: - self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin) + self.organisations_normalizer_type_cache.expiration_enabled = True - self.organisations_normalizer_type_cache.expiration_enabled = True self.logger.debug("flushed normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache) @exception_handler @@ -147,28 +156,32 @@ def get_plugins_by_organisation(self, organisation_id: str) -> List[Plugin]: def get_plugins_by_org_id(self, organisation_id: str) -> List[Plugin]: try: - return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id]) + with self.lock: + return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id]) except dict_utils.ExpiredError: self.flush_organisations_plugin_cache() return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id]) def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin: try: - return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id]) + with self.lock: + return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id]) except dict_utils.ExpiredError: self.flush_organisations_plugin_cache() return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id]) def get_boefjes_by_type_and_org_id(self, boefje_type: str, organisation_id: str) -> List[Plugin]: try: - return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type]) + with self.lock: + return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type]) except dict_utils.ExpiredError: self.flush_organisations_boefje_type_cache() return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type]) def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_type: str) -> List[Plugin]: try: - return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type]) + with self.lock: + return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type]) except dict_utils.ExpiredError: self.flush_organisations_normalizer_type_cache() return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])