From 31ad6aa7adf65c33e82555e36ddda78ce793590f Mon Sep 17 00:00:00 2001 From: GHAFHA <45609663+GHAFHA@users.noreply.github.com> Date: Wed, 24 Apr 2024 13:14:45 -0500 Subject: [PATCH 1/3] Refactor event parsing and add displacement calculations --- event.py | 14 +++++++------- function_app.py | 4 +++- models.py | 13 +++++++++---- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/event.py b/event.py index baa4468..ffe6496 100644 --- a/event.py +++ b/event.py @@ -2,21 +2,19 @@ import logging -class parser(): +class Parser(): def __init__(self, event: Event) -> None: self.event = event - - def parse(self)-> Event: - - + + def parse(self) -> Event: if self.event["name"] == "linpot": DataTransformer(self.event).handle_linpot() - name = self.event['name'] if self.event["name"] == "acclgyro": DataTransformer(self.event).handle_acclgyro() if self.event["name"] == "ecu": DataTransformer(self.event).handle_ecu() + class DataTransformer(): def __init__(self, event: Event): @@ -35,7 +33,9 @@ def handle_linpot(self) -> Event: RearLeft=self.event.fields['rear_left'], RearRight=self.event.fields['rear_right'] ) - linpot_event.calculate_displacements_mm() + displacement_in_mm = linpot_event.calculate_displacements_mm() + self.event.fields.update(displacement_in_mm) + except KeyError as e: logging.error("KeyError: %s", e) logging.error("Fields: %s", self.event.fields) diff --git a/function_app.py b/function_app.py index d5927e4..8bcc81e 100644 --- a/function_app.py +++ b/function_app.py @@ -1,5 +1,5 @@ from event import DataTransformer as DT -from event import parser as PS +from event import Parser from datetime import datetime import azure.functions as func import logging @@ -36,6 +36,8 @@ def eventhub_processor(azeventhub: func.EventHubEvent): # group all events that have the same event['tasg'] for event in events: + # Parse the event + Parser(event) tags = event['tags'] # Call frozen set because we can't hash a dictionary diff --git a/models.py b/models.py index a1db7c6..a1207a6 100644 --- a/models.py +++ b/models.py @@ -27,11 +27,16 @@ class LinpotEvent(BaseModel): rear_right: float def calculate_displacements_mm(self): - for key in ["Front Left", "Front Right", "Rear Left", "Rear Right"]: - # updating the lin pot values in place + + displacements = {} + linpots = ["front_left", "front_right", "rear_left", "rear_right"] + + for key in linpots: current_value = getattr(self, key) - new_value = -(current_value * LINPOT_CONVERSION_CONSTANT) + LINPOT_CONVERSION_OFFSET - setattr(self, key, new_value) + converted_to_mm = -(current_value * LINPOT_CONVERSION_CONSTANT) + LINPOT_CONVERSION_OFFSET + displacements[key] = converted_to_mm + + return displacements def calculate_wheel_loads(): pass From 0517e8a3df6208cebc24aebf398203e1a28e52c4 Mon Sep 17 00:00:00 2001 From: Squidtoon99 <49101235+Squidtoon99@users.noreply.github.com> Date: Mon, 29 Apr 2024 15:44:59 -0500 Subject: [PATCH 2/3] Refactor linpot event displacement calculation to use millimeters and update event fields --- event.py | 6 +++--- function_app.py | 3 +++ models.py | 15 ++++++++++----- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/event.py b/event.py index ffe6496..004bab0 100644 --- a/event.py +++ b/event.py @@ -33,13 +33,13 @@ def handle_linpot(self) -> Event: RearLeft=self.event.fields['rear_left'], RearRight=self.event.fields['rear_right'] ) - displacement_in_mm = linpot_event.calculate_displacements_mm() - self.event.fields.update(displacement_in_mm) + displacement_object = linpot_event.calculate_displacements_mm(self.event) + return displacement_object except KeyError as e: logging.error("KeyError: %s", e) logging.error("Fields: %s", self.event.fields) - return + return None def handle_acclgyro(self) -> Event: """ diff --git a/function_app.py b/function_app.py index b337175..e2749bf 100644 --- a/function_app.py +++ b/function_app.py @@ -12,6 +12,7 @@ import dns.resolver from can import Message from data_deserializer import MessageData +from event import DataTransformer, Parser dns.resolver.default_resolver=dns.resolver.Resolver(configure=False) dns.resolver.default_resolver.nameservers=['8.8.8.8'] @@ -49,6 +50,8 @@ def eventhub_processor(azeventhub: func.EventHubEvent): except Exception: # TODO: Make this a specific exception logging.error("[ECU] Error converting to dict: %s", msg) continue + elif event["tags"].get("source", "") == "linpot": + tags = event['tags'] # Call frozen set because we can't hash a dictionary diff --git a/models.py b/models.py index a1207a6..8abf25d 100644 --- a/models.py +++ b/models.py @@ -20,13 +20,17 @@ class LinpotEvent(BaseModel): ''' Class for keeping track of linpot event data ''' - # TODO rename the values for front left, front right, rear left, rear right and so on to not have spaces front_left: float front_right: float rear_left: float rear_right: float - def calculate_displacements_mm(self): + def calculate_displacements_mm(self, event: Event)-> Event: + """calcualtes displacements in mm from linpot values + + Returns: + dict: displacements in mm + """ displacements = {} linpots = ["front_left", "front_right", "rear_left", "rear_right"] @@ -36,9 +40,11 @@ def calculate_displacements_mm(self): converted_to_mm = -(current_value * LINPOT_CONVERSION_CONSTANT) + LINPOT_CONVERSION_OFFSET displacements[key] = converted_to_mm - return displacements + event.fields.update(displacements) + + return event - def calculate_wheel_loads(): + def calculate_wheel_loads(self): pass @@ -49,7 +55,6 @@ class AccelGyroEvent(BaseModel): def calculate_gforce(): pass - pass class ECUEvent(BaseModel): From 0946731eae5cef24c82f056c856863c44e31119c Mon Sep 17 00:00:00 2001 From: GHAFHA <45609663+GHAFHA@users.noreply.github.com> Date: Thu, 2 May 2024 00:18:11 -0500 Subject: [PATCH 3/3] Fix parsing and handling of events in event.py and function_app.py, new object --- data_deserializer.py | 2 +- event.py | 51 +++++++++++++++++++++++--------------------- function_app.py | 25 ++++++++++++---------- models.py | 7 ------ 4 files changed, 42 insertions(+), 43 deletions(-) diff --git a/data_deserializer.py b/data_deserializer.py index 40a17fa..f31a0ca 100644 --- a/data_deserializer.py +++ b/data_deserializer.py @@ -159,7 +159,7 @@ def wrapper(messsage: MessageData): if "data" in annotations: kwargs["data"] = messsage.data[ obj["start_position"] - - 1 : obj["start_position"] + - 1: obj["start_position"] + obj["length"] - 1 ] diff --git a/event.py b/event.py index ffe6496..0c30ec4 100644 --- a/event.py +++ b/event.py @@ -6,13 +6,16 @@ class Parser(): def __init__(self, event: Event) -> None: self.event = event - def parse(self) -> Event: - if self.event["name"] == "linpot": - DataTransformer(self.event).handle_linpot() - if self.event["name"] == "acclgyro": - DataTransformer(self.event).handle_acclgyro() - if self.event["name"] == "ecu": - DataTransformer(self.event).handle_ecu() + def parse(self) -> None: + try: + if self.event["name"] == "linpot": + DataTransformer(self.event).handle_linpot() + elif self.event["name"] == "accel": + DataTransformer(self.event).handle_acclgyro() + elif self.event["name"] == "gyro": + DataTransformer(self.event).handle_acclgyro() + except Exception as e: + logging.error("Error: %s, event not identifiable", e) class DataTransformer(): @@ -20,26 +23,33 @@ class DataTransformer(): def __init__(self, event: Event): self.event = event - def handle_linpot(self) -> Event: + def handle_linpot(self) -> LinpotEvent: """ apply displacement calculations to the event fields. this method is to handle all linpot transformations specifically """ - # for key in list of keys try: linpot_event = LinpotEvent( - FrontLeft=self.event.fields['front_left'], - FrontRight=self.event.fields['front_right'], - RearLeft=self.event.fields['rear_left'], - RearRight=self.event.fields['rear_right'] + fields={ + 'front_left': self.event.fields['front_left'], + 'front_right': self.event.fields['front_right'], + 'rear_left': self.event.fields['rear_left'], + 'rear_right': self.event.fields['rear_right'] + }, + name=self.event.name, + tags=self.event.tags, + timestamp=self.event.timestamp ) displacement_in_mm = linpot_event.calculate_displacements_mm() - self.event.fields.update(displacement_in_mm) - except KeyError as e: - logging.error("KeyError: %s", e) + for key, value in displacement_in_mm.items(): + linpot_event.fields[key] = value + + return linpot_event + + except Exception as e: + logging.error("Error: %s", e) logging.error("Fields: %s", self.event.fields) - return def handle_acclgyro(self) -> Event: """ @@ -47,10 +57,3 @@ def handle_acclgyro(self) -> Event: this method is to handle all acclgyro transformations specifically """ pass - - def handle_ecu(self) -> Event: - """ - apply ecu calculations to the event fields. - this method is to handle all ecu transformations specifically - """ - pass diff --git a/function_app.py b/function_app.py index b337175..92a5851 100644 --- a/function_app.py +++ b/function_app.py @@ -12,8 +12,8 @@ import dns.resolver from can import Message from data_deserializer import MessageData -dns.resolver.default_resolver=dns.resolver.Resolver(configure=False) -dns.resolver.default_resolver.nameservers=['8.8.8.8'] +dns.resolver.default_resolver = dns.resolver.Resolver(configure=False) +dns.resolver.default_resolver.nameservers = ['8.8.8.8'] app = func.FunctionApp() @@ -21,8 +21,8 @@ @app.function_name("EventHubTrigger1") @app.event_hub_message_trigger(arg_name="azeventhub", event_hub_name="metricforwarder", cardinality="many", - connection="metricsforward_metricmanager_EVENTHUB") -def eventhub_processor(azeventhub: func.EventHubEvent): + connection="metricsforward_metricmanager_EVENTHUB") +def eventhub_processor(azeventhub: func.EventHubEvent): events = [json.loads(event.get_body().decode('utf-8')) for event in azeventhub] logging.info("Processing %d events; first event: %s", len(events), json.dumps(events[0], indent=3)) @@ -34,6 +34,9 @@ def eventhub_processor(azeventhub: func.EventHubEvent): # group all events that have the same event['tasg'] for event in events: + + Parser(event).parse() + if event["tags"].get("source", "") == "ecu": arbitration_id = event["fields"].get("id", 0) raw_data = event["fields"].get("data", "") @@ -46,7 +49,7 @@ def eventhub_processor(azeventhub: func.EventHubEvent): msg_data = MessageData(msg) try: event["fields"] = json.loads(json.dumps(msg_data.to_dict(), default=str)) - except Exception: # TODO: Make this a specific exception + except Exception: # TODO: Make this a specific exception logging.error("[ECU] Error converting to dict: %s", msg) continue tags = event['tags'] @@ -70,8 +73,8 @@ def eventhub_processor(azeventhub: func.EventHubEvent): post_data = { "streams": [ { - "stream": {k:v for (k,v) in tags}, - "values": [ # this stupid conversion to nanoseconds. Who tf does logs in nanoseconds + "stream": {k: v for (k, v) in tags}, + "values": [ # this stupid conversion to nanoseconds. Who tf does logs in nanoseconds [str(dump["timestamp"] * 1000000), json.dumps(dump['fields'], default=str)] for dump in dumps ] } for tags, dumps in data.items() @@ -94,10 +97,10 @@ def eventhub_processor(azeventhub: func.EventHubEvent): documents = [] for doc in events: doc = { - "metadata": doc['tags'], - "timestamp": datetime.fromtimestamp(doc['timestamp'] / 1000), - **doc['fields'] - } + "metadata": doc['tags'], + "timestamp": datetime.fromtimestamp(doc['timestamp'] / 1000), + **doc['fields'] + } # validate timestamp because they suck if doc['timestamp'] > datetime.now() or doc['timestamp'] < datetime(2020, 1, 1): logging.error("Invalid timestamp: %s", doc['timestamp']) diff --git a/models.py b/models.py index a1207a6..69e1d30 100644 --- a/models.py +++ b/models.py @@ -50,10 +50,3 @@ def calculate_gforce(): pass pass - - -class ECUEvent(BaseModel): - ''' - Class for keeping track of ecu event data - ''' - pass