From 14fe7c424d0d88aba3bb263a0adad46fbaede543 Mon Sep 17 00:00:00 2001 From: Henrik Palmlund Wahlgren Date: Thu, 19 Aug 2021 10:51:00 +0200 Subject: [PATCH] Moved to attrs for messages and added protocol char. Fixes #23, #19 --- iec62056_21/messages.py | 172 +++++++++++++++------------------------- requirements.txt | 3 +- 2 files changed, 64 insertions(+), 111 deletions(-) diff --git a/iec62056_21/messages.py b/iec62056_21/messages.py index 49a1e47..7da6298 100644 --- a/iec62056_21/messages.py +++ b/iec62056_21/messages.py @@ -1,5 +1,6 @@ import re -import typing +from typing import * +import attr from iec62056_21.exceptions import Iec6205621ParseError, ValidationError from iec62056_21 import constants, utils @@ -40,6 +41,7 @@ def from_bytes(cls, bytes_data): return cls.from_representation(bytes_data.decode(constants.ENCODING)) +@attr.s(auto_attribs=True) class DataSet(Iec6205621Data): """ @@ -50,13 +52,9 @@ class DataSet(Iec6205621Data): EXCLUDE_CHARS = ["(", ")", "/", "!"] - def __init__(self, value: str, address: str = None, unit: str = None): - - # TODO: in programming mode, protocol mode C the value can be up to 128 chars - - self.address = address - self.value = value - self.unit = unit + value: str + address: Optional[str] = attr.ib(default=None) + unit: Optional[str] = attr.ib(default=None) def to_representation(self) -> str: if self.unit is not None and self.address is not None: @@ -70,7 +68,7 @@ def to_representation(self) -> str: return f"({self.value})" @classmethod - def from_representation(cls, data_set_string): + def from_representation(cls, data_set_string: str): just_value = regex_data_just_value.search(data_set_string) if just_value: @@ -92,30 +90,21 @@ def from_representation(cls, data_set_string): else: return cls(address=address, value=values_data, unit=None) - def __repr__(self): - return ( - f"{self.__class__.__name__}(" - f"value={self.value!r}, " - f"address={self.address!r}, " - f"unit={self.unit!r}" - f")" - ) - +@attr.s(auto_attribs=True) class DataLine(Iec6205621Data): """ A data line is a list of data sets. """ - def __init__(self, data_sets): - self.data_sets: typing.List[DataSet] = data_sets + data_sets: List[DataSet] def to_representation(self): sets_representation = [_set.to_representation() for _set in self.data_sets] return "".join(sets_representation) @classmethod - def from_representation(cls, string_data): + def from_representation(cls, string_data: str): """ Is a list of data sets id(value*unit)id(value*unit) need to split after each ")" @@ -132,20 +121,17 @@ def from_representation(cls, string_data): return cls(data_sets=data_sets) - def __repr__(self): - return f"{self.__class__.__name__}(" f"data_sets={self.data_sets!r}" f")" - +@attr.s(auto_attribs=True) class DataBlock(Iec6205621Data): """ A data block is a list of DataLines, each ended with a the line end characters \n\r """ - def __init__(self, data_lines): - self.data_lines = data_lines + data_lines: List[DataLine] - def to_representation(self): + def to_representation(self) -> str: lines_rep = [ (line.to_representation() + constants.LINE_END) for line in self.data_lines ] @@ -157,15 +143,12 @@ def from_representation(cls, string_data: str): data_lines = [DataLine.from_representation(line) for line in lines] return cls(data_lines) - def __repr__(self): - return f"{self.__class__.__name__}(data_lines={self.data_lines!r})" - +@attr.s(auto_attribs=True) class ReadoutDataMessage(Iec6205621Data): - def __init__(self, data_block): - self.data_block = data_block + data_block: DataBlock - def to_representation(self): + def to_representation(self) -> str: data = ( f"{constants.STX}{self.data_block.to_representation()}{constants.END_CHAR}" f"{constants.LINE_END}{constants.ETX}" @@ -186,27 +169,28 @@ def from_representation(cls, string_data: str): return cls(data_block=data_block) - def __repr__(self): - return f"{self.__class__.__name__}(data_block={self.data_block!r})" - +@attr.s(auto_attribs=True) class CommandMessage(Iec6205621Data): - allowed_commands = ["P", "W", "R", "E", "B"] - allowed_command_types = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"] + allowed_commands: ClassVar[List[str]] = ["P", "W", "R", "E", "B"] + allowed_command_types: ClassVar[List[str]] = [ + "0", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + ] + + command: str + command_type: str + data_set: Optional[DataSet] = attr.ib(default=None) - def __init__( - self, command: str, command_type: str, data_set: typing.Optional[DataSet] - ): - self.command = command - self.command_type = command_type - self.data_set = data_set - - if command not in self.allowed_commands: - raise ValueError(f"{command} is not an allowed command") - if command_type not in self.allowed_command_types: - raise ValueError(f"{command_type} is not an allowed command type") - - def to_representation(self): + def to_representation(self) -> str: header = f"{constants.SOH}{self.command}{self.command_type}" if self.data_set: body = f"{constants.STX}{self.data_set.to_representation()}{constants.ETX}" @@ -245,36 +229,28 @@ def for_single_write(cls, address, value): data_set = DataSet(value=value, address=address) return cls(command="W", command_type="1", data_set=data_set) - def __repr__(self): - return ( - f"{self.__class__.__name__}(" - f"command={self.command!r}, " - f"command_type={self.command_type!r}, " - f"data_set={self.data_set!r}" - f")" - ) - +@attr.s(auto_attribs=True) class AnswerDataMessage(Iec6205621Data): - def __init__(self, data_block): - self.data_block = data_block - self._data = None + + data_block: DataBlock + cached_data: Optional[List[DataSet]] = attr.ib(default=None, init=False) @property def data(self): - if not self._data: - self._get_all_data_sets() + if not self.cached_data: + self.get_all_data_sets() - return self._data + return self.cached_data - def _get_all_data_sets(self): + def get_all_data_sets(self): data_sets = list() for line in self.data_block.data_lines: - for set in line.data_sets: - data_sets.append(set) + for data_set in line.data_sets: + data_sets.append(data_set) - self._data = data_sets + self.cached_data = data_sets def to_representation(self): # TODO: this is not valid in case reading out partial blocks. @@ -295,15 +271,12 @@ def from_representation(cls, string_data): return cls(data_block=data_block) - def __repr__(self): - return f"{self.__class__.__name__}(data_block={self.data_block!r})" - +@attr.s(auto_attribs=True) class RequestMessage(Iec6205621Data): - def __init__(self, device_address=""): - self.device_address = device_address + device_address: str = attr.ib(default="") - def to_representation(self): + def to_representation(self) -> str: return ( f"{constants.START_CHAR}{constants.REQUEST_CHAR}{self.device_address}" f"{constants.END_CHAR}{constants.LINE_END}" @@ -314,44 +287,32 @@ def from_representation(cls, string_data): device_address = string_data[2:-3] return cls(device_address) - def __repr__(self): - return f"{self.__class__.__name__}(device_address={self.device_address!r})" - +@attr.s(auto_attribs=True) class AckOptionSelectMessage(Iec6205621Data): - """ - Only support protocol mode 0: Normal - """ + """""" - def __init__(self, baud_char, mode_char): - self.baud_char = baud_char - self.mode_char = mode_char + baud_char: str + mode_char: str + protocol_char: str = attr.ib(default="0") def to_representation(self): - return f"{constants.ACK}0{self.baud_char}{self.mode_char}{constants.LINE_END}" + return f"{constants.ACK}{self.protocol_char}{self.baud_char}{self.mode_char}{constants.LINE_END}" @classmethod def from_representation(cls, string_data): + protocol_char = string_data[1] baud_char = string_data[2] mode_char = string_data[3] - return cls(baud_char, mode_char) - - def __repr__(self): - return ( - f"{self.__class__.__name__}(" - f"baud_char={self.baud_char!r}, " - f"mode_char={self.mode_char!r}" - f")" - ) + return cls(baud_char, mode_char, protocol_char=protocol_char) +@attr.s(auto_attribs=True) class IdentificationMessage(Iec6205621Data): - def __init__( - self, identification: str, manufacturer: str, switchover_baudrate_char: str - ): - self.identification: str = identification - self.manufacturer: str = manufacturer - self.switchover_baudrate_char: str = switchover_baudrate_char + + identification: str + manufacturer: str + switchover_baudrate_char: str def to_representation(self): return ( @@ -366,12 +327,3 @@ def from_representation(cls, string_data): identification = string_data[6:-2] return cls(identification, manufacturer, switchover_baudrate_char) - - def __repr__(self): - return ( - f"{self.__class__.__name__}(" - f"identification={self.identification!r}, " - f"manufacturer={self.manufacturer!r}, " - f"switchover_baudrate_char={self.switchover_baudrate_char!r}" - f")" - ) diff --git a/requirements.txt b/requirements.txt index 346d49d..f371d18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ -attrs==19.1.0 +attrs==21.2.0 +pyserial==3.5 #pytest pytest==4.4.1