Skip to content

Commit

Permalink
optimize locking in katalogus.py, reuse available data (#3752)
Browse files Browse the repository at this point in the history
Co-authored-by: JP Bruins Slot <[email protected]>
  • Loading branch information
underdarknl and jpbruinsslot authored Nov 21, 2024
1 parent d7d2669 commit f751a99
Showing 1 changed file with 57 additions and 52 deletions.
109 changes: 57 additions & 52 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,90 +44,95 @@ def __init__(self, host: str, source: str, timeout: int, pool_connections: int,

def flush_caches(self) -> None:
self.flush_plugin_cache()
self.flush_normalizer_cache()
self.flush_boefje_cache()
self.flush_boefje_cache(self.plugin_cache)
self.flush_normalizer_cache(self.plugin_cache)

def flush_plugin_cache(self) -> None:
def flush_plugin_cache(self):
self.logger.debug("Flushing the katalogus plugin cache for organisations")

plugin_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

with self.plugin_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.plugin_cache.expiration_enabled = False
self.plugin_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

self.plugin_cache.cache = plugin_cache
self.plugin_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus plugin cache for organisations")

def flush_boefje_cache(self) -> None:
def flush_boefje_cache(self, plugins=None) -> None:
"""boefje.consumes -> plugin type boefje"""
self.logger.debug("Flushing the katalogus boefje type cache for organisations")

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.boefje_cache[org.id] = {}
boefje_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
boefje_cache.setdefault(org.id, {})

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue
org_plugins = plugins[org.id].values() if plugins else self.get_plugins_by_organisation(org.id)
for plugin in org_plugins:
if plugin.type != "boefje":
continue

if plugin.enabled is False:
continue
if plugin.enabled is False:
continue

if not plugin.consumes:
continue
if not plugin.consumes:
continue

# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue
# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue

for type_ in plugin.consumes:
self.boefje_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
boefje_cache[org.id].setdefault(type_, []).append(plugin)

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()
self.boefje_cache.cache = boefje_cache
self.boefje_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus boefje type cache for organisations")

def flush_normalizer_cache(self) -> None:
def flush_normalizer_cache(self, plugins=None) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("Flushing the katalogus normalizer type cache for organisations")

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
normalizer_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
normalizer_cache.setdefault(org.id, {})

orgs = self.get_organisations()
for org in orgs:
self.normalizer_cache[org.id] = {}
org_plugins = plugins[org.id].values() if plugins else self.get_plugins_by_organisation(org.id)
for plugin in org_plugins:
if plugin.type != "normalizer":
continue

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue
if plugin.enabled is False:
continue

if plugin.enabled is False:
continue
if not plugin.consumes:
continue

if not plugin.consumes:
continue

for type_ in plugin.consumes:
self.normalizer_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
normalizer_cache[org.id].setdefault(type_, []).append(plugin)

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
self.normalizer_cache.cache = normalizer_cache
self.normalizer_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus normalizer type cache for organisations")
Expand Down

0 comments on commit f751a99

Please sign in to comment.