Skip to content

Commit

Permalink
* BUGFIX: Sending data in json serialized format
Browse files Browse the repository at this point in the history
  • Loading branch information
lewis-chambers committed Jun 4, 2024
1 parent 2b3562b commit 7d69d75
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/iotdevicesimulator/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async def run(self, oracle: Oracle, query: CosmosQuery,
self._instance_logger.warn(f"No data found.")
else:
self._instance_logger.debug(f"Cycle {self.cycle+1}/{self.max_cycles} Read data from: {row["DATE_TIME"]}")
message_connection.send_message(str(row), self.topic)
message_connection.send_message(row, self.topic)
self._instance_logger.info(f"Sent message to: {self.topic}")

self.cycle += 1
Expand Down
3 changes: 2 additions & 1 deletion src/iotdevicesimulator/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ async def main(config_path: str):
mqtt_connection,
oracle_config,
swarm_name="soilmet",
delay_start=True,
delay_start=False,
max_cycles=5,
max_sites=5,
sleep_time=30,
site_ids="MORLY",
)
await swarm.run()

Expand Down
73 changes: 33 additions & 40 deletions src/iotdevicesimulator/messaging/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
from awscrt.exceptions import AwsCrtError
from iotdevicesimulator.messaging.core import MessagingBaseClass
from iotdevicesimulator.messaging.utils import json_serial
import backoff
import logging

Expand All @@ -19,6 +20,9 @@ class IotCoreMQTTConnection(MessagingBaseClass):
connection: awscrt.mqtt.Connection | None = None
"""A connection to the MQTT endpoint."""

_instance_logger: logging.Logger
"""Logger handle used by instance."""

def __init__(
self,
endpoint: str,
Expand Down Expand Up @@ -116,90 +120,79 @@ def __init__(
on_connection_closed=self._on_connection_closed,
)

@staticmethod
def _on_connection_interrupted(connection, error, **kwargs): # pragma: no cover
self._instance_logger = logger.getChild(f"client-{client_id}")

def _on_connection_interrupted(
self, connection, error, **kwargs
): # pragma: no cover
"""Callback when connection accidentally lost."""
print("Connection interrupted. error: {}".format(error))
self._instance_logger.debug("Connection interrupted. error: {}".format(error))

@staticmethod
def _on_connection_resumed(
connection, return_code, session_present, **kwargs
self, connection, return_code, session_present, **kwargs
): # pragma: no cover
"""Callback when an interrupted connection is re-established."""

print(
self._instance_logger.debug(
"Connection resumed. return_code: {} session_present: {}".format(
return_code, session_present
)
)

@staticmethod
def _on_connection_success(connection, callback_data): # pragma: no cover
def _on_connection_success(self, connection, callback_data): # pragma: no cover
"""Callback when the connection successfully connects."""

assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print(
self._instance_logger.debug(
"Connection Successful with return code: {} session present: {}".format(
callback_data.return_code, callback_data.session_present
)
)

@staticmethod
def _on_connection_failure(connection, callback_data): # pragma: no cover
def _on_connection_failure(self, connection, callback_data): # pragma: no cover
"""Callback when a connection attempt fails."""

assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print("Connection failed with error code: {}".format(callback_data.error))
self._instance_logger.debug(
"Connection failed with error code: {}".format(callback_data.error)
)

@staticmethod
def _on_connection_closed(connection, callback_data): # pragma: no cover
def _on_connection_closed(self, connection, callback_data): # pragma: no cover
"""Callback when a connection has been disconnected or shutdown successfully"""
print("Connection closed\n")
self._instance_logger.debug("Connection closed\n")

@backoff.on_exception(backoff.expo, exception=AwsCrtError, logger=logger)
def _connect(self): # pragma: no cover
self._instance_logger.debug("Connecting to endpoint")
connect_future = self.connection.connect()
connect_future.result()

@backoff.on_exception(backoff.expo, exception=AwsCrtError, logger=logger)
def _disconnect(self): # pragma: no cover
self._instance_logger.debug("Disconnecting from endpoint")
disconnect_future = self.connection.disconnect()
disconnect_future.result()

def send_message(
self, message: str, topic: str, count: int = 1
) -> None: # pragma: no cover
def send_message(self, message: dict, topic: str) -> None: # pragma: no cover
"""Sends a message to the endpoint.
Args:
message: The message to send.
topic: MQTT topic to send message under.
count: How many times to repeat the message. If 0, it sends forever.
"""

if not message:
logging.error(f"No message to send for topic: {topic}")
return

self._connect()

# Publish message to server desired number of times.
# This step is skipped if message is blank.
# This step loops forever if count was set to 0.
if message:
if count == 0:
logger.info("Sending messages until program killed")
else:
logger.info("Sending {} message(s)".format(count))

publish_count = 1
while (publish_count <= count) or (count == 0):
message_text = "{} [{}]".format(message, publish_count)
message_json = json.dumps(message_text)
self.connection.publish(
topic=topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE,
)

if count > 1:
time.sleep(1)
publish_count += 1
payload = json.dumps(message, default=json_serial)
self.connection.publish(
topic=topic,
payload=payload,
qos=mqtt.QoS.AT_LEAST_ONCE,
)

self._disconnect()
10 changes: 10 additions & 0 deletions src/iotdevicesimulator/messaging/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from datetime import date, datetime


def json_serial(obj):
"""Serializes `obj` into a json object."""

if isinstance(obj, (datetime, date)):
return obj.isoformat(timespec="microseconds")

raise TypeError(f"Type {type(obj)} is not serializable.")
12 changes: 1 addition & 11 deletions src/iotdevicesimulator/scripts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@
type=click.Path(exists=True),
help="Path to a logging config file. Uses default if not given.",
)
@click.option(
"--log-level",
type=click.Choice(
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
),
help="Sets the level of logs outputted.",
)
def main(ctx: click.Context, log_config: Path, log_level: str):
def main(ctx: click.Context, log_config: Path):
"""Core group of the cli."""
ctx.ensure_object(dict)

Expand All @@ -35,9 +28,6 @@ def main(ctx: click.Context, log_config: Path, log_level: str):

logging.config.fileConfig(fname=log_config)

if log_level:
logging.getLogger().setLevel(log_level)


@main.group()
@click.pass_context
Expand Down
3 changes: 3 additions & 0 deletions src/iotdevicesimulator/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ async def create(
self.oracle, CosmosSiteQuery[self.query.name]
)

if isinstance(site_ids, str):
site_ids = [site_ids]

self.sites = self._init_sites(
site_ids,
sleep_time=self.sleep_time,
Expand Down

0 comments on commit 7d69d75

Please sign in to comment.