From 70f963b9c7881afd8b88f921b73c5259d095e1ef Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Fri, 22 Mar 2024 18:59:35 -0400 Subject: [PATCH 01/26] maintain time.monotonic precision by using ns integer timestamps --- adafruit_minimqtt/adafruit_minimqtt.py | 95 ++++++++++++++++---------- 1 file changed, 60 insertions(+), 35 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 7ede955..d2ff2ab 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -51,7 +51,7 @@ from .matcher import MQTTMatcher -__version__ = "0.0.0+auto.0" +__version__ = "7.6.3" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git" # Client-specific variables @@ -181,7 +181,7 @@ def __init__( self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM self._pid = 0 - self._last_msg_sent_timestamp: float = 0 + self._last_msg_sent_timestamp: int = 0 self.logger = NullLogger() """An optional logging attribute that can be set with with a Logger to enable debug logging.""" @@ -197,7 +197,8 @@ def __init__( self._username = username self._password = password if ( - self._password and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT + self._password and len(password.encode( + "utf-8")) > MQTT_TOPIC_LENGTH_LIMIT ): # [MQTT-3.1.3.5] raise MMQTTException("Password length is too large.") @@ -220,11 +221,12 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - time_int = int(self.get_monotonic_time() * 100) % 1000 + time_int = (self.get_monotonic_time() % 10000000000) // 10000000 self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: - raise ValueError("MQTT Client ID must be between 1 and 23 bytes") + raise ValueError( + "MQTT Client ID must be between 1 and 23 bytes") # LWT self._lw_topic = None @@ -246,14 +248,22 @@ def __init__( def get_monotonic_time(self) -> float: """ - Provide monotonic time in seconds. Based on underlying implementation + Provide monotonic time in nanoseconds. Based on underlying implementation this might result in imprecise time, that will result in the library not being able to operate if running contiguously for more than 24 days or so. + Keeping timestamps in nanosecond ints from monotonic_ns should preserve precision. """ if self.use_monotonic_ns: - return time.monotonic_ns() / 1000000000 + return time.monotonic_ns() - return time.monotonic() + return int(time.monotonic() * 1000000000) + + def diff_ns(self, stamp): + """ + Taking timestamp differences using nanosecond ints before dividing + should maintain precision. + """ + return (self.get_monotonic_time() - stamp)/1000000000 def __enter__(self): return self @@ -307,7 +317,8 @@ def will_set( self.logger.debug("Setting last will properties") self._valid_qos(qos) if self._is_connected: - raise MMQTTException("Last Will should only be called before connect().") + raise MMQTTException( + "Last Will should only be called before connect().") if payload is None: payload = "" if isinstance(payload, (int, float, str)): @@ -331,7 +342,8 @@ def add_topic_callback(self, mqtt_topic: str, callback_method) -> None: If a callback is called for the topic, then any "on_message" callback will not be called. """ if mqtt_topic is None or callback_method is None: - raise ValueError("MQTT topic and callback method must both be defined.") + raise ValueError( + "MQTT topic and callback method must both be defined.") self._on_message_filtered[mqtt_topic] = callback_method def remove_topic_callback(self, mqtt_topic: str) -> None: @@ -379,7 +391,8 @@ def username_pw_set(self, username: str, password: Optional[str] = None) -> None """ if self._is_connected: - raise MMQTTException("This method must be called before connect().") + raise MMQTTException( + "This method must be called before connect().") self._username = username if password is not None: self._password = password @@ -508,7 +521,8 @@ def _connect( remaining_length += ( 2 + len(self._lw_topic.encode("utf-8")) + 2 + len(self._lw_msg) ) - var_header[7] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 + var_header[7] |= 0x4 | (self._lw_qos & 0x1) << 3 | ( + self._lw_qos & 0x2) << 3 var_header[7] |= self._lw_retain << 5 self._encode_remaining_length(fixed_header, remaining_length) @@ -544,7 +558,7 @@ def _connect( return result if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if self.diff_ns(stamp) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -601,7 +615,7 @@ def ping(self) -> list[int]: rc = self._wait_for_msg() if rc: rcs.append(rc) - if self.get_monotonic_time() - stamp > ping_timeout: + if self.diff_ns(stamp) > ping_timeout: raise MMQTTException("PINGRESP not returned from broker.") return rcs @@ -637,7 +651,8 @@ def publish( else: raise MMQTTException("Invalid message data type.") if len(msg) > MQTT_MSG_MAX_SZ: - raise MMQTTException(f"Message size larger than {MQTT_MSG_MAX_SZ} bytes.") + raise MMQTTException( + f"Message size larger than {MQTT_MSG_MAX_SZ} bytes.") assert ( 0 <= qos <= 1 ), "Quality of Service Level 2 is unsupported by this library." @@ -684,11 +699,12 @@ def publish( rcv_pid = rcv_pid_buf[0] << 0x08 | rcv_pid_buf[1] if self._pid == rcv_pid: if self.on_publish is not None: - self.on_publish(self, self.user_data, topic, rcv_pid) + self.on_publish( + self, self.user_data, topic, rcv_pid) return if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if self.diff_ns(stamp) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -728,8 +744,10 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N self.logger.debug("Sending SUBSCRIBE to broker...") fixed_header = bytearray([MQTT_SUB]) packet_length = 2 + (2 * len(topics)) + (1 * len(topics)) - packet_length += sum(len(topic.encode("utf-8")) for topic, qos in topics) - self._encode_remaining_length(fixed_header, remaining_length=packet_length) + packet_length += sum(len(topic.encode("utf-8")) + for topic, qos in topics) + self._encode_remaining_length( + fixed_header, remaining_length=packet_length) self.logger.debug(f"Fixed Header: {fixed_header}") self._sock.send(fixed_header) self._pid = self._pid + 1 if self._pid < 0xFFFF else 1 @@ -752,7 +770,7 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N while True: op = self._wait_for_msg() if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if self.diff_ns(stamp) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -809,7 +827,8 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: fixed_header = bytearray([MQTT_UNSUB]) packet_length = 2 + (2 * len(topics)) packet_length += sum(len(topic.encode("utf-8")) for topic in topics) - self._encode_remaining_length(fixed_header, remaining_length=packet_length) + self._encode_remaining_length( + fixed_header, remaining_length=packet_length) self.logger.debug(f"Fixed Header: {fixed_header}") self._sock.send(fixed_header) self._pid = self._pid + 1 if self._pid < 0xFFFF else 1 @@ -830,7 +849,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: stamp = self.get_monotonic_time() op = self._wait_for_msg() if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if self.diff_ns(stamp) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -842,7 +861,8 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1] for t in topics: if self.on_unsubscribe is not None: - self.on_unsubscribe(self, self.user_data, t, self._pid) + self.on_unsubscribe( + self, self.user_data, t, self._pid) self._subscribed_topics.remove(t) return @@ -860,7 +880,8 @@ def _recompute_reconnect_backoff(self) -> None: self._reconnect_timeout = 2**self._reconnect_attempt # pylint: disable=consider-using-f-string self.logger.debug( - "Reconnect timeout computed to {:.2f}".format(self._reconnect_timeout) + "Reconnect timeout computed to {:.2f}".format( + self._reconnect_timeout) ) if self._reconnect_timeout > self._reconnect_maximum_backoff: @@ -935,7 +956,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: while True: if ( - self.get_monotonic_time() - self._last_msg_sent_timestamp + self.diff_ns(self._last_msg_sent_timestamp) >= self.keep_alive ): # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server @@ -945,14 +966,15 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: rcs.extend(self.ping()) # ping() itself contains a _wait_for_msg() loop which might have taken a while, # so check here as well. - if self.get_monotonic_time() - stamp > timeout: - self.logger.debug(f"Loop timed out after {timeout} seconds") + if self.diff_ns(stamp) > timeout: + self.logger.debug( + f"Loop timed out after {timeout} seconds") break rc = self._wait_for_msg() if rc is not None: rcs.append(rc) - if self.get_monotonic_time() - stamp > timeout: + if self.diff_ns(stamp) > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break @@ -960,7 +982,6 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]: # pylint: disable = too-many-return-statements - """Reads and processes network events. Return the packet type or None if there is nothing to be received. @@ -985,12 +1006,14 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]: # If we get here, it means that there is nothing to be received return None pkt_type = res[0] & MQTT_PKT_TYPE_MASK - self.logger.debug(f"Got message type: {hex(pkt_type)} pkt: {hex(res[0])}") + self.logger.debug( + f"Got message type: {hex(pkt_type)} pkt: {hex(res[0])}") if pkt_type == MQTT_PINGRESP: self.logger.debug("Got PINGRESP") sz = self._sock_exact_recv(1)[0] if sz != 0x00: - raise MMQTTException(f"Unexpected PINGRESP returned from broker: {sz}.") + raise MMQTTException( + f"Unexpected PINGRESP returned from broker: {sz}.") return pkt_type if pkt_type != MQTT_PUBLISH: @@ -1019,7 +1042,8 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]: # read message contents raw_msg = self._sock_exact_recv(sz) msg = raw_msg if self._use_binary_mode else str(raw_msg, "utf-8") - self.logger.debug("Receiving PUBLISH \nTopic: %s\nMsg: %s\n", topic, raw_msg) + self.logger.debug( + "Receiving PUBLISH \nTopic: %s\nMsg: %s\n", topic, raw_msg) self._handle_on_message(topic, msg) if res[0] & 0x06 == 0x02: pkt = bytearray(b"\x40\x02\0\0") @@ -1067,14 +1091,15 @@ def _sock_exact_recv( recv_len = self._sock.recv_into(rc, bufsize) to_read = bufsize - recv_len if to_read < 0: - raise MMQTTException(f"negative number of bytes to read: {to_read}") + raise MMQTTException( + f"negative number of bytes to read: {to_read}") read_timeout = timeout if timeout is not None else self.keep_alive mv = mv[recv_len:] while to_read > 0: recv_len = self._sock.recv_into(mv, to_read) to_read -= recv_len mv = mv[recv_len:] - if self.get_monotonic_time() - stamp > read_timeout: + if self.diff_ns(stamp) > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) @@ -1094,7 +1119,7 @@ def _sock_exact_recv( recv = self._sock.recv(to_read) to_read -= len(recv) rc += recv - if self.get_monotonic_time() - stamp > read_timeout: + if self.diff_ns(stamp) > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) From 89bf045b6069ffcfb1347fdef5f3d0dd00a00135 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Fri, 22 Mar 2024 19:08:35 -0400 Subject: [PATCH 02/26] formatted with Black --- adafruit_minimqtt/adafruit_minimqtt.py | 61 +++++++++----------------- 1 file changed, 20 insertions(+), 41 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index d2ff2ab..d902805 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -197,8 +197,7 @@ def __init__( self._username = username self._password = password if ( - self._password and len(password.encode( - "utf-8")) > MQTT_TOPIC_LENGTH_LIMIT + self._password and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT ): # [MQTT-3.1.3.5] raise MMQTTException("Password length is too large.") @@ -225,8 +224,7 @@ def __init__( self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: - raise ValueError( - "MQTT Client ID must be between 1 and 23 bytes") + raise ValueError("MQTT Client ID must be between 1 and 23 bytes") # LWT self._lw_topic = None @@ -263,7 +261,7 @@ def diff_ns(self, stamp): Taking timestamp differences using nanosecond ints before dividing should maintain precision. """ - return (self.get_monotonic_time() - stamp)/1000000000 + return (self.get_monotonic_time() - stamp) / 1000000000 def __enter__(self): return self @@ -317,8 +315,7 @@ def will_set( self.logger.debug("Setting last will properties") self._valid_qos(qos) if self._is_connected: - raise MMQTTException( - "Last Will should only be called before connect().") + raise MMQTTException("Last Will should only be called before connect().") if payload is None: payload = "" if isinstance(payload, (int, float, str)): @@ -342,8 +339,7 @@ def add_topic_callback(self, mqtt_topic: str, callback_method) -> None: If a callback is called for the topic, then any "on_message" callback will not be called. """ if mqtt_topic is None or callback_method is None: - raise ValueError( - "MQTT topic and callback method must both be defined.") + raise ValueError("MQTT topic and callback method must both be defined.") self._on_message_filtered[mqtt_topic] = callback_method def remove_topic_callback(self, mqtt_topic: str) -> None: @@ -391,8 +387,7 @@ def username_pw_set(self, username: str, password: Optional[str] = None) -> None """ if self._is_connected: - raise MMQTTException( - "This method must be called before connect().") + raise MMQTTException("This method must be called before connect().") self._username = username if password is not None: self._password = password @@ -521,8 +516,7 @@ def _connect( remaining_length += ( 2 + len(self._lw_topic.encode("utf-8")) + 2 + len(self._lw_msg) ) - var_header[7] |= 0x4 | (self._lw_qos & 0x1) << 3 | ( - self._lw_qos & 0x2) << 3 + var_header[7] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 var_header[7] |= self._lw_retain << 5 self._encode_remaining_length(fixed_header, remaining_length) @@ -651,8 +645,7 @@ def publish( else: raise MMQTTException("Invalid message data type.") if len(msg) > MQTT_MSG_MAX_SZ: - raise MMQTTException( - f"Message size larger than {MQTT_MSG_MAX_SZ} bytes.") + raise MMQTTException(f"Message size larger than {MQTT_MSG_MAX_SZ} bytes.") assert ( 0 <= qos <= 1 ), "Quality of Service Level 2 is unsupported by this library." @@ -699,8 +692,7 @@ def publish( rcv_pid = rcv_pid_buf[0] << 0x08 | rcv_pid_buf[1] if self._pid == rcv_pid: if self.on_publish is not None: - self.on_publish( - self, self.user_data, topic, rcv_pid) + self.on_publish(self, self.user_data, topic, rcv_pid) return if op is None: @@ -744,10 +736,8 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N self.logger.debug("Sending SUBSCRIBE to broker...") fixed_header = bytearray([MQTT_SUB]) packet_length = 2 + (2 * len(topics)) + (1 * len(topics)) - packet_length += sum(len(topic.encode("utf-8")) - for topic, qos in topics) - self._encode_remaining_length( - fixed_header, remaining_length=packet_length) + packet_length += sum(len(topic.encode("utf-8")) for topic, qos in topics) + self._encode_remaining_length(fixed_header, remaining_length=packet_length) self.logger.debug(f"Fixed Header: {fixed_header}") self._sock.send(fixed_header) self._pid = self._pid + 1 if self._pid < 0xFFFF else 1 @@ -827,8 +817,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: fixed_header = bytearray([MQTT_UNSUB]) packet_length = 2 + (2 * len(topics)) packet_length += sum(len(topic.encode("utf-8")) for topic in topics) - self._encode_remaining_length( - fixed_header, remaining_length=packet_length) + self._encode_remaining_length(fixed_header, remaining_length=packet_length) self.logger.debug(f"Fixed Header: {fixed_header}") self._sock.send(fixed_header) self._pid = self._pid + 1 if self._pid < 0xFFFF else 1 @@ -861,8 +850,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1] for t in topics: if self.on_unsubscribe is not None: - self.on_unsubscribe( - self, self.user_data, t, self._pid) + self.on_unsubscribe(self, self.user_data, t, self._pid) self._subscribed_topics.remove(t) return @@ -880,8 +868,7 @@ def _recompute_reconnect_backoff(self) -> None: self._reconnect_timeout = 2**self._reconnect_attempt # pylint: disable=consider-using-f-string self.logger.debug( - "Reconnect timeout computed to {:.2f}".format( - self._reconnect_timeout) + "Reconnect timeout computed to {:.2f}".format(self._reconnect_timeout) ) if self._reconnect_timeout > self._reconnect_maximum_backoff: @@ -955,10 +942,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: rcs = [] while True: - if ( - self.diff_ns(self._last_msg_sent_timestamp) - >= self.keep_alive - ): + if self.diff_ns(self._last_msg_sent_timestamp) >= self.keep_alive: # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server self.logger.debug( "KeepAlive period elapsed - requesting a PINGRESP from the server..." @@ -967,8 +951,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: # ping() itself contains a _wait_for_msg() loop which might have taken a while, # so check here as well. if self.diff_ns(stamp) > timeout: - self.logger.debug( - f"Loop timed out after {timeout} seconds") + self.logger.debug(f"Loop timed out after {timeout} seconds") break rc = self._wait_for_msg() @@ -1006,14 +989,12 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]: # If we get here, it means that there is nothing to be received return None pkt_type = res[0] & MQTT_PKT_TYPE_MASK - self.logger.debug( - f"Got message type: {hex(pkt_type)} pkt: {hex(res[0])}") + self.logger.debug(f"Got message type: {hex(pkt_type)} pkt: {hex(res[0])}") if pkt_type == MQTT_PINGRESP: self.logger.debug("Got PINGRESP") sz = self._sock_exact_recv(1)[0] if sz != 0x00: - raise MMQTTException( - f"Unexpected PINGRESP returned from broker: {sz}.") + raise MMQTTException(f"Unexpected PINGRESP returned from broker: {sz}.") return pkt_type if pkt_type != MQTT_PUBLISH: @@ -1042,8 +1023,7 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]: # read message contents raw_msg = self._sock_exact_recv(sz) msg = raw_msg if self._use_binary_mode else str(raw_msg, "utf-8") - self.logger.debug( - "Receiving PUBLISH \nTopic: %s\nMsg: %s\n", topic, raw_msg) + self.logger.debug("Receiving PUBLISH \nTopic: %s\nMsg: %s\n", topic, raw_msg) self._handle_on_message(topic, msg) if res[0] & 0x06 == 0x02: pkt = bytearray(b"\x40\x02\0\0") @@ -1091,8 +1071,7 @@ def _sock_exact_recv( recv_len = self._sock.recv_into(rc, bufsize) to_read = bufsize - recv_len if to_read < 0: - raise MMQTTException( - f"negative number of bytes to read: {to_read}") + raise MMQTTException(f"negative number of bytes to read: {to_read}") read_timeout = timeout if timeout is not None else self.keep_alive mv = mv[recv_len:] while to_read > 0: From 639160a1b0d930f6550cc7e9c7b1515cda74ba6b Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Fri, 22 Mar 2024 19:20:02 -0400 Subject: [PATCH 03/26] added _ns suffix to vars and func --- adafruit_minimqtt/adafruit_minimqtt.py | 56 +++++++++++++------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index d902805..cc2b835 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -181,7 +181,7 @@ def __init__( self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM self._pid = 0 - self._last_msg_sent_timestamp: int = 0 + self._last_msg_sent_timestamp_ns: int = 0 self.logger = NullLogger() """An optional logging attribute that can be set with with a Logger to enable debug logging.""" @@ -220,7 +220,7 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - time_int = (self.get_monotonic_time() % 10000000000) // 10000000 + time_int = (self.get_monotonic_ns_time() % 10000000000) // 10000000 self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: @@ -244,7 +244,7 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None - def get_monotonic_time(self) -> float: + def get_monotonic_ns_time(self) -> float: """ Provide monotonic time in nanoseconds. Based on underlying implementation this might result in imprecise time, that will result in the library @@ -256,12 +256,12 @@ def get_monotonic_time(self) -> float: return int(time.monotonic() * 1000000000) - def diff_ns(self, stamp): + def diff_ns(self, stamp_ns): """ Taking timestamp differences using nanosecond ints before dividing should maintain precision. """ - return (self.get_monotonic_time() - stamp) / 1000000000 + return (self.get_monotonic_ns_time() - stamp_ns) / 1000000000 def __enter__(self): return self @@ -534,9 +534,9 @@ def _connect( if self._username is not None: self._send_str(self._username) self._send_str(self._password) - self._last_msg_sent_timestamp = self.get_monotonic_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_ns_time() self.logger.debug("Receiving CONNACK packet from broker") - stamp = self.get_monotonic_time() + stamp_ns = self.get_monotonic_ns_time() while True: op = self._wait_for_msg() if op == 32: @@ -552,7 +552,7 @@ def _connect( return result if op is None: - if self.diff_ns(stamp) > self._recv_timeout: + if self.diff_ns(stamp_ns) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -589,7 +589,7 @@ def disconnect(self) -> None: self._connection_manager.close_socket(self._sock) self._is_connected = False self._subscribed_topics = [] - self._last_msg_sent_timestamp = 0 + self._last_msg_sent_timestamp_ns = 0 if self.on_disconnect is not None: self.on_disconnect(self, self.user_data, 0) @@ -602,14 +602,14 @@ def ping(self) -> list[int]: self.logger.debug("Sending PINGREQ") self._sock.send(MQTT_PINGREQ) ping_timeout = self.keep_alive - stamp = self.get_monotonic_time() - self._last_msg_sent_timestamp = stamp + stamp_ns = self.get_monotonic_ns_time() + self._last_msg_sent_timestamp_ns = stamp_ns rc, rcs = None, [] while rc != MQTT_PINGRESP: rc = self._wait_for_msg() if rc: rcs.append(rc) - if self.diff_ns(stamp) > ping_timeout: + if self.diff_ns(stamp_ns) > ping_timeout: raise MMQTTException("PINGRESP not returned from broker.") return rcs @@ -678,11 +678,11 @@ def publish( self._sock.send(pub_hdr_fixed) self._sock.send(pub_hdr_var) self._sock.send(msg) - self._last_msg_sent_timestamp = self.get_monotonic_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_ns_time() if qos == 0 and self.on_publish is not None: self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: - stamp = self.get_monotonic_time() + stamp_ns = self.get_monotonic_ns_time() while True: op = self._wait_for_msg() if op == 0x40: @@ -696,7 +696,7 @@ def publish( return if op is None: - if self.diff_ns(stamp) > self._recv_timeout: + if self.diff_ns(stamp_ns) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -755,12 +755,12 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N self.logger.debug(f"SUBSCRIBING to topic {t} with QoS {q}") self.logger.debug(f"payload: {payload}") self._sock.send(payload) - stamp = self.get_monotonic_time() - self._last_msg_sent_timestamp = stamp + stamp_ns = self.get_monotonic_ns_time() + self._last_msg_sent_timestamp_ns = stamp_ns while True: op = self._wait_for_msg() if op is None: - if self.diff_ns(stamp) > self._recv_timeout: + if self.diff_ns(stamp_ns) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -832,13 +832,13 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: for t in topics: self.logger.debug(f"UNSUBSCRIBING from topic {t}") self._sock.send(payload) - self._last_msg_sent_timestamp = self.get_monotonic_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_ns_time() self.logger.debug("Waiting for UNSUBACK...") while True: - stamp = self.get_monotonic_time() + stamp_ns = self.get_monotonic_ns_time() op = self._wait_for_msg() if op is None: - if self.diff_ns(stamp) > self._recv_timeout: + if self.diff_ns(stamp_ns) > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -938,11 +938,11 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: self._connected() self.logger.debug(f"waiting for messages for {timeout} seconds") - stamp = self.get_monotonic_time() + stamp_ns = self.get_monotonic_ns_time() rcs = [] while True: - if self.diff_ns(self._last_msg_sent_timestamp) >= self.keep_alive: + if self.diff_ns(self._last_msg_sent_timestamp_ns) >= self.keep_alive: # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server self.logger.debug( "KeepAlive period elapsed - requesting a PINGRESP from the server..." @@ -950,14 +950,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: rcs.extend(self.ping()) # ping() itself contains a _wait_for_msg() loop which might have taken a while, # so check here as well. - if self.diff_ns(stamp) > timeout: + if self.diff_ns(stamp_ns) > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break rc = self._wait_for_msg() if rc is not None: rcs.append(rc) - if self.diff_ns(stamp) > timeout: + if self.diff_ns(stamp_ns) > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break @@ -1063,7 +1063,7 @@ def _sock_exact_recv( :param float timeout: timeout, in seconds. Defaults to keep_alive :return: byte array """ - stamp = self.get_monotonic_time() + stamp_ns = self.get_monotonic_ns_time() if not self._backwards_compatible_sock: # CPython/Socketpool Impl. rc = bytearray(bufsize) @@ -1078,7 +1078,7 @@ def _sock_exact_recv( recv_len = self._sock.recv_into(mv, to_read) to_read -= recv_len mv = mv[recv_len:] - if self.diff_ns(stamp) > read_timeout: + if self.diff_ns(stamp_ns) > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) @@ -1098,7 +1098,7 @@ def _sock_exact_recv( recv = self._sock.recv(to_read) to_read -= len(recv) rc += recv - if self.diff_ns(stamp) > read_timeout: + if self.diff_ns(stamp_ns) > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) From 9af0e3f7882782033fd5f9b7e0a89e7fd8cab193 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Fri, 22 Mar 2024 19:24:18 -0400 Subject: [PATCH 04/26] reverted func name back to get_monotonic_time to pass build check --- adafruit_minimqtt/adafruit_minimqtt.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index cc2b835..5720361 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -220,7 +220,7 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - time_int = (self.get_monotonic_ns_time() % 10000000000) // 10000000 + time_int = (self.get_monotonic_time() % 10000000000) // 10000000 self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: @@ -244,7 +244,7 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None - def get_monotonic_ns_time(self) -> float: + def get_monotonic_time(self) -> float: """ Provide monotonic time in nanoseconds. Based on underlying implementation this might result in imprecise time, that will result in the library @@ -261,7 +261,7 @@ def diff_ns(self, stamp_ns): Taking timestamp differences using nanosecond ints before dividing should maintain precision. """ - return (self.get_monotonic_ns_time() - stamp_ns) / 1000000000 + return (self.get_monotonic_time() - stamp_ns) / 1000000000 def __enter__(self): return self @@ -534,9 +534,9 @@ def _connect( if self._username is not None: self._send_str(self._username) self._send_str(self._password) - self._last_msg_sent_timestamp_ns = self.get_monotonic_ns_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_time() self.logger.debug("Receiving CONNACK packet from broker") - stamp_ns = self.get_monotonic_ns_time() + stamp_ns = self.get_monotonic_time() while True: op = self._wait_for_msg() if op == 32: @@ -602,7 +602,7 @@ def ping(self) -> list[int]: self.logger.debug("Sending PINGREQ") self._sock.send(MQTT_PINGREQ) ping_timeout = self.keep_alive - stamp_ns = self.get_monotonic_ns_time() + stamp_ns = self.get_monotonic_time() self._last_msg_sent_timestamp_ns = stamp_ns rc, rcs = None, [] while rc != MQTT_PINGRESP: @@ -678,11 +678,11 @@ def publish( self._sock.send(pub_hdr_fixed) self._sock.send(pub_hdr_var) self._sock.send(msg) - self._last_msg_sent_timestamp_ns = self.get_monotonic_ns_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_time() if qos == 0 and self.on_publish is not None: self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: - stamp_ns = self.get_monotonic_ns_time() + stamp_ns = self.get_monotonic_time() while True: op = self._wait_for_msg() if op == 0x40: @@ -755,7 +755,7 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N self.logger.debug(f"SUBSCRIBING to topic {t} with QoS {q}") self.logger.debug(f"payload: {payload}") self._sock.send(payload) - stamp_ns = self.get_monotonic_ns_time() + stamp_ns = self.get_monotonic_time() self._last_msg_sent_timestamp_ns = stamp_ns while True: op = self._wait_for_msg() @@ -832,10 +832,10 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: for t in topics: self.logger.debug(f"UNSUBSCRIBING from topic {t}") self._sock.send(payload) - self._last_msg_sent_timestamp_ns = self.get_monotonic_ns_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_time() self.logger.debug("Waiting for UNSUBACK...") while True: - stamp_ns = self.get_monotonic_ns_time() + stamp_ns = self.get_monotonic_time() op = self._wait_for_msg() if op is None: if self.diff_ns(stamp_ns) > self._recv_timeout: @@ -938,7 +938,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: self._connected() self.logger.debug(f"waiting for messages for {timeout} seconds") - stamp_ns = self.get_monotonic_ns_time() + stamp_ns = self.get_monotonic_time() rcs = [] while True: @@ -1063,7 +1063,7 @@ def _sock_exact_recv( :param float timeout: timeout, in seconds. Defaults to keep_alive :return: byte array """ - stamp_ns = self.get_monotonic_ns_time() + stamp_ns = self.get_monotonic_time() if not self._backwards_compatible_sock: # CPython/Socketpool Impl. rc = bytearray(bufsize) From 120d8a908e472c005dd48dd1d183d0bf45e7cd0f Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Mon, 25 Mar 2024 14:35:55 -0400 Subject: [PATCH 05/26] retain imprecise get_monotonic_time, add precision get_monotonic_time_ns function --- adafruit_minimqtt/adafruit_minimqtt.py | 41 ++++++++++++++++---------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 5720361..3ee93ea 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -51,7 +51,7 @@ from .matcher import MQTTMatcher -__version__ = "7.6.3" +__version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git" # Client-specific variables @@ -220,7 +220,7 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - time_int = (self.get_monotonic_time() % 10000000000) // 10000000 + time_int = (self.get_monotonic_time_ns() % 10000000000) // 10000000 self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: @@ -245,6 +245,17 @@ def __init__( self.on_unsubscribe = None def get_monotonic_time(self) -> float: + """ + Provide monotonic time in seconds. Based on underlying implementation + this might result in imprecise time, that will result in the library + not being able to operate if running contiguously for more than 24 days or so. + """ + if self.use_monotonic_ns: + return time.monotonic_ns() / 1000000000 + + return time.monotonic() + + def get_monotonic_time_ns(self) -> int: """ Provide monotonic time in nanoseconds. Based on underlying implementation this might result in imprecise time, that will result in the library @@ -256,12 +267,12 @@ def get_monotonic_time(self) -> float: return int(time.monotonic() * 1000000000) - def diff_ns(self, stamp_ns): + def diff_ns(self, stamp_ns) -> float: """ Taking timestamp differences using nanosecond ints before dividing - should maintain precision. + should maintain precision. Returns time difference in seconds. """ - return (self.get_monotonic_time() - stamp_ns) / 1000000000 + return (self.get_monotonic_time_ns() - stamp_ns) / 1000000000 def __enter__(self): return self @@ -534,9 +545,9 @@ def _connect( if self._username is not None: self._send_str(self._username) self._send_str(self._password) - self._last_msg_sent_timestamp_ns = self.get_monotonic_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_time_ns() self.logger.debug("Receiving CONNACK packet from broker") - stamp_ns = self.get_monotonic_time() + stamp_ns = self.get_monotonic_time_ns() while True: op = self._wait_for_msg() if op == 32: @@ -602,7 +613,7 @@ def ping(self) -> list[int]: self.logger.debug("Sending PINGREQ") self._sock.send(MQTT_PINGREQ) ping_timeout = self.keep_alive - stamp_ns = self.get_monotonic_time() + stamp_ns = self.get_monotonic_time_ns() self._last_msg_sent_timestamp_ns = stamp_ns rc, rcs = None, [] while rc != MQTT_PINGRESP: @@ -678,11 +689,11 @@ def publish( self._sock.send(pub_hdr_fixed) self._sock.send(pub_hdr_var) self._sock.send(msg) - self._last_msg_sent_timestamp_ns = self.get_monotonic_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_time_ns() if qos == 0 and self.on_publish is not None: self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: - stamp_ns = self.get_monotonic_time() + stamp_ns = self.get_monotonic_time_ns() while True: op = self._wait_for_msg() if op == 0x40: @@ -755,7 +766,7 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N self.logger.debug(f"SUBSCRIBING to topic {t} with QoS {q}") self.logger.debug(f"payload: {payload}") self._sock.send(payload) - stamp_ns = self.get_monotonic_time() + stamp_ns = self.get_monotonic_time_ns() self._last_msg_sent_timestamp_ns = stamp_ns while True: op = self._wait_for_msg() @@ -832,10 +843,10 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: for t in topics: self.logger.debug(f"UNSUBSCRIBING from topic {t}") self._sock.send(payload) - self._last_msg_sent_timestamp_ns = self.get_monotonic_time() + self._last_msg_sent_timestamp_ns = self.get_monotonic_time_ns() self.logger.debug("Waiting for UNSUBACK...") while True: - stamp_ns = self.get_monotonic_time() + stamp_ns = self.get_monotonic_time_ns() op = self._wait_for_msg() if op is None: if self.diff_ns(stamp_ns) > self._recv_timeout: @@ -938,7 +949,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: self._connected() self.logger.debug(f"waiting for messages for {timeout} seconds") - stamp_ns = self.get_monotonic_time() + stamp_ns = self.get_monotonic_time_ns() rcs = [] while True: @@ -1063,7 +1074,7 @@ def _sock_exact_recv( :param float timeout: timeout, in seconds. Defaults to keep_alive :return: byte array """ - stamp_ns = self.get_monotonic_time() + stamp_ns = self.get_monotonic_time_ns() if not self._backwards_compatible_sock: # CPython/Socketpool Impl. rc = bytearray(bufsize) From 5378f8f69811130f93129dc4a6a28c33b24c6b94 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Mon, 25 Mar 2024 16:35:20 -0400 Subject: [PATCH 06/26] reverted timestamps back to using ticks_ms from adafruit_ticks library --- adafruit_minimqtt/adafruit_minimqtt.py | 75 +++++++++++--------------- 1 file changed, 30 insertions(+), 45 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 3ee93ea..21e154a 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -36,6 +36,7 @@ from random import randint from adafruit_connection_manager import get_connection_manager +from adafruit_ticks import ticks_ms, ticks_diff try: from typing import List, Optional, Tuple, Type, Union @@ -51,7 +52,7 @@ from .matcher import MQTTMatcher -__version__ = "0.0.0+auto.0" +__version__ = "7.6.3" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git" # Client-specific variables @@ -181,7 +182,7 @@ def __init__( self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM self._pid = 0 - self._last_msg_sent_timestamp_ns: int = 0 + self._last_msg_sent_timestamp: int = 0 self.logger = NullLogger() """An optional logging attribute that can be set with with a Logger to enable debug logging.""" @@ -220,7 +221,7 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - time_int = (self.get_monotonic_time_ns() % 10000000000) // 10000000 + time_int = int(ticks_ms() / 10) % 1000 self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: @@ -255,25 +256,6 @@ def get_monotonic_time(self) -> float: return time.monotonic() - def get_monotonic_time_ns(self) -> int: - """ - Provide monotonic time in nanoseconds. Based on underlying implementation - this might result in imprecise time, that will result in the library - not being able to operate if running contiguously for more than 24 days or so. - Keeping timestamps in nanosecond ints from monotonic_ns should preserve precision. - """ - if self.use_monotonic_ns: - return time.monotonic_ns() - - return int(time.monotonic() * 1000000000) - - def diff_ns(self, stamp_ns) -> float: - """ - Taking timestamp differences using nanosecond ints before dividing - should maintain precision. Returns time difference in seconds. - """ - return (self.get_monotonic_time_ns() - stamp_ns) / 1000000000 - def __enter__(self): return self @@ -545,9 +527,9 @@ def _connect( if self._username is not None: self._send_str(self._username) self._send_str(self._password) - self._last_msg_sent_timestamp_ns = self.get_monotonic_time_ns() + self._last_msg_sent_timestamp = ticks_ms() self.logger.debug("Receiving CONNACK packet from broker") - stamp_ns = self.get_monotonic_time_ns() + stamp = ticks_ms() while True: op = self._wait_for_msg() if op == 32: @@ -563,7 +545,7 @@ def _connect( return result if op is None: - if self.diff_ns(stamp_ns) > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -600,7 +582,7 @@ def disconnect(self) -> None: self._connection_manager.close_socket(self._sock) self._is_connected = False self._subscribed_topics = [] - self._last_msg_sent_timestamp_ns = 0 + self._last_msg_sent_timestamp = 0 if self.on_disconnect is not None: self.on_disconnect(self, self.user_data, 0) @@ -613,14 +595,14 @@ def ping(self) -> list[int]: self.logger.debug("Sending PINGREQ") self._sock.send(MQTT_PINGREQ) ping_timeout = self.keep_alive - stamp_ns = self.get_monotonic_time_ns() - self._last_msg_sent_timestamp_ns = stamp_ns + stamp = ticks_ms() + self._last_msg_sent_timestamp = stamp rc, rcs = None, [] while rc != MQTT_PINGRESP: rc = self._wait_for_msg() if rc: rcs.append(rc) - if self.diff_ns(stamp_ns) > ping_timeout: + if ticks_diff(ticks_ms(), stamp): raise MMQTTException("PINGRESP not returned from broker.") return rcs @@ -689,11 +671,11 @@ def publish( self._sock.send(pub_hdr_fixed) self._sock.send(pub_hdr_var) self._sock.send(msg) - self._last_msg_sent_timestamp_ns = self.get_monotonic_time_ns() + self._last_msg_sent_timestamp = ticks_ms() if qos == 0 and self.on_publish is not None: self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: - stamp_ns = self.get_monotonic_time_ns() + stamp = ticks_ms() while True: op = self._wait_for_msg() if op == 0x40: @@ -707,7 +689,7 @@ def publish( return if op is None: - if self.diff_ns(stamp_ns) > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -766,12 +748,12 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N self.logger.debug(f"SUBSCRIBING to topic {t} with QoS {q}") self.logger.debug(f"payload: {payload}") self._sock.send(payload) - stamp_ns = self.get_monotonic_time_ns() - self._last_msg_sent_timestamp_ns = stamp_ns + stamp = ticks_ms() + self._last_msg_sent_timestamp = stamp while True: op = self._wait_for_msg() if op is None: - if self.diff_ns(stamp_ns) > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -843,13 +825,13 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: for t in topics: self.logger.debug(f"UNSUBSCRIBING from topic {t}") self._sock.send(payload) - self._last_msg_sent_timestamp_ns = self.get_monotonic_time_ns() + self._last_msg_sent_timestamp = ticks_ms() self.logger.debug("Waiting for UNSUBACK...") while True: - stamp_ns = self.get_monotonic_time_ns() + stamp = ticks_ms() op = self._wait_for_msg() if op is None: - if self.diff_ns(stamp_ns) > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -949,11 +931,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: self._connected() self.logger.debug(f"waiting for messages for {timeout} seconds") - stamp_ns = self.get_monotonic_time_ns() + stamp = ticks_ms() rcs = [] while True: - if self.diff_ns(self._last_msg_sent_timestamp_ns) >= self.keep_alive: + if ( + ticks_diff(ticks_ms(), self._last_msg_sent_timestamp) / 1000 + >= self.keep_alive + ): # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server self.logger.debug( "KeepAlive period elapsed - requesting a PINGRESP from the server..." @@ -961,14 +946,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: rcs.extend(self.ping()) # ping() itself contains a _wait_for_msg() loop which might have taken a while, # so check here as well. - if self.diff_ns(stamp_ns) > timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break rc = self._wait_for_msg() if rc is not None: rcs.append(rc) - if self.diff_ns(stamp_ns) > timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break @@ -1074,7 +1059,7 @@ def _sock_exact_recv( :param float timeout: timeout, in seconds. Defaults to keep_alive :return: byte array """ - stamp_ns = self.get_monotonic_time_ns() + stamp = ticks_ms() if not self._backwards_compatible_sock: # CPython/Socketpool Impl. rc = bytearray(bufsize) @@ -1089,7 +1074,7 @@ def _sock_exact_recv( recv_len = self._sock.recv_into(mv, to_read) to_read -= recv_len mv = mv[recv_len:] - if self.diff_ns(stamp_ns) > read_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) @@ -1109,7 +1094,7 @@ def _sock_exact_recv( recv = self._sock.recv(to_read) to_read -= len(recv) rc += recv - if self.diff_ns(stamp_ns) > read_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) From 51b055075030b103f11175d438a244abbdfa1eb4 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Mon, 25 Mar 2024 16:39:12 -0400 Subject: [PATCH 07/26] fix ping_timeout --- adafruit_minimqtt/adafruit_minimqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 21e154a..04c189f 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -602,7 +602,7 @@ def ping(self) -> list[int]: rc = self._wait_for_msg() if rc: rcs.append(rc) - if ticks_diff(ticks_ms(), stamp): + if ticks_diff(ticks_ms(), stamp) > ping_timeout * 1000: raise MMQTTException("PINGRESP not returned from broker.") return rcs From 5b0c8ccd03d26482818e00b3bd160891c486194c Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Tue, 26 Mar 2024 19:31:36 -0400 Subject: [PATCH 08/26] add adafruit_ticks to requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 2505288..afd7a2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ Adafruit-Blinka Adafruit-Circuitpython-ConnectionManager +Adafruit-Ticks From 5ad6abbeb357b87e86056a26bd848510079e5938 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Tue, 26 Mar 2024 19:32:44 -0400 Subject: [PATCH 09/26] revert --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index afd7a2d..2505288 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,3 @@ Adafruit-Blinka Adafruit-Circuitpython-ConnectionManager -Adafruit-Ticks From bce70b29030a82cb573e7271b46beb17bcc0916a Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Tue, 26 Mar 2024 19:34:15 -0400 Subject: [PATCH 10/26] add adafruit_ticks to requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 2505288..11e0e03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ Adafruit-Blinka Adafruit-Circuitpython-ConnectionManager +Adafruit-Ticks \ No newline at end of file From 92035c0720dbd7bedec8f4def918c173eb1ab6af Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Tue, 26 Mar 2024 19:38:47 -0400 Subject: [PATCH 11/26] try lower case in requirements.txt --- adafruit_minimqtt/adafruit_minimqtt.py | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 04c189f..5599cad 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -52,7 +52,7 @@ from .matcher import MQTTMatcher -__version__ = "7.6.3" +__version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git" # Client-specific variables diff --git a/requirements.txt b/requirements.txt index 11e0e03..94b8580 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ Adafruit-Blinka Adafruit-Circuitpython-ConnectionManager -Adafruit-Ticks \ No newline at end of file +adafruit-ticks \ No newline at end of file From 1dd46527d0694c070867bec365fc310b06445330 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Tue, 26 Mar 2024 19:41:47 -0400 Subject: [PATCH 12/26] ok, adafruit-circuitpython-ticks --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 94b8580..1059d65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ Adafruit-Blinka Adafruit-Circuitpython-ConnectionManager -adafruit-ticks \ No newline at end of file +adafruit-circuitpython-ticks \ No newline at end of file From d5626ec6adcb20e1bfd35ea8625004fac6ff2ecc Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Tue, 26 Mar 2024 19:44:16 -0400 Subject: [PATCH 13/26] fix end of file error? --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 1059d65..8075f62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ Adafruit-Blinka Adafruit-Circuitpython-ConnectionManager -adafruit-circuitpython-ticks \ No newline at end of file +adafruit-circuitpython-ticks From 128e3f039524126b9f378fa100a4d978b2d77781 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 5 May 2024 20:16:51 -0400 Subject: [PATCH 14/26] purge last remnants of time.monotonic(), get rid of imprecise_time argument parameter --- adafruit_minimqtt/adafruit_minimqtt.py | 28 -------------------------- 1 file changed, 28 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 5599cad..3d530f0 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -124,8 +124,6 @@ class MQTT: This works with all callbacks but the "on_message" and those added via add_topic_callback(); for those, to get access to the user_data use the 'user_data' member of the MQTT object passed as 1st argument. - :param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set - this to True in order to operate correctly over more than 24 days or so """ @@ -147,7 +145,6 @@ def __init__( socket_timeout: int = 1, connect_retries: int = 5, user_data=None, - use_imprecise_time: Optional[bool] = None, ) -> None: self._connection_manager = get_connection_manager(socket_pool) self._socket_pool = socket_pool @@ -156,20 +153,6 @@ def __init__( self._backwards_compatible_sock = False self._use_binary_mode = use_binary_mode - self.use_monotonic_ns = False - try: - time.monotonic_ns() - self.use_monotonic_ns = True - except AttributeError: - if use_imprecise_time: - self.use_monotonic_ns = False - else: - raise MMQTTException( # pylint: disable=raise-missing-from - "time.monotonic_ns() is not available. " - "Will use imprecise time however only if the" - "use_imprecise_time argument is set to True." - ) - if recv_timeout <= socket_timeout: raise MMQTTException( "recv_timeout must be strictly greater than socket_timeout" @@ -245,17 +228,6 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None - def get_monotonic_time(self) -> float: - """ - Provide monotonic time in seconds. Based on underlying implementation - this might result in imprecise time, that will result in the library - not being able to operate if running contiguously for more than 24 days or so. - """ - if self.use_monotonic_ns: - return time.monotonic_ns() / 1000000000 - - return time.monotonic() - def __enter__(self): return self From 830af789ce68018a2aebbbfc03ac2565d0d8d4f1 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 5 May 2024 20:24:52 -0400 Subject: [PATCH 15/26] remove test_loop dependence on mqtt.get_monotonic_time function --- tests/test_loop.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_loop.py b/tests/test_loop.py index 4d79b65..a40b66e 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -142,7 +142,8 @@ def test_loop_basic(self) -> None: time_before = time.monotonic() timeout = random.randint(3, 8) # pylint: disable=protected-access - mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time() + # mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time() + mqtt_client._last_msg_sent_timestamp = time.monotonic() rcs = mqtt_client.loop(timeout=timeout) time_after = time.monotonic() From 82d6f9d650595b7f163d808d61df62e58a8aed09 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 5 May 2024 20:41:35 -0400 Subject: [PATCH 16/26] timestamp uses ticks_ms --- tests/test_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_loop.py b/tests/test_loop.py index a40b66e..c8328b4 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -143,7 +143,7 @@ def test_loop_basic(self) -> None: timeout = random.randint(3, 8) # pylint: disable=protected-access # mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time() - mqtt_client._last_msg_sent_timestamp = time.monotonic() + mqtt_client._last_msg_sent_timestamp = mqtt_client.ticks_ms() rcs = mqtt_client.loop(timeout=timeout) time_after = time.monotonic() From a1eec26d1cc0c06c1a77f1426e89311b7f1c85be Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 5 May 2024 20:44:46 -0400 Subject: [PATCH 17/26] fix ticks_ms ref --- tests/test_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_loop.py b/tests/test_loop.py index c8328b4..4e4f526 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -143,7 +143,7 @@ def test_loop_basic(self) -> None: timeout = random.randint(3, 8) # pylint: disable=protected-access # mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time() - mqtt_client._last_msg_sent_timestamp = mqtt_client.ticks_ms() + mqtt_client._last_msg_sent_timestamp = MQTT.ticks_ms() rcs = mqtt_client.loop(timeout=timeout) time_after = time.monotonic() From e873f43fa970ee86198b8a6680bce6bda4016d12 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 5 May 2024 20:57:51 -0400 Subject: [PATCH 18/26] modify ping_timeout test for 3 res --- tests/test_loop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_loop.py b/tests/test_loop.py index 4e4f526..756e957 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -142,7 +142,6 @@ def test_loop_basic(self) -> None: time_before = time.monotonic() timeout = random.randint(3, 8) # pylint: disable=protected-access - # mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time() mqtt_client._last_msg_sent_timestamp = MQTT.ticks_ms() rcs = mqtt_client.loop(timeout=timeout) time_after = time.monotonic() @@ -224,7 +223,8 @@ def test_loop_ping_timeout(self): res = mqtt_client.loop(timeout=2 * keep_alive_timeout) assert time.monotonic() - start >= 2 * keep_alive_timeout assert len(mocket.sent) > 0 - assert len(res) == 2 + # assert len(res) == 2 + assert len(res) == 3 # not sure if 3 is ok assert set(res) == {int(0xD0)} # pylint: disable=no-self-use From 804d7f70bfc5059768ffbc41aebaa7c3e5f6e71e Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 5 May 2024 21:05:13 -0400 Subject: [PATCH 19/26] black formatting --- tests/test_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_loop.py b/tests/test_loop.py index 756e957..1ca6591 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -224,7 +224,7 @@ def test_loop_ping_timeout(self): assert time.monotonic() - start >= 2 * keep_alive_timeout assert len(mocket.sent) > 0 # assert len(res) == 2 - assert len(res) == 3 # not sure if 3 is ok + assert len(res) == 3 # not sure if 3 is ok assert set(res) == {int(0xD0)} # pylint: disable=no-self-use From b569ce7881f5fc3b4e2d5c5e2e3753e2c8b23c58 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Tue, 7 May 2024 18:39:12 -0400 Subject: [PATCH 20/26] Remove micropython from docs/conf.py --- docs/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/conf.py b/docs/conf.py index eb5696d..3b98995 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -26,7 +26,7 @@ # Uncomment the below if you use native CircuitPython modules such as # digitalio, micropython and busio. List the modules you use. Without it, the # autodoc module docs will fail to generate with a warning. -autodoc_mock_imports = ["micropython", "microcontroller", "random"] +autodoc_mock_imports = ["microcontroller", "random"] intersphinx_mapping = { From 89bd1e131ce07660c4bd356028c53000c3caf277 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Wed, 8 May 2024 20:13:05 -0400 Subject: [PATCH 21/26] make loop ping timeout test more robust --- tests/test_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_loop.py b/tests/test_loop.py index 1ca6591..ea3df5f 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -220,7 +220,7 @@ def test_loop_ping_timeout(self): mqtt_client._sock = mocket start = time.monotonic() - res = mqtt_client.loop(timeout=2 * keep_alive_timeout) + res = mqtt_client.loop(timeout=2 * keep_alive_timeout + recv_timeout) assert time.monotonic() - start >= 2 * keep_alive_timeout assert len(mocket.sent) > 0 # assert len(res) == 2 From ee32e5e32b65a53b2abcc442f3bdfcb0d38453c1 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Thu, 9 May 2024 18:06:06 -0400 Subject: [PATCH 22/26] cleaned up test_loop --- tests/test_loop.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_loop.py b/tests/test_loop.py index ea3df5f..6666d86 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -223,8 +223,7 @@ def test_loop_ping_timeout(self): res = mqtt_client.loop(timeout=2 * keep_alive_timeout + recv_timeout) assert time.monotonic() - start >= 2 * keep_alive_timeout assert len(mocket.sent) > 0 - # assert len(res) == 2 - assert len(res) == 3 # not sure if 3 is ok + assert len(res) == 3 assert set(res) == {int(0xD0)} # pylint: disable=no-self-use From 93b7b3e2b1bdeec1a761c474cd110d099420dd93 Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 12 May 2024 16:40:56 -0400 Subject: [PATCH 23/26] fixed up ping_timeout --- adafruit_minimqtt/adafruit_minimqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 3d530f0..3bc018d 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -574,8 +574,8 @@ def ping(self) -> list[int]: rc = self._wait_for_msg() if rc: rcs.append(rc) - if ticks_diff(ticks_ms(), stamp) > ping_timeout * 1000: - raise MMQTTException("PINGRESP not returned from broker.") + if ticks_diff(ticks_ms(), stamp) / 1000 > ping_timeout: + raise MMQTTException(f"PINGRESP not returned from broker within {ping_timeout} seconds.") return rcs # pylint: disable=too-many-branches, too-many-statements From 427225233daca44cd4da49c06f176b7573c897ce Mon Sep 17 00:00:00 2001 From: kevin-tritz Date: Sun, 12 May 2024 16:58:48 -0400 Subject: [PATCH 24/26] ok, reverted all of the black reformats and just formatted minimqtt.py --- adafruit_minimqtt/adafruit_minimqtt.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 3bc018d..702b403 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -575,7 +575,9 @@ def ping(self) -> list[int]: if rc: rcs.append(rc) if ticks_diff(ticks_ms(), stamp) / 1000 > ping_timeout: - raise MMQTTException(f"PINGRESP not returned from broker within {ping_timeout} seconds.") + raise MMQTTException( + f"PINGRESP not returned from broker within {ping_timeout} seconds." + ) return rcs # pylint: disable=too-many-branches, too-many-statements From 1ffbe5e508a7ddd8e7c793d66f96ad42263c31e8 Mon Sep 17 00:00:00 2001 From: foamyguy Date: Mon, 17 Jun 2024 09:54:29 -0500 Subject: [PATCH 25/26] fix for timeout test --- tests/test_recv_timeout.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_recv_timeout.py b/tests/test_recv_timeout.py index f65dc40..646aa7c 100644 --- a/tests/test_recv_timeout.py +++ b/tests/test_recv_timeout.py @@ -49,7 +49,7 @@ def test_recv_timeout_vs_keepalive(self) -> None: socket_mock.recv_into.assert_called() now = time.monotonic() - assert recv_timeout <= (now - start) < keep_alive + assert recv_timeout <= round(now - start, 2) <= keep_alive if __name__ == "__main__": From 2d562af9713d394d674eb57cb48c2edf715b9f38 Mon Sep 17 00:00:00 2001 From: foamyguy Date: Fri, 5 Jul 2024 12:05:34 -0500 Subject: [PATCH 26/26] tolerance value instead of rounding Co-authored-by: Dan Halbert --- tests/test_recv_timeout.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_recv_timeout.py b/tests/test_recv_timeout.py index 646aa7c..b291dd8 100644 --- a/tests/test_recv_timeout.py +++ b/tests/test_recv_timeout.py @@ -49,7 +49,7 @@ def test_recv_timeout_vs_keepalive(self) -> None: socket_mock.recv_into.assert_called() now = time.monotonic() - assert recv_timeout <= round(now - start, 2) <= keep_alive + assert recv_timeout <= (now - start) <= (keep_alive + 0.1) if __name__ == "__main__":