diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..aa0cdbc --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +ignore = E203 \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 49eaced..bac89ac 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -24,7 +24,10 @@ jobs: python -m pytest - name: Run convention tests run: | - python -m flake8 modi_firmware_updater tests --ignore E265,W503,W504 + python -m flake8 modi_firmware_updater tests --ignore E203,W503,W504 + - name: Run import sort check + run: | + python -m isort modi_firmware_updater tests --diff build-macos: name: macOS Test runs-on: macos-latest @@ -46,7 +49,10 @@ jobs: python -m pytest - name: Run convention tests run: | - python -m flake8 modi_firmware_updater tests --ignore E265,W503,W504 + python -m flake8 modi_firmware_updater tests --ignore E203,W503,W504 + - name: Run import sort check + run: | + python -m isort modi_firmware_updater tests --diff build-windows: name: Windows Test runs-on: windows-latest @@ -68,5 +74,7 @@ jobs: python -m pytest - name: Run convention tests run: | - python -m flake8 modi_firmware_updater tests --ignore E265,W503,W504 - + python -m flake8 modi_firmware_updater tests --ignore E203,W503,W504 + - name: Run import sort check + run: | + python -m isort modi_firmware_updater tests --diff diff --git a/modi_firmware_updater/core/esp32_updater.py b/modi_firmware_updater/core/esp32_updater.py index 3a0b508..080b98a 100644 --- a/modi_firmware_updater/core/esp32_updater.py +++ b/modi_firmware_updater/core/esp32_updater.py @@ -1,19 +1,17 @@ - import io -import sys -import time import json -import serial -import zipfile import pathlib - +import sys +import time import urllib.request as ur - -from os import path +import zipfile +from base64 import b64decode, b64encode from io import open -from base64 import b64encode, b64decode +from os import path from urllib.error import URLError +import serial + from modi_firmware_updater.util.connection_util import list_modi_ports @@ -50,10 +48,13 @@ def __init__(self): super().__init__(modi_ports[0].device, timeout=0.1, baudrate=921600) print(f"Connecting to MODI network module at {modi_ports[0].device}") - self.__address = [0x1000, 0x8000, 0XD000, 0x10000, 0xD0000] + self.__address = [0x1000, 0x8000, 0xD000, 0x10000, 0xD0000] self.file_path = [ - 'bootloader.bin', 'partitions.bin', 'ota_data_initial.bin', - 'modi_ota_factory.bin', 'esp32.bin' + "bootloader.bin", + "partitions.bin", + "ota_data_initial.bin", + "modi_ota_factory.bin", + "esp32.bin", ] self.id = None self.version = None @@ -66,9 +67,7 @@ def set_ui(self, ui): self.ui = ui def update_firmware(self, force=False): - # is_up_to_date = False - - print('Turning interpreter off...') + print("Turning interpreter off...") self.write(b'{"c":160,"s":0,"d":18,"b":"AAMAAAAA","l":6}') self.update_in_progress = True @@ -80,23 +79,11 @@ def update_firmware(self, force=False): if not force and not self.ui: response = input( f"ESP version already up to date (v{self.version})." - f" Do you still want to proceed? [y/n]: ") - if 'y' not in response: + f" Do you still want to proceed? [y/n]: " + ) + if "y" not in response: return - # elif self.ui: - # is_up_to_date = True - # print(f"ESP32 is already up to date (v{self.version}).") - # if self.ui.is_english: - # self.ui.update_network_esp32.setText( - # "Network ESP32 is already up to date." - # ) - # else: - # self.ui.update_network_esp32.setText( - # "네트워크 모듈이 최신 버전입니다." - # ) - # time.sleep(2) - - # if not is_up_to_date: + print(f"Updating v{self.version} to v{self.__version_to_update}") firmware_buffer = self.__compose_binary_firmware() @@ -122,21 +109,17 @@ def update_firmware(self, force=False): if self.ui: self.ui.update_stm32_modules.setStyleSheet( - f'border-image: url({self.ui.active_path}); font-size: 16px' + f"border-image: url({self.ui.active_path}); font-size: 16px" ) self.ui.update_stm32_modules.setEnabled(True) self.ui.update_network_stm32.setStyleSheet( - f'border-image: url({self.ui.active_path}); font-size: 16px' + f"border-image: url({self.ui.active_path}); font-size: 16px" ) self.ui.update_network_stm32.setEnabled(True) if self.ui.is_english: - self.ui.update_network_esp32.setText( - "Update Network ESP32" - ) + self.ui.update_network_esp32.setText("Update Network ESP32") else: - self.ui.update_network_esp32.setText( - "네트워크 모듈 업데이트" - ) + self.ui.update_network_esp32.setText("네트워크 모듈 업데이트") def __device_ready(self): print("Redirecting connection to esp device...") @@ -144,17 +127,18 @@ def __device_ready(self): def __device_sync(self): print("Syncing the esp device...") - sync_pkt = self.__parse_pkt([ - 0x0, self.DEVICE_SYNC, 0x24, 0, 0, 0, 0, 0, 0x7, 0x7, 0x12, 0x20 - ] + 32 * [0x55]) + sync_pkt = self.__parse_pkt( + [0x0, self.DEVICE_SYNC, 0x24, 0, 0, 0, 0, 0, 0x7, 0x7, 0x12, 0x20] + + 32 * [0x55] + ) self.__send_pkt(sync_pkt, timeout=10, continuous=True) print("Sync Complete") def __flash_attach(self): print("Attaching flash to esp device..") - attach_pkt = self.__parse_pkt([ - 0x0, self.SPI_ATTACH_REQ, 0x8 - ] + 13 * [0]) + attach_pkt = self.__parse_pkt( + [0x0, self.SPI_ATTACH_REQ, 0x8] + 13 * [0] + ) self.__send_pkt(attach_pkt, timeout=10) print("Flash attach Complete") @@ -162,25 +146,30 @@ def __set_flash_param(self): print("Setting esp flash parameter...") param_data = [0] * 32 fl_id, total_size, block_size, sector_size, page_size, status_mask = ( - 0, 2 * 1024 * 1024, 64 * 1024, 4 * 1024, 256, 0xFFFF + 0, + 2 * 1024 * 1024, + 64 * 1024, + 4 * 1024, + 256, + 0xFFFF, ) param_data[1] = self.SPI_FLASH_SET param_data[2] = 0x18 - param_data[8:12] = int.to_bytes(fl_id, length=4, byteorder='little') + param_data[8:12] = int.to_bytes(fl_id, length=4, byteorder="little") param_data[12:16] = int.to_bytes( - total_size, length=4, byteorder='little' + total_size, length=4, byteorder="little" ) param_data[16:20] = int.to_bytes( - block_size, length=4, byteorder='little' + block_size, length=4, byteorder="little" ) param_data[20:24] = int.to_bytes( - sector_size, length=4, byteorder='little' + sector_size, length=4, byteorder="little" ) param_data[24:28] = int.to_bytes( - page_size, length=4, byteorder='little' + page_size, length=4, byteorder="little" ) param_data[28:32] = int.to_bytes( - status_mask, length=4, byteorder='little' + status_mask, length=4, byteorder="little" ) param_pkt = self.__parse_pkt(param_data) self.__send_pkt(param_pkt, timeout=10) @@ -189,8 +178,8 @@ def __set_flash_param(self): @staticmethod def __parse_pkt(data): pkt = bytes(data) - pkt = pkt.replace(b'\xdb', b'\xdb\xdd').replace(b'\xc0', b'\xdb\xdc') - pkt = b'\xc0' + pkt + b'\xc0' + pkt = pkt.replace(b"\xdb", b"\xdb\xdd").replace(b"\xc0", b"\xdb\xdc") + pkt = b"\xc0" + pkt + b"\xc0" return pkt @retry(Exception) @@ -221,22 +210,22 @@ def __send_pkt(self, pkt, wait=True, timeout=None, continuous=False): raise Exception("Timeout Expired!") def __read_slip(self): - slip_pkt = b'' - while slip_pkt != b'\xc0': + slip_pkt = b"" + while slip_pkt != b"\xc0": slip_pkt = self.read() - if slip_pkt == b'': - return b'' - slip_pkt += self.read_until(b'\xc0') + if slip_pkt == b"": + return b"" + slip_pkt += self.read_until(b"\xc0") return slip_pkt def __read_json(self): - json_pkt = b'' - while json_pkt != b'{': + json_pkt = b"" + while json_pkt != b"{": json_pkt = self.read() - if json_pkt == b'': - return '' + if json_pkt == b"": + return "" time.sleep(0.1) - json_pkt += self.read_until(b'}') + json_pkt += self.read_until(b"}") return json_pkt def __wait_for_json(self): @@ -248,65 +237,72 @@ def __wait_for_json(self): def __get_esp_id(self): json_msg = json.loads(self.__wait_for_json()) - while json_msg['c'] != 0: + while json_msg["c"] != 0: json_msg = json.loads(self.__wait_for_json()) - return json_msg['s'] + return json_msg["s"] def __get_esp_version(self): get_version_pkt = b'{"c":160,"s":25,"d":4095,"b":"AAAAAAAAAA==","l":8}' self.write(get_version_pkt) json_msg = json.loads(self.__wait_for_json()) init_time = time.time() - while json_msg['c'] != 0xA1: + while json_msg["c"] != 0xA1: self.write(get_version_pkt) json_msg = json.loads(self.__wait_for_json()) if time.time() - init_time > 1: return None - ver = b64decode(json_msg['b']).lstrip(b'\x00') - return ver.decode('ascii') + ver = b64decode(json_msg["b"]).lstrip(b"\x00") + return ver.decode("ascii") def __set_esp_version(self, version_text: str): print(f"Writing version info (v{version_text})") - version_byte = version_text.encode('ascii') - version_byte = b'\x00' * (8 - len(version_byte)) + version_byte - version_text = b64encode(version_byte).decode('utf8') - version_msg = '{' + f'"c":160,"s":24,"d":4095,' \ - f'"b":"{version_text}","l":8' + '}' - version_msg_enc = version_msg.encode('utf8') + version_byte = version_text.encode("ascii") + version_byte = b"\x00" * (8 - len(version_byte)) + version_byte + version_text = b64encode(version_byte).decode("utf8") + version_msg = ( + "{" + f'"c":160,"s":24,"d":4095,' + f'"b":"{version_text}","l":8' + "}" + ) + version_msg_enc = version_msg.encode("utf8") self.write(version_msg_enc) - while json.loads(self.__wait_for_json())['c'] != 0xA1: + while json.loads(self.__wait_for_json())["c"] != 0xA1: time.sleep(0.5) self.__boot_to_app() - self.write(version_msg.encode('utf8')) + self.write(version_msg.encode("utf8")) print("The version info has been set!!") def __compose_binary_firmware(self): - binary_firmware = b'' + binary_firmware = b"" for i, bin_path in enumerate(self.file_path): if self.ui: if i == 2: - if sys.platform.startswith('win'): - root_path = ( - pathlib.PurePosixPath( - pathlib.PurePath(__file__), - '..', '..', 'assets', 'firmware', 'esp32' - ) + if sys.platform.startswith("win"): + root_path = pathlib.PurePosixPath( + pathlib.PurePath(__file__), + "..", + "..", + "assets", + "firmware", + "esp32", ) else: root_path = path.join( path.dirname(__file__), - '..', 'assets', 'firmware', 'esp32' + "..", + "assets", + "firmware", + "esp32", ) elif i == 3: root_path = ( - 'https://download.luxrobo.com/modi-ota-firmware/' - 'ota.zip' + "https://download.luxrobo.com/modi-ota-firmware/" + "ota.zip" ) else: root_path = ( - 'https://download.luxrobo.com/modi-esp32-firmware/' - 'esp.zip' + "https://download.luxrobo.com/modi-esp32-firmware/" + "esp.zip" ) if i != 2: @@ -315,14 +311,14 @@ def __compose_binary_firmware(self): download_response = conn.read() except URLError: raise URLError( - 'Failed to download firmware. Check your internet.' + "Failed to download firmware. Check your internet." ) zip_content = zipfile.ZipFile( - io.BytesIO(download_response), 'r' + io.BytesIO(download_response), "r" ) bin_data = zip_content.read(bin_path) elif i == 2: - if sys.platform.startswith('win'): + if sys.platform.startswith("win"): firmware_path = pathlib.PurePosixPath( root_path, bin_path ) @@ -330,21 +326,20 @@ def __compose_binary_firmware(self): firmware_path = path.join(root_path, bin_path) if self.ui.installation: firmware_path = path.dirname(__file__).replace( - 'core', bin_path + "core", bin_path ) - with open(firmware_path, 'rb') as bin_file: + with open(firmware_path, "rb") as bin_file: bin_data = bin_file.read() else: root_path = path.join( - path.dirname(__file__), - '..', 'assets', 'firmware', 'esp32' + path.dirname(__file__), "..", "assets", "firmware", "esp32" ) firmware_path = path.join(root_path, bin_path) - with open(firmware_path, 'rb') as bin_file: + with open(firmware_path, "rb") as bin_file: bin_data = bin_file.read() binary_firmware += bin_data if i < len(self.__address) - 1: - binary_firmware += b'\xFF' * ( + binary_firmware += b"\xFF" * ( self.__address[i + 1] - self.__address[i] - len(bin_data) ) return binary_firmware @@ -352,19 +347,18 @@ def __compose_binary_firmware(self): def __get_latest_version(self): if self.ui: version_path = ( - 'https://download.luxrobo.com/modi-esp32-firmware/version.txt' + "https://download.luxrobo.com/modi-esp32-firmware/version.txt" ) version_info = None for line in ur.urlopen(version_path, timeout=5): - version_info = line.decode('utf-8').lstrip('v').rstrip('\n') + version_info = line.decode("utf-8").lstrip("v").rstrip("\n") else: root_path = path.join( - path.dirname(__file__), - '..', 'assets', 'firmware', 'esp32' + path.dirname(__file__), "..", "assets", "firmware", "esp32" ) - version_path = path.join(root_path, 'esp_version.txt') - with open(version_path, 'r') as version_file: - version_info = version_file.readline().lstrip('v').rstrip('\n') + version_path = path.join(root_path, "esp_version.txt") + with open(version_path, "r") as version_file: + version_info = version_file.readline().lstrip("v").rstrip("\n") return version_info def __erase_chunk(self, size, offset): @@ -372,16 +366,14 @@ def __erase_chunk(self, size, offset): erase_data = [0] * 24 erase_data[1] = self.ESP_FLASH_BEGIN erase_data[2] = 0x10 - erase_data[8:12] = int.to_bytes(size, length=4, byteorder='little') + erase_data[8:12] = int.to_bytes(size, length=4, byteorder="little") erase_data[12:16] = int.to_bytes( - num_blocks, length=4, byteorder='little' + num_blocks, length=4, byteorder="little" ) erase_data[16:20] = int.to_bytes( - self.ESP_FLASH_BLOCK, length=4, byteorder='little' - ) - erase_data[20:24] = int.to_bytes( - offset, length=4, byteorder='little' + self.ESP_FLASH_BLOCK, length=4, byteorder="little" ) + erase_data[20:24] = int.to_bytes(offset, length=4, byteorder="little") erase_pkt = self.__parse_pkt(erase_data) self.__send_pkt(erase_pkt, timeout=10) @@ -391,15 +383,15 @@ def __write_flash_block(self, data, seq_block): checksum = self.ESP_CHECKSUM_MAGIC block_data[1] = self.ESP_FLASH_DATA - block_data[2:4] = int.to_bytes(size + 16, length=2, byteorder='little') - block_data[8:12] = int.to_bytes(size, length=4, byteorder='little') + block_data[2:4] = int.to_bytes(size + 16, length=2, byteorder="little") + block_data[8:12] = int.to_bytes(size, length=4, byteorder="little") block_data[12:16] = int.to_bytes( - seq_block, length=4, byteorder='little' + seq_block, length=4, byteorder="little" ) for i in range(size): block_data[24 + i] = data[i] - checksum ^= (0xFF & data[i]) - block_data[4:8] = int.to_bytes(checksum, length=4, byteorder='little') + checksum ^= 0xFF & data[i] + block_data[4:8] = int.to_bytes(checksum, length=4, byteorder="little") block_pkt = self.__parse_pkt(block_data) self.__send_pkt(block_pkt) @@ -408,11 +400,11 @@ def __write_binary_firmware(self, binary_firmware: bytes, manager): num_blocks = len(binary_firmware) // self.ESP_FLASH_BLOCK + 1 while binary_firmware: if self.ESP_FLASH_CHUNK < len(binary_firmware): - chunk_queue.append(binary_firmware[:self.ESP_FLASH_CHUNK]) - binary_firmware = binary_firmware[self.ESP_FLASH_CHUNK:] + chunk_queue.append(binary_firmware[: self.ESP_FLASH_CHUNK]) + binary_firmware = binary_firmware[self.ESP_FLASH_CHUNK :] else: chunk_queue.append(binary_firmware[:]) - binary_firmware = b'' + binary_firmware = b"" blocks_downloaded = 0 print("Start uploading firmware data...") @@ -441,11 +433,11 @@ def __write_chunk(self, chunk, curr_seq, total_seq, manager): block_queue = [] while chunk: if self.ESP_FLASH_BLOCK < len(chunk): - block_queue.append(chunk[:self.ESP_FLASH_BLOCK]) - chunk = chunk[self.ESP_FLASH_BLOCK:] + block_queue.append(chunk[: self.ESP_FLASH_BLOCK]) + chunk = chunk[self.ESP_FLASH_BLOCK :] else: block_queue.append(chunk[:]) - chunk = b'' + chunk = b"" for seq, block in enumerate(block_queue): if manager: manager.status = self.__progress_bar(curr_seq + seq, total_seq) @@ -461,7 +453,7 @@ def __write_chunk(self, chunk, curr_seq, total_seq, manager): f"({int((curr_seq+seq)/total_seq*100)}%)" ) print( - f'\r{self.__progress_bar(curr_seq + seq, total_seq)}', end='' + f"\r{self.__progress_bar(curr_seq + seq, total_seq)}", end="" ) self.__write_flash_block(block, seq) return len(block_queue) diff --git a/modi_firmware_updater/core/gd32_updater.py b/modi_firmware_updater/core/gd32_updater.py index 2e96e80..47b0ba7 100644 --- a/modi_firmware_updater/core/gd32_updater.py +++ b/modi_firmware_updater/core/gd32_updater.py @@ -1,25 +1,21 @@ - import io -import sys -import time import json -import zipfile - +import sys import threading as th +import time import urllib.request as ur - -from os import path -from io import open +import zipfile from base64 import b64encode +from io import open +from os import path from urllib.error import URLError from modi_firmware_updater.util.connection_util import SerTask -from modi_firmware_updater.util.message_util import ( - unpack_data, decode_message, parse_message -) -from modi_firmware_updater.util.module_util import ( - Module, get_module_type_from_uuid -) +from modi_firmware_updater.util.message_util import (decode_message, + parse_message, + unpack_data) +from modi_firmware_updater.util.module_util import (Module, + get_module_type_from_uuid) def retry(exception_to_catch): @@ -48,7 +44,7 @@ class GD32FirmwareUpdater: ERASE_COMPLETE = 7 def __init__( - self, is_os_update=True, target_ids=(0xFFF, ), conn_type='ser' + self, is_os_update=True, target_ids=(0xFFF,), conn_type="ser" ): self.conn_type = conn_type self.update_network_base = False @@ -81,7 +77,7 @@ def request_network_id(self): def __assign_network_id(self, sid, data): module_uuid = unpack_data(data, (6, 1))[0] module_type = get_module_type_from_uuid(module_uuid) - if module_type == 'network': + if module_type == "network": self.network_id = sid def update_module_firmware(self, update_network_base=False): @@ -93,8 +89,8 @@ def update_module_firmware(self, update_network_base=False): if timeout <= 0: if not self.update_in_progress: print( - 'Could not retrieve network id, ' - 'broadcast id will be used instead.' + "Could not retrieve network id, " + "broadcast id will be used instead." ) self.network_id = 0xFFF break @@ -108,8 +104,8 @@ def update_module_firmware(self, update_network_base=False): """ if self.network_id != 0xFFF: print( - f'Sending a request to update firmware of network ' - f'({self.network_id})' + f"Sending a request to update firmware of network " + f"({self.network_id})" ) self.request_to_update_firmware( self.network_id, is_network=True @@ -131,10 +127,10 @@ def __open_conn(self): return SerTask() def reinitialize_serial_connection(self): - print('Temporally disconnecting the serial connection...') + print("Temporally disconnecting the serial connection...") self.close() - print('Re-init serial connection for the update, in 2 seconds...') + print("Re-init serial connection for the update, in 2 seconds...") time.sleep(2) self.__conn = self.__open_conn() self.__conn.open_conn() @@ -148,7 +144,7 @@ def reset_state(self, update_in_progress: bool = False) -> None: self.update_in_progress = False if not update_in_progress: - print('Make sure you have connected module(s) to update') + print("Make sure you have connected module(s) to update") print("Resetting firmware updater's state") self.modules_to_update = [] self.modules_updated = [] @@ -166,7 +162,7 @@ def request_to_update_firmware(self, module_id, is_network=False) -> None: module_id, Module.UPDATE_FIRMWARE, Module.PNP_OFF ) self.__conn.send_nowait(firmware_update_message) - print('Firmware update has been requested') + print("Firmware update has been requested") def check_to_update_firmware(self, module_id: int) -> None: firmware_update_ready_message = self.__set_module_state( @@ -185,8 +181,10 @@ def add_to_waitlist(self, module_id: int, module_type: str) -> None: if module_id == curr_module_id: return - print(f"Adding {module_type} ({module_id}) to waiting list..." - f"{' ' * 60}") + print( + f"Adding {module_type} ({module_id}) to waiting list..." + f"{' ' * 60}" + ) # Add the module to the waiting list module_elem = module_id, module_type @@ -203,8 +201,9 @@ def update_module(self, module_id: int, module_type: str) -> None: updater_thread.daemon = True updater_thread.start() - def update_response(self, response: bool, - is_error_response: bool = False) -> None: + def update_response( + self, response: bool, is_error_response: bool = False + ) -> None: if not is_error_response: self.response_flag = response else: @@ -215,30 +214,25 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: self.modules_updated.append((module_id, module_type)) # Init base root_path, utilizing local binary files - root_path = ( - path.join( - path.dirname(__file__), - '..', 'assets', 'firmware', 'gd32' - ) + root_path = path.join( + path.dirname(__file__), "..", "assets", "firmware", "gd32" ) if self.__is_os_update: if self.ui: if self.update_network_base: - root_path = ( - 'https://download.luxrobo.com/modi-network-os' - ) - zip_path = path.join(root_path, 'network.zip') - bin_path = 'network.bin' + root_path = "https://download.luxrobo.com/modi-network-os" + zip_path = path.join(root_path, "network.zip") + bin_path = "network.bin" else: root_path = ( - 'https://download.luxrobo.com/modi-skeleton-mobile' + "https://download.luxrobo.com/modi-skeleton-mobile" ) - zip_path = path.join(root_path, 'skeleton.zip') + zip_path = path.join(root_path, "skeleton.zip") bin_path = ( - path.join(f'skeleton/{module_type.lower()}.bin') - if module_type != 'env' else - path.join('skeleton/environment.bin') + path.join(f"skeleton/{module_type.lower()}.bin") + if module_type != "env" + else path.join("skeleton/environment.bin") ) try: @@ -249,12 +243,12 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: "Failed to download firmware. Check your internet." ) zip_content = zipfile.ZipFile( - io.BytesIO(download_response), 'r' + io.BytesIO(download_response), "r" ) bin_buffer = zip_content.read(bin_path) else: bin_path = path.join(root_path, f"{module_type.lower()}.bin") - with open(bin_path, 'rb') as bin_file: + with open(bin_path, "rb") as bin_file: bin_buffer = bin_file.read() # Init metadata of the bytes loaded @@ -271,7 +265,8 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: print( f"\rUpdating {module_type} ({module_id}) " f"{self.__progress_bar(page_begin, bin_end)} " - f"{progress}%", end='' + f"{progress}%", + end="", ) page_end = page_begin + page_size @@ -283,9 +278,11 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: # Erase page (send erase request and receive its response) erase_page_success = self.send_firmware_command( - oper_type="erase", module_id=module_id, crc_val=0, + oper_type="erase", + module_id=module_id, + crc_val=0, dest_addr=flash_memory_addr, - page_addr=page_begin + page_offset + page_addr=page_begin + page_offset, ) if not erase_page_success: page_begin -= page_size @@ -296,20 +293,22 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: if page_begin + curr_ptr >= bin_size: break - curr_data = curr_page[curr_ptr:curr_ptr + 8] + curr_data = curr_page[curr_ptr : curr_ptr + 8] checksum = self.send_firmware_data( module_id, seq_num=curr_ptr // 8, bin_data=curr_data, - crc_val=checksum + crc_val=checksum, ) self.__delay(0.002) # CRC on current page (send CRC request / receive CRC response) crc_page_success = self.send_firmware_command( - oper_type="crc", module_id=module_id, crc_val=checksum, + oper_type="crc", + module_id=module_id, + crc_val=checksum, dest_addr=flash_memory_addr, - page_addr=page_begin + page_offset + page_addr=page_begin + page_offset, ) if not crc_page_success: page_begin -= page_size @@ -320,25 +319,25 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: ) # Get version info from version_path, using appropriate methods - version_info, version_file = None, 'version.txt' + version_info, version_file = None, "version.txt" if self.ui: version_path = path.join(root_path, version_file) for line in ur.urlopen(version_path, timeout=5): - version_info = line.decode('utf-8').lstrip('v') + version_info = line.decode("utf-8").lstrip("v") else: if self.update_network_base: - version_file = 'base_' + version_file + version_file = "base_" + version_file version_path = path.join(root_path, version_file) with open(version_path) as version_file: - version_info = version_file.readline().lstrip('v').rstrip('\n') - version_digits = [int(digit) for digit in version_info.split('.')] + version_info = version_file.readline().lstrip("v").rstrip("\n") + version_digits = [int(digit) for digit in version_info.split(".")] """ Version number is formed by concatenating all three version bits e.g. 2.2.4 -> 010 00010 00000100 -> 0100 0010 0000 0100 """ version = ( - version_digits[0] << 13 | - version_digits[1] << 8 | - version_digits[2] + version_digits[0] << 13 + | version_digits[1] << 8 + | version_digits[2] ) # Set end-flash data to be sent at the end of the firmware update @@ -348,11 +347,11 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: end_flash_data[7] = (version >> 8) & 0xFF self.send_end_flash_data(module_type, module_id, end_flash_data) print( - f'Version info (v{version_info}) has been written to its firmware!' + f"Version info (v{version_info}) has been written to its firmware!" ) # Firmware update flag down, resetting used flags - print(f'Firmware update is done for {module_type} ({module_id})') + print(f"Firmware update is done for {module_type} ({module_id})") self.reset_state(update_in_progress=True) if self.modules_to_update: @@ -383,8 +382,9 @@ def __delay(span): return @staticmethod - def __set_network_state(destination_id: int, module_state: int, - pnp_state: int) -> str: + def __set_network_state( + destination_id: int, module_state: int, pnp_state: int + ) -> str: message = dict() message["c"] = 0xA4 @@ -401,8 +401,9 @@ def __set_network_state(destination_id: int, module_state: int, return json.dumps(message, separators=(",", ":")) @staticmethod - def __set_module_state(destination_id: int, module_state: int, - pnp_state: int) -> str: + def __set_module_state( + destination_id: int, module_state: int, pnp_state: int + ) -> str: message = dict() message["c"] = 0x09 @@ -419,16 +420,19 @@ def __set_module_state(destination_id: int, module_state: int, return json.dumps(message, separators=(",", ":")) # TODO: Use retry decorator here - def send_end_flash_data(self, module_type: str, module_id: int, - end_flash_data: bytearray) -> None: + def send_end_flash_data( + self, module_type: str, module_id: int, end_flash_data: bytearray + ) -> None: # Write end-flash data until success end_flash_success = False while not end_flash_success: # Erase page (send erase request and receive erase response) erase_page_success = self.send_firmware_command( - oper_type="erase", module_id=module_id, crc_val=0, - dest_addr=0x0800F800 + oper_type="erase", + module_id=module_id, + crc_val=0, + dest_addr=0x0800F800, ) # TODO: Remove magic number of dest_addr above, try using flash_mem if not erase_page_success: @@ -441,8 +445,10 @@ def send_end_flash_data(self, module_type: str, module_id: int, # CRC on current page (send CRC request and receive CRC response) crc_page_success = self.send_firmware_command( - oper_type="crc", module_id=module_id, crc_val=checksum, - dest_addr=0x0800F800 + oper_type="crc", + module_id=module_id, + crc_val=checksum, + dest_addr=0x0800F800, ) if not crc_page_success: continue @@ -450,8 +456,14 @@ def send_end_flash_data(self, module_type: str, module_id: int, end_flash_success = True # print(f"End flash is written for {module_type} ({module_id})") - def get_firmware_command(self, module_id: int, rot_stype: int, - rot_scmd: int, crc32: int, page_addr: int) -> str: + def get_firmware_command( + self, + module_id: int, + rot_stype: int, + rot_scmd: int, + crc32: int, + page_addr: int, + ) -> str: message = dict() message["c"] = 0x0D @@ -473,15 +485,16 @@ def get_firmware_command(self, module_id: int, rot_stype: int, crc32 >>= 8 crc32_and_page_addr_data[4 + i] = page_addr & 0xFF page_addr >>= 8 - message["b"] = b64encode( - bytes(crc32_and_page_addr_data) - ).decode("utf-8") + message["b"] = b64encode(bytes(crc32_and_page_addr_data)).decode( + "utf-8" + ) message["l"] = 8 return json.dumps(message, separators=(",", ":")) - def get_firmware_data(self, module_id: int, seq_num: int, - bin_data: bytes) -> str: + def get_firmware_data( + self, module_id: int, seq_num: int, bin_data: bytes + ) -> str: message = dict() message["c"] = 0x0B message["s"] = seq_num @@ -493,7 +506,7 @@ def get_firmware_data(self, module_id: int, seq_num: int, return json.dumps(message, separators=(",", ":")) def calc_crc32(self, data: bytes, crc: int) -> int: - crc ^= int.from_bytes(data, byteorder='little', signed=False) + crc ^= int.from_bytes(data, byteorder="little", signed=False) for _ in range(32): if crc & (1 << 31) != 0: @@ -509,9 +522,14 @@ def calc_crc64(self, data: bytes, checksum: int) -> int: checksum = self.calc_crc32(data[4:], checksum) return checksum - def send_firmware_command(self, oper_type: str, module_id: int, - crc_val: int, dest_addr: int, - page_addr: int = 0) -> bool: + def send_firmware_command( + self, + oper_type: str, + module_id: int, + crc_val: int, + dest_addr: int, + page_addr: int = 0, + ) -> bool: rot_scmd = 2 if oper_type == "erase" else 1 # Send firmware command request @@ -522,9 +540,12 @@ def send_firmware_command(self, oper_type: str, module_id: int, return self.receive_command_response() - def receive_command_response(self, response_delay: float = 0.001, - response_timeout: float = 5, - max_response_error_count: int = 75) -> bool: + def receive_command_response( + self, + response_delay: float = 0.001, + response_timeout: float = 5, + max_response_error_count: int = 75, + ) -> bool: # Receive firmware command response response_wait_time = 0 while not self.response_flag: @@ -547,8 +568,9 @@ def receive_command_response(self, response_delay: float = 0.001, self.response_flag = False return True - def send_firmware_data(self, module_id: int, seq_num: int, bin_data: bytes, - crc_val: int) -> int: + def send_firmware_data( + self, module_id: int, seq_num: int, bin_data: bytes, crc_val: int + ) -> int: # Send firmware data data_message = self.get_firmware_data( module_id, seq_num=seq_num, bin_data=bin_data @@ -583,7 +605,7 @@ def __handle_message(self): command = { 0x05: self.__assign_network_id, 0x0A: self.__update_warning, - 0x0C: self.__update_firmware_state + 0x0C: self.__update_firmware_state, }.get(ins) if command: diff --git a/modi_firmware_updater/core/stm32_updater.py b/modi_firmware_updater/core/stm32_updater.py index 89841cb..7598e4f 100644 --- a/modi_firmware_updater/core/stm32_updater.py +++ b/modi_firmware_updater/core/stm32_updater.py @@ -1,26 +1,23 @@ - import io -import sys -import time import json -import serial -import zipfile - +import sys import threading as th +import time import urllib.request as ur - -from os import path -from io import open +import zipfile from base64 import b64encode +from io import open +from os import path from urllib.error import URLError +import serial + from modi_firmware_updater.util.connection_util import SerTask, list_modi_ports -from modi_firmware_updater.util.message_util import ( - unpack_data, decode_message, parse_message -) -from modi_firmware_updater.util.module_util import ( - Module, get_module_type_from_uuid -) +from modi_firmware_updater.util.message_util import (decode_message, + parse_message, + unpack_data) +from modi_firmware_updater.util.module_util import (Module, + get_module_type_from_uuid) def retry(exception_to_catch): @@ -49,7 +46,7 @@ class STM32FirmwareUpdater: ERASE_COMPLETE = 7 def __init__( - self, is_os_update=True, target_ids=(0xFFF, ), conn_type='ser' + self, is_os_update=True, target_ids=(0xFFF,), conn_type="ser" ): self.conn_type = conn_type self.update_network_base = False @@ -75,7 +72,7 @@ def __del__(self): try: self.close() except serial.SerialException: - print('Magic del is called with an exception') + print("Magic del is called with an exception") def set_ui(self, ui): self.ui = ui @@ -88,7 +85,7 @@ def request_network_id(self): def __assign_network_id(self, sid, data): module_uuid = unpack_data(data, (6, 1))[0] module_type = get_module_type_from_uuid(module_uuid) - if module_type == 'network': + if module_type == "network": self.network_id = sid def update_module_firmware(self, update_network_base=False): @@ -101,8 +98,8 @@ def update_module_firmware(self, update_network_base=False): if timeout <= 0: if not self.update_in_progress: print( - 'Could not retrieve network id, ' - 'broadcast id will be used instead.' + "Could not retrieve network id, " + "broadcast id will be used instead." ) self.network_id = 0xFFF r_mode = 2 @@ -117,8 +114,8 @@ def update_module_firmware(self, update_network_base=False): """ if self.network_id != 0xFFF: print( - f'Sending a request to update firmware of network ' - f'({self.network_id})' + f"Sending a request to update firmware of network " + f"({self.network_id})" ) if not self.update_in_progress: self.request_to_update_firmware( @@ -152,23 +149,23 @@ def _reconnect_serial_connection(self, modi_num): break def reinitialize_serial_connection(self, reinit_mode=1): - if self.ui and self.update_network_base and reinit_mode == 2: - modi_num = len(list_modi_ports()) - self.ui.stream.thread_signal.connect(self.ui.popup) - self.ui.stream.thread_signal.emit(self) - th.Thread( - target=self._reconnect_serial_connection, - args=(modi_num,), - daemon=True - ).start() - else: - self.__reinitialize_serial_connection() + # if self.ui and self.update_network_base and reinit_mode == 2: + # modi_num = len(list_modi_ports()) + # self.ui.stream.thread_signal.connect(self.ui.popup) + # self.ui.stream.thread_signal.emit(self) + # th.Thread( + # target=self._reconnect_serial_connection, + # args=(modi_num,), + # daemon=True + # ).start() + # else: + self.__reinitialize_serial_connection() def __reinitialize_serial_connection(self): - print('Temporally disconnecting the serial connection...') + print("Temporally disconnecting the serial connection...") self.close() time.sleep(2) - print('Re-init serial connection for the update, in 2 seconds...') + print("Re-init serial connection for the update, in 2 seconds...") self.__conn = self.__open_conn() self.__conn.open_conn() self.__running = True @@ -181,7 +178,7 @@ def reset_state(self, update_in_progress: bool = False) -> None: self.update_in_progress = False if not update_in_progress: - print('Make sure you have connected module(s) to update') + print("Make sure you have connected module(s) to update") print("Resetting firmware updater's state") self.modules_to_update = [] self.modules_updated = [] @@ -201,7 +198,7 @@ def request_to_update_firmware( module_id, Module.UPDATE_FIRMWARE, Module.PNP_OFF ) self.__conn.send_nowait(firmware_update_message) - print('Firmware update has been requested') + print("Firmware update has been requested") def check_to_update_firmware(self, module_id: int) -> None: firmware_update_ready_message = self.__set_module_state( @@ -220,8 +217,10 @@ def add_to_waitlist(self, module_id: int, module_type: str) -> None: if module_id == curr_module_id: return - print(f"Adding {module_type} ({module_id}) to waiting list..." - f"{' ' * 60}") + print( + f"Adding {module_type} ({module_id}) to waiting list..." + f"{' ' * 60}" + ) # Add the module to the waiting list module_elem = module_id, module_type @@ -238,8 +237,9 @@ def update_module(self, module_id: int, module_type: str) -> None: updater_thread.daemon = True updater_thread.start() - def update_response(self, response: bool, - is_error_response: bool = False) -> None: + def update_response( + self, response: bool, is_error_response: bool = False + ) -> None: if not is_error_response: self.response_flag = response else: @@ -250,30 +250,23 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: self.modules_updated.append((module_id, module_type)) # Init base root_path, utilizing local binary files - root_path = ( - path.join( - path.dirname(__file__), - '..', 'assets', 'firmware', 'stm32' - ) + root_path = path.join( + path.dirname(__file__), "..", "assets", "firmware", "stm32" ) if self.__is_os_update: if self.ui: if self.update_network_base: - root_path = ( - 'https://download.luxrobo.com/modi-network-os' - ) - zip_path = root_path + '/network.zip' - bin_path = 'network.bin' + root_path = "https://download.luxrobo.com/modi-network-os" + zip_path = root_path + "/network.zip" + bin_path = "network.bin" else: - root_path = ( - 'https://download.luxrobo.com/modi-skeleton' - ) - zip_path = root_path + '/skeleton.zip' + root_path = "https://download.luxrobo.com/modi-skeleton" + zip_path = root_path + "/skeleton.zip" bin_path = ( - path.join(f'{module_type.lower()}/Base_module.bin') - if module_type != 'env' else - path.join('environment/Base_module.bin') + path.join(f"{module_type.lower()}/Base_module.bin") + if module_type != "env" + else path.join("environment/Base_module.bin") ) try: @@ -284,12 +277,12 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: "Failed to download firmware. Check your internet." ) zip_content = zipfile.ZipFile( - io.BytesIO(download_response), 'r' + io.BytesIO(download_response), "r" ) bin_buffer = zip_content.read(bin_path) else: - bin_path = path.join(root_path, f'{module_type.lower()}.bin') - with open(bin_path, 'rb') as bin_file: + bin_path = path.join(root_path, f"{module_type.lower()}.bin") + with open(bin_path, "rb") as bin_file: bin_buffer = bin_file.read() # Init metadata of the bytes loaded @@ -337,7 +330,8 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: print( f"\rUpdating {module_type} ({module_id}) " f"{self.__progress_bar(page_begin, bin_end)} " - f"{progress}%", end='' + f"{progress}%", + end="", ) page_end = page_begin + page_size @@ -349,9 +343,11 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: # Erase page (send erase request and receive its response) erase_page_success = self.send_firmware_command( - oper_type="erase", module_id=module_id, crc_val=0, + oper_type="erase", + module_id=module_id, + crc_val=0, dest_addr=flash_memory_addr, - page_addr=page_begin + page_offset + page_addr=page_begin + page_offset, ) if not erase_page_success: page_begin -= page_size @@ -362,20 +358,22 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: if page_begin + curr_ptr >= bin_size: break - curr_data = curr_page[curr_ptr:curr_ptr + 8] + curr_data = curr_page[curr_ptr : curr_ptr + 8] checksum = self.send_firmware_data( module_id, seq_num=curr_ptr // 8, bin_data=curr_data, - crc_val=checksum + crc_val=checksum, ) self.__delay(0.002) # CRC on current page (send CRC request / receive CRC response) crc_page_success = self.send_firmware_command( - oper_type="crc", module_id=module_id, crc_val=checksum, + oper_type="crc", + module_id=module_id, + crc_val=checksum, dest_addr=flash_memory_addr, - page_addr=page_begin + page_offset + page_addr=page_begin + page_offset, ) if not crc_page_success: page_begin -= page_size @@ -386,25 +384,25 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: ) # Get version info from version_path, using appropriate methods - version_info, version_file = None, 'version.txt' + version_info, version_file = None, "version.txt" if self.ui: - version_path = root_path + '/' + version_file + version_path = root_path + "/" + version_file for line in ur.urlopen(version_path, timeout=5): - version_info = line.decode('utf-8').lstrip('v').rstrip('\n') + version_info = line.decode("utf-8").lstrip("v").rstrip("\n") else: if self.update_network_base: - version_file = 'base_' + version_file - version_path = root_path + '/' + version_file + version_file = "base_" + version_file + version_path = root_path + "/" + version_file with open(version_path) as version_file: - version_info = version_file.readline().lstrip('v').rstrip('\n') - version_digits = [int(digit) for digit in version_info.split('.')] + version_info = version_file.readline().lstrip("v").rstrip("\n") + version_digits = [int(digit) for digit in version_info.split(".")] """ Version number is formed by concatenating all three version bits e.g. 2.2.4 -> 010 00010 00000100 -> 0100 0010 0000 0100 """ version = ( - version_digits[0] << 13 | - version_digits[1] << 8 | - version_digits[2] + version_digits[0] << 13 + | version_digits[1] << 8 + | version_digits[2] ) # Set end-flash data to be sent at the end of the firmware update @@ -414,11 +412,11 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: end_flash_data[7] = (version >> 8) & 0xFF self.send_end_flash_data(module_type, module_id, end_flash_data) print( - f'Version info (v{version_info}) has been written to its firmware!' + f"Version info (v{version_info}) has been written to its firmware!" ) # Firmware update flag down, resetting used flags - print(f'Firmware update is done for {module_type} ({module_id})') + print(f"Firmware update is done for {module_type} ({module_id})") self.reset_state(update_in_progress=True) if self.modules_to_update: @@ -444,8 +442,8 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: if self.ui: if self.update_network_base: self.ui.update_stm32_modules.setStyleSheet( - f'border-image: url({self.ui.active_path});' - 'font-size: 16px' + f"border-image: url({self.ui.active_path});" + "font-size: 16px" ) self.ui.update_stm32_modules.setEnabled(True) if self.ui.is_english: @@ -458,8 +456,8 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: ) else: self.ui.update_network_stm32.setStyleSheet( - f'border-image: url({self.ui.active_path});' - 'font-size: 16px' + f"border-image: url({self.ui.active_path});" + "font-size: 16px" ) self.ui.update_network_stm32.setEnabled(True) if self.ui.is_english: @@ -467,12 +465,10 @@ def __update_firmware(self, module_id: int, module_type: str) -> None: "Update STM32 Modules." ) else: - self.ui.update_stm32_modules.setText( - "모듈 초기화" - ) + self.ui.update_stm32_modules.setText("모듈 초기화") self.ui.update_network_esp32.setStyleSheet( - f'border-image: url({self.ui.active_path});' - 'font-size: 16px' + f"border-image: url({self.ui.active_path});" + "font-size: 16px" ) self.ui.update_network_esp32.setEnabled(True) @@ -484,8 +480,9 @@ def __delay(span): return @staticmethod - def __set_network_state(destination_id: int, module_state: int, - pnp_state: int) -> str: + def __set_network_state( + destination_id: int, module_state: int, pnp_state: int + ) -> str: message = dict() message["c"] = 0xA4 @@ -502,8 +499,9 @@ def __set_network_state(destination_id: int, module_state: int, return json.dumps(message, separators=(",", ":")) @staticmethod - def __set_module_state(destination_id: int, module_state: int, - pnp_state: int) -> str: + def __set_module_state( + destination_id: int, module_state: int, pnp_state: int + ) -> str: message = dict() message["c"] = 0x09 @@ -521,16 +519,19 @@ def __set_module_state(destination_id: int, module_state: int, # TODO: Use retry decorator here @retry(Exception) - def send_end_flash_data(self, module_type: str, module_id: int, - end_flash_data: bytearray) -> None: + def send_end_flash_data( + self, module_type: str, module_id: int, end_flash_data: bytearray + ) -> None: # Write end-flash data until success end_flash_success = False while not end_flash_success: # Erase page (send erase request and receive erase response) erase_page_success = self.send_firmware_command( - oper_type="erase", module_id=module_id, crc_val=0, - dest_addr=0x0801F800 + oper_type="erase", + module_id=module_id, + crc_val=0, + dest_addr=0x0801F800, ) # TODO: Remove magic number of dest_addr above, try using flash_mem if not erase_page_success: @@ -543,17 +544,25 @@ def send_end_flash_data(self, module_type: str, module_id: int, # CRC on current page (send CRC request and receive CRC response) crc_page_success = self.send_firmware_command( - oper_type="crc", module_id=module_id, crc_val=checksum, - dest_addr=0x0801F800 + oper_type="crc", + module_id=module_id, + crc_val=checksum, + dest_addr=0x0801F800, ) if not crc_page_success: continue end_flash_success = True - # print(f"End flash is written for {module_type} ({module_id})") - - def get_firmware_command(self, module_id: int, rot_stype: int, - rot_scmd: int, crc32: int, page_addr: int) -> str: + print(f"End flash is written for {module_type} ({module_id})") + + def get_firmware_command( + self, + module_id: int, + rot_stype: int, + rot_scmd: int, + crc32: int, + page_addr: int, + ) -> str: message = dict() message["c"] = 0x0D @@ -575,15 +584,16 @@ def get_firmware_command(self, module_id: int, rot_stype: int, crc32 >>= 8 crc32_and_page_addr_data[4 + i] = page_addr & 0xFF page_addr >>= 8 - message["b"] = b64encode( - bytes(crc32_and_page_addr_data) - ).decode("utf-8") + message["b"] = b64encode(bytes(crc32_and_page_addr_data)).decode( + "utf-8" + ) message["l"] = 8 return json.dumps(message, separators=(",", ":")) - def get_firmware_data(self, module_id: int, seq_num: int, - bin_data: bytes) -> str: + def get_firmware_data( + self, module_id: int, seq_num: int, bin_data: bytes + ) -> str: message = dict() message["c"] = 0x0B message["s"] = seq_num @@ -595,7 +605,7 @@ def get_firmware_data(self, module_id: int, seq_num: int, return json.dumps(message, separators=(",", ":")) def calc_crc32(self, data: bytes, crc: int) -> int: - crc ^= int.from_bytes(data, byteorder='little', signed=False) + crc ^= int.from_bytes(data, byteorder="little", signed=False) for _ in range(32): if crc & (1 << 31) != 0: @@ -611,9 +621,14 @@ def calc_crc64(self, data: bytes, checksum: int) -> int: checksum = self.calc_crc32(data[4:], checksum) return checksum - def send_firmware_command(self, oper_type: str, module_id: int, - crc_val: int, dest_addr: int, - page_addr: int = 0) -> bool: + def send_firmware_command( + self, + oper_type: str, + module_id: int, + crc_val: int, + dest_addr: int, + page_addr: int = 0, + ) -> bool: rot_scmd = 2 if oper_type == "erase" else 1 # Send firmware command request @@ -624,9 +639,12 @@ def send_firmware_command(self, oper_type: str, module_id: int, return self.receive_command_response() - def receive_command_response(self, response_delay: float = 0.001, - response_timeout: float = 5, - max_response_error_count: int = 75) -> bool: + def receive_command_response( + self, + response_delay: float = 0.001, + response_timeout: float = 5, + max_response_error_count: int = 75, + ) -> bool: # Receive firmware command response response_wait_time = 0 while not self.response_flag: @@ -649,8 +667,9 @@ def receive_command_response(self, response_delay: float = 0.001, self.response_flag = False return True - def send_firmware_data(self, module_id: int, seq_num: int, bin_data: bytes, - crc_val: int) -> int: + def send_firmware_data( + self, module_id: int, seq_num: int, bin_data: bytes, crc_val: int + ) -> int: # Send firmware data data_message = self.get_firmware_data( module_id, seq_num=seq_num, bin_data=bin_data @@ -685,7 +704,7 @@ def __handle_message(self): command = { 0x05: self.__assign_network_id, 0x0A: self.__update_warning, - 0x0C: self.__update_firmware_state + 0x0C: self.__update_firmware_state, }.get(ins) if command: diff --git a/modi_firmware_updater/gui_firmware_updater.py b/modi_firmware_updater/gui_firmware_updater.py index 0f6ac03..82b4ab2 100644 --- a/modi_firmware_updater/gui_firmware_updater.py +++ b/modi_firmware_updater/gui_firmware_updater.py @@ -1,23 +1,21 @@ +import logging import os +import pathlib import sys +import threading as th import time -import logging -import logging.handlers -import pathlib import traceback as tb -import threading as th -from PyQt5 import uic -from PyQt5 import QtWidgets, QtCore, QtGui +from PyQt5 import QtCore, QtGui, QtWidgets, uic from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot from PyQt5.QtWidgets import QDialog -from modi_firmware_updater.core.stm32_updater import STM32FirmwareUpdater from modi_firmware_updater.core.esp32_updater import ESP32FirmwareUpdater +from modi_firmware_updater.core.stm32_updater import STM32FirmwareUpdater class StdoutRedirect(QObject): - printOccur = pyqtSignal(str, str, name='print') + printOccur = pyqtSignal(str, str, name="print") def __init__(self): QObject.__init__(self, None) @@ -32,7 +30,7 @@ def stop(self): def start(self): sys.stdout.write = self.write - sys.stderr.write = lambda msg: self.write(msg, color='red') + sys.stderr.write = lambda msg: self.write(msg, color="red") def write(self, s, color="black"): sys.stdout.flush() @@ -43,37 +41,36 @@ def write(self, s, color="black"): @staticmethod def __is_redundant_line(line): return ( - line.startswith('\rUpdating') or - line.startswith('\rFirmware Upload: [') or - len(line) < 3 + line.startswith("\rUpdating") + or line.startswith("\rFirmware Upload: [") + or len(line) < 3 ) class PopupMessageBox(QtWidgets.QMessageBox): - def __init__(self, main_window, level): QtWidgets.QMessageBox.__init__(self) self.window = main_window self.setSizeGripEnabled(True) - self.setWindowTitle('System Message') + self.setWindowTitle("System Message") def error_popup(): self.setIcon(self.Icon.Warning) - self.setText('ERROR') + self.setText("ERROR") def warning_popup(): self.setIcon(self.Icon.Information) - self.setText('WARNING') - # restart_btn = self.addButton('Ok', self.ActionRole) + self.setText("WARNING") + self.addButton("Ok", self.ActionRole) # restart_btn.clicked.connect(self.restart_btn) func = { - 'error': error_popup, - 'warning': warning_popup, + "error": error_popup, + "warning": warning_popup, }.get(level) func() - close_btn = self.addButton('Exit', self.ActionRole) + close_btn = self.addButton("Exit", self.ActionRole) close_btn.clicked.connect(self.close_btn) # report_btn = self.addButton('Report Error', self.ActionRole) # report_btn.clicked.connect(self.report_btn) @@ -102,7 +99,7 @@ def event(self, e): textEdit.setMaximumWidth(MAXSIZE) textEdit.setSizePolicy( QtWidgets.QSizePolicy.Expanding, - QtWidgets.QSizePolicy.Expanding + QtWidgets.QSizePolicy.Expanding, ) return result @@ -136,97 +133,81 @@ class Form(QDialog): def __init__(self, installer=False): QDialog.__init__(self) self.logger = self.__init_logger() - # self.__excepthook = sys.excepthook - # sys.excepthook = self.__popup_excepthook - # th.excepthook = self.__popup_thread_excepthook + self.__excepthook = sys.excepthook + sys.excepthook = self.__popup_excepthook + th.excepthook = self.__popup_thread_excepthook self.err_list = list() self.is_popup = False if installer: - ui_path = os.path.join( - os.path.dirname(__file__), 'updater.ui' - ) - if sys.platform.startswith('win'): - self.component_path = ( - pathlib.PurePosixPath( - pathlib.PurePath(__file__), - '..' - ) + ui_path = os.path.join(os.path.dirname(__file__), "updater.ui") + if sys.platform.startswith("win"): + self.component_path = pathlib.PurePosixPath( + pathlib.PurePath(__file__), ".." ) else: - self.component_path = ( - os.path.dirname(__file__).replace( - 'util', '' - ) + self.component_path = os.path.dirname(__file__).replace( + "util", "" ) else: - ui_path = ( - os.path.join( - os.path.dirname(__file__), - 'assets', 'updater.ui' - ) + ui_path = os.path.join( + os.path.dirname(__file__), "assets", "updater.ui" ) - if sys.platform.startswith('win'): - self.component_path = ( - pathlib.PurePosixPath( - pathlib.PurePath(__file__), - '..', 'assets', 'component' - ) + if sys.platform.startswith("win"): + self.component_path = pathlib.PurePosixPath( + pathlib.PurePath(__file__), "..", "assets", "component" ) else: - self.component_path = ( - os.path.join( - os.path.dirname(__file__), - 'assets', 'component' - ) + self.component_path = os.path.join( + os.path.dirname(__file__), "assets", "component" ) self.ui = uic.loadUi(ui_path) - self.ui.setStyleSheet('background-color: white') + self.ui.setStyleSheet("background-color: white") self.ui.console.hide() self.ui.setFixedHeight(600) # Set LUXROBO logo image - logo_path = os.path.join(self.component_path, 'luxrobo_logo.png') + logo_path = os.path.join(self.component_path, "luxrobo_logo.png") qPixmapVar = QtGui.QPixmap() qPixmapVar.load(logo_path) self.ui.lux_logo.setPixmap(qPixmapVar) # Buttons image self.active_path = pathlib.PurePosixPath( - self.component_path, 'btn_frame_active.png' + self.component_path, "btn_frame_active.png" ) self.inactive_path = pathlib.PurePosixPath( - self.component_path, 'btn_frame_inactive.png' + self.component_path, "btn_frame_inactive.png" ) self.pressed_path = pathlib.PurePosixPath( - self.component_path, 'btn_frame_pressed.png' + self.component_path, "btn_frame_pressed.png" ) self.language_frame_path = pathlib.PurePosixPath( - self.component_path, 'lang_frame.png' + self.component_path, "lang_frame.png" ) self.language_frame_pressed_path = pathlib.PurePosixPath( - self.component_path, 'lang_frame_pressed.png' + self.component_path, "lang_frame_pressed.png" ) self.ui.update_network_esp32.setStyleSheet( - f'border-image: url({self.active_path}); font-size: 16px' + f"border-image: url({self.active_path}); font-size: 16px" ) self.ui.update_stm32_modules.setStyleSheet( - f'border-image: url({self.active_path}); font-size: 16px' + f"border-image: url({self.active_path}); font-size: 16px" ) self.ui.update_network_stm32.setStyleSheet( - f'border-image: url({self.active_path}); font-size: 16px' + f"border-image: url({self.active_path}); font-size: 16px" ) self.ui.translate_button.setStyleSheet( - f'border-image: url({self.language_frame_path}); font-size: 13px' + f"border-image: url({self.language_frame_path}); font-size: 13px" ) self.ui.devmode_button.setStyleSheet( - f'border-image: url({self.language_frame_path}); font-size: 13px' + f"border-image: url({self.language_frame_path}); font-size: 13px" ) - self.ui.console.setStyleSheet('font-size: 10px') + self.ui.console.setStyleSheet("font-size: 10px") - self.ui.setWindowTitle('MODI Firmware Updater') + self.ui.setWindowTitle("MODI Firmware Updater") # Redirect stdout to text browser (i.e. console in our UI) self.stdout = StdoutRedirect() @@ -262,8 +243,8 @@ def __init__(self, installer=False): self.ui.update_network_esp32.setDefault(False) # Print init status - time_now_str = time.strftime('[%Y/%m/%d@%X]', time.localtime()) - print(time_now_str + ' GUI MODI Firmware Updater has been started!') + time_now_str = time.strftime("[%Y/%m/%d@%X]", time.localtime()) + print(time_now_str + " GUI MODI Firmware Updater has been started!") # Set up field variables self.firmware_updater = None @@ -293,12 +274,10 @@ def update_network_esp32(self): if self.firmware_updater and self.firmware_updater.update_in_progress: return self.ui.update_network_esp32.setStyleSheet( - f'border-image: url({self.pressed_path}); font-size: 16px' + f"border-image: url({self.pressed_path}); font-size: 16px" ) self.ui.console.clear() - print( - 'ESP32 Firmware Updater has been initialized for esp update!' - ) + print("ESP32 Firmware Updater has been initialized for esp update!") th.Thread( target=self.__click_motion, args=(0, button_start), daemon=True ).start() @@ -312,12 +291,10 @@ def update_stm32_modules(self): if self.firmware_updater and self.firmware_updater.update_in_progress: return self.ui.update_stm32_modules.setStyleSheet( - f'border-image: url({self.pressed_path}); font-size: 16px' + f"border-image: url({self.pressed_path}); font-size: 16px" ) self.ui.console.clear() - print( - 'STM32 Firmware Updater has been initialized for module update!' - ) + print("STM32 Firmware Updater has been initialized for module update!") th.Thread( target=self.__click_motion, args=(1, button_start), daemon=True ).start() @@ -333,12 +310,10 @@ def update_network_stm32(self): if self.firmware_updater and self.firmware_updater.update_in_progress: return self.ui.update_network_stm32.setStyleSheet( - f'border-image: url({self.pressed_path}); font-size: 16px' + f"border-image: url({self.pressed_path}); font-size: 16px" ) self.ui.console.clear() - print( - 'STM32 Firmware Updater has been initialized for base update!' - ) + print("STM32 Firmware Updater has been initialized for base update!") th.Thread( target=self.__click_motion, args=(2, button_start), daemon=True ).start() @@ -347,15 +322,15 @@ def update_network_stm32(self): th.Thread( target=stm32_updater.update_module_firmware, args=(True,), - daemon=True + daemon=True, ).start() self.firmware_updater = stm32_updater def dev_mode_button(self): button_start = time.time() self.ui.devmode_button.setStyleSheet( - f'border-image: url({self.language_frame_pressed_path});' - 'font-size: 13px' + f"border-image: url({self.language_frame_pressed_path});" + "font-size: 13px" ) th.Thread( target=self.__click_motion, args=(3, button_start), daemon=True @@ -371,28 +346,29 @@ def dev_mode_button(self): def translate_button_text(self): button_start = time.time() self.ui.translate_button.setStyleSheet( - f'border-image: url({self.language_frame_pressed_path});' - 'font-size: 13px' + f"border-image: url({self.language_frame_pressed_path});" + "font-size: 13px" ) th.Thread( target=self.__click_motion, args=(4, button_start), daemon=True ).start() button_en = [ - 'Update Network ESP32', - 'Update STM32 Modules', - 'Update Network STM32', - 'Dev Mode', - '한국어', + "Update Network ESP32", + "Update STM32 Modules", + "Update Network STM32", + "Dev Mode", + "한국어", ] button_kr = [ - '네트워크 모듈 업데이트', - '모듈 초기화', - '네트워크 모듈 초기화', - '개발자 모드', - 'English', + "네트워크 모듈 업데이트", + "모듈 초기화", + "네트워크 모듈 초기화", + "개발자 모드", + "English", ] - appropriate_translation = \ + appropriate_translation = ( button_kr if self.button_in_english else button_en + ) self.button_in_english = not self.button_in_english self.ui.is_english = not self.ui.is_english for i, button in enumerate(self.buttons): @@ -403,13 +379,13 @@ def translate_button_text(self): # @staticmethod def __init_logger(): - logger = logging.getLogger('GUI MODI Firmware Updater Logger') + logger = logging.getLogger("GUI MODI Firmware Updater Logger") logger.setLevel(logging.DEBUG) formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) - file_handler = logging.FileHandler('gmfu.log') + file_handler = logging.FileHandler("gmfu.log") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) @@ -419,7 +395,7 @@ def __popup_excepthook(self, exctype, value, traceback): self.__excepthook(exctype, value, traceback) if self.is_popup: return - self.popup = PopupMessageBox(self.ui, level='error') + self.popup = PopupMessageBox(self.ui, level="error") self.popup.setInformativeText(str(value)) self.popup.setDetailedText(str(tb.extract_tb(traceback))) self.is_popup = True @@ -439,14 +415,14 @@ def __thread_error_hook(self, err_msg): @pyqtSlot(object) def _thread_signal_hook(self): - self.thread_popup = PopupMessageBox( - self.ui, level='warning' - ) + self.thread_popup = PopupMessageBox(self.ui, level="warning") if self.button_in_english: - text = ('Reconnect network module and ' - 'click the button again please.') + text = ( + "Reconnect network module and " + "click the button again please." + ) else: - text = '네트워크 모듈을 재연결 후 버튼을 다시 눌러주십시오.' + text = "네트워크 모듈을 재연결 후 버튼을 다시 눌러주십시오." self.thread_popup.setInformativeText(text) self.is_popup = True @@ -457,18 +433,18 @@ def __click_motion(self, button_type, start_time): if button_type in [3, 4]: self.buttons[button_type].setStyleSheet( - f'border-image: url({self.language_frame_path});' - 'font-size: 13px' + f"border-image: url({self.language_frame_path});" + "font-size: 13px" ) else: self.buttons[button_type].setStyleSheet( - f'border-image: url({self.active_path}); font-size: 16px' + f"border-image: url({self.active_path}); font-size: 16px" ) for i, q_button in enumerate(self.buttons): if i in [button_type, 3, 4]: continue q_button.setStyleSheet( - f'border-image: url({self.inactive_path}); font-size: 16px' + f"border-image: url({self.inactive_path}); font-size: 16px" ) q_button.setEnabled(False) @@ -497,7 +473,6 @@ def __append_text_line(self, line): @staticmethod def __is_update_progress_line(line): - return ( - line.startswith('\rUpdating') or - line.startswith('\rFirmware Upload: [') + return line.startswith("\rUpdating") or line.startswith( + "\rFirmware Upload: [" ) diff --git a/modi_firmware_updater/util/connection_util.py b/modi_firmware_updater/util/connection_util.py index b21fd79..7adcd9c 100644 --- a/modi_firmware_updater/util/connection_util.py +++ b/modi_firmware_updater/util/connection_util.py @@ -1,16 +1,15 @@ import os import time +from abc import ABC, abstractmethod +from typing import List, Optional + import serial import serial.tools.list_ports as stl - -from abc import ABC, abstractmethod from serial.serialutil import SerialException from serial.tools.list_ports_common import ListPortInfo -from typing import List, Optional class ConnTask(ABC): - def __init__(self, verbose=False): self._bus = None self.verbose = verbose @@ -50,27 +49,28 @@ def wait(func): """Wait decorator Make sure this is attached to inherited send method """ + def decorator(self, pkt: str) -> None: init_time = time.perf_counter() func(self, pkt) while time.perf_counter() - init_time < 0.04: pass + return decorator class SerTask(ConnTask): - def __init__(self, verbose=False, port=None): print("Initiating serial connection...") super().__init__(verbose) self.__port = port - self.__json_buffer = b'' + self.__json_buffer = b"" # # Inherited Methods # def open_conn(self) -> None: - """ Open serial port + """Open serial port :return: None """ @@ -80,8 +80,10 @@ def open_conn(self) -> None: if self.__port: if self.__port not in map(lambda info: info.device, modi_ports): - raise SerialException(f"{self.__port} is not connected " - f"to a MODI network module.") + raise SerialException( + f"{self.__port} is not connected " + f"to a MODI network module." + ) else: try: self._bus = self.__init_serial(self.__port) @@ -109,36 +111,36 @@ def __init_serial(port): return ser def close_conn(self) -> None: - """ Close serial port + """Close serial port :return: None """ self._bus.close() def recv(self) -> Optional[str]: - """ Read serial message and put message to serial read queue + """Read serial message and put message to serial read queue :return: str """ buf_temp = self._bus.read_all() self.__json_buffer += buf_temp - idx = self.__json_buffer.find(b'{') + idx = self.__json_buffer.find(b"{") if idx < 0: - self.__json_buffer = b'' + self.__json_buffer = b"" return None self.__json_buffer = self.__json_buffer[idx:] - idx = self.__json_buffer.find(b'}') + idx = self.__json_buffer.find(b"}") if idx < 0: return None - json_pkt = self.__json_buffer[:idx + 1].decode('utf8') - self.__json_buffer = self.__json_buffer[idx + 1:] + json_pkt = self.__json_buffer[: idx + 1].decode("utf8") + self.__json_buffer = self.__json_buffer[idx + 1 :] if self.verbose: - print(f'recv: {json_pkt}') + print(f"recv: {json_pkt}") return json_pkt @ConnTask.wait def send(self, pkt: str, verbose=False) -> None: - """ Send json pkt + """Send json pkt :param pkt: Json pkt to send :type pkt: str @@ -146,12 +148,12 @@ def send(self, pkt: str, verbose=False) -> None: :type verbose: bool :return: None """ - self._bus.write(pkt.encode('utf8')) + self._bus.write(pkt.encode("utf8")) if self.verbose or verbose: - print(f'send: {pkt}') + print(f"send: {pkt}") def send_nowait(self, pkt: str, verbose=False) -> None: - """ Send json pkt + """Send json pkt :param pkt: Json pkt to send :type pkt: str @@ -159,9 +161,9 @@ def send_nowait(self, pkt: str, verbose=False) -> None: :type verbose: bool :return: None """ - self._bus.write(pkt.encode('utf8')) + self._bus.write(pkt.encode("utf8")) if self.verbose or verbose: - print(f'send: {pkt}') + print(f"send: {pkt}") def list_modi_ports() -> List[ListPortInfo]: @@ -173,13 +175,14 @@ def list_modi_ports() -> List[ListPortInfo]: def __is_modi_port(port): return ( (port.manufacturer and port.manufacturer.upper() == "LUXROBO") - or port.product in ( + or port.product + in ( "MODI Network Module", "MODI Network Module(BootLoader)", "STM32 Virtual ComPort", "STMicroelectronics Virtual COM Port", ) - or (port.vid == 0x2fde and port.pid == 0x2) + or (port.vid == 0x2FDE and port.pid == 0x2) or (port.vid == 0x483 and port.pid == 0x5740) ) diff --git a/modi_firmware_updater/util/message_util.py b/modi_firmware_updater/util/message_util.py index 413028d..f63653d 100644 --- a/modi_firmware_updater/util/message_util.py +++ b/modi_firmware_updater/util/message_util.py @@ -1,17 +1,20 @@ import json -from base64 import b64encode, b64decode +from base64 import b64decode, b64encode from typing import Tuple -def parse_message(command: int, source: int, destination: int, - byte_data: Tuple = - (None, None, None, None, None, None, None, None)): +def parse_message( + command: int, + source: int, + destination: int, + byte_data: Tuple = (None, None, None, None, None, None, None, None), +): message = dict() - message['c'] = command - message['s'] = source - message['d'] = destination - message['b'] = __encode_bytes(byte_data) - message['l'] = len(byte_data) + message["c"] = command + message["s"] = source + message["d"] = destination + message["b"] = __encode_bytes(byte_data) + message["l"] = len(byte_data) return json.dumps(message, separators=(",", ":")) @@ -33,36 +36,38 @@ def __encode_bytes(byte_data: Tuple): idx += 1 elif byte_data[idx] > 256: length = __extract_length(idx, byte_data) - data[idx: idx + length] = int.to_bytes( - byte_data[idx], byteorder='little', length=length, signed=True + data[idx : idx + length] = int.to_bytes( + byte_data[idx], byteorder="little", length=length, signed=True ) idx += length elif byte_data[idx] < 0: - data[idx: idx + 4] = int.to_bytes( - int(byte_data[idx]), byteorder='little', length=4, signed=True + data[idx : idx + 4] = int.to_bytes( + int(byte_data[idx]), byteorder="little", length=4, signed=True ) idx += 4 elif byte_data[idx] < 256: data[idx] = int(byte_data[idx]) idx += 1 - return b64encode(bytes(data)).decode('utf8') + return b64encode(bytes(data)).decode("utf8") def decode_message(message: str): message = json.loads(message) - command = message['c'] - source = message['s'] - destination = message['d'] - data = message['b'] - length = message['l'] + command = message["c"] + source = message["s"] + destination = message["d"] + data = message["b"] + length = message["l"] return command, source, destination, data, length def unpack_data(data: str, structure: Tuple = (1, 1, 1, 1, 1, 1, 1, 1)): - data = bytearray(b64decode(data.encode('utf8'))) + data = bytearray(b64decode(data.encode("utf8"))) idx = 0 result = [] for size in structure: - result.append(int.from_bytes(data[idx:idx + size], byteorder='little')) + result.append( + int.from_bytes(data[idx : idx + size], byteorder="little") + ) idx += size return result diff --git a/modi_firmware_updater/util/module_util.py b/modi_firmware_updater/util/module_util.py index 3cc2fe7..5505cea 100644 --- a/modi_firmware_updater/util/module_util.py +++ b/modi_firmware_updater/util/module_util.py @@ -1,7 +1,6 @@ - import time -from typing import Union from os import path +from typing import Union from modi_firmware_updater.util.message_util import parse_message @@ -13,21 +12,20 @@ def get_module_type_from_uuid(uuid): type_indicator = str(hexadecimal)[:4] module_type = { # Input modules - '2000': 'env', - '2010': 'gyro', - '2020': 'mic', - '2030': 'button', - '2040': 'dial', - '2050': 'ultrasonic', - '2060': 'ir', - + "2000": "env", + "2010": "gyro", + "2020": "mic", + "2030": "button", + "2040": "dial", + "2050": "ultrasonic", + "2060": "ir", # Output modules - '4000': 'display', - '4010': 'motor', - '4020': 'led', - '4030': 'speaker', + "4000": "display", + "4010": "motor", + "4020": "led", + "4030": "speaker", }.get(type_indicator) - return 'network' if module_type is None else module_type + return "network" if module_type is None else module_type class Module: @@ -58,7 +56,7 @@ def __init__(self, id_, uuid, conn_task): self.module_type = str() self._properties = dict() - self._topology = {'r': 0, 't': 0, 'l': 0, 'b': 0} + self._topology = {"r": 0, "t": 0, "l": 0, "b": 0} # sampling_rate = (100 - property_sampling_frequency) * 11, in ms self.prop_samp_freq = 91 @@ -99,8 +97,8 @@ def has_user_code(self): @property def version(self): version_string = "" - version_string += str(self.__version >> 13) + '.' - version_string += str(self.__version % (2 ** 13) >> 8) + '.' + version_string += str(self.__version >> 13) + "." + version_string += str(self.__version % (2 ** 13) >> 8) + "." version_string += str(self.__version % (2 ** 8)) return version_string @@ -122,16 +120,13 @@ def uuid(self) -> int: @property def is_up_to_date(self): - root_path = ( - path.join( - path.dirname(__file__), - '..', 'assets', 'firmware', 'stm32' - ) + root_path = path.join( + path.dirname(__file__), "..", "assets", "firmware", "stm32" ) - version_path = path.join(root_path, 'version.txt') + version_path = path.join(root_path, "version.txt") with open(version_path) as version_file: - version_info = version_file.readline().lstrip('v').rstrip('\n') - version_digits = [int(digit) for digit in version_info.split('.')] + version_info = version_file.readline().lstrip("v").rstrip("\n") + version_digits = [int(digit) for digit in version_info.split(".")] latest_version = ( version_digits[0] << 13 | version_digits[1] << 8 @@ -140,7 +135,7 @@ def is_up_to_date(self): return latest_version <= self.__version def _get_property(self, property_type: int) -> float: - """ Get module property value and request + """Get module property value and request :param property_type: Type of the requested property :type property_type: int @@ -158,9 +153,10 @@ def _get_property(self, property_type: int) -> float: return self._properties[property_type].value - def update_property(self, property_type: int, - property_value: float) -> None: - """ Update property value and time + def update_property( + self, property_type: int, property_value: float + ) -> None: + """Update property value and time :param property_type: Type of the updated property :type property_type: int @@ -172,9 +168,10 @@ def update_property(self, property_type: int, self._properties[property_type].value = property_value self._properties[property_type].last_update_time = time.time() - def __request_property(self, destination_id: int, - property_type: int) -> None: - """ Generate message for request property + def __request_property( + self, destination_id: int, property_type: int + ) -> None: + """Generate message for request property :param destination_id: Id of the destination module :type destination_id: int @@ -184,7 +181,9 @@ def __request_property(self, destination_id: int, """ self._properties[property_type].last_update_time = time.time() req_prop_msg = parse_message( - 0x03, 0, destination_id, - (property_type, None, self.prop_samp_freq, None) + 0x03, + 0, + destination_id, + (property_type, None, self.prop_samp_freq, None), ) self._conn.send(req_prop_msg) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9216134 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,2 @@ +[tool.black] +line-length = 79 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b449694..23be2a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,6 @@ pyqt5 pyinstaller pytest mypy -flake8 \ No newline at end of file +flake8 +black +isort