Skip to content

Commit

Permalink
Update shared/camera to address a couple of issues, update allow_setu…
Browse files Browse the repository at this point in the history
…p default to True, fix Python compat in module and more. Bump version.
  • Loading branch information
cjavad committed Feb 14, 2025
1 parent 4c5a977 commit c86beb5
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "simplyprint-ws-client"
version = "1.0.1rc7"
version = "1.0.1rc8"
license = "AGPL-3.0-or-later"
authors = ["SimplyPrint <[email protected]>"]
description = "SimplyPrint Websocket Client"
Expand All @@ -25,7 +25,7 @@ typing-extensions = "*"

aiohttp = [
# Python 3.8 support dropped in 3.11.0
{ version = "^3.11.11", python = ">=3.9" },
{ version = "^3.11.12", python = ">=3.9" },
{ version = "^3.10.11", python = "<3.9" }
]

Expand Down
2 changes: 1 addition & 1 deletion simplyprint_ws_client/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class DefaultClient(Client[TConfig], ABC):
"""Default prioritized message handling."""

async def send_ping(self) -> None:
if not self.printer.intervals.use("ping"):
if not self.printer.intervals.is_ready("ping"):
return

self.printer.latency.ping_now()
Expand Down
2 changes: 1 addition & 1 deletion simplyprint_ws_client/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ClientSettings:
event_loop_backend: EventLoopBackend = EventLoopBackend.ASYNCIO
development: bool = False
config_manager_t: ConfigManagerType = ConfigManagerType.MEMORY
allow_setup: bool = False
allow_setup: bool = True
tick_rate = 1.0
reconnect_timeout = 5.0
sentry_dsn: Optional[str] = None
Expand Down
3 changes: 1 addition & 2 deletions simplyprint_ws_client/shared/camera/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,5 @@ class ReceivedFrame(NamedTuple):


Response = Union[
ReceivedFrame,
...
ReceivedFrame
]
1 change: 1 addition & 0 deletions simplyprint_ws_client/shared/camera/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def _refresh_timer(self):

if self.pause_timeout:
self._pause_timer = threading.Timer(self.pause_timeout, self.stop)
self._pause_timer.daemon = True
self._pause_timer.start()

def _read_frame(self):
Expand Down
15 changes: 13 additions & 2 deletions simplyprint_ws_client/shared/camera/handle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from typing import TYPE_CHECKING, List
import time
from typing import TYPE_CHECKING, List, Tuple, Optional

from .base import FrameT
from .commands import Response, PollCamera, StartCamera, \
Expand All @@ -16,6 +17,8 @@ class CameraHandle(StoppableInterface):

_waiters: List[asyncio.Future]
_frame_time_window: List[float]
_last_poll_time: float
_cached_frame: Optional[Tuple[float, FrameT]] = None

def __init__(self, pool: 'CameraPool', camera_id: int):
self.pool = pool
Expand All @@ -34,6 +37,8 @@ def on_response(self, res: Response):
if len(self._frame_time_window) > 10:
self._frame_time_window.pop(0)

self._cached_frame = (res.time, res.data)

while self._waiters:
fut = self._waiters.pop(0)

Expand All @@ -43,8 +48,14 @@ def on_response(self, res: Response):
loop = fut.get_loop()
loop.call_soon_threadsafe(fut.set_result, res.data)

async def receive_frame(self) -> FrameT:
async def receive_frame(self, allow_cached=False) -> FrameT:
# Old frame
if allow_cached and self._cached_frame is not None:
return self._cached_frame[1]

# New frame
self.pool.submit_request(PollCamera(self.id))
self._last_poll_time = time.time()
loop = asyncio.get_running_loop()
fut = loop.create_future()
self._waiters.append(fut)
Expand Down
25 changes: 12 additions & 13 deletions simplyprint_ws_client/shared/camera/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ class ClientCameraMixin(Client):
_camera_pool: Optional[CameraPool] = None
_camera_uri: Optional[URL] = None
_camera_handle: Optional[CameraHandle] = None
_camera_pause_timeout: Optional[int] = None
_stream_lock: CancelableLock
_stream_setup: asyncio.Event

def initialize_camera_mixin(
self,
camera_pool: Optional[CameraPool] = None,
pause_timeout: Optional[int] = None,
**_kwargs
):
self._camera_pool = camera_pool
self._stream_lock = CancelableLock()
self._stream_setup = asyncio.Event()
self._camera_pause_timeout = pause_timeout

def set_camera_uri(self, uri: Optional[URL] = None):
if self._camera_pool is None:
Expand All @@ -46,24 +49,21 @@ def set_camera_uri(self, uri: Optional[URL] = None):

# Create a new camera handle
if self._camera_uri and self._camera_pool:
self._camera_handle = self._camera_pool.create(self._camera_uri)
self._camera_handle = self._camera_pool.create(self._camera_uri, pause_timeout=self._camera_pause_timeout)
self.event_loop.call_soon_threadsafe(self._stream_setup.set)

def __del__(self):
if self._camera_handle:
self._camera_handle.stop()
self.set_camera_uri(None)

async def on_stream_on(self):
await self._stream_setup.wait()

