Skip to content

Commit

Permalink
Move external functions and dataclasses to telemetry_utils
Browse files Browse the repository at this point in the history
  • Loading branch information
EliasJRH committed Feb 25, 2024
1 parent 62c7e2a commit a1ec7ce
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 258 deletions.
266 changes: 8 additions & 258 deletions modules/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@

from signal import signal, SIGTERM
from time import time, sleep
from typing import Any, TypeAlias, List
from typing import Any, TypeAlias

import modules.telemetry.json_packets as jsp
import modules.websocket.commands as wsc
from modules.misc.config import Config
from modules.telemetry.replay import TelemetryReplay
from modules.telemetry.sd_block import LoggingMetadataSpacerBlock
from modules.telemetry.superblock import SuperBlock, Flight
from modules.misc.config import Config
import modules.telemetry.v1.data_block as v1db
import modules.telemetry.telemetry_utils as tu
from modules.telemetry.telemetry_utils import (
mission_path,
get_filepath_for_proposed_name,
parse_rn2483_transmission,
ParsedBlock,
)
from modules.telemetry.telemetry_errors import MissionNotFoundError, AlreadyRecordingError, ReplayPlaybackError
from modules.telemetry.v1.block import PacketHeader, BlockHeader, DeviceAddress

# Types

Expand All @@ -45,154 +48,12 @@
logger = logging.getLogger(__name__)


# Helper functions
def mission_path(mission_name: str, missions_dir: Path, file_suffix: int = 0) -> Path:
"""Returns the path to the mission file with the matching mission name."""

return missions_dir.joinpath(f"{mission_name}{'' if file_suffix == 0 else f'_{file_suffix}'}.{MISSION_EXTENSION}")


def shutdown_sequence() -> None:
for child in active_children():
child.terminate()
exit(0)


def get_filepath_for_proposed_name(mission_name: str, missions_dir: Path) -> Path:
"""Obtains filepath for proposed name, with a maximum of giving a suffix 50 times before failing."""
file_suffix = 1
missions_filepath = mission_path(mission_name, missions_dir)

while missions_filepath.is_file() and file_suffix < FILE_CREATION_ATTEMPT_LIMIT:
missions_filepath = mission_path(mission_name, missions_dir, file_suffix)
file_suffix += 1

if file_suffix >= FILE_CREATION_ATTEMPT_LIMIT:
raise ValueError(f"Too many mission files already exist with name {mission_name}.")

return missions_filepath


@dataclass
class ParsedBlock:
"""Parsed block data from the telemetry process."""

# mission_time: int
block_name: str
block_header: BlockHeader
block_contents: v1db.DataBlock


@dataclass
class ParsedTransmission:
"""Parsed transmission data from the telemetry process."""

packet_header: PacketHeader
blocks: List[ParsedBlock]


def parse_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_contents: str) -> ParsedBlock:
"""
Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.
"""

# Working with hex strings until this point.
# Hex/Bytes Demarcation point
logger.debug(
f"Parsing v{pkt_version} type {block_header.message_type} subtype {block_header.message_subtype} contents: {hex_block_contents}"
)
block_bytes: bytes = bytes.fromhex(hex_block_contents)

try:
block_subtype = v1db.DataBlockSubtype(block_header.message_subtype)
block_contents = v1db.DataBlock.parse(block_subtype, block_bytes)
block_name = block_subtype.name.lower()

logger.debug(f"Data block parsed with mission time {block_contents.mission_time}")
logger.info(str(block_contents))

# if block == DataBlockSubtype.STATUS:
# self.status.rocket = jsp.RocketData.from_data_block(block)
# return

# return ParsedBlock(block.mission_time, block_name, block_header, block)
return ParsedBlock(block_name, block_header, block_contents)

except NotImplementedError:
logger.warning(
f"Block parsing for type {block_header.message_type}, with subtype {block_header.message_subtype} not implemented!"
)
except ValueError:
logger.error("Invalid data block subtype")


