Skip to content

Commit

Permalink
Merge pull request #190 from vladak/recv_timeout_vs_keep_alive
Browse files Browse the repository at this point in the history
honor recv_timeout in _sock_exact_recv() and ping()
  • Loading branch information
dhalbert authored May 20, 2024
2 parents d412e9a + 5814fd0 commit 53b1412
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
12 changes: 7 additions & 5 deletions adafruit_minimqtt/adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def ping(self) -> list[int]:
self._connected()
self.logger.debug("Sending PINGREQ")
self._sock.send(MQTT_PINGREQ)
ping_timeout = self.keep_alive
ping_timeout = self._recv_timeout
stamp = self.get_monotonic_time()
self._last_msg_sent_timestamp = stamp
rc, rcs = None, []
Expand All @@ -624,7 +624,9 @@ def ping(self) -> list[int]:
if rc:
rcs.append(rc)
if self.get_monotonic_time() - stamp > ping_timeout:
raise MMQTTException("PINGRESP not returned from broker.")
raise MMQTTException(
f"PINGRESP not returned from broker within {ping_timeout} seconds."
)
return rcs

# pylint: disable=too-many-branches, too-many-statements
Expand Down Expand Up @@ -1090,7 +1092,7 @@ def _sock_exact_recv(
to_read = bufsize - recv_len
if to_read < 0:
raise MMQTTException(f"negative number of bytes to read: {to_read}")
read_timeout = timeout if timeout is not None else self.keep_alive
read_timeout = timeout if timeout is not None else self._recv_timeout
mv = mv[recv_len:]
while to_read > 0:
recv_len = self._sock.recv_into(mv, to_read)
Expand All @@ -1101,7 +1103,7 @@ def _sock_exact_recv(
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
)
else: # ESP32SPI Impl.
# This will timeout with socket timeout (not keepalive timeout)
# This will time out with socket timeout (not receive timeout).
rc = self._sock.recv(bufsize)
if not rc:
self.logger.debug("_sock_exact_recv timeout")
Expand All @@ -1111,7 +1113,7 @@ def _sock_exact_recv(
# or raise exception if wait longer than read_timeout
to_read = bufsize - len(rc)
assert to_read >= 0
read_timeout = self.keep_alive
read_timeout = self._recv_timeout
while to_read > 0:
recv = self._sock.recv(to_read)
to_read -= len(recv)
Expand Down
56 changes: 56 additions & 0 deletions tests/test_recv_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# SPDX-FileCopyrightText: 2024 Vladimír Kotal
#
# SPDX-License-Identifier: Unlicense

"""receive timeout tests"""

import socket
import time
from unittest import TestCase, main
from unittest.mock import Mock

import adafruit_minimqtt.adafruit_minimqtt as MQTT


class RecvTimeout(TestCase):
"""This class contains tests for receive timeout handling."""

def test_recv_timeout_vs_keepalive(self) -> None:
"""verify that receive timeout as used via ping() is different to keep alive timeout"""

for side_effect in [lambda ret_buf, buf_size: 0, socket.timeout]:
with self.subTest():
host = "127.0.0.1"

recv_timeout = 4
keep_alive = recv_timeout * 2
mqtt_client = MQTT.MQTT(
broker=host,
socket_pool=socket,
connect_retries=1,
socket_timeout=recv_timeout // 2,
recv_timeout=recv_timeout,
keep_alive=keep_alive,
)

# Create a mock socket that will accept anything and return nothing.
socket_mock = Mock()
socket_mock.recv_into = Mock(side_effect=side_effect)
# pylint: disable=protected-access
mqtt_client._sock = socket_mock

mqtt_client._connected = lambda: True
start = time.monotonic()
with self.assertRaises(MQTT.MMQTTException):
mqtt_client.ping()

# Verify the mock interactions.
socket_mock.send.assert_called_once()
socket_mock.recv_into.assert_called()

now = time.monotonic()
assert recv_timeout <= (now - start) < keep_alive


if __name__ == "__main__":
main()

0 comments on commit 53b1412

Please sign in to comment.