From ea373ac773e6ad8840545d1e14b8abf43e155824 Mon Sep 17 00:00:00 2001 From: gluap Date: Thu, 4 Jan 2024 00:20:00 +0100 Subject: [PATCH] validate acknowledge messages. --- pyduofern/duofern_stick.py | 80 ++++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 12 deletions(-) diff --git a/pyduofern/duofern_stick.py b/pyduofern/duofern_stick.py index c43ff4a..cfd9c8c 100644 --- a/pyduofern/duofern_stick.py +++ b/pyduofern/duofern_stick.py @@ -29,9 +29,13 @@ import logging import os import os.path +import random import tempfile import threading import time +from dataclasses import dataclass +from queue import Queue, Empty +from typing import Dict import serial import serial.tools.list_ports @@ -61,7 +65,8 @@ def hex(stuff): duoRemotePair = "0D0006010000000000000000000000000000yyyyyy01" -MIN_MESSAGE_INTERVAL_MILLIS = 50 +MIN_MESSAGE_INTERVAL_MILLIS = 25 +RESEND_SECONDS = (2,4) def refresh_serial_connection(function): def new_funtion(*args, **kwargs): @@ -119,7 +124,7 @@ def __init__(self, system_code=None, config_file_json=None, duofern_parser=None, self.pairing = False self.unpairing = False - self.write_queue = [] + self.write_queue = Queue() if not ephemeral: self.config['system_code'] = self.system_code self._dump_config() @@ -162,7 +167,7 @@ def _initialize_recording(self): def _initialize(self, **kwargs): # pragma: no cover raise NotImplementedError("need to use an implementation of the Duofernstick") - def _simple_write(self, **kwargs): # pragma: no cover + def _simple_write(self, *args, **kwargs): # pragma: no cover raise NotImplementedError("need to use an implementation of the Duofernstick") def send(self, msg, **kwargs): # pragma: no cover @@ -182,6 +187,9 @@ def process_message(self, message): recorder.write("received {}\n".format(message)) recorder.flush() if message[0:2] == '81': + if hasattr(self, "unacknowledged"): + if message[-14:-2] in self.unacknowledged: + del self.unacknowledged[message[-14:-2]] return if message[0:4] == '0602': logger.info("got pairing reply") @@ -248,11 +256,6 @@ def set_name(self, id, name): self._initialize() self.initialized=1 - def handle_write_queue(self): - if len(self.write_queue) > 0: - tosend = self.write_queue.pop() - logger.info("sending {} from write queue, {} msgs left in queue".format(tosend, len(self.write_queue))) - self._simple_write(tosend) def stop_pair(self): self.send(duoStopPair) @@ -437,6 +440,13 @@ async def handshake(self): self.initialized = True +@dataclass +class WaitingMessage: + """Class for keeping track of an item in inventory.""" + message: str + next: datetime.datetime + retries: int = 5 + class DuofernStickThreaded(DuofernStick, threading.Thread): def __init__(self, serial_port=None, *args, **kwargs): super().__init__(*args, **kwargs) @@ -456,6 +466,9 @@ def __init__(self, serial_port=None, *args, **kwargs): self.running = False self.last_send = datetime.datetime.now() + self.rewrite_queue = Queue() + self.unacknowledged: Dict[str,WaitingMessage] = {} + def _read_answer(self, some_string): # ReadAnswer """read an answer...""" logger.debug("should read {}".format(some_string)) @@ -569,6 +582,23 @@ def _simple_write(self, string_to_write): # SimpleWrite self.serial_connection.open() self.serial_connection.write(data_to_write) + def handle_write_queue(self): + try: + tosend = self.write_queue.get(block=False, timeout=None) + logger.info("sending {} from write queue, {} msgs left in queue".format(tosend, self.write_queue.qsize())) + self._simple_write(tosend) + self.unacknowledged[tosend[-14:-2]] = WaitingMessage(tosend, datetime.datetime.now()+datetime.timedelta(seconds=random.uniform(*RESEND_SECONDS))) + except Empty: + pass + + def handle_rewrite_queue(self): + try: + tosend = self.rewrite_queue.get(block=False, timeout=None) + logger.info("SENDING {} from REwrite queue, {} msgs left in queue".format(tosend, self.rewrite_queue.qsize())) + self._simple_write(tosend) + except Empty: + pass + def command(self, *args, **kwargs): if self.recording: with open(self.record_filename, "a") as recorder: @@ -579,12 +609,15 @@ def add_serial_and_send(self, msg): message = msg.replace("zzzzzz", "6f" + self.system_code) logger.debug("sending {}".format(message)) self.send(message) - logger.debug("added {} to write queue".format(message)) def run(self): self.running = True self._initialize() + last_resend_check = datetime.datetime.now() + toggle = False while self.running: + toggle = not toggle + self.serial_connection.timeout = .05 if not self.serial_connection.isOpen(): self.serial_connection.open() @@ -600,8 +633,31 @@ def run(self): if in_data != duoACK: self._simple_write(duoACK) self.serial_connection.timeout = 1 - if (len(self.write_queue) > 0) and ((datetime.datetime.now() - self.last_send) >= datetime.timedelta(milliseconds=MIN_MESSAGE_INTERVAL_MILLIS)): - self.handle_write_queue() + if not self.write_queue.empty() or not self.rewrite_queue.empty() and ( + (datetime.datetime.now() - self.last_send) >= datetime.timedelta(milliseconds=MIN_MESSAGE_INTERVAL_MILLIS)): + if toggle: + self.handle_write_queue() + else: + self.handle_rewrite_queue() + + if datetime.datetime.now() - last_resend_check > datetime.timedelta(seconds=0.1): + self.handle_resends() + last_resend_check = datetime.datetime.now() + + def handle_resends(self): + logger.debug(self.unacknowledged) + done = set() + t = datetime.datetime.now() + for k in self.unacknowledged.keys(): + if self.unacknowledged[k].retries == 0: + done.add(k) + elif self.unacknowledged[k].next < t: + self.unacknowledged[k].next = t + datetime.timedelta(seconds=random.uniform(*RESEND_SECONDS)) + self.unacknowledged[k].retries -= 1 + self.rewrite_queue.put(self.unacknowledged[k].message) + for d in done: + del self.unacknowledged[d] + def stop(self): self.running = False @@ -617,6 +673,6 @@ def unpair(self, timeout=10): def send(self, msg, **kwargs): logger.debug("sending {}".format(msg)) - self.write_queue.append(msg) + self.write_queue.put_nowait(msg) logger.debug("added {} to write queue".format(msg)) return