Skip to content

Commit

Permalink
Issue #74 creating console script openeo-aggregator-prime-caches
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 22, 2023
1 parent ccdd292 commit 8224458
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 45 deletions.
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
extras_require={
"dev": tests_require,
},
entry_points={
"console_scripts": ["openeo-aggregator-prime-caches=openeo_aggregator.background.prime_caches:main"],
},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
Expand Down
3 changes: 3 additions & 0 deletions src/openeo_aggregator/background/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Background tasks to support the web app: cache priming, maintenance, ...
"""
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
"""
Background tasks to support the web app: cache priming, maintenance, ...
"""

import logging
from pathlib import Path
from typing import Any, Callable, Optional, Sequence, Tuple, Union
from typing import Any, Optional, Sequence, Union

from kazoo.client import KazooClient
from openeo.util import TimingLogger
Expand All @@ -23,6 +19,7 @@ class AttrStatsProxy:
Proxy object to wrap a given object and keep stats of attribute/method usage.
"""

# TODO: move this to a utilities module
# TODO: avoid all these public attributes that could collide with existing attributes of the proxied object
__slots__ = ["target", "to_track", "stats"]

Expand All @@ -37,66 +34,59 @@ def __getattr__(self, name):
return getattr(self.target, name)


def build_kazoo_client_with_stats() -> Tuple[Callable, dict]:
"""
Build KazooClient factory, for Kazoo clients with additional ZooKeeper operation stats
"""
stats = {}

def kazoo_client_factory(**kwargs):
zk = KazooClient(**kwargs)
return AttrStatsProxy(
target=zk,
to_track=["start", "stop", "create", "get", "set"],
stats=stats,
)

return kazoo_client_factory, stats

def main(config: Union[str, Path, AggregatorConfig, None] = None):
"""CLI entrypoint"""
setup_logging(config=get_aggregator_logging_config(context="background-task"))
prime_caches(config=config)

def prime_caches(config: Optional[Union[str, Path]]):
log = logging.getLogger(f"{__name__}.prime_caches")

with TimingLogger(title="Prime caches", logger=log):

def prime_caches(config: Union[str, Path, AggregatorConfig, None] = None):
with TimingLogger(title="Prime caches", logger=_log):
config: AggregatorConfig = get_config(config)
log.info(f"Using config: {config.get('config_source')=}")
_log.info(f"Using config: {config.get('config_source')=}")

# Inject Zookeeper operation statistics
kazoo_stats, kazoo_client_factory = build_kazoo_client_with_stats()
log.info(f"Using patched kazoo client factory {kazoo_client_factory}")
# TODO: create a new config instead of updating an existing one?
config.kazoo_client_factory = kazoo_client_factory
kazoo_stats = {}
_patch_config_for_kazoo_client_stats(config, kazoo_stats)

log.info(f"Creating AggregatorBackendImplementation with {config.aggregator_backends}")
_log.info(f"Creating AggregatorBackendImplementation with {config.aggregator_backends}")
backends = MultiBackendConnection.from_config(config)
backend_implementation = AggregatorBackendImplementation(backends=backends, config=config)

with TimingLogger(title="General capabilities", logger=log):
with TimingLogger(title="General capabilities", logger=_log):
backends.get_api_versions()
backend_implementation.file_formats()
backend_implementation.secondary_services.service_types()

with TimingLogger(title="Get full collection listing", logger=log):
with TimingLogger(title="Get full collection listing", logger=_log):
collections_metadata = backend_implementation.catalog.get_all_metadata()

with TimingLogger(title="Get per collection metadata", logger=log):
with TimingLogger(title="Get per collection metadata", logger=_log):
collection_ids = [m["id"] for m in collections_metadata]
for c, collection_id in enumerate(collection_ids):
log.info(f"get collection {c+1}/{len(collection_ids)} {collection_id}")
_log.info(f"get collection {c+1}/{len(collection_ids)} {collection_id}")
backend_implementation.catalog.get_collection_metadata(collection_id=collection_id)

with TimingLogger(title="Get merged processes", logger=log):
with TimingLogger(title="Get merged processes", logger=_log):
backend_implementation.processing.get_merged_process_metadata()

log.info(f"Zookeeper stats: {kazoo_stats}")
_log.info(f"Zookeeper stats: {kazoo_stats}")


if __name__ == "__main__":
setup_logging(
config=get_aggregator_logging_config(
context="background-task",
handler_default_level="DEBUG",
def _patch_config_for_kazoo_client_stats(config: AggregatorConfig, stats: dict):
def kazoo_client_factory(**kwargs):
_log.info(f"AttrStatsProxy-wrapping KazooClient with {kwargs=}")
zk = KazooClient(**kwargs)
return AttrStatsProxy(
target=zk,
to_track=["start", "stop", "create", "get", "set"],
stats=stats,
)
)
prime_caches(config=Path(__file__).parent.parent.parent / "conf/aggregator.dev.py")

_log.info(f"Patching config with {kazoo_client_factory=}")
# TODO: create a new config instead of updating an existing one?
config.kazoo_client_factory = kazoo_client_factory


if __name__ == "__main__":
main(config=Path(__file__).parent.parent.parent.parent / "conf/aggregator.dev.py")
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from openeo_aggregator.background import AttrStatsProxy
from openeo_aggregator.background.prime_caches import AttrStatsProxy


class TestAttrStatsProxy:
Expand Down

0 comments on commit 8224458

Please sign in to comment.