Skip to content

Commit

Permalink
Improve logging (use separate thread to write to files), don't mark d…
Browse files Browse the repository at this point in the history
…ata as changed twice for MP connections etc.
  • Loading branch information
cjavad committed Jan 30, 2025
1 parent e3b0c06 commit 3e16b59
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 141 deletions.
5 changes: 2 additions & 3 deletions example/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from simplyprint_ws_client.core.settings import ClientSettings
from simplyprint_ws_client.core.ws_protocol.connection import ConnectionMode
from simplyprint_ws_client.shared.cli.cli import ClientCli
from simplyprint_ws_client.shared.logging import ClientHandler
from simplyprint_ws_client.shared.sp.url_builder import SimplyPrintBackend
from simplyprint_ws_client.shared.logging import setup_logging
from .virtual_client import VirtualClient, VirtualConfig

if __name__ == "__main__":
Expand All @@ -17,7 +16,7 @@
config_manager_t=ConfigManagerType.JSON,
)

ClientHandler.setup_logging(settings)
setup_logging(settings)
app = ClientApp(settings)
cli = ClientCli(app)
cli.start_client = lambda: app.run_blocking()
Expand Down
5 changes: 3 additions & 2 deletions simplyprint_ws_client/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def __init__(self, config: PrinterConfig, *, event_loop_provider: Optional[Event
self.printer.set_extruder_count(1)
self.printer.set_nozzle_count(1)
self.event_bus = EventBus(event_loop_provider=self)
self.logger = logging.getLogger(ClientName.from_client(self))
self.logger = logging.getLogger(ClientName(self))
self._pending_action_backoff = ExponentialBackoff(10, 600, 3600)
instrument(self)

Expand Down Expand Up @@ -288,7 +288,8 @@ async def _on_multi_printer_added(self, msg: MultiPrinterAddedMsg):
# A successful addition does not require a backoff.
self._pending_action_backoff.reset()
self.config.id = msg.data.pid
await self._on_connected_state()
self.state = State.CONNECTED
self.signal()

@configure(ServerMsgType.REMOVE_CONNECTION, priority=1)
async def _on_multi_printer_removed(self, msg: MultiPrinterRemovedMsg):
Expand Down
2 changes: 1 addition & 1 deletion simplyprint_ws_client/core/ws_protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(
self,
session: Optional[ClientSession] = None,
hint: Optional[ConnectionHint] = None,
logger: logging.Logger = logging.getLogger(__name__),
logger: logging.Logger = logging.getLogger("simplyprint_ws"),
**kwargs
):
AsyncStoppable.__init__(self, **kwargs)
Expand Down
1 change: 1 addition & 0 deletions simplyprint_ws_client/core/ws_protocol/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
'MeshDataMsg',
'LogsSentMsg',
'MaterialDataMsg',
'NotificationDataMsg',
]

from enum import StrEnum, IntEnum
Expand Down
42 changes: 28 additions & 14 deletions simplyprint_ws_client/events/event_bus_worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import asyncio
import logging
from abc import ABC
from abc import ABC, abstractmethod
from queue import Queue, Empty
from typing import Union, Hashable, NamedTuple, Optional, Dict, Tuple, Any
from typing import Union, Hashable, NamedTuple, Optional, Dict, Tuple, Any, Coroutine

from .emitter import TEvent, Emitter
from .event_bus import EventBus
from ..shared.hardware.physical_machine import PhysicalMachine
from ..shared.utils.stoppable import StoppableThread, AsyncStoppable, StoppableInterface


Expand All @@ -19,21 +18,32 @@ class _EventQueueItem(NamedTuple):

_TEventQueueValue = Optional[_EventQueueItem]

# Calculate max queue size based on memory constraints
# Base buffer of unprocessed events is 5000, for every additional 256MB of memory, add 100 to the buffer
_MAX_QUEUE_SIZE = 5000 + ((PhysicalMachine().total_memory() // (256 * 1024 * 1024)) * 100)
_MAX_QUEUE_SIZE = 10000


class EventBusWorker(Emitter[TEvent], StoppableInterface, ABC):
event_bus: EventBus[TEvent]
event_queue: Union[Queue[_TEventQueueValue], asyncio.Queue]
logger: logging.Logger = logging.getLogger(__name__)
maxsize: int

def __init__(self, event_bus: EventBus[TEvent]) -> None:
def __init__(self, event_bus: EventBus[TEvent], *args, maxsize=_MAX_QUEUE_SIZE,
logger: Optional[logging.Logger] = None, **kwargs) -> None:
self.event_bus = event_bus
self.logger = logger or self.logger
self.maxsize = maxsize

@abstractmethod
def emit_sync(self, event: Union[Hashable, TEvent], *args, **kwargs) -> Union[None, Coroutine[Any, Any, None]]:
...

@abstractmethod
def emit(self, event: Union[Hashable, TEvent], *args, **kwargs) -> Union[None, Coroutine[Any, Any, None]]:
...

def _full_warning(self):
if self.event_queue.full():
logging.warning(
self.logger.warning(
f"Event queue worker is full, {self.event_queue.qsize()} events are pending!!! Expect degraded "
f"performance.")

Expand All @@ -52,9 +62,9 @@ def stop(self):

class ThreadedEventBusWorker(EventBusWorker[TEvent], StoppableThread):
def __init__(self, event_bus: EventBus[TEvent], *args, **kwargs):
EventBusWorker.__init__(self, event_bus)
EventBusWorker.__init__(self, event_bus, *args, **kwargs)
StoppableThread.__init__(self, *args, **kwargs)
self.event_queue = Queue(maxsize=_MAX_QUEUE_SIZE)
self.event_queue = Queue(maxsize=self.maxsize)

async def emit(self, event: Union[Hashable, TEvent], *args, **kwargs) -> None:
if self.is_stopped():
Expand All @@ -79,20 +89,22 @@ def run(self):
if item is None:
break

self.event_queue.task_done()

try:
if item.is_async:
self.event_bus.emit_task(item.event, *item.args, **item.kwargs)
else:
self.event_bus.emit_sync(item.event, *item.args, **item.kwargs)
except Exception as e:
logging.exception(f"Error while processing event {item.event}", exc_info=e)
self.logger.error(f"Error while processing event {item.event}", exc_info=e)


class AsyncEventBusWorker(EventBusWorker[TEvent], AsyncStoppable):
def __init__(self, event_bus: EventBus[TEvent], *args, **kwargs):
EventBusWorker.__init__(self, event_bus)
EventBusWorker.__init__(self, event_bus, *args, **kwargs)
AsyncStoppable.__init__(self, *args, **kwargs)
self.event_queue = asyncio.Queue(maxsize=_MAX_QUEUE_SIZE)
self.event_queue = asyncio.Queue(maxsize=self.maxsize)

async def emit(self, event: Union[Hashable, TEvent], *args, **kwargs) -> None:
if self.is_stopped():
Expand All @@ -117,10 +129,12 @@ async def run(self):
if item is None:
break

self.event_queue.task_done()

try:
if item.is_async:
await self.event_bus.emit(item.event, *item.args, **item.kwargs)
else:
self.event_bus.emit_sync(item.event, *item.args, **item.kwargs)
except Exception as e:
logging.exception(f"Error while processing event {item.event}", exc_info=e)
self.logger.error(f"Error while processing event {item.event}", exc_info=e)
121 changes: 120 additions & 1 deletion simplyprint_ws_client/shared/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,122 @@
from .client_handler import ClientHandler
__all__ = [
'ClientFilesHandler',
'ClientLogger',
'ClientName',
'setup_logging',
'get_log_folder'
]

import logging
import logging.handlers
import queue
from pathlib import Path
from typing import TYPE_CHECKING, Dict, ClassVar, Optional

from .client_logger import ClientLogger
from ..utils.slugify import slugify

try:
from typing import Self
except ImportError:
from typing_extensions import Self

from ...const import APP_DIRS

if TYPE_CHECKING:
from ...core.app import ClientSettings

from .client_name import ClientName


def get_log_folder(name: ClientName) -> Path:
log_folder = APP_DIRS.user_log_path / name.ctx.unique_id

if not log_folder.exists():
log_folder.mkdir(parents=True, exist_ok=True)

return log_folder


def create_file_handler(file: Path, *args, **kwargs):
handler = logging.handlers.RotatingFileHandler(
file,
*args,
maxBytes=30 * 1024 * 1024,
backupCount=3,
delay=True,
**kwargs
)

return handler


def create_root_handler(settings: 'ClientSettings') -> logging.Handler:
if not APP_DIRS.user_log_path.exists():
APP_DIRS.user_log_path.mkdir(parents=True, exist_ok=True)
main_log_file = APP_DIRS.user_log_path / f"{slugify(settings.name)}.log"
return create_file_handler(main_log_file)


def setup_logging(settings: 'ClientSettings') -> callable:
"""Setup logging based on client settings."""
logging_queue = queue.SimpleQueue()

logging.basicConfig(
level=logging.DEBUG,
handlers=[logging.handlers.QueueHandler(logging_queue)],
format='%(asctime)s.%(msecs)03d | %(levelname)s | %(name)s | %(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
force=True,
)

stream_handler = logging.StreamHandler()
client_handler = ClientFilesHandler()
client_handler.set_default_handler(create_root_handler(settings))

if settings.development:
stream_handler.setLevel(logging.DEBUG)
else:
stream_handler.setLevel(logging.INFO)

listener = logging.handlers.QueueListener(logging_queue, stream_handler, client_handler, respect_handler_level=True)
listener.start()
return listener.stop


class ClientFilesHandler(logging.Handler):
default_handler: ClassVar[Optional[logging.Handler]] = None
client_handlers: ClassVar[Dict[ClientName, logging.Handler]] = {}

@classmethod
def set_default_handler(cls, handler: logging.Handler):
cls.default_handler = handler

@classmethod
def register_client_name(cls, name: ClientName):
if name in cls.client_handlers:
return

log_folder = get_log_folder(name)
log_name = slugify(name.peek() or "main")
log_file = log_folder / f"{log_name}.log"
cls.client_handlers[name] = create_file_handler(log_file)

@classmethod
def deregister_client_name(cls, name: ClientName):
cls.client_handlers.pop(name, None)

def emit(self, record):
return self.handle(record)

def handle(self, record):
if not isinstance(record.name, ClientName) or record.name not in self.__class__.client_handlers:
if self.__class__.default_handler is not None:
self.__class__.default_handler.emit(record)

return

self.__class__.client_handlers[record.name].emit(record)


# When loading this module, set the default logger class to ClientLogger
logging.setLoggerClass(ClientLogger)
81 changes: 0 additions & 81 deletions simplyprint_ws_client/shared/logging/client_handler.py

This file was deleted.

Loading

0 comments on commit 3e16b59

Please sign in to comment.