diff --git a/otaclient/app/log_setting.py b/otaclient/app/log_setting.py index cebb67a3c..5d1b4a26c 100644 --- a/otaclient/app/log_setting.py +++ b/otaclient/app/log_setting.py @@ -15,19 +15,40 @@ from __future__ import annotations +import atexit import logging import os -import time +import weakref import yaml -from queue import Queue, Empty +from queue import Queue from threading import Thread from urllib.parse import urljoin +from typing import MutableMapping import requests from otaclient import otaclient_package_name from .configs import config as cfg +_logging_running = True +_logging_upload_thread: MutableMapping[ + Thread, Queue[str | None] +] = weakref.WeakKeyDictionary() + + +def _python_exit(): + """Let the log upload thread exit at python exit.""" + global _logging_running + _logging_running = False + + # unblock the thread + for t, q in _logging_upload_thread.items(): + q.put_nowait(None) + t.join(timeout=3) + + +atexit.register(_python_exit) + # NOTE: EcuInfo imports this log_setting so independent get_ecu_id are required. def get_ecu_id(): @@ -49,16 +70,9 @@ def get_logger(name: str, loglevel: int) -> logging.Logger: class _LogTeeHandler(logging.Handler): """Implementation of teeing local logs to a remote otaclient-iot-logger server.""" - def __init__( - self, - upload_interval: int = 30, - max_entries_per_upload: int = 512, - max_backlog: int = 2048, - ) -> None: + def __init__(self, max_backlog: int = 2048) -> None: super().__init__() - self._queue: Queue[str] = Queue(maxsize=max_backlog) - self._max_entries_per_upload = max_entries_per_upload - self._upload_interval = upload_interval + self._queue: Queue[str | None] = Queue(maxsize=max_backlog) def emit(self, record: logging.LogRecord) -> None: try: @@ -67,29 +81,30 @@ def emit(self, record: logging.LogRecord) -> None: pass def start_upload_thread(self, endpoint_url: str): - _queue, _max_per_upload, _interval = ( - self._queue, - self._max_entries_per_upload, - self._upload_interval, - ) + _queue = self._queue def _thread_main(): _session = requests.Session() + global _logging_running - while True: - for _ in range(_max_per_upload): - try: - _entry = _queue.get_nowait() - _session.post(endpoint_url, data=_entry) - except Empty: - break - except Exception: - pass - time.sleep(_interval) + while _logging_running: + entry = _queue.get() + if entry is None: + return # stop signal + if not entry: + continue # skip uploading empty log line + + try: + _session.post(endpoint_url, data=entry, timeout=3) + except Exception: + pass _thread = Thread(target=_thread_main, daemon=True) _thread.start() - return _thread + + # register the logging upload thread + global _logging_upload_thread + _logging_upload_thread[_thread] = _queue def configure_logging(loglevel: int, *, ecu_id: str): diff --git a/tests/test_log_setting.py b/tests/test_log_setting.py new file mode 100644 index 000000000..16e65c3eb --- /dev/null +++ b/tests/test_log_setting.py @@ -0,0 +1,42 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import logging + +from otaclient.app import log_setting + +MODULE = log_setting.__name__ +logger = logging.getLogger(__name__) + + +def test_server_logger(): + test_log_msg = "emit one logging entry" + + # ------ setup test ------ # + _handler = log_setting._LogTeeHandler() + logger.addHandler(_handler) + + # ------ execution ------ # + logger.info(test_log_msg) + + # ------ clenaup ------ # + logger.removeHandler(_handler) + + # ------ check result ------ # + _queue = _handler._queue + _log = _queue.get_nowait() + assert _log == test_log_msg