Skip to content

Commit

Permalink
TopicMerge error handling and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
klpanagi committed Jan 19, 2025
1 parent 0f06665 commit 6f816f3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
21 changes: 19 additions & 2 deletions commlib/aggregation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import functools
import logging
from typing import Any, Dict, List
from commlib.connection import BaseConnectionParameters
from commlib.node import Node

aggregation_logger = None


class TopicMerge:
Expand All @@ -17,6 +19,16 @@ def __init__(self, broker_params: BaseConnectionParameters,
self.node = Node(node_name="TopicMerge",
connection_params=self.broker_params,
debug=False, heartbeats=False)
@classmethod
def logger(cls) -> logging.Logger:
global aggregation_logger
if aggregation_logger is None:
aggregation_logger = logging.getLogger(__name__)
return aggregation_logger

@property
def log(self):
return self.logger()

def create_subscriptions(self):
for topic in self.input_topics:
Expand All @@ -33,8 +45,13 @@ def create_publisher(self):
def on_msg_internal(self, processors: Dict[str, callable],
payload: Dict[str, Any], topic: str):
for proc in processors:
payload = proc(payload)
self.pub.publish(topic=self.output_topic, msg=payload)
try:
payload = proc(payload)
self.pub.publish(topic=self.output_topic, msg=payload)
self.log.info(f"Processed message: {payload}")
except Exception as e:
self.log.error(f"Error processing message: {e}")
continue

def start(self):
self.create_publisher()
Expand Down
11 changes: 3 additions & 8 deletions examples/topic_merge/topic_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
from commlib.aggregation import TopicMerge


def on_message(msg):
# pass
print(f"Received front sonar data: {msg}")


if __name__ == "__main__":
if len(sys.argv) < 2:
broker = "redis"
Expand All @@ -30,10 +25,10 @@ def on_message(msg):
sys.exit(1)
conn_params = ConnectionParameters()

input_topics = ["goaldsl.*.event"]
output_topic = "merged_goal_event_topic"
input_topics = ["goaldsl.1.event"]
output_topic = "goaldsl.1.event"
processors = {
"goaldsl.*.event": [
"goaldsl.1.event": [
lambda msg: {
"position": {
"x": msg["x"], "y": msg["y"], "z": 0
Expand Down

0 comments on commit 6f816f3

Please sign in to comment.