diff --git a/azure-iot-device/azure/iot/device/common/mqtt_transport.py b/azure-iot-device/azure/iot/device/common/mqtt_transport.py index 7ee90b84d..855ff3e4c 100644 --- a/azure-iot-device/azure/iot/device/common/mqtt_transport.py +++ b/azure-iot-device/azure/iot/device/common/mqtt_transport.py @@ -14,6 +14,8 @@ from . import transport_exceptions as exceptions from enum import Enum import socks +import dataclasses +import datetime logger = logging.getLogger(__name__) @@ -49,6 +51,104 @@ } +@dataclasses.dataclass +class PahoStatus(object): + connect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `connect` method""" + + on_connect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes passed into the `on_connect` handler""" + + connect_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `connect` method""" + + disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `disconnect` method""" + + on_disconnect_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes passed into the `on_disconnect` handler""" + + disconnect_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `disconnect` method""" + + publish_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `publish` method""" + + publish_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `publish` method""" + + subscribe_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `subscribe` method""" + + subscribe_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `subscribe` method""" + + unsubscribe_rc_codes: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of rc codes returned from the `unsubscribe` method""" + + unsubscribe_exceptions: dict = dataclasses.field(default_factory=dict) + """Value->count dictionary of exceptions raised by the `unsubscribe` method""" + + count_message_received: int = 0 + + count_subscribe: int = 0 + count_suback: int = 0 + + count_unsubscribe: int = 0 + count_unsuback: int = 0 + + count_publish: int = 0 + count_puback: int = 0 + + shut_down: bool = False + + time_since_last_paho_traffic_in: str = "" + time_since_last_paho_traffic_out: str = "" + client_object_id: int = 0 + thread_name: str = "" + thread_is_alive: bool = False + len_out_mesage_queue: int = 0 + len_in_message_queue: int = 0 + len_out_pakcet_queue: int = 0 + thread_terminate: bool = False + paho_connection_state: int = 0 + + def to_dict(self): + return dataclasses.asdict(self) + + +@dataclasses.dataclass(order=True) +class PahoConfig(object): + transport: str = "" + protocol: str = "" + keepalive: int = 0 + connect_timeout: int = 0 + reconnect_on_failure: bool = False + reconnect_delay_min: int = 0 + reconnect_delay_max: int = 0 + host: str = "" + port: int = 0 + proxy_args: dict = dataclasses.field(default_factory=dict) + socket_class: str = "" + socket_name: str = "" + + def to_dict(self): + return dataclasses.asdict(self) + + +def add_count_to_dict(dikt, key): + if isinstance(key, Exception): + key = type(key).__name__ + dikt[key] = dikt.get(key, 0) + 1 + + +def format_time_delta(s): + if s: + return str(datetime.timedelta(seconds=mqtt.time_func() - s)) + else: + return "infinity" + + def _create_error_from_connack_rc_code(rc): """ Given a paho CONNACK rc code, return an Exception that can be raised @@ -131,6 +231,8 @@ def __init__( self._mqtt_client = self._create_mqtt_client() + self._paho_status = PahoStatus() + def _create_mqtt_client(self): """ Create the MQTT client object and assign all necessary event handler callbacks. @@ -176,6 +278,8 @@ def on_connect(client, userdata, flags, rc): this = self_weakref() logger.info("connected with result code: {}".format(rc)) + add_count_to_dict(this._paho_status.on_connect_rc_codes, rc) + if rc: # i.e. if there is an error if this.on_mqtt_connection_failure_handler: try: @@ -204,6 +308,9 @@ def on_disconnect(client, userdata, rc): this = self_weakref() logger.info("disconnected with result code: {}".format(rc)) + if this: + add_count_to_dict(this._paho_status.on_disconnect_rc_codes, rc) + cause = None if rc: # i.e. if there is an error logger.debug("".join(traceback.format_stack())) @@ -239,6 +346,7 @@ def on_disconnect(client, userdata, rc): def on_subscribe(client, userdata, mid, granted_qos): this = self_weakref() logger.info("suback received for {}".format(mid)) + this._paho_status.count_suback += 1 # subscribe failures are returned from the subscribe() call. This is just # a notification that a SUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.SUBSCRIBE, mid) @@ -246,6 +354,7 @@ def on_subscribe(client, userdata, mid, granted_qos): def on_unsubscribe(client, userdata, mid): this = self_weakref() logger.info("UNSUBACK received for {}".format(mid)) + this._paho_status.count_unsuback += 1 # unsubscribe failures are returned from the unsubscribe() call. This is just # a notification that a SUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.UNSUBSCRIBE, mid) @@ -253,6 +362,7 @@ def on_unsubscribe(client, userdata, mid): def on_publish(client, userdata, mid): this = self_weakref() logger.info("payload published for {}".format(mid)) + this._paho_status.count_puback += 1 # publish failures are returned from the publish() call. This is just # a notification that a PUBACK was received, so there is no failure case here this._op_manager.complete_operation(OperationType.PUBLISH, mid) @@ -260,6 +370,7 @@ def on_publish(client, userdata, mid): def on_message(client, userdata, mqtt_message): this = self_weakref() logger.info("message received on {}".format(mqtt_message.topic)) + this._paho_status.count_message_received += 1 if this.on_mqtt_message_received_handler: try: @@ -362,6 +473,7 @@ def _create_ssl_context(self): def shutdown(self): """Shut down the transport. This is (currently) irreversible.""" + self._paho_status.shut_down = True # Remove the disconnect handler from Paho. We don't want to trigger any events in response # to the shutdown and confuse the higher level layers of code. Just end it. self._mqtt_client.on_disconnect = None @@ -405,7 +517,9 @@ def connect(self, password=None): rc = self._mqtt_client.connect( host=self._hostname, port=8883, keepalive=self._keep_alive ) + add_count_to_dict(self._paho_status.connect_rc_codes, rc) except socket.error as e: + add_count_to_dict(self._paho_status.connect_exceptions, e) self._force_transport_disconnect_and_cleanup() # Only this type will raise a special error @@ -428,6 +542,7 @@ def connect(self, password=None): raise exceptions.ConnectionFailedError() from e except Exception as e: + add_count_to_dict(self._paho_status.connect_exceptions, e) self._force_transport_disconnect_and_cleanup() raise exceptions.ProtocolClientError("Unexpected Paho failure during connect") from e @@ -451,6 +566,7 @@ def disconnect(self, clear_inflight=False): try: rc = self._mqtt_client.disconnect() except Exception as e: + add_count_to_dict(self._paho_status.disconnect_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during disconnect") from e finally: self._mqtt_client.loop_stop() @@ -459,6 +575,8 @@ def disconnect(self, clear_inflight=False): logger.debug("in paho thread. nulling _thread") self._mqtt_client._thread = None + add_count_to_dict(self._paho_status.disconnect_rc_codes, rc) + logger.debug("_mqtt_client.disconnect returned rc={}".format(rc)) if rc: # This could result in ConnectionDroppedError or ProtocolClientError @@ -488,14 +606,18 @@ def subscribe(self, topic, qos=1, callback=None): :raises: ProtocolClientError if there is some other client error. :raises: NoConnectionError if the client isn't actually connected. """ + self._paho_status.count_subscribe += 1 logger.info("subscribing to {} with qos {}".format(topic, qos)) try: (rc, mid) = self._mqtt_client.subscribe(topic, qos=qos) - except ValueError: + except ValueError as e: + add_count_to_dict(self._paho_status.subscribe_exceptions, e) raise except Exception as e: + add_count_to_dict(self._paho_status.subscribe_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during subscribe") from e logger.debug("_mqtt_client.subscribe returned rc={}".format(rc)) + add_count_to_dict(self._paho_status.subscribe_rc_codes, rc) if rc: # This could result in ConnectionDroppedError or ProtocolClientError raise _create_error_from_rc_code(rc) @@ -513,16 +635,20 @@ def unsubscribe(self, topic, callback=None): :raises: ProtocolClientError if there is some other client error. :raises: NoConnectionError if the client isn't actually connected. """ + self._paho_status.count_unsubscribe += 1 logger.info("unsubscribing from {}".format(topic)) try: (rc, mid) = self._mqtt_client.unsubscribe(topic) - except ValueError: + except ValueError as e: + add_count_to_dict(self._paho_status.unsubscribe_exceptions, e) raise except Exception as e: + add_count_to_dict(self._paho_status.unsubscribe_exceptions, e) raise exceptions.ProtocolClientError( "Unexpected Paho failure during unsubscribe" ) from e logger.debug("_mqtt_client.unsubscribe returned rc={}".format(rc)) + add_count_to_dict(self._paho_status.unsubscribe_rc_codes, rc) if rc: # This could result in ConnectionDroppedError or ProtocolClientError raise _create_error_from_rc_code(rc) @@ -546,16 +672,18 @@ def publish(self, topic, payload, qos=1, callback=None): :raises: ConnectionDroppedError if connection is dropped during execution. :raises: ProtocolClientError if there is some other client error. """ + self._paho_status.count_publish += 1 logger.info("publishing on {}".format(topic)) try: (rc, mid) = self._mqtt_client.publish(topic=topic, payload=payload, qos=qos) - except ValueError: - raise - except TypeError: + except (ValueError, TypeError) as e: + add_count_to_dict(self._paho_status.publish_exceptions, e) raise except Exception as e: + add_count_to_dict(self._paho_status.publish_exceptions, e) raise exceptions.ProtocolClientError("Unexpected Paho failure during publish") from e logger.debug("_mqtt_client.publish returned rc={}".format(rc)) + add_count_to_dict(self._paho_status.publish_rc_codes, rc) if rc: # Even though Paho returns a rc code indicating an error, it still stores the message # and will publish on connect, so this isn't really a failure - it just hangs. @@ -565,6 +693,59 @@ def publish(self, topic, payload, qos=1, callback=None): raise _create_error_from_rc_code(rc) self._op_manager.establish_operation(OperationType.PUBLISH, mid, callback) + def get_debug_status(self): + """ + Return an infomrational status object that describes the current state of the Paho transport + """ + + self._paho_status.time_since_last_paho_traffic_in = format_time_delta( + self._mqtt_client._last_msg_in + ) + self._paho_status.time_since_last_paho_traffic_out = format_time_delta( + self._mqtt_client._last_msg_out + ) + + self._paho_status.client_object_id = id(self._mqtt_client) + self._paho_status.thread_name = ( + self._mqtt_client._thread.name if self._mqtt_client._thread else "None" + ) + self._paho_status.thread_is_alive = ( + str(self._mqtt_client._thread.is_alive()) if self._mqtt_client._thread else "No thread" + ) + self._paho_status.len_out_mesage_queue = len(self._mqtt_client._out_messages) + self._paho_status.len_in_message_queue = len(self._mqtt_client._in_messages) + self._paho_status.len_out_pakcet_queue = len(self._mqtt_client._out_packet) + self._paho_status.thread_terminate = self._mqtt_client._thread_terminate + self._paho_status.paho_connection_state = self._mqtt_client._state + + return self._paho_status + + def get_debug_config(self): + """ + Return an infomrational status object that describes the configuration of the Paho transport + """ + config = PahoConfig() + + config.transport = self._mqtt_client._transport + config.protocol = str(self._mqtt_client._protocol) + config.keepalive = self._mqtt_client._keepalive + config.connect_timeout = self._mqtt_client._connect_timeout + config.reconnect_on_failure = self._mqtt_client._reconnect_on_failure + config.reconnect_delay_min = self._mqtt_client._reconnect_min_delay + config.reconnect_delay_max = self._mqtt_client._reconnect_max_delay + config.host = self._mqtt_client._host + config.port = self._mqtt_client._port + config.proxy_args = self._mqtt_client._proxy + config.socket_class = type(self._mqtt_client.socket()).__name__ + config.socket_name = ( + str(self._mqtt_client.socket().getsockname()) + if self._mqtt_client.socket() + and getattr(self._mqtt_client.socket(), "getsockname", None) + else "No socket name" + ) + + return config + class OperationType(Enum): PUBLISH = "PUBLISH" diff --git a/dev_utils/dev_utils/iptables.py b/dev_utils/dev_utils/iptables.py index 00eea78ab..f346176fb 100644 --- a/dev_utils/dev_utils/iptables.py +++ b/dev_utils/dev_utils/iptables.py @@ -64,28 +64,39 @@ def transport_to_port(transport): ) -def disconnect_output_port(disconnect_type, transport, host): +def disconnect_output_port(disconnect_type, transport, host=None): """ Disconnect the port for a given transport. disconnect_type can either be "DROP" to drop packets sent to that port, or it can be "REJECT" to reject packets sent to that port. """ # sudo -n iptables -A OUTPUT -p tcp --dport 8883 --destination 20.21.22.23 -j DROP - ip = get_ip(host) port = transport_to_port(transport) - run_shell_command( - "{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format( - get_sudo_prefix(), port, ip, disconnect_type + + if host: + ip = get_ip(host) + run_shell_command( + "{}iptables -A OUTPUT -p tcp --dport {} --destination {} -j {}".format( + get_sudo_prefix(), port, ip, disconnect_type + ) + ) + else: + run_shell_command( + "{}iptables -A OUTPUT -p tcp --dport {} -j {}".format( + get_sudo_prefix(), port, disconnect_type + ) ) - ) -def reconnect_all(transport, host): +def reconnect_all(transport, host=None): """ Reconnect all disconnects for this host and transport. Effectively, clean up anything that this module may have done. """ if not sys.platform.startswith("win"): - ip = get_ip(host) + if host: + ip = get_ip(host) + else: + ip = "" port = transport_to_port(transport) for disconnect_type in all_disconnect_types: # sudo -n iptables -L OUTPUT -n -v --line-numbers diff --git a/longhaul/.gitignore b/longhaul/.gitignore new file mode 100644 index 000000000..9d5389b73 --- /dev/null +++ b/longhaul/.gitignore @@ -0,0 +1 @@ +logs/**/* diff --git a/longhaul/longhaul.py b/longhaul/longhaul.py new file mode 100644 index 000000000..20491e268 --- /dev/null +++ b/longhaul/longhaul.py @@ -0,0 +1,541 @@ +# st ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +import os +import sys +import asyncio +import logging +import logging.handlers +import functools +import json +import random +import dataclasses +import time +import datetime +import gc +import collections +import glob +from blessings import Terminal +from azure.iot.device.aio import IoTHubDeviceClient +from azure.iot.device import Message, X509 + +DEVICE_ID = os.environ["IOTHUB_DEVICE_ID"] + +USE_WEBSOCKETS = True if os.environ.get("IOTHUB_WEBSOCKETS", False) else False + +# Maximum number of seconds between reconnect retries +MAX_WAIT_TIME_BETWEEN_RECONNECT_ATTEMPTS = 20 + +# How long to sleep between telemetry sends +MESSAGE_SEND_SLEEP_TIME = 1 + +# How often do we start taking heap snapshots, in seconds +HEAP_HISTORY_STARTING_INTERVAL = 10 + +# How many heap counts do we keep in the history? +HEAP_HISTORY_LENGTH = 4 + +# Interval, in seconds, for updating the display +DISPLAY_INTERVAL = 1 + +# Interval for rotating logs, in seconds +LOG_ROTATION_INTERVAL = 3600 + +# How many logs to keep before recycling +LOG_BACKUP_COUNT = 6 + +# Directory for storing log files +LOG_DIRECTORY = "./logs/{}".format(DEVICE_ID) + +# Prepare the log directory +os.makedirs(LOG_DIRECTORY, exist_ok=True) +for filename in glob.glob("{}/*.log".format(LOG_DIRECTORY)): + os.remove(filename) + + +log_formatter = logging.Formatter( + "%(asctime)s %(levelname)-5s (%(threadName)s) %(filename)s:%(funcName)s():%(message)s" +) + +paho_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/paho.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +paho_log_handler.setLevel(level=logging.DEBUG) +paho_log_handler.setFormatter(log_formatter) + +info_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/info.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +info_log_handler.setLevel(level=logging.INFO) +info_log_handler.setFormatter(log_formatter) + +debug_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/debug.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +debug_log_handler.setLevel(level=logging.DEBUG) +debug_log_handler.setFormatter(log_formatter) + +longhaul_log_handler = logging.FileHandler(filename="{}/longhaul.log".format(LOG_DIRECTORY)) +longhaul_log_handler.setLevel(level=logging.DEBUG) +longhaul_log_handler.setFormatter(log_formatter) + +root_logger = logging.getLogger() +root_logger.setLevel(level=logging.DEBUG) +root_logger.addHandler(info_log_handler) +root_logger.addHandler(debug_log_handler) + +paho_logger = logging.getLogger("paho") +paho_logger.addHandler(paho_log_handler) + + +logger = logging.getLogger(__name__) +logger.addHandler(longhaul_log_handler) + +term = Terminal() + +try: + # Copy Paho so time deltas work + time_func = time.monotonic +except AttributeError: + time_func = time.time + + +def format_time_delta(s): + if s: + return str(datetime.timedelta(seconds=time_func() - s)) + else: + return "infinity" + + +@dataclasses.dataclass +class HeapHistoryItem(object): + time: str + object_count: int + + +@dataclasses.dataclass(order=True) +class HeapHistoryStatus(object): + snapshot_interval: int + next_heap_snapshot: int + history: list + + def __init__(self): + super(HeapHistoryStatus, self).__init__() + self.snapshot_interval = HEAP_HISTORY_STARTING_INTERVAL + self.next_heap_snapshot = time_func() + self.snapshot_interval + self.history = [] + + +@dataclasses.dataclass(order=True) +class ReconnectStatus(object): + connect_loop_status: str = "new" + pipeline_connection_status: str = "" + + connect_count: int = 0 + disconnect_count: int = 0 + max_disconencted_time: int = 0 + + last_connect_time: int = 0 + last_disconnect_time: int = 0 + time_since_last_connect: str = "" + time_since_last_disconnect: str = "" + + +@dataclasses.dataclass(order=True) +class ExceptionStatus(object): + connect_exceptions: dict = dataclasses.field(default_factory=dict) + send_exceptions: dict = dataclasses.field(default_factory=dict) + + +@dataclasses.dataclass(order=True) +class IoTHubClientConfig(object): + client_class: str = "" + server_verification_cert: bool = False + gateway_hostname: str = "" + websockets: bool = False + cipher: str = "" + product_info: str = "" + proxy_options: dict = dataclasses.field(default_factory=dict) + sastoken_ttl: int = 0 + keep_alive: int = 0 + connection_retry: bool = False + connection_retry_interval: int = 0 + device_id: str = "" + module_id: str = "" + x509: bool = False + sastoken_class: str = "" + sastoken_signing_mechanism_class: str = "" + + +@dataclasses.dataclass(order=True) +class IoTHubClientStatus(object): + connection_state: str = "" + + +@dataclasses.dataclass(order=True) +class SendMessageStatus(object): + messages_sent: int = 0 + messages_queued: int = 0 + + last_message_sent_time: int = 0 + time_since_last_message_sent: str = "" + + +@dataclasses.dataclass(order=True) +class ClientStatus(object): + reconnect: ReconnectStatus + exception: ExceptionStatus + paho_status: dataclasses.dataclass + paho_config: dataclasses.dataclass + iothub_client_status: IoTHubClientStatus + iothub_client_config: IoTHubClientConfig + send_message: SendMessageStatus + heap_history: HeapHistoryStatus + + start_time: int = 0 + time_since_start: str = "" + device_id: str = DEVICE_ID + websockets: bool = USE_WEBSOCKETS + + def __init__(self): + super(ClientStatus, self).__init__() + self.reconnect = ReconnectStatus() + self.exception = ExceptionStatus() + self.paho_status = None + self.paho_config = None + self.iothub_client_status = IoTHubClientStatus() + self.iothub_client_config = IoTHubClientConfig() + self.send_message = SendMessageStatus() + self.heap_history = HeapHistoryStatus() + + +def wrap_in_try_catch(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + logger.error( + "Exception in {}: {}".format(func.__name__, get_type_name(e)), exc_info=True + ) + raise e + + return wrapper + + +def get_type_name(e): + return type(e).__name__ + + +def get_transport_from_device_client(device_client): + pipeline_root = device_client._mqtt_pipeline._pipeline + stage = pipeline_root + while stage.next: + stage = stage.next + return stage.transport + + +def get_iothub_client_config(iothub_client): + internal_config_object = iothub_client._mqtt_pipeline._nucleus.pipeline_configuration + config = IoTHubClientConfig() + + config.client_class = get_type_name(iothub_client) + config.server_verification_cert = ( + True if internal_config_object.server_verification_cert else False + ) + config.gateway_hostname = internal_config_object.gateway_hostname + config.websockets = internal_config_object.websockets + config.cipher = str(internal_config_object.cipher) + config.product_info = internal_config_object.product_info + config.proxy_options = internal_config_object.proxy_options + config.keep_alive = internal_config_object.keep_alive + config.connection_retry = internal_config_object.connection_retry + config.connection_retry_interval = internal_config_object.connection_retry_interval + config.device_id = internal_config_object.device_id + config.module_id = internal_config_object.module_id + config.x509 = True if internal_config_object.x509 else False + sastoken = internal_config_object.sastoken + config.sastoken_ttl = sastoken.ttl if sastoken else 0 + config.sastoken_class = get_type_name(sastoken) if sastoken else None + config.sastoken_signing_mechanism_class = ( + get_type_name(sastoken._signing_mechanism) + if sastoken and sastoken._signing_mechanism + else None + ) + + return config + + +def get_iothub_client_status(iothub_client): + status = IoTHubClientStatus() + nucleus = iothub_client._mqtt_pipeline._nucleus + status.connection_state = str(nucleus.connection_state) + return status + + +class Client(object): + async def init(self): + self.device_client = None + + self.outgoing_message_queue = asyncio.Queue() + + self.disconnected_event = asyncio.Event() + self.connected_event = asyncio.Event() + self.exit_app_event = asyncio.Event() + + self.disconnected_event.set() + self.status = ClientStatus() + + self.first_connect = True + + @wrap_in_try_catch + async def send_message_loop(self): + + while True: + await self.connected_event.wait() + next_message = await self.outgoing_message_queue.get() + self.status.send_message.messages_queued = self.outgoing_message_queue.qsize() + try: + await self.device_client.send_message(next_message) + self.status.send_message.last_message_sent_time = time_func() + except Exception as e: + t = get_type_name(e) + self.status.exception.send_exceptions[t] = ( + self.status.exception.send_exceptions.get(t, 0) + 1 + ) + await self.outgoing_message_queue.put(next_message) + else: + self.status.send_message.messages_sent += 1 + # TODO: queue here + + @wrap_in_try_catch + async def queue_message_loop(self): + messageId = 0 + while True: + messageId += 1 + message = Message( + json.dumps( + {"messageId": messageId, "message": "This is message #{}".format(messageId)} + ) + ) + await self.outgoing_message_queue.put(message) + self.status.send_message.messages_queued = self.outgoing_message_queue.qsize() + await asyncio.sleep(MESSAGE_SEND_SLEEP_TIME) + if self.exit_app_event.is_set(): + return + + @wrap_in_try_catch + async def reconnect_loop(self): + while True: + done, pending = await asyncio.wait( + [ + self.disconnected_event.wait(), + self.exit_app_event.wait(), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + await asyncio.gather(*done) + [x.cancel() for x in pending] + + if self.exit_app_event.is_set(): + self.status.reconnect.connect_loop_status = "exiting while connected" + return + + if self.first_connect: + sleep_time = 0 + self.first_connect = False + else: + sleep_time = random.random() * MAX_WAIT_TIME_BETWEEN_RECONNECT_ATTEMPTS + + self.status.reconnect.connect_loop_status = ( + "disconencted. waiting for {} seconds".format(round(sleep_time, 2)) + ) + + done, pending = await asyncio.wait( + [ + self.connected_event.wait(), + self.exit_app_event.wait(), + asyncio.sleep(sleep_time), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + await asyncio.gather(*done) + [x.cancel() for x in pending] + + if self.exit_app_event.is_set(): + self.status.reconnect.connect_loop_status = "exiting while disconnected" + return + + if self.device_client.connected: + self.status.reconnect.connect_loop_status = "connected" + else: + try: + self.status.reconnect.connect_loop_status = "connecting" + await self.device_client.connect() + self.status.reconnect.connect_loop_status = "connected" + except Exception as e: + t = get_type_name(e) + self.status.reconnect.connect_loop_status = "connect exception {}".format(t) + self.status.exception.connect_exceptions[t] = ( + self.status.exception.connect_exceptions.get(t, 0) + 1 + ) + + @property + def transport(self): + return get_transport_from_device_client(self.device_client) + + @wrap_in_try_catch + async def display_loop(self): + last_heap_counts = collections.Counter({}) + + while True: + + self.status.time_since_start = format_time_delta(self.status.start_time) + + self.status.paho_config = self.transport.get_debug_config() + self.status.paho_status = self.transport.get_debug_status() + self.status.iothub_client_config = get_iothub_client_config(self.device_client) + self.status.iothub_client_status = get_iothub_client_status(self.device_client) + + self.status.reconnect.time_since_last_connect = format_time_delta( + self.status.reconnect.last_connect_time + ) + self.status.reconnect.time_since_last_disconnect = format_time_delta( + self.status.reconnect.last_disconnect_time + ) + self.status.send_message.time_since_last_message_sent = format_time_delta( + self.status.send_message.last_message_sent_time + ) + + if time_func() >= self.status.heap_history.next_heap_snapshot: + gc.collect(2) + + self.status.heap_history.snapshot_interval *= 2 + self.status.heap_history.next_heap_snapshot = ( + time_func() + self.status.heap_history.snapshot_interval + ) + self.status.heap_history.history.append( + HeapHistoryItem( + time=str(datetime.datetime.now()), + object_count=len(gc.get_objects()), + ) + ) + self.status.heap_history.history = self.status.heap_history.history[ + -HEAP_HISTORY_LENGTH: + ] + + logger.info( + "Current Status: {}".format( + json.dumps(dataclasses.asdict(self.status), indent=2) + ) + ) + + heap_counts = collections.Counter([type(x).__name__ for x in gc.get_objects()]) + delta = collections.Counter(heap_counts) + delta.subtract(last_heap_counts) + + for key in list(delta.keys()): + if delta[key] == 0: + del delta[key] + + logger.info("Heap Delta: {}".format(json.dumps(delta, indent=2))) + last_heap_counts = heap_counts + + print(term.clear()) + if time_func() - self.status.start_time > 50: + self.status.paho_config = None + self.status.iothub_client_config = None + + print(json.dumps(dataclasses.asdict(self.status), indent=2)) + + done, pending = await asyncio.wait( + [ + self.exit_app_event.wait(), + asyncio.sleep(DISPLAY_INTERVAL), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + await asyncio.gather(*done) + [x.cancel() for x in pending] + + if self.exit_app_event.is_set(): + return + + @wrap_in_try_catch + async def handle_connection_state_change(self): + if self.device_client.connected: + self.status.reconnect.connect_count += 1 + self.status.reconnect.last_connect_time = time_func() + self.status.reconnect.pipeline_connection_status = "connected" + self.disconnected_event.clear() + self.connected_event.set() + else: + self.status.reconnect.disconnect_count += 1 + self.status.reconnect.last_disconnect_time = time_func() + self.status.reconnect.pipeline_connection_status = "disconnected" + self.disconnected_event.set() + self.connected_event.clear() + + async def main(self): + # Make sure this was run with `python -X dev longhaul.py`. + if not sys.flags.dev_mode: + print("please re-run with -X dev command line arguments") + sys.exit(1) + + await self.init() + + self.status.start_time = time_func() + + if "IOTHUB_DEVICE_CERT" in os.environ: + self.device_client = IoTHubDeviceClient.create_from_x509_certificate( + device_id=os.environ["IOTHUB_DEVICE_ID"], + hostname=os.environ["IOTHUB_HOSTNAME"], + x509=X509( + cert_file=os.environ["IOTHUB_DEVICE_CERT"], + key_file=os.environ["IOTHUB_DEVICE_KEY"], + ), + websockets=USE_WEBSOCKETS, + ) + else: + conn_str = os.getenv("IOTHUB_DEVICE_CONNECTION_STRING") + self.device_client = IoTHubDeviceClient.create_from_connection_string( + conn_str, websockets=USE_WEBSOCKETS + ) + self.device_client.on_connection_state_change = self.handle_connection_state_change + + tasks = [ + self.send_message_loop(), + self.queue_message_loop(), + self.reconnect_loop(), + self.display_loop(), + ] + + try: + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + await asyncio.gather(*done) + except Exception as e: + logger.error("Exception in main loop: {}".format(get_type_name(e))) + finally: + logger.warning("Exiting app") + self.exit_app_event.set() + logger.info("Waiting for all coroutines to exit") + await asyncio.wait_for( + asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED), timeout=5 + ) + await self.device_client.shutdown() + + +if __name__ == "__main__": + asyncio.run(Client().main()) diff --git a/scripts/packet_drop b/scripts/packet_drop new file mode 100755 index 000000000..f9bf2ddd8 --- /dev/null +++ b/scripts/packet_drop @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from dev_utils import iptables + +iptables.disconnect_output_port("DROP", "mqtt") +iptables.disconnect_output_port("DROP", "mqttws") diff --git a/scripts/packet_reject b/scripts/packet_reject new file mode 100755 index 000000000..35f642d5e --- /dev/null +++ b/scripts/packet_reject @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from dev_utils import iptables + +iptables.disconnect_output_port("REJECT", "mqtt") +iptables.disconnect_output_port("REJECT", "mqttws") diff --git a/scripts/packet_restore b/scripts/packet_restore new file mode 100755 index 000000000..db8fbd41d --- /dev/null +++ b/scripts/packet_restore @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from dev_utils import iptables + +iptables.reconnect_all("mqtt") +iptables.reconnect_all("mqttws")