Skip to content

Commit

Permalink
reformat all files with new black config
Browse files Browse the repository at this point in the history
  • Loading branch information
drunsinn committed Oct 7, 2023
1 parent 7d01c7e commit 65d8709
Show file tree
Hide file tree
Showing 21 changed files with 210 additions and 658 deletions.
407 changes: 108 additions & 299 deletions pyLSV2/client.py

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions pyLSV2/dat_cls.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,9 +1150,7 @@ def normalized_name(self) -> str:


class ScopeSignalData:
def __init__(
self, channel: int, signal: int, offset: int, factor: float, unit: str
):
def __init__(self, channel: int, signal: int, offset: int, factor: float, unit: str):
self._channel = channel
self._signal = signal
self._offset = offset
Expand Down
25 changes: 6 additions & 19 deletions pyLSV2/low_level_com.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ def __init__(self, hostname: str, port: int = 19000, timeout: float = 15.0):
try:
self._host_ip = socket.gethostbyname(hostname)
except socket.gaierror:
logging.error(
"there was an error getting the IP for the hostname %s", hostname
)
logging.error("there was an error getting the IP for the hostname %s", hostname)
raise

self._port = self.DEFAULT_PORT
Expand Down Expand Up @@ -182,10 +180,7 @@ def telegram(
telegram,
)
if len(telegram) >= self.buffer_size:
raise OverflowError(
"telegram to long for set current buffer size: %d >= %d"
% (len(telegram), self.buffer_size)
)
raise OverflowError("telegram to long for set current buffer size: %d >= %d" % (len(telegram), self.buffer_size))

