From a1af88939fea972e6efe8908474f94079c7d1fe9 Mon Sep 17 00:00:00 2001 From: Jevgeni Kiski Date: Tue, 17 Sep 2024 15:25:43 +0300 Subject: [PATCH 1/4] Get rid of ThreadQueueInterface in Text interfaces #500 --- .pre-commit-config.yaml | 6 +- paradox/interfaces/text/core.py | 11 +- paradox/interfaces/text/gsm.py | 142 ++++++++++---------------- paradox/interfaces/text/pushbullet.py | 31 +++--- paradox/interfaces/text/signal.py | 22 ++-- paradox/lib/async_message_manager.py | 14 +-- tests/interfaces/test_gsm.py | 117 +++++++++++++++++++++ tests/pai.conf | 2 + tests/test_async_queue.py | 64 +++++------- 9 files changed, 239 insertions(+), 170 deletions(-) create mode 100644 tests/interfaces/test_gsm.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4099e599..b7b4822b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,13 +16,13 @@ repos: - id: trailing-whitespace - repo: https://github.com/asottile/pyupgrade - rev: v3.15.2 + rev: v3.17.0 hooks: - id: pyupgrade args: ["--py37-plus"] - repo: https://github.com/psf/black - rev: 24.4.0 + rev: 24.8.0 hooks: - id: black args: @@ -35,7 +35,7 @@ repos: - id: isort - repo: https://github.com/PyCQA/flake8 - rev: 7.0.0 + rev: 7.1.1 hooks: - id: flake8 additional_dependencies: [flake8-bugbear] diff --git a/paradox/interfaces/text/core.py b/paradox/interfaces/text/core.py index 6d6f9041..2408f6ce 100644 --- a/paradox/interfaces/text/core.py +++ b/paradox/interfaces/text/core.py @@ -2,14 +2,14 @@ from paradox.config import config as cfg from paradox.event import Event, EventLevel, Notification -from paradox.interfaces import ThreadQueueInterface +from paradox.interfaces import AsyncInterface from paradox.lib import ps from paradox.lib.event_filter import EventFilter, EventTagFilter, LiveEventRegexpFilter logger = logging.getLogger("PAI").getChild(__name__) -class AbstractTextInterface(ThreadQueueInterface): +class AbstractTextInterface(AsyncInterface): """Interface Class using any Text interface""" def __init__(self, alarm, event_filter: EventFilter, min_level=EventLevel.INFO): @@ -20,12 +20,7 @@ def __init__(self, alarm, event_filter: EventFilter, min_level=EventLevel.INFO): self.min_level = min_level self.alarm = alarm - def stop(self): - super().stop() - - def _run(self): - super()._run() - + async def run(self): ps.subscribe(self.handle_panel_event, "events") ps.subscribe(self.handle_notify, "notifications") diff --git a/paradox/interfaces/text/gsm.py b/paradox/interfaces/text/gsm.py index 3201817e..36ae8724 100644 --- a/paradox/interfaces/text/gsm.py +++ b/paradox/interfaces/text/gsm.py @@ -1,11 +1,7 @@ -# -*- coding: utf-8 -*- - import asyncio -import datetime import json import logging import os -from concurrent import futures import serial_asyncio @@ -24,22 +20,16 @@ class SerialConnectionProtocol(ConnectionProtocol): def __init__(self, handler: ConnectionHandler): - super(SerialConnectionProtocol, self).__init__(handler) - self.buffer = b"" - self.loop = asyncio.get_event_loop() + super().__init__(handler) self.last_message = b"" - def connection_made(self, transport): - super(SerialConnectionProtocol, self).connection_made(transport) - self.handler.on_connection() - async def send_message(self, message): self.last_message = message self.transport.write(message + b"\r\n") def data_received(self, recv_data): self.buffer += recv_data - logger.debug("BUFFER: {}".format(self.buffer)) + logger.debug(f"BUFFER: {self.buffer}") while len(self.buffer) >= 0: r = self.buffer.find(b"\r\n") # not found @@ -61,25 +51,22 @@ def data_received(self, recv_data): if self.last_message == frame: self.last_message = b"" elif len(frame) > 0: - self.loop.create_task(self.handler.on_message(frame)) # Callback + self.handler.on_message(frame) # Callback def connection_lost(self, exc): logger.error("The serial port was closed") - self.buffer = b"" self.last_message = b"" - super(SerialConnectionProtocol, self).connection_lost(exc) + super().connection_lost(exc) class SerialCommunication(ConnectionHandler): - def __init__(self, loop, port, baud=9600, timeout=5, recv_callback=None): + def __init__(self, port, baud=9600, timeout=5): self.port_path = port self.baud = baud self.connected_future = None - self.recv_callback = recv_callback - self.loop = loop + self.recv_callback = None self.connected = False self.connection = None - asyncio.set_event_loop(loop) self.queue = asyncio.Queue() def clear(self): @@ -96,14 +83,12 @@ def on_connection(self): self.connected = True def on_message(self, message: bytes): - logger.debug("M->I: {}".format(message)) + logger.debug(f"M->I: {message}") if self.recv_callback is not None: - return asyncio.get_event_loop().call_soon( - self.recv_callback(message) - ) # Callback + self.recv_callback(message) # Callback else: - return self.queue.put_nowait(message) + self.queue.put_nowait(message) def set_recv_callback(self, callback): self.recv_callback = callback @@ -120,23 +105,23 @@ def make_protocol(self): return SerialConnectionProtocol(self) async def write(self, message, timeout=15): - logger.debug("I->M: {}".format(message)) + logger.debug(f"I->M: {message}") if self.connection is not None: await self.connection.send_message(message) - return await asyncio.wait_for(self.queue.get(), timeout=5, loop=self.loop) + return await asyncio.wait_for(self.queue.get(), timeout=5) async def read(self, timeout=5): if self.connection is not None: return await asyncio.wait_for(self.queue.get(), timeout=timeout) async def connect(self): - logger.info("Connecting to serial port {}".format(self.port_path)) + logger.info(f"Connecting to serial port {self.port_path}") - self.connected_future = self.loop.create_future() - self.loop.call_later(5, self.open_timeout) + self.connected_future = asyncio.get_event_loop().create_future() + asyncio.get_event_loop().call_later(5, self.open_timeout) _, self.connection = await serial_asyncio.create_serial_connection( - self.loop, self.make_protocol, self.port_path, self.baud + asyncio.get_event_loop(), self.make_protocol, self.port_path, self.baud ) return await self.connected_future @@ -156,73 +141,64 @@ def __init__(self, alarm): self.port = None self.modem_connected = False - self.loop = asyncio.new_event_loop() self.message_cmt = None def stop(self): - """ Stops the GSM Interface Thread""" - self.stop_running.set() - - self.loop.stop() + """Stops the GSM Interface""" super().stop() + logger.debug("GSM Stopped. TODO: Implement a proper stop") - logger.debug("GSM Stopped") - - def write(self, message: str, expected: str = None) -> None: + async def write(self, message: str, expected: str = None) -> None: r = b"" while r != expected: - r = self.loop.run_until_complete(self.port.write(message)) + r = await self.port.write(message) data = b"" if r == b"ERROR": - raise Exception("Got error from modem: {}".format(r)) + raise Exception(f"Got error from modem: {r}") while r != expected: - r = self.loop.run_until_complete(self.port.read()) + r = await self.port.read() data += r + b"\n" - def connect(self): - logger.info( - "Using {} at {} baud".format(cfg.GSM_MODEM_PORT, cfg.GSM_MODEM_BAUDRATE) - ) + async def connect(self): + logger.info(f"Using {cfg.GSM_MODEM_PORT} at {cfg.GSM_MODEM_BAUDRATE} baud") try: if not os.path.exists(cfg.GSM_MODEM_PORT): - logger.error("Modem port ({}) not found".format(cfg.GSM_MODEM_PORT)) + logger.error(f"Modem port ({cfg.GSM_MODEM_PORT}) not found") return False self.port = SerialCommunication( - self.loop, cfg.GSM_MODEM_PORT, cfg.GSM_MODEM_BAUDRATE, 5 + cfg.GSM_MODEM_PORT, cfg.GSM_MODEM_BAUDRATE, 5 ) - except: - logger.exception( - "Could not open port {} for GSM modem".format(cfg.GSM_MODEM_PORT) - ) + except Exception: + logger.exception(f"Could not open port {cfg.GSM_MODEM_PORT} for GSM modem") return False self.port.set_recv_callback(None) - result = self.loop.run_until_complete(self.port.connect()) + result = await self.port.connect() if not result: logger.exception("Could not connect to GSM modem") return False try: - self.write(b"AT", b"OK") # Init - self.write(b"ATE0", b"OK") # Disable Echo - self.write(b"AT+CMEE=2", b"OK") # Increase verbosity - self.write(b"AT+CMGF=1", b"OK") # SMS Text mode - self.write(b"AT+CFUN=1", b"OK") # Enable modem - self.write( + await self.write(b"AT", b"OK") # Init + await self.write(b"ATE0", b"OK") # Disable Echo + await self.write(b"AT+CMEE=2", b"OK") # Increase verbosity + await self.write(b"AT+CMGF=1", b"OK") # SMS Text mode + await self.write(b"AT+CFUN=1", b"OK") # Enable modem + await self.write( b"AT+CNMI=1,2,0,0,0", b"OK" ) # SMS received only when modem enabled, Use +CMT with SMS, No Status Report, - self.write(b"AT+CUSD=1", b"OK") # Enable result code presentation + await self.write(b"AT+CUSD=1", b"OK") # Enable result code presentation - except futures.TimeoutError as e: + except asyncio.TimeoutError: logger.error("No reply from modem") return False - except: + except Exception: logger.exception("Modem connect error") return False @@ -234,18 +210,14 @@ def connect(self): self.modem_connected = True return True - def _run(self): - super(GSMTextInterface, self)._run() + async def run(self): + await super().run() - while not self.modem_connected and not self.stop_running.isSet(): - if not self.connect(): + while not self.modem_connected: + if not await self.connect(): logger.warning("Could not connect to modem") - self.stop_running.wait(5) - - self.loop.run_forever() - - self.stop_running.wait() + await asyncio.sleep(5) async def data_received(self, data: str) -> bool: logger.debug(f"Data Received: {data}") @@ -262,21 +234,18 @@ async def data_received(self, data: str) -> bool: return True - def handle_message(self, timestamp: str, source: str, message: str) -> None: - """ Handle GSM message. It should be a command """ + async def handle_message(self, timestamp: str, source: str, message: str) -> None: + """Handle GSM message. It should be a command""" - logger.debug("Received: {} {} {}".format(timestamp, source, message)) + logger.debug(f"Received: {timestamp} {source} {message}") if source in cfg.GSM_CONTACTS: - future = asyncio.run_coroutine_threadsafe( - self.handle_command(message), self.alarm.work_loop - ) - ret = future.result(10) + ret = await self.handle_command(message) - m = "GSM {}: {}".format(source, ret) + m = f"GSM {source}: {ret}" logger.info(m) else: - m = "GSM {} (UNK): {}".format(source, message) + m = f"GSM {source} (UNK): {message}" logger.warning(m) self.send_message(m, EventLevel.INFO) @@ -284,7 +253,7 @@ def handle_message(self, timestamp: str, source: str, message: str) -> None: Notification(sender=self.name, message=m, level=EventLevel.INFO) ) - def send_message(self, message: str, level: EventLevel) -> None: + async def send_message(self, message: str, level: EventLevel) -> None: if self.port is None: logger.warning("GSM not available when sending message") return @@ -293,12 +262,9 @@ def send_message(self, message: str, level: EventLevel) -> None: data = b'AT+CMGS="%b"\x0d%b\x1a' % (dst.encode(), message.encode()) try: - future = asyncio.run_coroutine_threadsafe( - self.port.write(data), self.loop - ) - result = future.result() - logger.debug("SMS result: {}".format(result)) - except: + result = await self.port.write(data) + logger.debug(f"SMS result: {result}") + except Exception: logger.exception("ERROR sending SMS") def process_cmt(self, header: str, text: str) -> None: @@ -308,8 +274,8 @@ def process_cmt(self, header: str, text: str) -> None: tokens = json.loads(f"[{header[idx:]}]", strict=False) - logger.debug("On {}, {} sent {}".format(tokens[2], tokens[0], text)) - self.handle_message(tokens[2], tokens[0], text) + logger.debug(f"On {tokens[2]}, {tokens[0]} sent {text}") + asyncio.create_task(self.handle_message(tokens[2], tokens[0], text)) def process_cusd(self, message: str) -> None: idx = message.find(" ") diff --git a/paradox/interfaces/text/pushbullet.py b/paradox/interfaces/text/pushbullet.py index 07183030..47c972e9 100644 --- a/paradox/interfaces/text/pushbullet.py +++ b/paradox/interfaces/text/pushbullet.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- import asyncio import json import logging @@ -23,7 +22,7 @@ class PushBulletWSClient(WebSocketBaseClient): name = "pushbullet" def __init__(self, interface, url): - """ Initializes the PB WS Client""" + """Initializes the PB WS Client""" super().__init__(url) self.pb = Pushbullet(cfg.PUSHBULLET_KEY) @@ -31,7 +30,7 @@ def __init__(self, interface, url): self.interface = interface self.device = None - for i, device in enumerate(self.pb.devices): + for _, device in enumerate(self.pb.devices): if device.nickname == cfg.PUSHBULLET_DEVICE: logger.debug("Device found") self.device = device @@ -45,12 +44,12 @@ def stop(self): self.manager.stop() def handshake_ok(self): - """ Callback trigger when connection succeeded""" + """Callback trigger when connection succeeded""" logger.info("Handshake OK") self.manager.add(self) self.manager.start() for chat in self.pb.chats: - logger.debug("Associated contacts: {}".format(chat)) + logger.debug(f"Associated contacts: {chat}") # Receiving pending messages self.received_message(json.dumps({"type": "tickle", "subtype": "push"})) @@ -58,12 +57,12 @@ def handshake_ok(self): self.send_message("Active") def received_message(self, message): - """ Handle Pushbullet message. It should be a command """ - logger.debug("Received Message {}".format(message)) + """Handle Pushbullet message. It should be a command""" + logger.debug(f"Received Message {message}") try: message = json.loads(str(message)) - except: + except Exception: logger.exception("Unable to parse message") return @@ -107,11 +106,11 @@ def received_message(self, message): ) def unhandled_error(self, error): - logger.error("{}".format(error)) + logger.error(f"{error}") try: self.terminate() - except: + except Exception: logger.exception("Closing Pushbullet WS") self.close() @@ -129,7 +128,7 @@ def send_message(self, msg, dstchat=None): if chat.email in cfg.PUSHBULLET_CONTACTS: try: self.pb.push_note(cfg.PUSHBULLET_DEVICE, msg, chat=chat) - except: + except Exception: logger.exception("Sending message") time.sleep(5) @@ -148,21 +147,21 @@ def __init__(self, alarm): self.name = PushBulletWSClient.name self.pb_ws = None - def _run(self): - super(PushbulletTextInterface, self)._run() + async def run(self): + await super().run() try: self.pb_ws = PushBulletWSClient( self, - "wss://stream.pushbullet.com/websocket/{}".format(cfg.PUSHBULLET_KEY), + f"wss://stream.pushbullet.com/websocket/{cfg.PUSHBULLET_KEY}", ) self.pb_ws.connect() - except: + except Exception: logger.exception("Could not connect to Pushbullet service") logger.info("Pushbullet Interface Started") def stop(self): - """ Stops the Pushbullet interface""" + """Stops the Pushbullet interface""" super().stop() if self.pb_ws is not None: self.pb_ws.stop() diff --git a/paradox/interfaces/text/signal.py b/paradox/interfaces/text/signal.py index 1bdcd28c..6ace577a 100644 --- a/paradox/interfaces/text/signal.py +++ b/paradox/interfaces/text/signal.py @@ -1,15 +1,16 @@ -# -*- coding: utf-8 -*- import asyncio import logging from gi.repository import GLib +from pydbus import SystemBus + from paradox.config import config as cfg from paradox.event import EventLevel, Notification + # Signal interface. # Only exposes critical status changes and accepts commands from paradox.interfaces.text.core import ConfiguredAbstractTextInterface from paradox.lib import ps -from pydbus import SystemBus logger = logging.getLogger("PAI").getChild(__name__) @@ -30,8 +31,7 @@ def __init__(self, alarm): self.loop = None def stop(self): - - """ Stops the Signal Interface Thread""" + """Stops the Signal Interface Thread""" if self.loop is not None: self.loop.quit() @@ -39,8 +39,8 @@ def stop(self): logger.debug("Signal Stopped") - def _run(self): - super(SignalTextInterface, self)._run() + async def run(self): + await super().run() bus = SystemBus() @@ -50,7 +50,7 @@ def _run(self): logger.debug("Signal Interface Running") - self.loop.run() + asyncio.get_event_loop().run_in_executor(None, self.loop.run) def send_message(self, message: str, level: EventLevel): if self.signal is None: @@ -59,11 +59,11 @@ def send_message(self, message: str, level: EventLevel): try: self.signal.sendMessage(str(message), [], cfg.SIGNAL_CONTACTS) - except: + except Exception: logger.exception("Signal send message") def handle_message(self, timestamp, source, groupID, message, attachments): - """ Handle Signal message. It should be a command """ + """Handle Signal message. It should be a command""" logger.debug( "Received Message {} {} {} {} {}".format( @@ -77,10 +77,10 @@ def handle_message(self, timestamp, source, groupID, message, attachments): ) ret = future.result(10) - m = "Signal {} : {}".format(source, ret) + m = f"Signal {source} : {ret}" logger.info(m) else: - m = "Signal {} (UNK): {}".format(source, message) + m = f"Signal {source} (UNK): {message}" logger.warning(m) self.send_message(m, EventLevel.INFO) diff --git a/paradox/lib/async_message_manager.py b/paradox/lib/async_message_manager.py index 5089a8c9..fbaa4f2e 100644 --- a/paradox/lib/async_message_manager.py +++ b/paradox/lib/async_message_manager.py @@ -26,13 +26,9 @@ def can_handle(self, data: Container) -> bool: class AsyncMessageManager: - def __init__(self, loop=None): + def __init__(self): super().__init__() - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop - self.handler_registry = HandlerRegistry() self.raw_handler_registry = HandlerRegistry() @@ -58,7 +54,11 @@ def deregister_handler(self, name): self.handler_registry.remove_by_name(name) def schedule_message_handling(self, message: Container): - return self.loop.create_task(self.handler_registry.handle(message)) + return asyncio.get_event_loop().create_task( + self.handler_registry.handle(message) + ) def schedule_raw_message_handling(self, message: Container): - return self.loop.create_task(self.raw_handler_registry.handle(message)) + return asyncio.get_event_loop().create_task( + self.raw_handler_registry.handle(message) + ) diff --git a/tests/interfaces/test_gsm.py b/tests/interfaces/test_gsm.py new file mode 100644 index 00000000..89406b38 --- /dev/null +++ b/tests/interfaces/test_gsm.py @@ -0,0 +1,117 @@ +import asyncio +from unittest import mock + +import pytest + +from paradox.interfaces.text.gsm import ( + GSMTextInterface, + SerialCommunication, + SerialConnectionProtocol, +) + + +@pytest.fixture +async def connected_serial_communication(): + port = "test_port" + baud = 9600 + timeout = 5 + comm = SerialCommunication(port, baud, timeout) + + assert comm.queue.empty() + + async def mocked_create_serial_connection(loop, protocol_factory, *args, **kwargs): + transport = mock.Mock() + protocol = comm.make_protocol() + asyncio.get_event_loop().call_soon(protocol.connection_made, transport) + return (transport, protocol) + + with mock.patch( + "serial_asyncio.create_serial_connection", + new_callable=mock.AsyncMock, + side_effect=mocked_create_serial_connection, + ): + asyncio.get_event_loop().call_soon(comm.on_connection) + result = await comm.connect() + assert result + + assert comm.connected + + return comm + + +# Test SerialConnectionProtocol class +@pytest.mark.asyncio +async def test_serial_connection_protocol(): + handler = mock.MagicMock() + protocol = SerialConnectionProtocol(handler) + + transport = mock.MagicMock() + protocol.connection_made(transport) + handler.on_connection.assert_called_once() + + message = b"test_message" + await protocol.send_message(message) + transport.write.assert_called_once_with(message + b"\r\n") + + recv_data = b"test_data\r\n" + protocol.data_received(recv_data) + handler.on_message.assert_called_once_with(b"test_data") + + exc = Exception("test_exception") + protocol.connection_lost(exc) + handler.on_connection_loss.assert_called_once_with() + + +# Test SerialCommunication class +@pytest.mark.asyncio +async def test_serial_communication(connected_serial_communication): + comm = connected_serial_communication + + write_message = b"write_message" + write_response_message = b"write_response_message" + read_message = b"read_message" + + asyncio.get_event_loop().call_soon(comm.on_message, write_response_message) + result = await comm.write(write_message) + assert result == write_response_message + + asyncio.get_event_loop().call_soon(comm.on_message, read_message) + await comm.read() + assert comm.queue.empty() + + callback = mock.MagicMock() + comm.set_recv_callback(callback) + assert comm.recv_callback == callback + comm.on_message(read_message) + callback.assert_called_once_with(read_message) + + +# Test GSMTextInterface class +@pytest.mark.asyncio +async def test_gsm_text_interface(connected_serial_communication): + alarm = mock.MagicMock() + event = asyncio.Event() + + async def control_partition(partition, command): + assert partition == "outside" + assert command == "arm" + event.set() + + return True + + interface = GSMTextInterface(alarm) + interface.port = connected_serial_communication + interface.modem_connected = True + + data = b"+CMT: test_data" + await interface.data_received(data) + assert interface.message_cmt == data.decode() + + # level = EventLevel.INFO + # await interface.send_message("bla", level) + + header = '+CMT: "+1234567890","","24/09/17,10:30:00+32"' + text = "partition outside arm" + alarm.control_partition.side_effect = control_partition + interface.process_cmt(header, text) + await asyncio.wait_for(event.wait(), timeout=0.1) diff --git a/tests/pai.conf b/tests/pai.conf index a8ccbf86..ba0793bd 100644 --- a/tests/pai.conf +++ b/tests/pai.conf @@ -1,3 +1,5 @@ # Just make Config class happy and use defaults. LOGGING_FILE=None + +GSM_CONTACTS = ["+1234567890"] diff --git a/tests/test_async_queue.py b/tests/test_async_queue.py index ac329101..ad0e8fa0 100644 --- a/tests/test_async_queue.py +++ b/tests/test_async_queue.py @@ -1,9 +1,9 @@ import asyncio import binascii - from unittest import mock -import pytest + from construct import Container +import pytest from paradox.hardware.evo.parsers import LiveEvent, ReadEEPROMResponse from paradox.lib.async_message_manager import AsyncMessageManager @@ -19,12 +19,11 @@ def print_beer(m): print("beer") -def test_event_handler(): +@pytest.mark.asyncio +async def test_event_handler(): eh = EventMessageHandler(print_beer) - loop = asyncio.get_event_loop() - mh = AsyncMessageManager(loop) - + mh = AsyncMessageManager() mh.register_handler(eh) assert 1 == len(mh.handler_registry) @@ -33,13 +32,13 @@ def test_event_handler(): message = LiveEvent.parse(payload) - coro = asyncio.ensure_future(mh.schedule_message_handling(message)) - loop.run_until_complete(coro) + await mh.schedule_message_handling(message) assert 1 == len(mh.handler_registry) -def test_event_handler_failure(): +@pytest.mark.asyncio +async def test_event_handler_failure(): # eeprom_request_bin = binascii.unhexlify('500800009f004037') eeprom_response_bin = binascii.unhexlify( "524700009f0041133e001e0e0400000000060a0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000121510010705004e85" @@ -48,26 +47,24 @@ def test_event_handler_failure(): eh = EventMessageHandler(print_beer) eh.handle = mock.MagicMock() - loop = asyncio.get_event_loop() - mh = AsyncMessageManager(loop) - + mh = AsyncMessageManager() mh.register_handler(eh) assert 1 == len(mh.handler_registry) message = ReadEEPROMResponse.parse(eeprom_response_bin) - coro = asyncio.ensure_future(mh.schedule_message_handling(message)) - loop.run_until_complete(coro) + coro = mh.schedule_message_handling(message) + result = await coro assert ( - coro.result() is None + result is None ) # failed to parse response message return None. Maybe needs to throw something. assert 1 == len(mh.handler_registry) eh.handle.assert_not_called() -def test_handler_two_messages(): +async def test_handler_two_messages(): def event_handler(message): print("event") @@ -80,28 +77,21 @@ async def get_eeprom_result(mhm): "524700009f0041133e001e0e0400000000060a0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000121510010705004e85" ) - loop = asyncio.get_event_loop() - mh = AsyncMessageManager(loop) + mh = AsyncMessageManager() event_handler = EventMessageHandler(event_handler) mh.register_handler(event_handler) # running - task_handle_wait = loop.create_task(asyncio.sleep(0.1)) - task_get_eeprom = loop.create_task(get_eeprom_result(mh)) + task_handle_wait = asyncio.create_task(asyncio.sleep(0.1)) + task_get_eeprom = asyncio.create_task(get_eeprom_result(mh)) task_handle_event1 = mh.schedule_message_handling( LiveEvent.parse(event_response_bin) ) mh.schedule_message_handling(ReadEEPROMResponse.parse(eeprom_response_bin)) - task_handle_event2 = mh.schedule_message_handling( - LiveEvent.parse(event_response_bin) - ) - - # assert 2 == len(mh.handlers) + mh.schedule_message_handling(LiveEvent.parse(event_response_bin)) - loop.run_until_complete( - asyncio.gather(task_handle_wait, task_get_eeprom) - ) + await asyncio.gather(task_handle_wait, task_get_eeprom) assert 1 == len(mh.handler_registry) @@ -111,7 +101,8 @@ async def get_eeprom_result(mhm): assert 1 == len(mh.handler_registry) -def test_handler_timeout(): +@pytest.mark.asyncio +async def test_handler_timeout(): def event_handler(message): print("event received") @@ -131,23 +122,22 @@ async def post_eeprom_message(mhm): ReadEEPROMResponse.parse(eeprom_response_bin) ) - loop = asyncio.get_event_loop() - mh = AsyncMessageManager(loop) + mh = AsyncMessageManager() # running - task_get_eeprom = loop.create_task(get_eeprom_result(mh)) - loop.create_task(post_eeprom_message(mh)) + task_get_eeprom = asyncio.create_task(get_eeprom_result(mh)) + asyncio.create_task(post_eeprom_message(mh)) assert 0 == len(mh.handler_registry) with pytest.raises(asyncio.TimeoutError): - loop.run_until_complete(task_get_eeprom) + await task_get_eeprom assert 0 == len(mh.handler_registry) # Also test EventMessageHandler - event_handler = EventMessageHandler(event_handler) - mh.register_handler(event_handler) + event_handler_instance = EventMessageHandler(event_handler) + mh.register_handler(event_handler_instance) event_response_bin = b"\xe2\xff\xad\x06\x14\x13\x01\x04\x0e\x10\x00\x01\x05\x00\x00\x00\x00\x00\x02Living room \x00\xcc" task_handle_event1 = mh.schedule_message_handling( @@ -156,6 +146,6 @@ async def post_eeprom_message(mhm): assert 1 == len(mh.handler_registry) - loop.run_until_complete(task_handle_event1) + await task_handle_event1 assert 1 == len(mh.handler_registry) From 3651314e9ac0812145c9650b5b431095273de07c Mon Sep 17 00:00:00 2001 From: Jevgeni Kiski Date: Tue, 17 Sep 2024 15:31:40 +0300 Subject: [PATCH 2/4] asyncio_default_fixture_loop_scope --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 86246907..83a2af82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,3 +22,4 @@ combine_as_imports = true [tool.pytest.ini_options] asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" From f1ba7f7a7f65c377fd9db5d8ffa0aadad4abaa5d Mon Sep 17 00:00:00 2001 From: Jevgeni Kiski Date: Tue, 17 Sep 2024 15:40:10 +0300 Subject: [PATCH 3/4] Get rid of loops --- paradox/connections/ip/connection.py | 10 ++++++---- paradox/connections/serial_connection.py | 15 ++++++--------- paradox/interfaces/text/signal.py | 10 +++++----- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/paradox/connections/ip/connection.py b/paradox/connections/ip/connection.py index 46f81265..f0fd7519 100644 --- a/paradox/connections/ip/connection.py +++ b/paradox/connections/ip/connection.py @@ -60,7 +60,7 @@ def __init__(self, host="127.0.0.1", port=10000): self.port = port async def _try_connect(self): - _, self._protocol = await self.loop.create_connection( + _, self._protocol = await asyncio.get_event_loop().create_connection( self._make_protocol, host=self.host, port=self.port ) @@ -90,7 +90,9 @@ def set_key(self, value): self._protocol.key = value def on_ip_message(self, container: Container): - return self.loop.create_task(self.ip_handler_registry.handle(container)) + return asyncio.get_event_loop().create_task( + self.ip_handler_registry.handle(container) + ) async def wait_for_ip_message(self, timeout=cfg.IO_TIMEOUT) -> Container: future = FutureHandler() @@ -115,7 +117,7 @@ def __init__( self.port = port async def _try_connect(self) -> None: - _, self._protocol = await self.loop.create_connection( + _, self._protocol = await asyncio.get_event_loop().create_connection( self._make_protocol, host=self.host, port=self.port ) @@ -146,7 +148,7 @@ def write(self, data: bytes): async def _try_connect(self) -> None: await self.stun_session.connect() - _, self._protocol = await self.loop.create_connection( + _, self._protocol = await asyncio.get_event_loop().create_connection( self._make_protocol, sock=self.stun_session.get_socket() ) diff --git a/paradox/connections/serial_connection.py b/paradox/connections/serial_connection.py index 537307a8..3234b084 100644 --- a/paradox/connections/serial_connection.py +++ b/paradox/connections/serial_connection.py @@ -1,13 +1,10 @@ -# -*- coding: utf-8 -*- - - +import asyncio import logging import os import stat -import typing -import serial_asyncio from serial import SerialException +import serial_asyncio from ..exceptions import SerialConnectionOpenFailed from .connection import Connection @@ -67,12 +64,12 @@ async def connect(self) -> bool: logger.error(f"Failed to update file {self.port_path} permissions") return False - self.connected_future = self.loop.create_future() - open_timeout_handler = self.loop.call_later(5, self.open_timeout) + self.connected_future = asyncio.get_event_loop().create_future() + open_timeout_handler = asyncio.get_event_loop().call_later(5, self.open_timeout) try: _, self._protocol = await serial_asyncio.create_serial_connection( - self.loop, self.make_protocol, self.port_path, self.baud + asyncio.get_event_loop(), self.make_protocol, self.port_path, self.baud ) return await self.connected_future @@ -81,7 +78,7 @@ async def connect(self) -> bool: raise SerialConnectionOpenFailed( "Connection to serial port failed" ) from e # PAICriticalException - except: + except Exception: logger.exception("Unable to connect to Serial") finally: open_timeout_handler.cancel() diff --git a/paradox/interfaces/text/signal.py b/paradox/interfaces/text/signal.py index 6ace577a..81f92e33 100644 --- a/paradox/interfaces/text/signal.py +++ b/paradox/interfaces/text/signal.py @@ -28,12 +28,12 @@ def __init__(self, alarm): ) self.signal = None - self.loop = None + self.glib_loop = None def stop(self): """Stops the Signal Interface Thread""" - if self.loop is not None: - self.loop.quit() + if self.glib_loop is not None: + self.glib_loop.quit() super().stop() @@ -46,11 +46,11 @@ async def run(self): self.signal = bus.get("org.asamk.Signal") self.signal.onMessageReceived = self.handle_message - self.loop = GLib.MainLoop() + self.glib_loop = GLib.MainLoop() logger.debug("Signal Interface Running") - asyncio.get_event_loop().run_in_executor(None, self.loop.run) + asyncio.get_event_loop().run_in_executor(None, self.glib_loop.run) def send_message(self, message: str, level: EventLevel): if self.signal is None: From 5df1caafe0f7680bb37f20564b98fb156dd29431 Mon Sep 17 00:00:00 2001 From: Jevgeni Kiski Date: Tue, 17 Sep 2024 15:51:01 +0300 Subject: [PATCH 4/4] Fix tests --- tests/connection/ip/test_bare_connection.py | 5 +++- .../connection/ip/test_local_ip_connection.py | 5 +++- .../connection/ip/test_stun_ip_connection.py | 23 +++++++++++-------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/connection/ip/test_bare_connection.py b/tests/connection/ip/test_bare_connection.py index 89165930..524aeefb 100644 --- a/tests/connection/ip/test_bare_connection.py +++ b/tests/connection/ip/test_bare_connection.py @@ -1,3 +1,4 @@ +import asyncio from unittest.mock import AsyncMock import pytest @@ -14,7 +15,9 @@ async def test_connect(mocker): protocol.is_active.return_value = True create_connection_mock = AsyncMock(return_value=(None, protocol)) - mocker.patch.object(connection.loop, "create_connection", create_connection_mock) + mocker.patch.object( + asyncio.get_event_loop(), "create_connection", create_connection_mock + ) assert connection.connected is False diff --git a/tests/connection/ip/test_local_ip_connection.py b/tests/connection/ip/test_local_ip_connection.py index 30e37792..4d84a3d9 100644 --- a/tests/connection/ip/test_local_ip_connection.py +++ b/tests/connection/ip/test_local_ip_connection.py @@ -1,3 +1,4 @@ +import asyncio from unittest.mock import AsyncMock import pytest @@ -15,7 +16,9 @@ async def test_connect(mocker): protocol.is_active.return_value = True create_connection_mock = AsyncMock(return_value=(None, protocol)) - mocker.patch.object(connection.loop, "create_connection", create_connection_mock) + mocker.patch.object( + asyncio.get_event_loop(), "create_connection", create_connection_mock + ) connect_command_execute = mocker.patch.object( IPModuleConnectCommand, "execute", AsyncMock() ) diff --git a/tests/connection/ip/test_stun_ip_connection.py b/tests/connection/ip/test_stun_ip_connection.py index 0595bc48..26593934 100644 --- a/tests/connection/ip/test_stun_ip_connection.py +++ b/tests/connection/ip/test_stun_ip_connection.py @@ -1,3 +1,4 @@ +import asyncio from unittest.mock import AsyncMock import pytest @@ -17,7 +18,9 @@ async def test_connect(mocker): protocol.is_active.return_value = True create_connection_mock = AsyncMock(return_value=(None, protocol)) - mocker.patch.object(connection.loop, "create_connection", create_connection_mock) + mocker.patch.object( + asyncio.get_event_loop(), "create_connection", create_connection_mock + ) connect_command_execute = mocker.patch.object( IPModuleConnectCommand, "execute", AsyncMock() ) @@ -74,7 +77,7 @@ async def assert_session_connect(mocker, session): "serial": "bf4c1fe4", "type": "HD77", "port": 54321, - "panelSerial": "0584b067" + "panelSerial": "0584b067", }, { "lastUpdate": "2021-05-07T15:41:19Z", @@ -85,7 +88,7 @@ async def assert_session_connect(mocker, session): "serial": "465e81a0", "type": "HD88", "port": 12345, - "panelSerial": "0584b067" + "panelSerial": "0584b067", }, { "lastUpdate": "2021-05-07T15:41:19Z", @@ -98,7 +101,7 @@ async def assert_session_connect(mocker, session): "panelSerial": "a72ed4bf", "xoraddr": "9a640069cda9b317", "API": None, - "ipAddress": "0.0.0.0" + "ipAddress": "0.0.0.0", }, { "lastUpdate": "2021-05-07T15:41:19Z", @@ -111,13 +114,13 @@ async def assert_session_connect(mocker, session): "panelSerial": "0584b067", "xoraddr": "c351472f48a5e1ba", "API": None, - "ipAddress": "0.0.0.0" - } + "ipAddress": "0.0.0.0", + }, ], "paid": 1, "daysLeft": 364, "sitePanelStatus": 1, - "email": "em@em.com" + "email": "em@em.com", } ] } @@ -130,6 +133,8 @@ def json(self): mocker.patch("requests.get").return_value = StubResponse() client = mocker.patch("paradox.lib.stun.StunClient") - client.return_value.receive_response.return_value = [{"attr_body": "abcdef", "name": "BEER"}] + client.return_value.receive_response.return_value = [ + {"attr_body": "abcdef", "name": "BEER"} + ] await session.connect() - return json_data \ No newline at end of file + return json_data