Skip to content

Commit

Permalink
Issue #74 add cli options for logging
Browse files Browse the repository at this point in the history
e.g. with rotating file

expanded test coverage quite a bit too
  • Loading branch information
soxofaan committed Sep 27, 2023
1 parent 4f7c6c1 commit f70c753
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,5 @@ temp*

# pytest basetemp root (e.g. for tmp_path fixture).
pytest-tmp

openeo_python.log*
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"requests",
"attrs",
"openeo>=0.17.0",
"openeo_driver>=0.67.0.dev",
"openeo_driver>=0.68.0.dev",
"flask~=2.0",
"gunicorn~=20.0",
"python-json-logger>=2.0.0",
Expand Down
10 changes: 8 additions & 2 deletions src/openeo_aggregator/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
"""
import logging
import os
from typing import Any
from pathlib import Path
from typing import Any, List, Optional, Union

import flask
import openeo_driver.views
from openeo_driver.config.load import ConfigGetter
from openeo_driver.util.logging import (
LOG_HANDLER_STDERR_JSON,
LOGGING_CONTEXT_FLASK,
get_logging_config,
setup_logging,
Expand Down Expand Up @@ -74,10 +76,13 @@ def agg_backends():


def get_aggregator_logging_config(
*,
context: str = LOGGING_CONTEXT_FLASK,
handler_default_level: str = "DEBUG",
root_handlers: Optional[List[str]] = None,
log_file: Optional[Union[str, Path]] = None,
) -> dict:
root_handlers = ["stderr_json"]
root_handlers = root_handlers or [LOG_HANDLER_STDERR_JSON]
if smart_bool(os.environ.get("OPENEO_AGGREGATOR_SIMPLE_LOGGING")):
root_handlers = None

Expand All @@ -94,6 +99,7 @@ def get_aggregator_logging_config(
"kazoo": {"level": "WARN"},
},
context=context,
log_file=log_file,
)


Expand Down
39 changes: 33 additions & 6 deletions src/openeo_aggregator/background/prime_caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
import functools
import logging
from pathlib import Path
from typing import Any, Optional, Sequence, Union
from typing import Any, List, Optional, Sequence, Union

from kazoo.client import KazooClient
from openeo.util import TimingLogger
from openeo_driver.util.logging import just_log_exceptions, setup_logging
from openeo_driver.util.logging import (
LOG_HANDLER_FILE_JSON,
LOG_HANDLER_ROTATING_FILE_JSON,
LOG_HANDLER_STDERR_BASIC,
LOG_HANDLER_STDERR_JSON,
just_log_exceptions,
setup_logging,
)

from openeo_aggregator.app import get_aggregator_logging_config
from openeo_aggregator.backend import AggregatorBackendImplementation
Expand Down Expand Up @@ -45,7 +52,7 @@ def __getattr__(self, name):
FAIL_MODE_WARN = "warn"


def main():
def main(args: Optional[List[str]] = None):
"""CLI entrypoint"""
cli = argparse.ArgumentParser()
cli.add_argument(
Expand All @@ -64,10 +71,29 @@ def main():
default=FAIL_MODE_FAILFAST,
help="Fail mode: fail fast or warn (just log failures but keep going if possible).",
)
cli.add_argument(
"--log-handler",
dest="log_handlers",
choices=[
LOG_HANDLER_STDERR_BASIC,
LOG_HANDLER_STDERR_JSON,
LOG_HANDLER_FILE_JSON,
LOG_HANDLER_ROTATING_FILE_JSON,
],
action="append",
help="Log handlers.",
)
cli.add_argument(
"--log-file",
help="Log file to use for (rotating) file log handlers.",
)

arguments = cli.parse_args()
arguments = cli.parse_args(args=args)

setup_logging(config=get_aggregator_logging_config(context="background-task"))
logging_config = get_aggregator_logging_config(
context="background-task", root_handlers=arguments.log_handlers, log_file=arguments.log_file
)
setup_logging(config=logging_config)
prime_caches(
config=arguments.config,
require_zookeeper_writes=arguments.require_zookeeper_writes,
Expand Down Expand Up @@ -130,9 +156,10 @@ def prime_caches(


def _patch_config_for_kazoo_client_stats(config: AggregatorConfig, stats: dict):
orig_kazoo_client_factory = config.kazoo_client_factory or KazooClient
def kazoo_client_factory(**kwargs):
_log.info(f"AttrStatsProxy-wrapping KazooClient with {kwargs=}")
zk = KazooClient(**kwargs)
zk = orig_kazoo_client_factory(**kwargs)
return AttrStatsProxy(
target=zk,
to_track=["start", "stop", "create", "get", "set"],
Expand Down
32 changes: 20 additions & 12 deletions src/openeo_aggregator/testing.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import collections
import dataclasses
import datetime
import itertools
import json
import pathlib
import time
from typing import Any, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from unittest import mock

import kazoo
Expand All @@ -15,23 +17,32 @@
from openeo_aggregator.utils import Clock


@dataclasses.dataclass
class DummyZnodeStat:
last_modified: float = dataclasses.field(default_factory=Clock.time)


class DummyKazooClient:
"""
Stand-in object for KazooClient for testing.
"""

def __init__(self):
self.state = "closed"
self.data = {}
self.data: Dict[str, Tuple[bytes, DummyZnodeStat]] = {}

def start(self):
def start(self, timeout: float = 15):
assert self.state == "closed"
self.state = "open"

def stop(self):
assert self.state == "open"
self.state = "closed"

@property
def connected(self):
return self.state == "open"

def _assert_open(self):
if not self.state == "open":
raise kazoo.exceptions.ConnectionClosedError("Connection has been closed")
Expand All @@ -46,7 +57,7 @@ def create(self, path: str, value, makepath: bool = False):
self.create(parent, b"", makepath=makepath)
else:
raise kazoo.exceptions.NoNodeError
self.data[path] = value
self.data[path] = value, DummyZnodeStat()

def exists(self, path):
self._assert_open()
Expand All @@ -56,7 +67,7 @@ def get(self, path):
self._assert_open()
if path not in self.data:
raise kazoo.exceptions.NoNodeError()
return (self.data[path], None)
return self.data[path]

def get_children(self, path):
self._assert_open()
Expand All @@ -69,22 +80,19 @@ def set(self, path, value, version=-1):
self._assert_open()
if path not in self.data:
raise kazoo.exceptions.NoNodeError()
self.data[path] = value
self.data[path] = (value, DummyZnodeStat())

def get_data_deserialized(self, drop_empty=False) -> dict:
"""Dump whole db as a dict, but deserialize values"""

def deserialize(b: bytes):
if b[:2] == b'{"' and b[-1:] == b"}":
# simple JSON format sniffing
if (b[:1], b[-1:]) in {(b"{", b"}"), (b"[", b"]")}:
return json.loads(b.decode("utf8"))
else:
return b.decode("utf8")

return {
k: deserialize(v)
for (k, v) in self.data.items()
if v or not drop_empty
}
return {k: deserialize(v) for (k, (v, stats)) in self.data.items() if v or not drop_empty}


def approx_now(abs=10):
Expand Down
151 changes: 142 additions & 9 deletions tests/background/test_prime_caches.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,38 @@
from openeo_aggregator.background.prime_caches import AttrStatsProxy, prime_caches
import json
import textwrap
from pathlib import Path
from typing import Any

import pytest
from openeo_driver.testing import DictSubSet

from openeo_aggregator.background.prime_caches import AttrStatsProxy, main, prime_caches
from openeo_aggregator.config import AggregatorConfig
from openeo_aggregator.testing import DummyKazooClient

FILE_FORMATS_JUST_GEOTIFF = {
"input": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}},
"output": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}},
}


@pytest.fixture
def config(backend1, backend2, backend1_id, backend2_id, zk_client) -> AggregatorConfig:
conf = AggregatorConfig()
conf.aggregator_backends = {
backend1_id: backend1,
backend2_id: backend2,
}
conf.kazoo_client_factory = lambda **kwargs: zk_client
conf.zookeeper_prefix = "/oa/"
conf.memoizer = {
"type": "zookeeper",
"config": {
"zk_hosts": "localhost:2181",
"default_ttl": 24 * 60 * 60,
},
}
return conf


class TestAttrStatsProxy:
Expand All @@ -18,16 +52,11 @@ def meh(self, x):
assert foo.stats == {"bar": 1}


def test_prime_caches_basic(config, backend1, backend2, requests_mock, mbldr, caplog):
def test_prime_caches_basic(config, backend1, backend2, requests_mock, mbldr, caplog, zk_client):
"""Just check that bare basics of `prime_caches` work."""
# TODO: check that (zookeeper) caches are actually updated/written.
just_geotiff = {
"input": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}},
"output": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}},
}
mocks = [
requests_mock.get(backend1 + "/file_formats", json=just_geotiff),
requests_mock.get(backend2 + "/file_formats", json=just_geotiff),
requests_mock.get(backend1 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF),
requests_mock.get(backend2 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF),
requests_mock.get(backend1 + "/collections", json=mbldr.collections("S2")),
requests_mock.get(backend1 + "/collections/S2", json=mbldr.collection("S2")),
requests_mock.get(backend2 + "/collections", json=mbldr.collections("S2")),
Expand All @@ -37,3 +66,107 @@ def test_prime_caches_basic(config, backend1, backend2, requests_mock, mbldr, ca
prime_caches(config=config)

assert all([m.call_count == 1 for m in mocks])

assert zk_client.get_data_deserialized() == DictSubSet(
{
"/oa/cache/CollectionCatalog/all": [
[DictSubSet({"id": "S2"})],
DictSubSet({"_jsonserde": DictSubSet()}),
],
"/oa/cache/CollectionCatalog/collection/S2": DictSubSet({"id": "S2"}),
"/oa/cache/Processing/all/1.1.0": DictSubSet({"load_collection": DictSubSet({"id": "load_collection"})}),
"/oa/cache/general/file_formats": FILE_FORMATS_JUST_GEOTIFF,
"/oa/cache/mbcon/api_versions": ["1.1.0"],
"/oa/cache/SecondaryServices/service_types": {
"service_types": {},
"supporting_backend_ids": [],
},
}
)


def _is_primitive_construct(data: Any) -> bool:
"""Consists only of Python primitives int, float, dict, list, str, ...?"""
if isinstance(data, dict):
return all(_is_primitive_construct(k) and _is_primitive_construct(v) for k, v in data.items())
elif isinstance(data, (list, tuple, set)):
return all(_is_primitive_construct(x) for x in data)
else:
return isinstance(data, (bool, int, float, str, bytes)) or data is None


def _get_primitive_config(config: AggregatorConfig) -> dict:
return {k: v for k, v in config.items() if _is_primitive_construct(v)}


def _build_config_file(config: AggregatorConfig, path: Path):
"""Best effort AggregatorConfig to config file conversion."""
path.write_text(
textwrap.dedent(
f"""
from openeo_aggregator.config import AggregatorConfig
config = AggregatorConfig({_get_primitive_config(config)})
"""
)
)


def test_prime_caches_main_basic(backend1, backend2, requests_mock, mbldr, caplog, tmp_path, backend1_id, backend2_id):
"""Just check that bare basics of `prime_caches` main work."""
mocks = [
requests_mock.get(backend1 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF),
requests_mock.get(backend2 + "/file_formats", json=FILE_FORMATS_JUST_GEOTIFF),
requests_mock.get(backend1 + "/collections", json=mbldr.collections("S2")),
requests_mock.get(backend1 + "/collections/S2", json=mbldr.collection("S2")),
requests_mock.get(backend2 + "/collections", json=mbldr.collections("S2")),
requests_mock.get(backend2 + "/collections/S2", json=mbldr.collection("S2")),
]

# Construct config file
config = AggregatorConfig()
config.aggregator_backends = {
backend1_id: backend1,
backend2_id: backend2,
}
config_file = tmp_path / "conf.py"
_build_config_file(config, config_file)

main(args=["--config", str(config_file)])

assert all([m.call_count == 1 for m in mocks])


def test_prime_caches_main_logging(backend1, backend2, mbldr, caplog, tmp_path, backend1_id, backend2_id, pytester):
"""Run main in subprocess (so no request mocks, and probably a lot of failures) to see if logging setup works."""

config = AggregatorConfig()
config.aggregator_backends = {
backend1_id: backend1,
backend2_id: backend2,
}
config_file = tmp_path / "conf.py"
_build_config_file(config, config_file)

main(args=["--config", str(config_file)])
log_file = tmp_path / "agg.log"

result = pytester.run(
"openeo-aggregator-prime-caches",
"--config",
str(config_file),
"--log-handler",
"rotating_file_json",
"--log-file",
str(log_file),
)

assert result.outlines == []
assert result.errlines == []

with log_file.open("r") as f:
log_entries = [json.loads(line) for line in f]

assert any(log["message"] == f"Loading config from Python file {config_file}" for log in log_entries)
assert any(log["message"].startswith("Prime caches: start") for log in log_entries)
assert any(log["message"].startswith("Prime caches: fail") for log in log_entries)
assert any("cache miss" in log["message"] for log in log_entries)
3 changes: 3 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from openeo_aggregator.config import AggregatorConfig
from openeo_aggregator.testing import DummyKazooClient, MetadataBuilder

pytest_plugins = "pytester"


_DEFAULT_PROCESSES = [
"load_collection",
"load_result",
Expand Down
Loading

0 comments on commit f70c753

Please sign in to comment.