def parse_rn2483_transmission(data: str, config: Config) -> ParsedTransmission:
"""Parses RN2483 Packets and extracts our telemetry payload blocks"""
# List of parsed blocks
parsed_blocks: list[ParsedBlock] = []

# Extract the packet header
data = data.strip() # Sometimes some extra whitespace
logger.debug(f"Full data string: {data}")
pkt_hdr = PacketHeader.from_hex(data[:32])

if len(pkt_hdr) <= 32: # If this packet nothing more than just the header
logger.info(f"{pkt_hdr}")

blocks = data[32:] # Remove the packet header

if not is_valid_packet_header(pkt_hdr, config.approved_callsigns): # Return immediately if packet header is invalid
return

# Parse through all blocks
while blocks != "":
# Parse block header
logger.debug(f"Blocks: {blocks}")
logger.debug(f"Block header: {blocks[:8]}")
block_header = BlockHeader.from_hex(blocks[:8])

# Select block contents
block_len = len(block_header) * 2 # Convert length in bytes to length in hex symbols
block_contents = blocks[8:block_len]
logger.debug(f"Block info: {block_header}")

# Block Header Validity
if not block_header.valid:
logger.error("Block header contains invalid block type values, skipping block")
blocks = blocks[block_len:]
continue

# Check if message is destined for ground station for processing
if block_header.destination in [DeviceAddress.GROUND_STATION, DeviceAddress.MULTICAST]:
parsed_blocks.append(
parse_radio_block(pkt_hdr.version, block_header, block_contents)
) # Append parsed block to list
else:
logger.warning("Invalid destination address")

# Remove the data we processed from the whole set, and move onto the next data block
blocks = blocks[block_len:]
return ParsedTransmission(pkt_hdr, parsed_blocks)


def is_valid_packet_header(pkt_hdr: PacketHeader, approved_callsigns: dict[str, str]) -> bool:
"""Validates the packet header"""

# Ensure packet is from an approved call sign
if pkt_hdr.callsign in approved_callsigns:
logger.info(f"Incoming packet from {pkt_hdr.callsign} ({approved_callsigns.get(pkt_hdr.callsign)})")
else:
logger.warning(f"Incoming packet from unauthorized call sign {pkt_hdr.callsign}")
return False

# Ensure packet version compatibility
if pkt_hdr.version < SUPPORTED_ENCODING_VERSION:
logger.error(f"This version of ground station does not support encoding below {SUPPORTED_ENCODING_VERSION}")
return False

return True