if not self._camera_handle:
return
await self._stream_setup.wait()

self._camera_handle.start()

async def on_stream_off(self):
if not self._camera_handle:
return
await self._stream_setup.wait()

self._camera_handle.pause()
self._stream_lock.cancel()
Expand All @@ -72,14 +72,13 @@ async def on_test_webcam(self):
await self.on_webcam_snapshot(WebcamSnapshotDemandData())

async def on_webcam_snapshot(self, data: WebcamSnapshotDemandData, retries: int = 3, retry_timeout=5):
await self._stream_setup.wait()

# Drop event if no camera is set up.
if not self._camera_handle:
return
await self._stream_setup.wait()

is_snapshot_event = data.id is not None

# Block until the camera is ready.
frame = await self._camera_handle.receive_frame()
frame = await self._camera_handle.receive_frame(allow_cached=is_snapshot_event)

# Empty frame or none.
if not frame:
Expand All @@ -93,7 +92,7 @@ async def on_webcam_snapshot(self, data: WebcamSnapshotDemandData, retries: int
return

# Capture snapshot events and send them to the API
if data.id is not None:
if is_snapshot_event:
await SimplyPrintApi.post_snapshot(data.id, frame, endpoint=data.endpoint)
self.logger.debug(f"Posted snapshot to API with id {data.id}")
return
Expand Down
37 changes: 26 additions & 11 deletions simplyprint_ws_client/shared/camera/pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import logging
import multiprocessing
import sys
Expand All @@ -8,7 +9,7 @@

from yarl import URL

from .base import BaseCameraProtocol
from .base import BaseCameraProtocol, FrameT
from .commands import Request, CreateCamera, Response, PollCamera, \
StartCamera, StopCamera, DeleteCamera, ReceivedFrame
from .controller import CameraController
Expand Down Expand Up @@ -44,7 +45,7 @@ def on_request(self, req: Request):
if isinstance(req, CreateCamera):
with self:
self.instances[req.id] = CameraController(
lambda frame: self.response_queue.put(ReceivedFrame(req.id, time.monotonic(), frame)),
functools.partial(self._send_frame, req.id),
protocol=req.protocol,
pause_timeout=req.pause_timeout,
)
Expand Down Expand Up @@ -76,6 +77,11 @@ def on_request(self, req: Request):
except Exception as e:
logging.debug("Error", exc_info=e)

def _send_frame(self, camera_id: int, frame: Optional[FrameT]):
res = ReceivedFrame(camera_id, time.time(), frame)
self.response_queue.put(res)
logging.debug(f"Sent frame to %s with size %s", camera_id, len(frame) if frame is not None else 0)

def run(self):
try:
self._run()
Expand All @@ -89,7 +95,7 @@ def _run(self):
self.instances = {}

logging.basicConfig(
level=logging.DEBUG,
level=logging.INFO,
format="%(asctime)s [%(process)d] %(message)s", datefmt="%H:%M:%S",
handlers=[logging.StreamHandler(stream=sys.stdout)]
)
Expand Down Expand Up @@ -124,11 +130,12 @@ def __init__(self, pool_size=0, **kwargs):
ProcessStoppable.__init__(self, **kwargs)
Synchronized.__init__(self)

self.processes = [self._create_worker_process() for _ in
range(pool_size or (multiprocessing.cpu_count() - 1))]
self.processes = []
self.protocols = []
self.allocations = {}

self.pool_size = pool_size or multiprocessing.cpu_count()

def _create_worker_process(self):
return CameraWorkerProcess(daemon=True, parent_stoppable=self)

Expand Down Expand Up @@ -179,15 +186,13 @@ def pool_size(self, value):

self.allocations.pop(uuid, None)

process.stop()
process.command_queue.put(None)
process.response_queue.put(None)
self._stop_process(process)

def _start_process(self, process: CameraWorkerProcess):
if process.is_alive() or process.is_stopped():
return

with self:
if process.is_alive() or process.is_stopped():
return

process.start()

process.thread = threading.Thread(
Expand All @@ -197,6 +202,16 @@ def _start_process(self, process: CameraWorkerProcess):
)
process.thread.start()

@staticmethod
def _stop_process(process: CameraWorkerProcess):
with self:
if process.is_stopped():
return

process.stop()
process.command_queue.put(None)
process.response_queue.put(None)

def _next_process_idx(self):
with self:
pool_size = self._pool_size()
Expand Down
28 changes: 28 additions & 0 deletions tests/test_client_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import unittest

from simplyprint_ws_client.core.app import ClientApp
from simplyprint_ws_client.core.client import DefaultClient
from simplyprint_ws_client.core.config import PrinterConfig
from simplyprint_ws_client.core.settings import ClientSettings


class TestClientApp(unittest.TestCase):
def test_virtual_client(self):
client_settings = ClientSettings(
DefaultClient,
PrinterConfig,
camera_workers=0,
)

config = PrinterConfig.get_new()

client_app = ClientApp(client_settings)

client = client_app.add(config)

client_app.run_detached()
client_app.wait(2)

self.assertIsNotNone(client.config.short_id)

client_app.stop()

0 comments on commit c86beb5

Please sign in to comment.