data_recived = bytearray()
try:
Expand All @@ -201,22 +196,16 @@ def telegram(
raise

if len(data_recived) > 0:
self._logger.debug(
"received block of data with length %d", len(data_recived)
)
self._logger.debug("received block of data with length %d", len(data_recived))
if len(data_recived) >= 8:
# read 4 bytes for response length
response_length = struct.unpack("!L", data_recived[0:4])[0]

# read 4 bytes for response type
self._last_lsv2_response = RSP(
data_recived[4:8].decode("utf-8", "ignore")
)
self._last_lsv2_response = RSP(data_recived[4:8].decode("utf-8", "ignore"))
else:
# response is less than 8 bytes long which is not enough space for package length and response message!
raise LSV2ProtocolException(
"response to short, less than 8 bytes: %s" % data_recived
)
raise LSV2ProtocolException("response to short, less than 8 bytes: %s" % data_recived)
else:
response_length = 0
self._last_lsv2_response = RSP.NONE
Expand All @@ -229,9 +218,7 @@ def telegram(
len(response_content) < response_length,
)
try:
response_content.extend(
self._tcpsock.recv(response_length - len(data_recived[8:]))
)
response_content.extend(self._tcpsock.recv(response_length - len(data_recived[8:])))
except Exception:
self._logger.error(
"something went wrong while waiting for more data to arrive. expected %d, received %d, content so far: %s",
Expand Down
20 changes: 5 additions & 15 deletions pyLSV2/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def decode_system_parameters(result_set: bytearray) -> ld.SystemParameters:
elif message_length == 124:
info_list = struct.unpack("!14L8B8L2BH4B2L2HLL", result_set)
else:
raise LSV2DataException(
"unexpected length %s of message content %s" % (message_length, result_set)
)
raise LSV2DataException("unexpected length %s of message content %s" % (message_length, result_set))
sys_par = ld.SystemParameters()
sys_par.markers_start_address = info_list[0]
sys_par.number_of_markers = info_list[1]
Expand Down Expand Up @@ -105,9 +103,7 @@ def decode_system_information(data_set: bytearray) -> Union[bool, int]:
return data_set


def decode_file_system_info(
data_set: bytearray, control_type: ControlType = ControlType.UNKNOWN
) -> ld.FileEntry:
def decode_file_system_info(data_set: bytearray, control_type: ControlType = ControlType.UNKNOWN) -> ld.FileEntry:
"""
Decode result from file system entry
Expand Down Expand Up @@ -186,12 +182,8 @@ def decode_drive_info(data_set: bytearray) -> List[ld.DriveEntry]:
while (offset + fixed_length + 1) < len(data_set):
drive_entry = ld.DriveEntry()
drive_entry.unknown_0 = struct.unpack("!L", data_set[offset : offset + 4])[0]
drive_entry.unknown_1 = struct.unpack("!4s", data_set[offset + 4 : offset + 8])[
0
]
drive_entry.unknown_2 = struct.unpack("!L", data_set[offset + 8 : offset + 12])[
0
]
drive_entry.unknown_1 = struct.unpack("!4s", data_set[offset + 4 : offset + 8])[0]
drive_entry.unknown_2 = struct.unpack("!L", data_set[offset + 8 : offset + 12])[0]

if chr(data_set[offset + fixed_length]) == ":":
drive_entry.name = ba_to_ustr(data_set[offset + 12 : offset + 17])
Expand Down Expand Up @@ -235,9 +227,7 @@ def decode_tool_info(data_set: bytearray) -> ld.ToolInformation:
tool_info = ld.ToolInformation()
tool_info.number = struct.unpack("!L", data_set[0:4])[0]
tool_info.index = struct.unpack("!H", data_set[4:6])[0]
tool_info.axis = {0: "X", 1: "Y", 2: "Z"}.get(
struct.unpack("!H", data_set[6:8])[0], "unknown"
)
tool_info.axis = {0: "X", 1: "Y", 2: "Z"}.get(struct.unpack("!H", data_set[6:8])[0], "unknown")
if len(data_set) > 8:
tool_info.length = struct.unpack("<d", data_set[8:16])[0]
tool_info.radius = struct.unpack("<d", data_set[16:24])[0]
Expand Down
34 changes: 8 additions & 26 deletions pyLSV2/misc_scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,11 @@ def decode_signal_description(data_set: bytearray) -> List[ld.ScopeSignal]:
name_end += 1
channel_name = lm.ba_to_ustr(data_set[name_start:name_end])
if data_set[10:46] != bytearray(b"\x00" * 36):
raise LSV2DataException(
"unexpected data in channel description in bytes 10 to 45: %s"
% data_set[10:46]
)
raise LSV2DataException("unexpected data in channel description in bytes 10 to 45: %s" % data_set[10:46])
interval_value_1 = struct.unpack("!H", data_set[2:4])[0]
interval_value_2 = struct.unpack("!H", data_set[8:10])[0]
if interval_value_1 != interval_value_2:
raise LSV2DataException(
"error in decoding of channel description data: %s" % data_set
)
raise LSV2DataException("error in decoding of channel description data: %s" % data_set)

min_interval = interval_value_1
type_num = struct.unpack("!H", data_set[4:6])[0]
Expand All @@ -62,9 +57,7 @@ def decode_signal_description(data_set: bytearray) -> List[ld.ScopeSignal]:

channel_type = lc.ChannelType(type_num)
if not data_set[6:8] == bytearray(b"\x00\x00"):
raise LSV2DataException(
"unexpected values in bytes 6 and 7: %s" % data_set[6:8]
)
raise LSV2DataException("unexpected values in bytes 6 and 7: %s" % data_set[6:8])
if channel_type in [lc.ChannelType.TYPE1, lc.ChannelType.TYPE4]:
if len(data_set) not in [98, 106]:
raise LSV2DataException("unexpected length of data for channel type 1 or 4")
Expand Down Expand Up @@ -110,9 +103,7 @@ def decode_signal_description(data_set: bytearray) -> List[ld.ScopeSignal]:
return signals


def decode_signal_details(
signal_list: List[ld.ScopeSignal], data_set: bytearray
) -> List[ld.ScopeSignal]:
def decode_signal_details(signal_list: List[ld.ScopeSignal], data_set: bytearray) -> List[ld.ScopeSignal]:
"""
Decode the detailed description of a signal after selecting them
Expand All @@ -130,10 +121,7 @@ def split_dataset(data):
logger.debug("R_OP dataset has expected length")
for i, data_sub_set in enumerate(split_dataset(data_set)):
if data_sub_set[17:] != bytearray(b"?\x00\x00\x00\x00"):
raise Exception(
"unexpected data in signal details at position 17 %s"
% data_sub_set[17:]
)
raise Exception("unexpected data in signal details at position 17 %s" % data_sub_set[17:])

signal_list[i].unit = lm.ba_to_ustr(data_sub_set[0:10])
signal_list[i].factor = struct.unpack("<d", data_sub_set[10:18])[0]
Expand All @@ -147,9 +135,7 @@ def split_dataset(data):
signal_list[i].offset,
)
else:
logger.error(
"R_OP dataset has unexpected length %d of %s", len(data_set), data_set
)
logger.error("R_OP dataset has unexpected length %d of %s", len(data_set), data_set)
return signal_list


Expand All @@ -174,9 +160,7 @@ def decode_scope_reading(
sig_data_end = sig_data_start + sig_data_lenth

for signal in signal_list:
logger.debug(
"decode data for channel %d signal %d", signal.channel, signal.signal
)
logger.debug("decode data for channel %d signal %d", signal.channel, signal.signal)
sig_data = ld.ScopeSignalData(
channel=signal.channel,
signal=signal.signal,
Expand All @@ -191,9 +175,7 @@ def decode_scope_reading(

unpack_string = "!32l"
value_start = sig_data_start + 6
sig_data.data.extend(
struct.unpack(unpack_string, data_set[value_start:sig_data_end])
)
sig_data.data.extend(struct.unpack(unpack_string, data_set[value_start:sig_data_end]))

reading.add_dataset((sig_data))

Expand Down
77 changes: 18 additions & 59 deletions pyLSV2/table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ def column_names(self) -> list:
"""list of columns used in this table"""
return self._columns

def append_column(
self, name: str, start: int, end: int, width: int = 0, empty_value=None
):
def append_column(self, name: str, start: int, end: int, width: int = 0, empty_value=None):
"""add column to the table format"""
self._columns.append(name)
if width == 0:
Expand Down Expand Up @@ -167,9 +165,7 @@ def update_column_format(self, name: str, parameters: dict):

def _get_column_names(self):
"""get list of columns used in this table"""
raise DeprecationWarning(
"Do not use this function anymore! Use ```column_names```"
)
raise DeprecationWarning("Do not use this function anymore! Use ```column_names```")

def append_row(self, row):
"""add a data entry to the table"""
Expand Down Expand Up @@ -204,14 +200,11 @@ def dump_native(self, file_path: pathlib.Path, renumber_column=None):
version_string = " Version:%s" % str(self._version)

with open(file_path, "w", encoding="ascii") as tfp:
tfp.write("BEGIN %s%s%s\n" %
(file_name, units_string, version_string))
tfp.write("BEGIN %s%s%s\n" % (file_name, units_string, version_string))

for column_name in self._columns:
if column_name not in self._column_format:
raise Exception(
"configuration is incomplete, missing definition for column {column_name:s}"
)
raise Exception("configuration is incomplete, missing definition for column {column_name:s}")
fixed_width = self._column_format[column_name]["width"]
format_string = "{0:<%d}" % fixed_width
tfp.write(format_string.format(column_name))
Expand All @@ -233,16 +226,9 @@ def dump_native(self, file_path: pathlib.Path, renumber_column=None):
"entry is missing optional column %s defined in output format, replace with empty value",
column_name,
)
tfp.write(
format_string.format(
self._column_format[column_name]["empty_value"]
)
)
tfp.write(format_string.format(self._column_format[column_name]["empty_value"]))
else:
raise Exception(
"entry is missing a value for column %s defined in the output format"
% column_name
)
raise Exception("entry is missing a value for column %s defined in the output format" % column_name)
tfp.write("\n")
row_counter += 1

Expand All @@ -254,8 +240,7 @@ def dump_csv(self, file_path: pathlib.Path, decimal_char: str = "."):
:param file_path: file location for csv file
"""
self._logger.debug(
"write table to csv, using decimal char '%s'", decimal_char)
self._logger.debug("write table to csv, using decimal char '%s'", decimal_char)

def localize_floats(row):
float_pattern = re.compile(r"^[+-]?\d+\.\d+$")
Expand All @@ -277,9 +262,7 @@ def localize_floats(row):
csv_writer.writerow(localize_floats(row))
self._logger.info("csv file saved successfully")

def find_string(
self, column_name: str, search_value: Union[str, re.Pattern]
) -> list:
def find_string(self, column_name: str, search_value: Union[str, re.Pattern]) -> list:
"""
search for string rows by string or pattern
returns list of lines that contain the search result
Expand All @@ -289,20 +272,12 @@ def find_string(
"""
search_results = []
if not column_name in self._columns:
self._logger.error(
"column with name %s not part of this table", column_name
)
self._logger.error("column with name %s not part of this table", column_name)
else:
if isinstance(search_value, (str,)):
search_results = [
itm for itm in self._content if search_value in itm[column_name]
]
search_results = [itm for itm in self._content if search_value in itm[column_name]]
elif isinstance(search_value, (re.Pattern,)):
search_results = [
itm
for itm in self._content
if search_value.match(itm[column_name]) is not None
]
search_results = [itm for itm in self._content if search_value.match(itm[column_name]) is not None]
return search_results

@staticmethod
Expand Down Expand Up @@ -333,10 +308,7 @@ def parse_table(table_path: pathlib.Path) -> "NCTable":
)

if header is None:
raise Exception(
"File has wrong format: incorrect header for file %s"
% table_path
)
raise Exception("File has wrong format: incorrect header for file %s" % table_path)

nctable.name = header.group("name").strip()
nctable.suffix = header.group("suffix")
Expand Down Expand Up @@ -414,11 +386,7 @@ def parse_table(table_path: pathlib.Path) -> "NCTable":

table_entry = {}
for column in nctable.column_names:
table_entry[column] = line[
nctable.get_column_start(column): nctable.get_column_end(
column
)
].strip()
table_entry[column] = line[nctable.get_column_start(column) : nctable.get_column_end(column)].strip()
nctable.append_row(table_entry)

logger.debug("Found %d entries", len(nctable.rows))
Expand All @@ -428,12 +396,8 @@ def parse_table(table_path: pathlib.Path) -> "NCTable":
for c_d in table_config["TableDescription"]["columns"]:
cfg_column_name = c_d["CfgColumnDescription"]["key"]
if cfg_column_name not in nctable.column_names:
raise Exception(
"found unexpected column %s" % cfg_column_name
)
if c_d["CfgColumnDescription"][
"width"
] != nctable.get_column_width(cfg_column_name):
raise Exception("found unexpected column %s" % cfg_column_name)
if c_d["CfgColumnDescription"]["width"] != nctable.get_column_width(cfg_column_name):
raise Exception(
"found difference in column width for colmun %s: %d : %d"
% (
Expand All @@ -442,9 +406,7 @@ def parse_table(table_path: pathlib.Path) -> "NCTable":
nctable.get_column_width(cfg_column_name),
)
)
nctable.update_column_format(
cfg_column_name, c_d["CfgColumnDescription"]
)
nctable.update_column_format(cfg_column_name, c_d["CfgColumnDescription"])

except UnicodeDecodeError:
logger.error("File has invalid utf-8 encoding")
Expand Down Expand Up @@ -508,8 +470,7 @@ def str_to_typed_value(value_string: str):
if isinstance(last_object, (list,)):
if ":=" in line:
parts = line.split(":=")
last_object.append(
{parts[0]: str_to_typed_value(parts[1])})
last_object.append({parts[0]: str_to_typed_value(parts[1])})
else:
last_object.append(line)

Expand Down Expand Up @@ -545,9 +506,7 @@ def from_json_format(file_path: pathlib.Path) -> "NCTable":
end=json_data["column_config"][column]["end"],
)
if "empty_value" in json_data["column_config"][column]:
nct.set_column_empty_value(
column, json_data["column_config"][column]["empty_value"]
)
nct.set_column_empty_value(column, json_data["column_config"][column]["empty_value"])
return nct

@staticmethod
Expand Down
Loading

0 comments on commit 65d8709

Please sign in to comment.