# Main class
class Telemetry(Process):
def __init__(
Expand Down Expand Up @@ -516,44 +377,6 @@ def recording_write_bytes(self, num_bytes: int, spacer: bool = False) -> None:
spacer_block = LoggingMetadataSpacerBlock(512 - (num_bytes % 512))
_ = self.mission_recording_file.write(spacer_block.to_bytes())

@DeprecationWarning
def parse_radio_block(self, version: int, block_type: int, block_subtype: int, contents: str) -> None:
"""
Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.
"""

# Working with hex strings until this point.
# Hex/Bytes Demarcation point
logger.debug(f"Parsing v{version} type {block_type} subtype {block_subtype} contents: {contents}")
block_contents: bytes = bytes.fromhex(contents)

try:
block = v1db.DataBlock.parse(v1db.DataBlockSubtype(block_subtype), block_contents)
# logger.debug(f"Data block parsed with mission time {block.mission_time}")
logger.info(str(block))

# Increase the last mission time
if block.mission_time > self.status.mission.last_mission_time:
self.status.mission.last_mission_time = block.mission_time

# if block == DataBlockSubtype.STATUS:
# self.status.rocket = jsp.RocketData.from_data_block(block)
# return

block_name = v1db.DataBlockSubtype(block_subtype).name.lower()
# Stores the last n packets into the telemetry data buffer
if self.telemetry.get(block_name) is None:
self.telemetry[block_name] = [dict(block)] # type:ignore
else:
self.telemetry[block_name].append(dict(block))
if len(self.telemetry[block_name]) > self.config.telemetry_buffer_size:
self.telemetry[block_name].pop(0)

except NotImplementedError:
logger.warning(f"Block parsing for type {block_type}, with subtype {block_subtype} not implemented!")
except ValueError:
logger.error("Invalid data block subtype")

def process_transmission(self, data: str) -> None:
"""Processes the incoming radio transmission data."""
# Parse the transmission
Expand All @@ -570,79 +393,6 @@ def process_transmission(self, data: str) -> None:
# buffer_length = len(self.mission_recording_buffer)
# self.recording_write_bytes(buffer_length - (buffer_length % 512))

# def parse_rn2483_transmission(self, data: str):
# """Parses RN2483 Packets and extracts our telemetry payload blocks"""

# # Extract the packet header
# data = data.strip() # Sometimes some extra whitespace
# logger.debug(f"Full data string: {data}")
# pkt_hdr = PacketHeader.from_hex(data[:32])

# if len(pkt_hdr) <= 32: # If this packet nothing more than just the header
# logger.info(f"{pkt_hdr}")

# blocks = data[32:] # Remove the packet header

# if not self.is_valid_packet_header(pkt_hdr):
# return

# # Parse through all blocks
# while blocks != "":
# # Parse block header
# logger.debug(f"Blocks: {blocks}")
# logger.debug(f"Block header: {blocks[:8]}")
# block_header = BlockHeader.from_hex(blocks[:8])

# # Select block contents
# block_len = len(block_header) * 2 # Convert length in bytes to length in hex symbols
# block_contents = blocks[8:block_len]
# logger.debug(f"Block info: {block_header}")

# # Block Header Validity
# if not block_header.valid:
# logger.error("Block header contains invalid block type values, skipping block")
# blocks = blocks[block_len:]
# continue

# # Check if message is destined for ground station for processing
# if block_header.destination in [DeviceAddress.GROUND_STATION, DeviceAddress.MULTICAST]:
# cur_block: ParsedBlock = parse_radio_block(pkt_hdr.version, block_header, block_contents)

# self.update(cur_block)

# # TODO UPDATE FOR V1
# # Write data to file when recording
# # if self.status.mission.recording:
# # logger.debug(f"Recording: {self.status.mission.recording}")
# # self.mission_recording_buffer += TelemetryDataBlock(block.subtype, data=block).to_bytes()
# # if len(self.mission_recording_buffer) >= 512:
# # buffer_length = len(self.mission_recording_buffer)
# # self.recording_write_bytes(buffer_length - (buffer_length % 512))
# else:
# logger.warning("Invalid destination address")

# # Remove the data we processed from the whole set, and move onto the next data block
# blocks = blocks[block_len:]

# def is_valid_packet_header(self, pkt_hdr: PacketHeader) -> bool:
# """Validates the packet header"""

# # Ensure packet is from an approved call sign
# if pkt_hdr.callsign in self.config.approved_callsigns:
# logger.info(
# f"Incoming packet from {pkt_hdr.callsign} ({self.config.approved_callsigns.get(pkt_hdr.callsign)})"
# )
# else:
# logger.warning(f"Incoming packet from unauthorized call sign {pkt_hdr.callsign}")
# return False

# # Ensure packet version compatibility
# if pkt_hdr.version < SUPPORTED_ENCODING_VERSION:
# logger.error(f"This version of ground station does not support encoding below {SUPPORTED_ENCODING_VERSION}")
# return False

# return True

def update(self, parsed_data: ParsedBlock) -> None:
"""Updates the telemetry buffer with the latest block data and latest mission time."""
if parsed_data.block_contents.mission_time > self.status.mission.last_mission_time:
Expand Down
Loading

0 comments on commit a1ec7ce

Please sign in to comment.