-
Notifications
You must be signed in to change notification settings - Fork 383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feedback requested: Python SDK config and status object for debugging #1082
base: v3
Are you sure you want to change the base?
Changes from all commits
46d5f9d
86d6df6
2b29f1d
ee0c9c9
8372fb7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ | |
from . import transport_exceptions as exceptions | ||
from enum import Enum | ||
import socks | ||
import dataclasses | ||
import datetime | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -49,6 +51,104 @@ | |
} | ||
|
||
|
||
@dataclasses.dataclass | ||
class PahoStatus(object): | ||
connect_rc_codes: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of rc codes returned from the `connect` method""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is.... super nitpicky... (but also kind of not) but you aren't supposed to use """ to make comments anywhere except a docstring. As I recall, they are interpreted differently by the runtime, because the """ is a string literal, wheras a # comment just gets completely ignored. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, not nitpicky. These string literals end up taking space at runtime, so the distinction is important. I did it this way on purpose. Even though docstrings on attributes are technically not part of the Python standard, some tools will pick these up (if you put them after the attribute definition). I think PyCharm will pick these up, and maybe some versions of Sphynx. |
||
|
||
on_connect_rc_codes: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of rc codes passed into the `on_connect` handler""" | ||
|
||
connect_exceptions: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of exceptions raised by the `connect` method""" | ||
|
||
disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of rc codes returned from the `disconnect` method""" | ||
|
||
on_disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of rc codes passed into the `on_disconnect` handler""" | ||
|
||
disconnect_exceptions: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of exceptions raised by the `disconnect` method""" | ||
|
||
publish_rc_codes: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of rc codes returned from the `publish` method""" | ||
|
||
publish_exceptions: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of exceptions raised by the `publish` method""" | ||
|
||
subscribe_rc_codes: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of rc codes returned from the `subscribe` method""" | ||
|
||
subscribe_exceptions: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of exceptions raised by the `subscribe` method""" | ||
|
||
unsubscribe_rc_codes: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of rc codes returned from the `unsubscribe` method""" | ||
|
||
unsubscribe_exceptions: dict = dataclasses.field(default_factory=dict) | ||
"""Value->count dictionary of exceptions raised by the `unsubscribe` method""" | ||
|
||
count_message_received: int = 0 | ||
|
||
count_subscribe: int = 0 | ||
count_suback: int = 0 | ||
|
||
count_unsubscribe: int = 0 | ||
count_unsuback: int = 0 | ||
|
||
count_publish: int = 0 | ||
count_puback: int = 0 | ||
|
||
shut_down: bool = False | ||
|
||
time_since_last_paho_traffic_in: str = "" | ||
time_since_last_paho_traffic_out: str = "" | ||
client_object_id: int = 0 | ||
thread_name: str = "" | ||
thread_is_alive: bool = False | ||
len_out_mesage_queue: int = 0 | ||
len_in_message_queue: int = 0 | ||
len_out_pakcet_queue: int = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spelling? |
||
thread_terminate: bool = False | ||
paho_connection_state: int = 0 | ||
|
||
def to_dict(self): | ||
return dataclasses.asdict(self) | ||
|
||
|
||
@dataclasses.dataclass(order=True) | ||
class PahoConfig(object): | ||
transport: str = "" | ||
protocol: str = "" | ||
keepalive: int = 0 | ||
connect_timeout: int = 0 | ||
reconnect_on_failure: bool = False | ||
reconnect_delay_min: int = 0 | ||
reconnect_delay_max: int = 0 | ||
host: str = "" | ||
port: int = 0 | ||
proxy_args: dict = dataclasses.field(default_factory=dict) | ||
socket_class: str = "" | ||
socket_name: str = "" | ||
|
||
def to_dict(self): | ||
return dataclasses.asdict(self) | ||
|
||
|
||
def add_count_to_dict(dikt, key): | ||
if isinstance(key, Exception): | ||
key = type(key).__name__ | ||
dikt[key] = dikt.get(key, 0) + 1 | ||
|
||
|
||
def format_time_delta(s): | ||
if s: | ||
return str(datetime.timedelta(seconds=mqtt.time_func() - s)) | ||
else: | ||
return "infinity" | ||
|
||
|
||
def _create_error_from_connack_rc_code(rc): | ||
""" | ||
Given a paho CONNACK rc code, return an Exception that can be raised | ||
|
@@ -131,6 +231,8 @@ def __init__( | |
|
||
self._mqtt_client = self._create_mqtt_client() | ||
|
||
self._paho_status = PahoStatus() | ||
|
||
def _create_mqtt_client(self): | ||
""" | ||
Create the MQTT client object and assign all necessary event handler callbacks. | ||
|
@@ -176,6 +278,8 @@ def on_connect(client, userdata, flags, rc): | |
this = self_weakref() | ||
logger.info("connected with result code: {}".format(rc)) | ||
|
||
add_count_to_dict(this._paho_status.on_connect_rc_codes, rc) | ||
|
||
if rc: # i.e. if there is an error | ||
if this.on_mqtt_connection_failure_handler: | ||
try: | ||
|
@@ -204,6 +308,9 @@ def on_disconnect(client, userdata, rc): | |
this = self_weakref() | ||
logger.info("disconnected with result code: {}".format(rc)) | ||
|
||
if this: | ||
add_count_to_dict(this._paho_status.on_disconnect_rc_codes, rc) | ||
|
||
cause = None | ||
if rc: # i.e. if there is an error | ||
logger.debug("".join(traceback.format_stack())) | ||
|
@@ -239,27 +346,31 @@ def on_disconnect(client, userdata, rc): | |
def on_subscribe(client, userdata, mid, granted_qos): | ||
this = self_weakref() | ||
logger.info("suback received for {}".format(mid)) | ||
this._paho_status.count_suback += 1 | ||
# subscribe failures are returned from the subscribe() call. This is just | ||
# a notification that a SUBACK was received, so there is no failure case here | ||
this._op_manager.complete_operation(OperationType.SUBSCRIBE, mid) | ||
|
||
def on_unsubscribe(client, userdata, mid): | ||
this = self_weakref() | ||
logger.info("UNSUBACK received for {}".format(mid)) | ||
this._paho_status.count_unsuback += 1 | ||
# unsubscribe failures are returned from the unsubscribe() call. This is just | ||
# a notification that a SUBACK was received, so there is no failure case here | ||
this._op_manager.complete_operation(OperationType.UNSUBSCRIBE, mid) | ||
|
||
def on_publish(client, userdata, mid): | ||
this = self_weakref() | ||
logger.info("payload published for {}".format(mid)) | ||
this._paho_status.count_puback += 1 | ||
# publish failures are returned from the publish() call. This is just | ||
# a notification that a PUBACK was received, so there is no failure case here | ||
this._op_manager.complete_operation(OperationType.PUBLISH, mid) | ||
|
||
def on_message(client, userdata, mqtt_message): | ||
this = self_weakref() | ||
logger.info("message received on {}".format(mqtt_message.topic)) | ||
this._paho_status.count_message_received += 1 | ||
|
||
if this.on_mqtt_message_received_handler: | ||
try: | ||
|
@@ -362,6 +473,7 @@ def _create_ssl_context(self): | |
|
||
def shutdown(self): | ||
"""Shut down the transport. This is (currently) irreversible.""" | ||
self._paho_status.shut_down = True | ||
# Remove the disconnect handler from Paho. We don't want to trigger any events in response | ||
# to the shutdown and confuse the higher level layers of code. Just end it. | ||
self._mqtt_client.on_disconnect = None | ||
|
@@ -405,7 +517,9 @@ def connect(self, password=None): | |
rc = self._mqtt_client.connect( | ||
host=self._hostname, port=8883, keepalive=self._keep_alive | ||
) | ||
add_count_to_dict(self._paho_status.connect_rc_codes, rc) | ||
except socket.error as e: | ||
add_count_to_dict(self._paho_status.connect_exceptions, e) | ||
self._force_transport_disconnect_and_cleanup() | ||
|
||
# Only this type will raise a special error | ||
|
@@ -428,6 +542,7 @@ def connect(self, password=None): | |
raise exceptions.ConnectionFailedError() from e | ||
|
||
except Exception as e: | ||
add_count_to_dict(self._paho_status.connect_exceptions, e) | ||
self._force_transport_disconnect_and_cleanup() | ||
|
||
raise exceptions.ProtocolClientError("Unexpected Paho failure during connect") from e | ||
|
@@ -451,6 +566,7 @@ def disconnect(self, clear_inflight=False): | |
try: | ||
rc = self._mqtt_client.disconnect() | ||
except Exception as e: | ||
add_count_to_dict(self._paho_status.disconnect_exceptions, e) | ||
raise exceptions.ProtocolClientError("Unexpected Paho failure during disconnect") from e | ||
finally: | ||
self._mqtt_client.loop_stop() | ||
|
@@ -459,6 +575,8 @@ def disconnect(self, clear_inflight=False): | |
logger.debug("in paho thread. nulling _thread") | ||
self._mqtt_client._thread = None | ||
|
||
add_count_to_dict(self._paho_status.disconnect_rc_codes, rc) | ||
|
||
logger.debug("_mqtt_client.disconnect returned rc={}".format(rc)) | ||
if rc: | ||
# This could result in ConnectionDroppedError or ProtocolClientError | ||
|
@@ -488,14 +606,18 @@ def subscribe(self, topic, qos=1, callback=None): | |
:raises: ProtocolClientError if there is some other client error. | ||
:raises: NoConnectionError if the client isn't actually connected. | ||
""" | ||
self._paho_status.count_subscribe += 1 | ||
logger.info("subscribing to {} with qos {}".format(topic, qos)) | ||
try: | ||
(rc, mid) = self._mqtt_client.subscribe(topic, qos=qos) | ||
except ValueError: | ||
except ValueError as e: | ||
add_count_to_dict(self._paho_status.subscribe_exceptions, e) | ||
raise | ||
except Exception as e: | ||
add_count_to_dict(self._paho_status.subscribe_exceptions, e) | ||
raise exceptions.ProtocolClientError("Unexpected Paho failure during subscribe") from e | ||
logger.debug("_mqtt_client.subscribe returned rc={}".format(rc)) | ||
add_count_to_dict(self._paho_status.subscribe_rc_codes, rc) | ||
if rc: | ||
# This could result in ConnectionDroppedError or ProtocolClientError | ||
raise _create_error_from_rc_code(rc) | ||
|
@@ -513,16 +635,20 @@ def unsubscribe(self, topic, callback=None): | |
:raises: ProtocolClientError if there is some other client error. | ||
:raises: NoConnectionError if the client isn't actually connected. | ||
""" | ||
self._paho_status.count_unsubscribe += 1 | ||
logger.info("unsubscribing from {}".format(topic)) | ||
try: | ||
(rc, mid) = self._mqtt_client.unsubscribe(topic) | ||
except ValueError: | ||
except ValueError as e: | ||
add_count_to_dict(self._paho_status.unsubscribe_exceptions, e) | ||
raise | ||
except Exception as e: | ||
add_count_to_dict(self._paho_status.unsubscribe_exceptions, e) | ||
raise exceptions.ProtocolClientError( | ||
"Unexpected Paho failure during unsubscribe" | ||
) from e | ||
logger.debug("_mqtt_client.unsubscribe returned rc={}".format(rc)) | ||
add_count_to_dict(self._paho_status.unsubscribe_rc_codes, rc) | ||
if rc: | ||
# This could result in ConnectionDroppedError or ProtocolClientError | ||
raise _create_error_from_rc_code(rc) | ||
|
@@ -546,16 +672,18 @@ def publish(self, topic, payload, qos=1, callback=None): | |
:raises: ConnectionDroppedError if connection is dropped during execution. | ||
:raises: ProtocolClientError if there is some other client error. | ||
""" | ||
self._paho_status.count_publish += 1 | ||
logger.info("publishing on {}".format(topic)) | ||
try: | ||
(rc, mid) = self._mqtt_client.publish(topic=topic, payload=payload, qos=qos) | ||
except ValueError: | ||
raise | ||
except TypeError: | ||
except (ValueError, TypeError) as e: | ||
add_count_to_dict(self._paho_status.publish_exceptions, e) | ||
raise | ||
except Exception as e: | ||
add_count_to_dict(self._paho_status.publish_exceptions, e) | ||
raise exceptions.ProtocolClientError("Unexpected Paho failure during publish") from e | ||
logger.debug("_mqtt_client.publish returned rc={}".format(rc)) | ||
add_count_to_dict(self._paho_status.publish_rc_codes, rc) | ||
if rc: | ||
# Even though Paho returns a rc code indicating an error, it still stores the message | ||
# and will publish on connect, so this isn't really a failure - it just hangs. | ||
|
@@ -565,6 +693,59 @@ def publish(self, topic, payload, qos=1, callback=None): | |
raise _create_error_from_rc_code(rc) | ||
self._op_manager.establish_operation(OperationType.PUBLISH, mid, callback) | ||
|
||
def get_debug_status(self): | ||
""" | ||
Return an infomrational status object that describes the current state of the Paho transport | ||
""" | ||
|
||
self._paho_status.time_since_last_paho_traffic_in = format_time_delta( | ||
self._mqtt_client._last_msg_in | ||
) | ||
self._paho_status.time_since_last_paho_traffic_out = format_time_delta( | ||
self._mqtt_client._last_msg_out | ||
) | ||
|
||
self._paho_status.client_object_id = id(self._mqtt_client) | ||
self._paho_status.thread_name = ( | ||
self._mqtt_client._thread.name if self._mqtt_client._thread else "None" | ||
) | ||
self._paho_status.thread_is_alive = ( | ||
str(self._mqtt_client._thread.is_alive()) if self._mqtt_client._thread else "No thread" | ||
) | ||
self._paho_status.len_out_mesage_queue = len(self._mqtt_client._out_messages) | ||
self._paho_status.len_in_message_queue = len(self._mqtt_client._in_messages) | ||
self._paho_status.len_out_pakcet_queue = len(self._mqtt_client._out_packet) | ||
self._paho_status.thread_terminate = self._mqtt_client._thread_terminate | ||
self._paho_status.paho_connection_state = self._mqtt_client._state | ||
|
||
return self._paho_status | ||
|
||
def get_debug_config(self): | ||
""" | ||
Return an infomrational status object that describes the configuration of the Paho transport | ||
""" | ||
config = PahoConfig() | ||
|
||
config.transport = self._mqtt_client._transport | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel as though accessing convention-private attributes of Paho is dangerous. There is no guarantee they don't change in an update to Paho, which could break anyone using our library until we make an update. Furthermore, that risk doesn't really seem worth it for a logging utility. I could be convinced of the utility if we were to build some exception handling in to protect against this possibility, but even still, seems dicey. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that it's dangerous. I also think it's super useful -- at least some of these are. Using getattr() with a default would be much safer. |
||
config.protocol = str(self._mqtt_client._protocol) | ||
config.keepalive = self._mqtt_client._keepalive | ||
config.connect_timeout = self._mqtt_client._connect_timeout | ||
config.reconnect_on_failure = self._mqtt_client._reconnect_on_failure | ||
config.reconnect_delay_min = self._mqtt_client._reconnect_min_delay | ||
config.reconnect_delay_max = self._mqtt_client._reconnect_max_delay | ||
config.host = self._mqtt_client._host | ||
config.port = self._mqtt_client._port | ||
config.proxy_args = self._mqtt_client._proxy | ||
config.socket_class = type(self._mqtt_client.socket()).__name__ | ||
config.socket_name = ( | ||
str(self._mqtt_client.socket().getsockname()) | ||
if self._mqtt_client.socket() | ||
and getattr(self._mqtt_client.socket(), "getsockname", None) | ||
else "No socket name" | ||
) | ||
|
||
return config | ||
|
||
|
||
class OperationType(Enum): | ||
PUBLISH = "PUBLISH" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,28 +64,39 @@ def transport_to_port(transport): | |
) | ||
|
||
|
||
def disconnect_output_port(disconnect_type, transport, host): | ||
def disconnect_output_port(disconnect_type, transport, host=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great change. Was thinking of doing this myself. |
||
""" | ||
Disconnect the port for a given transport. disconnect_type can either be "DROP" to drop | ||
packets sent to that port, or it can be "REJECT" to reject packets sent to that port. | ||
""" | ||
# sudo -n iptables -A OUTPUT -p tcp --dport 8883 --destination 20.21.22.23 -j DROP | ||
ip = get_ip(host) | ||
port = transport_to_port(transport) | ||
run_shell_command( | ||
"{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format( | ||
get_sudo_prefix(), port, ip, disconnect_type | ||
|
||
if host: | ||
ip = get_ip(host) | ||
run_shell_command( | ||
"{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format( | ||
get_sudo_prefix(), port, ip, disconnect_type | ||
) | ||
) | ||
else: | ||
run_shell_command( | ||
"{}iptables -A OUTPUT -p tcp --dport {} -j {}".format( | ||
get_sudo_prefix(), port, disconnect_type | ||
) | ||
) | ||
) | ||
|
||
|
||
def reconnect_all(transport, host): | ||
def reconnect_all(transport, host=None): | ||
""" | ||
Reconnect all disconnects for this host and transport. Effectively, clean up | ||
anything that this module may have done. | ||
""" | ||
if not sys.platform.startswith("win"): | ||
ip = get_ip(host) | ||
if host: | ||
ip = get_ip(host) | ||
else: | ||
ip = "" | ||
port = transport_to_port(transport) | ||
for disconnect_type in all_disconnect_types: | ||
# sudo -n iptables -L OUTPUT -n -v --line-numbers | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
logs/**/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this requires Python 3.7, there should be an update to the
setup.py
to exclude Python 3.6 (and a removal of the associated classifier)