From 3e16b59a0aba90de8a6ccec0d16727b3e865e4c8 Mon Sep 17 00:00:00 2001 From: Javad Asgari Shafique Date: Thu, 30 Jan 2025 22:21:13 +0100 Subject: [PATCH] Improve logging (use separate thread to write to files), don't mark data as changed twice for MP connections etc. --- example/__main__.py | 5 +- simplyprint_ws_client/core/client.py | 5 +- .../core/ws_protocol/connection.py | 2 +- .../core/ws_protocol/messages.py | 1 + .../events/event_bus_worker.py | 42 ++++-- .../shared/logging/__init__.py | 121 +++++++++++++++++- .../shared/logging/client_handler.py | 81 ------------ .../shared/logging/client_logger.py | 31 +---- .../shared/logging/client_name.py | 26 ++-- simplyprint_ws_client/shared/utils/slugify.py | 11 ++ 10 files changed, 184 insertions(+), 141 deletions(-) delete mode 100644 simplyprint_ws_client/shared/logging/client_handler.py create mode 100644 simplyprint_ws_client/shared/utils/slugify.py diff --git a/example/__main__.py b/example/__main__.py index 426d790..7144cb4 100644 --- a/example/__main__.py +++ b/example/__main__.py @@ -3,8 +3,7 @@ from simplyprint_ws_client.core.settings import ClientSettings from simplyprint_ws_client.core.ws_protocol.connection import ConnectionMode from simplyprint_ws_client.shared.cli.cli import ClientCli -from simplyprint_ws_client.shared.logging import ClientHandler -from simplyprint_ws_client.shared.sp.url_builder import SimplyPrintBackend +from simplyprint_ws_client.shared.logging import setup_logging from .virtual_client import VirtualClient, VirtualConfig if __name__ == "__main__": @@ -17,7 +16,7 @@ config_manager_t=ConfigManagerType.JSON, ) - ClientHandler.setup_logging(settings) + setup_logging(settings) app = ClientApp(settings) cli = ClientCli(app) cli.start_client = lambda: app.run_blocking() diff --git a/simplyprint_ws_client/core/client.py b/simplyprint_ws_client/core/client.py index cb54cba..3de9d4d 100644 --- a/simplyprint_ws_client/core/client.py +++ b/simplyprint_ws_client/core/client.py @@ -136,7 +136,7 @@ def __init__(self, config: PrinterConfig, *, event_loop_provider: Optional[Event self.printer.set_extruder_count(1) self.printer.set_nozzle_count(1) self.event_bus = EventBus(event_loop_provider=self) - self.logger = logging.getLogger(ClientName.from_client(self)) + self.logger = logging.getLogger(ClientName(self)) self._pending_action_backoff = ExponentialBackoff(10, 600, 3600) instrument(self) @@ -288,7 +288,8 @@ async def _on_multi_printer_added(self, msg: MultiPrinterAddedMsg): # A successful addition does not require a backoff. self._pending_action_backoff.reset() self.config.id = msg.data.pid - await self._on_connected_state() + self.state = State.CONNECTED + self.signal() @configure(ServerMsgType.REMOVE_CONNECTION, priority=1) async def _on_multi_printer_removed(self, msg: MultiPrinterRemovedMsg): diff --git a/simplyprint_ws_client/core/ws_protocol/connection.py b/simplyprint_ws_client/core/ws_protocol/connection.py index 13bf260..4db3a9f 100644 --- a/simplyprint_ws_client/core/ws_protocol/connection.py +++ b/simplyprint_ws_client/core/ws_protocol/connection.py @@ -131,7 +131,7 @@ def __init__( self, session: Optional[ClientSession] = None, hint: Optional[ConnectionHint] = None, - logger: logging.Logger = logging.getLogger(__name__), + logger: logging.Logger = logging.getLogger("simplyprint_ws"), **kwargs ): AsyncStoppable.__init__(self, **kwargs) diff --git a/simplyprint_ws_client/core/ws_protocol/messages.py b/simplyprint_ws_client/core/ws_protocol/messages.py index 701c1ed..adc4ed5 100644 --- a/simplyprint_ws_client/core/ws_protocol/messages.py +++ b/simplyprint_ws_client/core/ws_protocol/messages.py @@ -86,6 +86,7 @@ 'MeshDataMsg', 'LogsSentMsg', 'MaterialDataMsg', + 'NotificationDataMsg', ] from enum import StrEnum, IntEnum diff --git a/simplyprint_ws_client/events/event_bus_worker.py b/simplyprint_ws_client/events/event_bus_worker.py index c3889b0..4a91ead 100644 --- a/simplyprint_ws_client/events/event_bus_worker.py +++ b/simplyprint_ws_client/events/event_bus_worker.py @@ -1,12 +1,11 @@ import asyncio import logging -from abc import ABC +from abc import ABC, abstractmethod from queue import Queue, Empty -from typing import Union, Hashable, NamedTuple, Optional, Dict, Tuple, Any +from typing import Union, Hashable, NamedTuple, Optional, Dict, Tuple, Any, Coroutine from .emitter import TEvent, Emitter from .event_bus import EventBus -from ..shared.hardware.physical_machine import PhysicalMachine from ..shared.utils.stoppable import StoppableThread, AsyncStoppable, StoppableInterface @@ -19,21 +18,32 @@ class _EventQueueItem(NamedTuple): _TEventQueueValue = Optional[_EventQueueItem] -# Calculate max queue size based on memory constraints -# Base buffer of unprocessed events is 5000, for every additional 256MB of memory, add 100 to the buffer -_MAX_QUEUE_SIZE = 5000 + ((PhysicalMachine().total_memory() // (256 * 1024 * 1024)) * 100) +_MAX_QUEUE_SIZE = 10000 class EventBusWorker(Emitter[TEvent], StoppableInterface, ABC): event_bus: EventBus[TEvent] event_queue: Union[Queue[_TEventQueueValue], asyncio.Queue] + logger: logging.Logger = logging.getLogger(__name__) + maxsize: int - def __init__(self, event_bus: EventBus[TEvent]) -> None: + def __init__(self, event_bus: EventBus[TEvent], *args, maxsize=_MAX_QUEUE_SIZE, + logger: Optional[logging.Logger] = None, **kwargs) -> None: self.event_bus = event_bus + self.logger = logger or self.logger + self.maxsize = maxsize + + @abstractmethod + def emit_sync(self, event: Union[Hashable, TEvent], *args, **kwargs) -> Union[None, Coroutine[Any, Any, None]]: + ... + + @abstractmethod + def emit(self, event: Union[Hashable, TEvent], *args, **kwargs) -> Union[None, Coroutine[Any, Any, None]]: + ... def _full_warning(self): if self.event_queue.full(): - logging.warning( + self.logger.warning( f"Event queue worker is full, {self.event_queue.qsize()} events are pending!!! Expect degraded " f"performance.") @@ -52,9 +62,9 @@ def stop(self): class ThreadedEventBusWorker(EventBusWorker[TEvent], StoppableThread): def __init__(self, event_bus: EventBus[TEvent], *args, **kwargs): - EventBusWorker.__init__(self, event_bus) + EventBusWorker.__init__(self, event_bus, *args, **kwargs) StoppableThread.__init__(self, *args, **kwargs) - self.event_queue = Queue(maxsize=_MAX_QUEUE_SIZE) + self.event_queue = Queue(maxsize=self.maxsize) async def emit(self, event: Union[Hashable, TEvent], *args, **kwargs) -> None: if self.is_stopped(): @@ -79,20 +89,22 @@ def run(self): if item is None: break + self.event_queue.task_done() + try: if item.is_async: self.event_bus.emit_task(item.event, *item.args, **item.kwargs) else: self.event_bus.emit_sync(item.event, *item.args, **item.kwargs) except Exception as e: - logging.exception(f"Error while processing event {item.event}", exc_info=e) + self.logger.error(f"Error while processing event {item.event}", exc_info=e) class AsyncEventBusWorker(EventBusWorker[TEvent], AsyncStoppable): def __init__(self, event_bus: EventBus[TEvent], *args, **kwargs): - EventBusWorker.__init__(self, event_bus) + EventBusWorker.__init__(self, event_bus, *args, **kwargs) AsyncStoppable.__init__(self, *args, **kwargs) - self.event_queue = asyncio.Queue(maxsize=_MAX_QUEUE_SIZE) + self.event_queue = asyncio.Queue(maxsize=self.maxsize) async def emit(self, event: Union[Hashable, TEvent], *args, **kwargs) -> None: if self.is_stopped(): @@ -117,10 +129,12 @@ async def run(self): if item is None: break + self.event_queue.task_done() + try: if item.is_async: await self.event_bus.emit(item.event, *item.args, **item.kwargs) else: self.event_bus.emit_sync(item.event, *item.args, **item.kwargs) except Exception as e: - logging.exception(f"Error while processing event {item.event}", exc_info=e) + self.logger.error(f"Error while processing event {item.event}", exc_info=e) diff --git a/simplyprint_ws_client/shared/logging/__init__.py b/simplyprint_ws_client/shared/logging/__init__.py index b01de14..0afdc5f 100644 --- a/simplyprint_ws_client/shared/logging/__init__.py +++ b/simplyprint_ws_client/shared/logging/__init__.py @@ -1,3 +1,122 @@ -from .client_handler import ClientHandler +__all__ = [ + 'ClientFilesHandler', + 'ClientLogger', + 'ClientName', + 'setup_logging', + 'get_log_folder' +] + +import logging +import logging.handlers +import queue +from pathlib import Path +from typing import TYPE_CHECKING, Dict, ClassVar, Optional + from .client_logger import ClientLogger +from ..utils.slugify import slugify + +try: + from typing import Self +except ImportError: + from typing_extensions import Self + +from ...const import APP_DIRS + +if TYPE_CHECKING: + from ...core.app import ClientSettings + from .client_name import ClientName + + +def get_log_folder(name: ClientName) -> Path: + log_folder = APP_DIRS.user_log_path / name.ctx.unique_id + + if not log_folder.exists(): + log_folder.mkdir(parents=True, exist_ok=True) + + return log_folder + + +def create_file_handler(file: Path, *args, **kwargs): + handler = logging.handlers.RotatingFileHandler( + file, + *args, + maxBytes=30 * 1024 * 1024, + backupCount=3, + delay=True, + **kwargs + ) + + return handler + + +def create_root_handler(settings: 'ClientSettings') -> logging.Handler: + if not APP_DIRS.user_log_path.exists(): + APP_DIRS.user_log_path.mkdir(parents=True, exist_ok=True) + main_log_file = APP_DIRS.user_log_path / f"{slugify(settings.name)}.log" + return create_file_handler(main_log_file) + + +def setup_logging(settings: 'ClientSettings') -> callable: + """Setup logging based on client settings.""" + logging_queue = queue.SimpleQueue() + + logging.basicConfig( + level=logging.DEBUG, + handlers=[logging.handlers.QueueHandler(logging_queue)], + format='%(asctime)s.%(msecs)03d | %(levelname)s | %(name)s | %(message)s', + datefmt="%Y-%m-%d %H:%M:%S", + force=True, + ) + + stream_handler = logging.StreamHandler() + client_handler = ClientFilesHandler() + client_handler.set_default_handler(create_root_handler(settings)) + + if settings.development: + stream_handler.setLevel(logging.DEBUG) + else: + stream_handler.setLevel(logging.INFO) + + listener = logging.handlers.QueueListener(logging_queue, stream_handler, client_handler, respect_handler_level=True) + listener.start() + return listener.stop + + +class ClientFilesHandler(logging.Handler): + default_handler: ClassVar[Optional[logging.Handler]] = None + client_handlers: ClassVar[Dict[ClientName, logging.Handler]] = {} + + @classmethod + def set_default_handler(cls, handler: logging.Handler): + cls.default_handler = handler + + @classmethod + def register_client_name(cls, name: ClientName): + if name in cls.client_handlers: + return + + log_folder = get_log_folder(name) + log_name = slugify(name.peek() or "main") + log_file = log_folder / f"{log_name}.log" + cls.client_handlers[name] = create_file_handler(log_file) + + @classmethod + def deregister_client_name(cls, name: ClientName): + cls.client_handlers.pop(name, None) + + def emit(self, record): + return self.handle(record) + + def handle(self, record): + if not isinstance(record.name, ClientName) or record.name not in self.__class__.client_handlers: + if self.__class__.default_handler is not None: + self.__class__.default_handler.emit(record) + + return + + self.__class__.client_handlers[record.name].emit(record) + + +# When loading this module, set the default logger class to ClientLogger +logging.setLoggerClass(ClientLogger) diff --git a/simplyprint_ws_client/shared/logging/client_handler.py b/simplyprint_ws_client/shared/logging/client_handler.py deleted file mode 100644 index 666ee2f..0000000 --- a/simplyprint_ws_client/shared/logging/client_handler.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -import re - -from logging.handlers import RotatingFileHandler -from pathlib import Path -from typing import TYPE_CHECKING, Dict - -try: - from typing import Self -except ImportError: - from typing_extensions import Self - -from ...const import APP_DIRS - -if TYPE_CHECKING: - from ...core.config import PrinterConfig - from .client_name import ClientName - from ...core.app import ClientSettings - - -class ClientHandler(RotatingFileHandler): - formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s', '%m-%d-%Y %H:%M:%S') - handlers: Dict[str, 'ClientHandler'] = {} - - @classmethod - def slugify(cls, name: str) -> str: - # Slugify the log name - name = re.sub(r'[^\w\s.-]', '', name) - name = re.sub(r'[\s_-]+', '-', name) - name = re.sub(r'^-+|-+$', '', name) - return name - - @classmethod - def get_log_folder(cls, config: 'PrinterConfig') -> Path: - log_folder = APP_DIRS.user_log_path / config.unique_id - - if not log_folder.exists(): - log_folder.mkdir(parents=True) - - return log_folder - - @classmethod - def from_client_name(cls, name: 'ClientName') -> Self: - log_folder = cls.get_log_folder(name.config) - - log_name = cls.slugify(name.peek() or "main") - log_file = log_folder / f"{log_name}.log" - - return cls._create_handler( - log_file, - maxBytes=30 * 1024 * 1024, - backupCount=3, - delay=True - ) - - @classmethod - def root_handler(cls, settings: 'ClientSettings') -> Self: - main_log_file = APP_DIRS.user_log_path / f"{cls.slugify(settings.name)}.log" - return cls._create_handler(main_log_file, maxBytes=30 * 1024 * 1024, backupCount=3, delay=True) - - @classmethod - def _create_handler(cls, file_path: Path, *args, **kwargs): - file_key = str(file_path) - - if file_key not in cls.handlers: - handler = cls.handlers[file_key] = cls(file_path, *args, **kwargs) - handler.setFormatter(cls.formatter) - - return cls.handlers[file_key] - - @classmethod - def setup_logging(cls, settings: 'ClientSettings'): - if not APP_DIRS.user_log_path.exists(): - APP_DIRS.user_log_path.mkdir(parents=True, exist_ok=True) - - logging.basicConfig( - level=logging.DEBUG, - format="[%(asctime)s] %(levelname)s %(name)s.%(funcName)s: %(message)s", - handlers=[logging.StreamHandler(), cls.root_handler(settings)], - force=True, - ) diff --git a/simplyprint_ws_client/shared/logging/client_logger.py b/simplyprint_ws_client/shared/logging/client_logger.py index 5b4555a..dfa1506 100644 --- a/simplyprint_ws_client/shared/logging/client_logger.py +++ b/simplyprint_ws_client/shared/logging/client_logger.py @@ -1,3 +1,5 @@ +__all__ = ["ClientLogger"] + import logging from typing import Union @@ -6,7 +8,6 @@ except ImportError: from typing_extensions import Self -from .client_handler import ClientHandler from .client_name import ClientName @@ -36,29 +37,5 @@ def getChild(self, suffix: str) -> Self: return logger def _initialize_client_logger(self): - # Remove the previous ClientHandler - # And add a new one based on the new name - - for handler in self.handlers: - if isinstance(handler, ClientHandler): - self.removeHandler(handler) - - self.addHandler(ClientHandler.from_client_name(self.name)) - - root_stream_handler = None - - for handler in self.root.handlers: - if not type(handler) is logging.StreamHandler: - continue - - root_stream_handler = handler - break - - if root_stream_handler: - self.addHandler(root_stream_handler) - - self.propagate = False - - -# When loading this module, set the default logger class to ClientLogger -logging.setLoggerClass(ClientLogger) + from . import ClientFilesHandler + ClientFilesHandler.register_client_name(self.name) diff --git a/simplyprint_ws_client/shared/logging/client_name.py b/simplyprint_ws_client/shared/logging/client_name.py index b9a6a53..52184d6 100644 --- a/simplyprint_ws_client/shared/logging/client_name.py +++ b/simplyprint_ws_client/shared/logging/client_name.py @@ -1,4 +1,6 @@ -from typing import TYPE_CHECKING, List, Optional +__all__ = ['ClientName'] + +from typing import TYPE_CHECKING, List, Optional, Union try: from typing import Self @@ -9,26 +11,30 @@ from ...core.client import Client from ...core.config import PrinterConfig + TClientContext = Union[Client, PrinterConfig] +else: + TClientContext = Union['Client', 'PrinterConfig'] + class ClientName(str): + ctx: TClientContext stack: List[str] - config: 'PrinterConfig' - def __new__(cls, config: 'PrinterConfig') -> str: - return super().__new__(cls, config.unique_id) + def __new__(cls, ctx: TClientContext) -> str: + return super().__new__(cls, ctx.unique_id) - def __init__(self, config: 'PrinterConfig') -> None: - self.config = config + def __init__(self, ctx: TClientContext) -> None: + self.ctx = ctx self.stack = [] def __str__(self) -> str: - return ".".join([self.config.unique_id] + self.stack) + return ".".join([self.ctx.unique_id] + self.stack) def __hash__(self) -> int: return hash(str(self)) def copy(self) -> Self: - return ClientName(self.config).push_all(self.stack) + return ClientName(self.ctx).push_all(self.stack) def push_all(self, names: List[str]) -> Self: for name in names: @@ -51,7 +57,3 @@ def peek(self) -> Optional[str]: def getChild(self, suffix: str) -> Self: return self.copy().push(suffix) - - @staticmethod - def from_client(client: 'Client') -> 'ClientName': - return ClientName(client.config) diff --git a/simplyprint_ws_client/shared/utils/slugify.py b/simplyprint_ws_client/shared/utils/slugify.py new file mode 100644 index 0000000..b2feb77 --- /dev/null +++ b/simplyprint_ws_client/shared/utils/slugify.py @@ -0,0 +1,11 @@ +__all__ = ['slugify'] + +import re + + +def slugify(name: str) -> str: + # Slugify the log name + name = re.sub(r'[^\w\s.-]', '', name) + name = re.sub(r'[\s_-]+', '-', name) + name = re.sub(r'^-+|-+$', '', name) + return name