Skip to content

Commit

Permalink
fix(v3.6.x): fix and refine logs uploading (#280)
Browse files Browse the repository at this point in the history
This PR fixes and removes the unwanted logs upload interval, refines the implementation of logs uploading.

For the refinement side, the logging upload thread now has proper exit logic implemented. At otaclient exits, the logging uploader thread can gracefully exit now.
  • Loading branch information
Bodong-Yang authored Apr 2, 2024
1 parent c2e7f76 commit bc83e31
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 27 deletions.
69 changes: 42 additions & 27 deletions otaclient/app/log_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,40 @@

from __future__ import annotations

import atexit
import logging
import os
import time
import weakref
import yaml
from queue import Queue, Empty
from queue import Queue
from threading import Thread
from urllib.parse import urljoin
from typing import MutableMapping

import requests

from otaclient import otaclient_package_name
from .configs import config as cfg

_logging_running = True
_logging_upload_thread: MutableMapping[
Thread, Queue[str | None]
] = weakref.WeakKeyDictionary()


def _python_exit():
"""Let the log upload thread exit at python exit."""
global _logging_running
_logging_running = False

# unblock the thread
for t, q in _logging_upload_thread.items():
q.put_nowait(None)
t.join(timeout=3)


atexit.register(_python_exit)


# NOTE: EcuInfo imports this log_setting so independent get_ecu_id are required.
def get_ecu_id():
Expand All @@ -49,16 +70,9 @@ def get_logger(name: str, loglevel: int) -> logging.Logger:
class _LogTeeHandler(logging.Handler):
"""Implementation of teeing local logs to a remote otaclient-iot-logger server."""

def __init__(
self,
upload_interval: int = 30,
max_entries_per_upload: int = 512,
max_backlog: int = 2048,
) -> None:
def __init__(self, max_backlog: int = 2048) -> None:
super().__init__()
self._queue: Queue[str] = Queue(maxsize=max_backlog)
self._max_entries_per_upload = max_entries_per_upload
self._upload_interval = upload_interval
self._queue: Queue[str | None] = Queue(maxsize=max_backlog)

def emit(self, record: logging.LogRecord) -> None:
try:
Expand All @@ -67,29 +81,30 @@ def emit(self, record: logging.LogRecord) -> None:
pass

def start_upload_thread(self, endpoint_url: str):
_queue, _max_per_upload, _interval = (
self._queue,
self._max_entries_per_upload,
self._upload_interval,
)
_queue = self._queue

def _thread_main():
_session = requests.Session()
global _logging_running

while True:
for _ in range(_max_per_upload):
try:
_entry = _queue.get_nowait()
_session.post(endpoint_url, data=_entry)
except Empty:
break
except Exception:
pass
time.sleep(_interval)
while _logging_running:
entry = _queue.get()
if entry is None:
return # stop signal
if not entry:
continue # skip uploading empty log line

try:
_session.post(endpoint_url, data=entry, timeout=3)
except Exception:
pass

_thread = Thread(target=_thread_main, daemon=True)
_thread.start()
return _thread

# register the logging upload thread
global _logging_upload_thread
_logging_upload_thread[_thread] = _queue


def configure_logging(loglevel: int, *, ecu_id: str):
Expand Down
42 changes: 42 additions & 0 deletions tests/test_log_setting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

import logging

from otaclient.app import log_setting

MODULE = log_setting.__name__
logger = logging.getLogger(__name__)


def test_server_logger():
test_log_msg = "emit one logging entry"

# ------ setup test ------ #
_handler = log_setting._LogTeeHandler()
logger.addHandler(_handler)

# ------ execution ------ #
logger.info(test_log_msg)

# ------ clenaup ------ #
logger.removeHandler(_handler)

# ------ check result ------ #
_queue = _handler._queue
_log = _queue.get_nowait()
assert _log == test_log_msg

0 comments on commit bc83e31

Please sign in to comment.