From 262a7140e6e614832f042ec00916bd0b42519376 Mon Sep 17 00:00:00 2001 From: Alex R Date: Fri, 2 Apr 2021 21:18:43 +0000 Subject: [PATCH] OTA DFU with gatt-python lib (#9) * initial changes * working * remove deprecated code * Update README.md pexpect no longer needed --- README.md | 1 - src/ble_dfu.py | 219 ++++++++++++++++++ src/meson.build | 3 +- src/ota/README.md | 118 ---------- src/ota/ble_legacy_dfu_controller.py | 291 ------------------------ src/ota/ble_secure_dfu_controller.py | 323 --------------------------- src/ota/dfu.py | 188 ---------------- src/ota/nrf_ble_dfu_controller.py | 263 ---------------------- src/window.py | 70 +++--- 9 files changed, 259 insertions(+), 1217 deletions(-) create mode 100644 src/ble_dfu.py delete mode 100644 src/ota/README.md delete mode 100644 src/ota/ble_legacy_dfu_controller.py delete mode 100644 src/ota/ble_secure_dfu_controller.py delete mode 100755 src/ota/dfu.py delete mode 100644 src/ota/nrf_ble_dfu_controller.py diff --git a/README.md b/README.md index b18248c..480745d 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,6 @@ Dependancies sudo pacman -S meson python-pip base-devel bluez bluez-utils pip3 install gatt pip3 install dbus-python -pip3 install pexpect ``` Build/Install diff --git a/src/ble_dfu.py b/src/ble_dfu.py new file mode 100644 index 0000000..d98bb8e --- /dev/null +++ b/src/ble_dfu.py @@ -0,0 +1,219 @@ +from array import array +import gatt +import os +from .util import * +import math + +class InfiniTimeDFU(gatt.Device): + # Class constants + UUID_DFU_SERVICE = "00001530-1212-efde-1523-785feabcd123" + UUID_CTRL_POINT = "00001531-1212-efde-1523-785feabcd123" + UUID_PACKET = "00001532-1212-efde-1523-785feabcd123" + UUID_VERSION = "00001534-1212-efde-1523-785feabcd123" + + def __init__(self, mac_address, manager, window, firmware_path, datfile_path, verbose): + self.firmware_path = firmware_path + self.datfile_path = datfile_path + self.target_mac = mac_address + self.window = window + self.verbose = verbose + self.current_step = 0 + self.pkt_receipt_interval = 10 + self.pkt_payload_size = 20 + self.done = False + self.packet_recipt_count = 0 + + super().__init__(mac_address, manager) + + def input_setup(self): + """Bin: read binfile into bin_array""" + print( + "preparing " + + os.path.split(self.firmware_path)[1] + + " for " + + self.target_mac + ) + + if self.firmware_path == None: + raise Exception("input invalid") + + name, extent = os.path.splitext(self.firmware_path) + + if extent == ".bin": + self.bin_array = array("B", open(self.firmware_path, "rb").read()) + + self.image_size = len(self.bin_array) + print("Binary image size: %d" % self.image_size) + print( + "Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array)) + ) + return + raise Exception("input invalid") + + def connect_succeeded(self): + super().connect_succeeded() + print("[%s] Connected" % (self.mac_address)) + + def connect_failed(self, error): + super().connect_failed(error) + print("[%s] Connection failed: %s" % (self.mac_address, str(error))) + + def disconnect_succeeded(self): + super().disconnect_succeeded() + print("[%s] Disconnected" % (self.mac_address)) + + def characteristic_enable_notifications_succeeded(self, characteristic): + if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT: + print("Notification Enable succeeded for Control Point Characteristic") + self.step_one() + + def characteristic_write_value_succeeded(self, characteristic): + if self.verbose and characteristic.uuid == self.UUID_CTRL_POINT: + print( + "Characteristic value was written successfully for Control Point Characteristic" + ) + if self.verbose and characteristic.uuid == self.UUID_PACKET: + print( + "Characteristic value was written successfully for Packet Characteristic" + ) + if self.current_step == 1: + self.step_two() + + if self.current_step == 3: + self.step_four() + + if self.current_step == 5: + self.step_six() + + if self.current_step == 6: + print("Begin DFU") + self.step_seven() + + def characteristic_value_updated(self, characteristic, value): + if self.verbose: + if characteristic.uuid == self.UUID_CTRL_POINT: + print( + "Characteristic value was updated for Control Point Characteristic" + ) + if characteristic.uuid == self.UUID_PACKET: + print("Characteristic value was updated for Packet Characteristic") + print("New value is:", value) + + if array_to_hex_string(value)[2:-2] == "01": + self.step_three() + + if array_to_hex_string(value)[2:-2] == "02": + self.step_five() + + if array_to_hex_string(value)[0:2] == "11": + self.packet_recipt_count += 1 + print("[INFO ] receipt count", str(self.packet_recipt_count)) + if self.done != True: + self.i += self.pkt_payload_size + self.step_seven() + + if array_to_hex_string(value)[2:-2] == "04": + self.step_nine() + + def services_resolved(self): + super().services_resolved() + + print("[%s] Resolved services" % (self.mac_address)) + ble_dfu_serv = next(s for s in self.services if s.uuid == self.UUID_DFU_SERVICE) + self.ctrl_point_char = next( + c for c in ble_dfu_serv.characteristics if c.uuid == self.UUID_CTRL_POINT + ) + self.packet_char = next( + c for c in ble_dfu_serv.characteristics if c.uuid == self.UUID_PACKET + ) + + if self.verbose: + print("[INFO ] Enabling notifications for Control Point Characteristic") + self.ctrl_point_char.enable_notifications() + + def step_one(self): + self.current_step = 1 + if self.verbose: + print( + "[INFO ] Sending ('Start DFU' (0x01), 'Application' (0x04)) to DFU Control Point" + ) + self.ctrl_point_char.write_value(bytearray.fromhex("01 04")) + + def step_two(self): + self.current_step = 2 + if self.verbose: + print("[INFO ] Sending Image size to the DFU Packet characteristic") + x = len(self.bin_array) + hex_size_array_lsb = uint32_to_bytes_le(x) + zero_pad_array_le(hex_size_array_lsb, 8) + self.packet_char.write_value(bytearray(hex_size_array_lsb)) + print("[INFO ] Waiting for Image Size notification") + + def step_three(self): + self.current_step = 3 + if self.verbose: + print("[INFO ] Sending 'INIT DFU' + Init Packet Command") + self.ctrl_point_char.write_value(bytearray.fromhex("02 00")) + + def step_four(self): + self.current_step = 4 + if self.verbose: + print("[INFO ] Sending the Init image (DAT)") + self.packet_char.write_value(bytearray(self.get_init_bin_array())) + if self.verbose: + print("[INFO ] Send 'INIT DFU' + Init Packet Complete Command") + self.ctrl_point_char.write_value(bytearray.fromhex("02 01")) + print("[INFO ] Waiting for INIT DFU notification") + + def step_five(self): + self.current_step = 5 + if self.verbose: + print("Setting pkt receipt notification interval") + self.ctrl_point_char.write_value(bytearray.fromhex("08 0A")) + + def step_six(self): + self.current_step = 6 + if self.verbose: + print( + "[INFO ] Send 'RECEIVE FIRMWARE IMAGE' command to set DFU in firmware receive state" + ) + self.ctrl_point_char.write_value(bytearray.fromhex("03")) + self.segment_count = 0 + self.i = 0 + self.segment_total = int( + math.ceil(self.image_size / float(self.pkt_payload_size)) + ) + + def step_seven(self): + self.current_step = 7 + # Send bin_array contents as as series of packets (burst mode). + # Each segment is pkt_payload_size bytes long. + # For every pkt_receipt_interval sends, wait for notification. + segment = self.bin_array[self.i : self.i + self.pkt_payload_size] + self.packet_char.write_value(segment) + self.segment_count += 1 + if self.segment_count == self.segment_total: + self.done = True + self.step_eight() + elif (self.segment_count % self.pkt_receipt_interval) != 0: + self.i += self.pkt_payload_size + self.step_seven() + else: + if self.verbose: + print("[INFO ] Waiting for Packet Reciept Notifiation") + + def step_eight(self): + self.current_step = 8 + print("[INFO ] Sending Validate command") + self.ctrl_point_char.write_value(bytearray.fromhex("04")) + + def step_nine(self): + self.current_step = 9 + print("[INFO ] Activate and reset") + self.ctrl_point_char.write_value(bytearray.fromhex("05")) + self.window.show_complete() + + def get_init_bin_array(self): + # Open the DAT file and create array of its contents + init_bin_array = array("B", open(self.datfile_path, "rb").read()) + return init_bin_array diff --git a/src/meson.build b/src/meson.build index ca50aa2..e6c4d02 100644 --- a/src/meson.build +++ b/src/meson.build @@ -30,9 +30,8 @@ siglo_sources = [ 'main.py', 'window.py', 'bluetooth.py', - 'ota/ble_legacy_dfu_controller.py', + 'ble_dfu.py', 'ota/util.py', - 'ota/nrf_ble_dfu_controller.py', 'ota/unpacker.py', ] diff --git a/src/ota/README.md b/src/ota/README.md deleted file mode 100644 index 3397db4..0000000 --- a/src/ota/README.md +++ /dev/null @@ -1,118 +0,0 @@ -# Python nRF5 OTA DFU Controller - -So... this is my fork of dingara's fork of astronomer80's fork of -foldedtoad's Python OTA DFU utility. - -My own contribution is little more than a brute force conversion to -python3. It is sparsely tested so there are likely to be a few -remaining bytes versus string bugs remaining in the places I didn't test -. I used it primarily as part of -[wasp-os](https://github.com/daniel-thompson/wasp-os) as a way to -deliver OTA updates to nRF52-based smart watches, especially the -[Pine64 PineTime](https://www.pine64.org/pinetime/). - -## What does it do? - -This is a Python program that uses `gatttool` (provided with the Linux BlueZ driver) to achieve Over The Air (OTA) Device Firmware Updates (DFU) to a Nordic Semiconductor nRF5 (either nRF51 or nRF52) device via Bluetooth Low Energy (BLE). - -### Main features: - -* Perform OTA DFU to an nRF5 peripheral without an external USB BLE dongle. -* Ability to detect if the peripheral is running in application mode or bootloader, and automatically switch if needed (buttonless). -* Support for both Legacy (SDK <= 11) and Secure (SDK >= 12) bootloader. - -Before using this utility the nRF5 peripheral device needs to be programmed with a DFU bootloader (see Nordic Semiconductor documentation/examples for instructions on that). - -## Prerequisites - -* BlueZ 5.4 or above -* Python 3.6 -* Python `pexpect` module (available via pip) -* Python `intelhex` module (available via pip) - -## Firmware Build Requirement - -* Your nRF5 peripheral firmware build method will produce a firmware file ending with either `*.hex` or `*.bin`. -* Your nRF5 firmware build method will produce an Init file ending with `.dat`. -* The typical naming convention is `application.bin` and `application.dat`, but this utility will accept other names. - -## Generating init files - -### Legacy bootloader - -Use the `gen_dat` application (you need to compile it with `gcc gen_dat.c -o gen_dat` on first run) to generate a `.dat` file from your `.bin` file. Example: - - ./gen_dat application.bin application.dat - -Note: The `gen_dat` utility expects a `.bin` file input, so you'll get Cyclic Redundancy Check (CRC) errors during DFU using a `.dat` file generated from a `.hex` file. - -An alternative is to use `nrfutil` from Nordic Semiconductor, but I've found this method to be easier. You may need to edit the `gen_dat` source to fit your specific application. - -### Secure bootloader - -You need to use `nrfutil` to generate firmware packages for the new secure bootloader (SDK > 12) as the package needs to be signed with a private/public key pair. Note that the bootloader will need to be programmed with the corresponding public key. See the [nrfutil repo](https://github.com/NordicSemiconductor/pc-nrfutil) for details. - -Note: I've had problems with the pip version of `nrfutil`. I recommend [installing from source](https://github.com/NordicSemiconductor/pc-nrfutil#running-and-installing-from-source) instead. - -## Usage - -There are two ways to specify firmware files for this utility. Either by specifying both the `.hex` or `.bin` file with the `.dat` file, or more easily by the `.zip` file, which contains both the hex and dat files. - -The new `.zip` file form is encouraged by Nordic, but the older hex/bin + dat file methods should still work. - -## Usage Examples - - > sudo ./dfu.py -f ~/application.hex -d ~/application.dat -a CD:E3:4A:47:1C:E4 - -or: - - > sudo ./dfu.py -z ~/application.zip -a CD:E3:4A:47:1C:E4 - -You can use the `hcitool lescan` to figure out the address of a DFU target, for example: - - $ sudo hcitool -i hci0 lescan - LE Scan ... - CD:E3:4A:47:1C:E4 - CD:E3:4A:47:1C:E4 (unknown) - - -## Example Output - - ================================ - == == - == DFU Server == - == == - ================================ - - Sending file application.bin to CD:E3:4A:47:1C:E4 - bin array size: 60788 - Checking DFU State... - Board needs to switch in DFU mode - Switching to DFU mode - Enable Notifications in DFU mode - Sending hex file size - Waiting for Image Size notification - Waiting for INIT DFU notification - Begin DFU - Progress: |xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx| 100.0% Complete (60788 of 60788 bytes) - - Upload complete in 0 minutes and 14 seconds - segments sent: 3040 - Waiting for DFU complete notification - Waiting for Firmware Validation notification - Activate and reset - DFU Server done - -## TODO: - -* Implement link-loss procedure for Legacy Controller. -* Update example output in readme. -* Add makefile examples. -* More code cleanup. - -## Info & References - -* [Nordic Legacy DFU Service](http://infocenter.nordicsemi.com/topic/com.nordic.infocenter.sdk5.v11.0.0/bledfu_transport_bleservice.html?cp=4_0_3_4_3_1_4_1) -* [Nordic Legacy DFU sequence diagrams](http://infocenter.nordicsemi.com/topic/com.nordic.infocenter.sdk5.v11.0.0/bledfu_transport_bleprofile.html?cp=4_0_3_4_3_1_4_0_1_6#ota_profile_pkt_rcpt_notif) -* [Nordic Secure DFU bootloader](http://infocenter.nordicsemi.com/topic/com.nordic.infocenter.sdk5.v12.2.0/lib_dfu_transport_ble.html?cp=4_0_1_3_5_2_2) -* [nrfutil](https://github.com/NordicSemiconductor/pc-nrfutil) diff --git a/src/ota/ble_legacy_dfu_controller.py b/src/ota/ble_legacy_dfu_controller.py deleted file mode 100644 index 756b771..0000000 --- a/src/ota/ble_legacy_dfu_controller.py +++ /dev/null @@ -1,291 +0,0 @@ -import math -import pexpect -import time - -from array import array -from .util import * - -from .nrf_ble_dfu_controller import NrfBleDfuController - -verbose = False - -class Procedures: - START_DFU = 1 - INITIALIZE_DFU = 2 - RECEIVE_FIRMWARE_IMAGE = 3 - VALIDATE_FIRMWARE = 4 - ACTIVATE_IMAGE_AND_RESET = 5 - RESET_SYSTEM = 6 - REPORT_RECEIVED_IMAGE_SIZE = 7 - PRN_REQUEST = 8 - RESPONSE = 16 - PACKET_RECEIPT_NOTIFICATION = 17 - - string_map = { - START_DFU : "START_DFU", - INITIALIZE_DFU : "INITIALIZE_DFU", - RECEIVE_FIRMWARE_IMAGE : "RECEIVE_FIRMWARE_IMAGE", - VALIDATE_FIRMWARE : "VALIDATE_FIRMWARE", - ACTIVATE_IMAGE_AND_RESET : "ACTIVATE_IMAGE_AND_RESET", - RESET_SYSTEM : "RESET_SYSTEM", - REPORT_RECEIVED_IMAGE_SIZE : "REPORT_RECEIVED_IMAGE_SIZE", - PRN_REQUEST : "PACKET_RECEIPT_NOTIFICATION_REQUEST", - RESPONSE : "RESPONSE", - PACKET_RECEIPT_NOTIFICATION : "PACKET_RECEIPT_NOTIFICATION", - } - - @staticmethod - def to_string(proc): - return Procedures.string_map[proc] - - @staticmethod - def from_string(proc_str): - return int(proc_str, 16) - -class Responses: - SUCCESS = 1 - INVALID_STATE = 2 - NOT_SUPPORTED = 3 - DATA_SIZE_EXCEEDS_LIMITS = 4 - CRC_ERROR = 5 - OPERATION_FAILED = 6 - - string_map = { - SUCCESS : "SUCCESS", - INVALID_STATE : "INVALID_STATE", - NOT_SUPPORTED : "NOT_SUPPORTED", - DATA_SIZE_EXCEEDS_LIMITS : "DATA_SIZE_EXCEEDS_LIMITS", - CRC_ERROR : "CRC_ERROR", - OPERATION_FAILED : "OPERATION_FAILED", - } - - @staticmethod - def to_string(res): - return Responses.string_map[res] - - @staticmethod - def from_string(res_str): - return int(res_str, 16) - - -class BleDfuControllerLegacy(NrfBleDfuController): - # Class constants - UUID_CONTROL_POINT = "00001531-1212-efde-1523-785feabcd123" - UUID_PACKET = "00001532-1212-efde-1523-785feabcd123" - UUID_VERSION = "00001534-1212-efde-1523-785feabcd123" - - # Constructor inherited from abstract base class - - # -------------------------------------------------------------------------- - # Start the firmware update process - # -------------------------------------------------------------------------- - def start(self, verbose=False): - (_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT) - (_, self.data_handle, _) = self._get_handles(self.UUID_PACKET) - - self.pkt_receipt_interval = 10 - - if verbose: - print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle)) - print('Packet handle: 0x%04x' % (self.data_handle)) - - # Subscribe to notifications from Control Point characteristic - if verbose: print("Enabling notifications") - self._enable_notifications(self.ctrlpt_cccd_handle) - - # Send 'START DFU' + Application Command - if verbose: print("Sending START_DFU") - self._dfu_send_command(Procedures.START_DFU, [0x04]) - - # Transmit binary image size - # Need to pad the byte array with eight zero bytes - # (because that's what the bootloader is expecting...) - hex_size_array_lsb = uint32_to_bytes_le(len(self.bin_array)) - zero_pad_array_le(hex_size_array_lsb, 8) - self._dfu_send_data(hex_size_array_lsb) - - # Wait for response to Image Size - print("Waiting for Image Size notification") - self._wait_and_parse_notify() - - # Send 'INIT DFU' + Init Packet Command - self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x00]) - - # Transmit the Init image (DAT). - self._dfu_send_init() - - # Send 'INIT DFU' + Init Packet Complete Command - self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x01]) - - print("Waiting for INIT DFU notification") - # Wait for INIT DFU notification (indicates flash erase completed) - self._wait_and_parse_notify() - - # Set the Packet Receipt Notification interval - if verbose: print("Setting pkt receipt notification interval") - prn = uint16_to_bytes_le(self.pkt_receipt_interval) - self._dfu_send_command(Procedures.PRN_REQUEST, prn) - - # Send 'RECEIVE FIRMWARE IMAGE' command to set DFU in firmware receive state. - self._dfu_send_command(Procedures.RECEIVE_FIRMWARE_IMAGE) - - # Send bin_array contents as as series of packets (burst mode). - # Each segment is pkt_payload_size bytes long. - # For every pkt_receipt_interval sends, wait for notification. - segment_count = 0 - segment_total = int(math.ceil(self.image_size/float(self.pkt_payload_size))) - time_start = time.time() - last_send_time = time.time() - print("Begin DFU") - for i in range(0, self.image_size, self.pkt_payload_size): - segment = self.bin_array[i:i + self.pkt_payload_size] - self._dfu_send_data(segment) - segment_count += 1 - - # print "segment #{} of {}, dt = {}".format(segment_count, segment_total, time.time() - last_send_time) - # last_send_time = time.time() - - if (segment_count == segment_total): - print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) - - duration = time.time() - time_start - print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60))) - if verbose: print("segments sent: {}".format(segment_count)) - - print("Waiting for DFU complete notification") - # Wait for DFU complete notification - self._wait_and_parse_notify() - - elif (segment_count % self.pkt_receipt_interval) == 0: - (proc, res, pkts) = self._wait_and_parse_notify() - - # TODO: Check pkts == segment_count * pkt_payload_size - - if res != Responses.SUCCESS: - raise Exception("bad notification status: {}".format(Responses.to_string(res))) - - print_progress(pkts, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) - - # Send Validate Command - self._dfu_send_command(Procedures.VALIDATE_FIRMWARE) - - print("Waiting for Firmware Validation notification") - # Wait for Firmware Validation notification - self._wait_and_parse_notify() - - # Wait a bit for copy on the peer to be finished - time.sleep(1) - - # Send Activate and Reset Command - print("Activate and reset") - self._dfu_send_command(Procedures.ACTIVATE_IMAGE_AND_RESET) - - # -------------------------------------------------------------------------- - # Check if the peripheral is running in bootloader (DFU) or application mode - # Returns True if the peripheral is in DFU mode - # -------------------------------------------------------------------------- - def check_DFU_mode(self): - if verbose: print("Checking DFU State...") - - cmd = 'char-read-uuid %s' % self.UUID_VERSION - - if verbose: print(cmd) - - self.ble_conn.sendline(cmd) - - # Skip two rows - try: - res = self.ble_conn.expect('handle:.*', timeout=10) - # res = self.ble_conn.expect('handle:', timeout=10) - except pexpect.TIMEOUT as e: - print("State timeout") - except: - pass - - return self.ble_conn.after.find(b'value: 08 00')!=-1 - - def switch_to_dfu_mode(self): - (_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT) - - # Enable notifications - cmd = 'char-write-req 0x%02x %02x' % (bl_cccd_handle, 1) - if verbose: print(cmd) - self.ble_conn.sendline(cmd) - - # Reset the board in DFU mode. After reset the board will be disconnected - cmd = 'char-write-req 0x%02x 0104' % (bl_value_handle) - if verbose: print(cmd) - self.ble_conn.sendline(cmd) - - time.sleep(0.5) - - #print "Send 'START DFU' + Application Command" - #self._dfu_state_set(0x0104) - - # Reconnect the board. - #ret = self.scan_and_connect() - #if verbose: print("Connected " + str(ret)) - - #return ret - return 1 - - - # -------------------------------------------------------------------------- - # Parse notification status results - # -------------------------------------------------------------------------- - def _dfu_parse_notify(self, notify): - if len(notify) < 3: - print("notify data length error") - return None - - if verbose: print(notify) - - dfu_notify_opcode = Procedures.from_string(notify[0]) - - if dfu_notify_opcode == Procedures.RESPONSE: - - dfu_procedure = Procedures.from_string(notify[1]) - dfu_response = Responses.from_string(notify[2]) - - procedure_str = Procedures.to_string(dfu_procedure) - response_str = Responses.to_string(dfu_response) - - if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, response_str)) - - return (dfu_procedure, dfu_response) - - if dfu_notify_opcode == Procedures.PACKET_RECEIPT_NOTIFICATION: - receipt = bytes_to_uint32_le(notify[1:5]) - return (dfu_notify_opcode, Responses.SUCCESS, receipt) - - # -------------------------------------------------------------------------- - # Wait for a notification and parse the response - # -------------------------------------------------------------------------- - def _wait_and_parse_notify(self): - if verbose: print("Waiting for notification") - notify = self._dfu_wait_for_notify() - - if notify is None: - raise Exception("No notification received") - - if verbose: print("Parsing notification") - - result = self._dfu_parse_notify(notify) - if result[1] != Responses.SUCCESS: - raise Exception("Error in {} procedure, reason: {}".format( - Procedures.to_string(result[0]), - Responses.to_string(result[1]))) - - return result - - #-------------------------------------------------------------------------- - # Send the Init info (*.dat file contents) to peripheral device. - #-------------------------------------------------------------------------- - def _dfu_send_init(self): - if verbose: print("dfu_send_init") - - # Open the DAT file and create array of its contents - init_bin_array = array('B', open(self.datfile_path, 'rb').read()) - - # Transmit Init info - self._dfu_send_data(init_bin_array) diff --git a/src/ota/ble_secure_dfu_controller.py b/src/ota/ble_secure_dfu_controller.py deleted file mode 100644 index 7065458..0000000 --- a/src/ota/ble_secure_dfu_controller.py +++ /dev/null @@ -1,323 +0,0 @@ -import math -import pexpect -import time - -from array import array -from util import * - -from nrf_ble_dfu_controller import NrfBleDfuController - -verbose = False - -class Procedures: - CREATE = 0x01 - SET_PRN = 0x02 - CALC_CHECKSUM = 0x03 - EXECUTE = 0x04 - SELECT = 0x06 - RESPONSE = 0x60 - - PARAM_COMMAND = 0x01 - PARAM_DATA = 0x02 - - string_map = { - CREATE : "CREATE", - SET_PRN : "SET_PRN", - CALC_CHECKSUM : "CALC_CHECKSUM", - EXECUTE : "EXECUTE", - SELECT : "SELECT", - RESPONSE : "RESPONSE", - } - - @staticmethod - def to_string(proc): - return Procedures.string_map[proc] - - @staticmethod - def from_string(proc_str): - return int(proc_str, 16) - -class Results: - INVALID_CODE = 0x00 - SUCCESS = 0x01 - OPCODE_NOT_SUPPORTED = 0x02 - INVALID_PARAMETER = 0x03 - INSUFF_RESOURCES = 0x04 - INVALID_OBJECT = 0x05 - UNSUPPORTED_TYPE = 0x07 - OPERATION_NOT_PERMITTED = 0x08 - OPERATION_FAILED = 0x0A - - string_map = { - INVALID_CODE : "INVALID_CODE", - SUCCESS : "SUCCESS", - OPCODE_NOT_SUPPORTED : "OPCODE_NOT_SUPPORTED", - INVALID_PARAMETER : "INVALID_PARAMETER", - INSUFF_RESOURCES : "INSUFFICIENT_RESOURCES", - INVALID_OBJECT : "INVALID_OBJECT", - UNSUPPORTED_TYPE : "UNSUPPORTED_TYPE", - OPERATION_NOT_PERMITTED : "OPERATION_NOT_PERMITTED", - OPERATION_FAILED : "OPERATION_FAILED", - } - - @staticmethod - def to_string(res): - return Results.string_map[res] - - @staticmethod - def from_string(res_str): - return int(res_str, 16) - - -class BleDfuControllerSecure(NrfBleDfuController): - # Class constants - UUID_BUTTONLESS = '8e400001-f315-4f60-9fb8-838830daea50' - UUID_CONTROL_POINT = '8ec90001-f315-4f60-9fb8-838830daea50' - UUID_PACKET = '8ec90002-f315-4f60-9fb8-838830daea50' - - # Constructor inherited from abstract base class - - # -------------------------------------------------------------------------- - # Start the firmware update process - # -------------------------------------------------------------------------- - def start(self): - (_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT) - (_, self.data_handle, _) = self._get_handles(self.UUID_PACKET) - - if verbose: - print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle)) - print('Packet handle: 0x%04x' % (self.data_handle)) - - # Subscribe to notifications from Control Point characteristic - self._enable_notifications(self.ctrlpt_cccd_handle) - - # Set the Packet Receipt Notification interval - prn = uint16_to_bytes_le(self.pkt_receipt_interval) - self._dfu_send_command(Procedures.SET_PRN, prn) - - self._dfu_send_init() - - self._dfu_send_image() - - # -------------------------------------------------------------------------- - # Check if the peripheral is running in bootloader (DFU) or application mode - # Returns True if the peripheral is in DFU mode - # -------------------------------------------------------------------------- - def check_DFU_mode(self): - print("Checking DFU State...") - - self.ble_conn.sendline('characteristics') - - dfu_mode = False - - try: - self.ble_conn.expect([self.UUID_BUTTONLESS], timeout=2) - except pexpect.TIMEOUT as e: - dfu_mode = True - - return dfu_mode - - def switch_to_dfu_mode(self): - (_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_BUTTONLESS) - - self._enable_notifications(bl_cccd_handle) - - # Reset the board in DFU mode. After reset the board will be disconnected - cmd = 'char-write-req 0x%04x 01' % (bl_value_handle) - self.ble_conn.sendline(cmd) - - # Wait some time for board to reboot - time.sleep(0.5) - - # Increase the mac address by one and reconnect - self.target_mac_increase(1) - return self.scan_and_connect() - - # -------------------------------------------------------------------------- - # Parse notification status results - # -------------------------------------------------------------------------- - def _dfu_parse_notify(self, notify): - if len(notify) < 3: - print("notify data length error") - return None - - if verbose: print(notify) - - dfu_notify_opcode = Procedures.from_string(notify[0]) - if dfu_notify_opcode == Procedures.RESPONSE: - - dfu_procedure = Procedures.from_string(notify[1]) - dfu_result = Results.from_string(notify[2]) - - procedure_str = Procedures.to_string(dfu_procedure) - result_str = Results.to_string(dfu_result) - - # if verbose: print "opcode: {0}, proc: {1}, res: {2}".format(dfu_notify_opcode, procedure_str, result_str) - if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, result_str)) - - # Packet Receipt notifications are sent in the exact same format - # as responses to the CALC_CHECKSUM procedure. - if(dfu_procedure == Procedures.CALC_CHECKSUM and dfu_result == Results.SUCCESS): - offset = bytes_to_uint32_le(notify[3:7]) - crc32 = bytes_to_uint32_le(notify[7:11]) - - return (dfu_procedure, dfu_result, offset, crc32) - - elif(dfu_procedure == Procedures.SELECT and dfu_result == Results.SUCCESS): - max_size = bytes_to_uint32_le(notify[3:7]) - offset = bytes_to_uint32_le(notify[7:11]) - crc32 = bytes_to_uint32_le(notify[11:15]) - - return (dfu_procedure, dfu_result, max_size, offset, crc32) - - else: - return (dfu_procedure, dfu_result) - - # -------------------------------------------------------------------------- - # Wait for a notification and parse the response - # -------------------------------------------------------------------------- - def _wait_and_parse_notify(self): - if verbose: print("Waiting for notification") - notify = self._dfu_wait_for_notify() - - if notify is None: - raise Exception("No notification received") - - if verbose: print("Parsing notification") - - result = self._dfu_parse_notify(notify) - if result[1] != Results.SUCCESS: - raise Exception("Error in {} procedure, reason: {}".format( - Procedures.to_string(result[0]), - Results.to_string(result[1]))) - - return result - - # -------------------------------------------------------------------------- - # Send the Init info (*.dat file contents) to peripheral device. - # -------------------------------------------------------------------------- - def _dfu_send_init(self): - if verbose: print("dfu_send_init") - - # Open the DAT file and create array of its contents - init_bin_array = array('B', open(self.datfile_path, 'rb').read()) - init_size = len(init_bin_array) - init_crc = 0; - - # Select command - self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_COMMAND]); - (proc, res, max_size, offset, crc32) = self._wait_and_parse_notify() - - if offset != init_size or crc32 != init_crc: - if offset == 0 or offset > init_size: - # Create command - self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_COMMAND] + uint32_to_bytes_le(init_size)) - res = self._wait_and_parse_notify() - - segment_count = 0 - segment_total = int(math.ceil(init_size/float(self.pkt_payload_size))) - - for i in range(0, init_size, self.pkt_payload_size): - segment = init_bin_array[i:i + self.pkt_payload_size] - self._dfu_send_data(segment) - segment_count += 1 - - if (segment_count % self.pkt_receipt_interval) == 0: - (proc, res, offset, crc32) = self._wait_and_parse_notify() - - if res != Results.SUCCESS: - raise Exception("bad notification status: {}".format(Results.to_string(res))) - - # Calculate CRC - self._dfu_send_command(Procedures.CALC_CHECKSUM) - self._wait_and_parse_notify() - - # Execute command - self._dfu_send_command(Procedures.EXECUTE) - self._wait_and_parse_notify() - - print("Init packet successfully transfered") - - # -------------------------------------------------------------------------- - # Send the Firmware image to peripheral device. - # -------------------------------------------------------------------------- - def _dfu_send_image(self): - if verbose: print("dfu_send_image") - - # Select Data Object - self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_DATA]) - (proc, res, max_size, offset, crc32) = self._wait_and_parse_notify() - - # Split the firmware into multiple objects - num_objects = int(math.ceil(self.image_size / float(max_size))) - print("Max object size: %d, num objects: %d, offset: %d, total size: %d" % (max_size, num_objects, offset, self.image_size)) - - time_start = time.time() - last_send_time = time.time() - - obj_offset = (offset/max_size)*max_size - while(obj_offset < self.image_size): - # print "\nSending object {} of {}".format(obj_offset/max_size+1, num_objects) - obj_offset += self._dfu_send_object(obj_offset, max_size) - - # Image uploaded successfully, update the progress bar - print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) - - duration = time.time() - time_start - print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60))) - - # -------------------------------------------------------------------------- - # Send a single data object of given size and offset. - # -------------------------------------------------------------------------- - def _dfu_send_object(self, offset, obj_max_size): - if offset != self.image_size: - if offset == 0 or offset >= obj_max_size or crc32 != crc32_unsigned(self.bin_array[0:offset]): - # Create Data Object - size = min(obj_max_size, self.image_size - offset) - self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_DATA] + uint32_to_bytes_le(size)) - self._wait_and_parse_notify() - - segment_count = 0 - segment_total = int(math.ceil(min(obj_max_size, self.image_size-offset)/float(self.pkt_payload_size))) - - segment_begin = offset - segment_end = min(offset+obj_max_size, self.image_size) - - for i in range(segment_begin, segment_end, self.pkt_payload_size): - num_bytes = min(self.pkt_payload_size, segment_end - i) - segment = self.bin_array[i:i + num_bytes] - self._dfu_send_data(segment) - segment_count += 1 - - # print "j: {} i: {}, end: {}, bytes: {}, size: {} segment #{} of {}".format( - # offset, i, segment_end, num_bytes, self.image_size, segment_count, segment_total) - - if (segment_count % self.pkt_receipt_interval) == 0: - try: - (proc, res, offset, crc32) = self._wait_and_parse_notify() - except e: - # Likely no notification received, need to re-transmit object - return 0 - - if res != Results.SUCCESS: - raise Exception("bad notification status: {}".format(Results.to_string(res))) - - if crc32 != crc32_unsigned(self.bin_array[0:offset]): - # Something went wrong, need to re-transmit this object - return 0 - - print_progress(offset, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) - - # Calculate CRC - self._dfu_send_command(Procedures.CALC_CHECKSUM) - (proc, res, offset, crc32) = self._wait_and_parse_notify() - if(crc32 != crc32_unsigned(self.bin_array[0:offset])): - # Need to re-transmit object - return 0 - - # Execute command - self._dfu_send_command(Procedures.EXECUTE) - self._wait_and_parse_notify() - - # If everything executed correctly, return amount of bytes transfered - return obj_max_size diff --git a/src/ota/dfu.py b/src/ota/dfu.py deleted file mode 100755 index 207962d..0000000 --- a/src/ota/dfu.py +++ /dev/null @@ -1,188 +0,0 @@ -#!/usr/bin/env python3 -""" ------------------------------------------------------------------------------- - DFU Server for Nordic nRF51 based systems. - Conforms to nRF51_SDK 11.0 BLE_DFU requirements. ------------------------------------------------------------------------------- -""" -import os, re -import sys -import optparse -import time -import math -import traceback - -from unpacker import Unpacker - -from ble_secure_dfu_controller import BleDfuControllerSecure -from ble_legacy_dfu_controller import BleDfuControllerLegacy - -def main(): - - init_msg = """ - ================================ - == == - == DFU Server == - == == - ================================ - """ - - # print "DFU Server start" - print(init_msg) - - try: - parser = optparse.OptionParser(usage='%prog -f -a \n\nExample:\n\tdfu.py -f application.hex -d application.dat -a cd:e3:4a:47:1c:e4', - version='0.5') - - parser.add_option('-a', '--address', - action='store', - dest="address", - type="string", - default=None, - help='DFU target address.' - ) - - parser.add_option('-f', '--file', - action='store', - dest="hexfile", - type="string", - default=None, - help='hex file to be uploaded.' - ) - - parser.add_option('-d', '--dat', - action='store', - dest="datfile", - type="string", - default=None, - help='dat file to be uploaded.' - ) - - parser.add_option('-z', '--zip', - action='store', - dest="zipfile", - type="string", - default=None, - help='zip file to be used.' - ) - - parser.add_option('--secure', - action='store_true', - dest='secure_dfu', - default=True, - help='Use secure bootloader (Nordic SDK > 12)' - ) - - parser.add_option('--legacy', - action='store_false', - dest='secure_dfu', - help='Use secure bootloader (Nordic SDK < 12)' - ) - - options, args = parser.parse_args() - - except Exception as e: - print(e) - print("For help use --help") - sys.exit(2) - - try: - - ''' Validate input parameters ''' - - if not options.address: - parser.print_help() - exit(2) - - unpacker = None - hexfile = None - datfile = None - - if options.zipfile != None: - - if (options.hexfile != None) or (options.datfile != None): - print("Conflicting input directives") - exit(2) - - unpacker = Unpacker() - #print options.zipfile - try: - hexfile, datfile = unpacker.unpack_zipfile(options.zipfile) - except Exception as e: - print("ERR") - print(e) - pass - - else: - if (not options.hexfile) or (not options.datfile): - parser.print_help() - exit(2) - - if not os.path.isfile(options.hexfile): - print("Error: Hex file doesn't exist") - exit(2) - - if not os.path.isfile(options.datfile): - print("Error: DAT file doesn't exist") - exit(2) - - hexfile = options.hexfile - datfile = options.datfile - - - ''' Start of Device Firmware Update processing ''' - - if options.secure_dfu: - ble_dfu = BleDfuControllerSecure(options.address.upper(), hexfile, datfile) - else: - ble_dfu = BleDfuControllerLegacy(options.address.upper(), hexfile, datfile) - - # Initialize inputs - ble_dfu.input_setup() - - # Connect to peer device. Assume application mode. - if ble_dfu.scan_and_connect(): - if not ble_dfu.check_DFU_mode(): - print("Need to switch to DFU mode") - success = ble_dfu.switch_to_dfu_mode() - if not success: - print("Couldn't reconnect") - else: - # The device might already be in DFU mode (MAC + 1) - ble_dfu.target_mac_increase(1) - - # Try connection with new address - print("Couldn't connect, will try DFU MAC") - if not ble_dfu.scan_and_connect(): - raise Exception("Can't connect to device") - - ble_dfu.start() - - # Disconnect from peer device if not done already and clean up. - ble_dfu.disconnect() - - except Exception as e: - # print traceback.format_exc() - print("Exception at line {}: {}".format(sys.exc_info()[2].tb_lineno, e)) - pass - - except: - pass - - # If Unpacker for zipfile used then delete Unpacker - if unpacker != None: - unpacker.delete() - - print("DFU Server done") - -""" ------------------------------------------------------------------------------- - ------------------------------------------------------------------------------- -""" -if __name__ == '__main__': - - # Do not litter the world with broken .pyc files. - sys.dont_write_bytecode = True - - main() diff --git a/src/ota/nrf_ble_dfu_controller.py b/src/ota/nrf_ble_dfu_controller.py deleted file mode 100644 index d4b8970..0000000 --- a/src/ota/nrf_ble_dfu_controller.py +++ /dev/null @@ -1,263 +0,0 @@ -import os -import pexpect -import re - -from abc import ABCMeta, abstractmethod -from array import array -from .util import * - -verbose = False - -class NrfBleDfuController(object, metaclass=ABCMeta): - ctrlpt_handle = 0 - ctrlpt_cccd_handle = 0 - data_handle = 0 - - pkt_receipt_interval = 10 - pkt_payload_size = 20 - - # -------------------------------------------------------------------------- - # Start the firmware update process - # -------------------------------------------------------------------------- - @abstractmethod - def start(self): - pass - - # -------------------------------------------------------------------------- - # Check if the peripheral is running in bootloader (DFU) or application mode - # Returns True if the peripheral is in DFU mode - # -------------------------------------------------------------------------- - @abstractmethod - def check_DFU_mode(self): - pass - - @abstractmethod - # -------------------------------------------------------------------------- - # Switch from application to bootloader (DFU) - # -------------------------------------------------------------------------- - def switch_to_dfu_mode(self): - pass - - # -------------------------------------------------------------------------- - # Parse notification status results - # -------------------------------------------------------------------------- - @abstractmethod - def _dfu_parse_notify(self, notify): - pass - - # -------------------------------------------------------------------------- - # Wait for a notification and parse the response - # -------------------------------------------------------------------------- - @abstractmethod - def _wait_and_parse_notify(self): - pass - - def __init__(self, target_mac, firmware_path, datfile_path): - self.target_mac = target_mac - - self.firmware_path = firmware_path - self.datfile_path = datfile_path - - self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % target_mac) - self.ble_conn.delaybeforesend = 0 - - # -------------------------------------------------------------------------- - # Start the firmware update process - # -------------------------------------------------------------------------- - def start(self): - (_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT) - (_, self.data_handle, _) = self._get_handles(self.UUID_PACKET) - - if verbose: - print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle)) - print('Packet handle: 0x%04x' % (self.data_handle)) - - # Subscribe to notifications from Control Point characteristic - self._enable_notifications(self.ctrlpt_cccd_handle) - - # Set the Packet Receipt Notification interval - prn = uint16_to_bytes_le(self.pkt_receipt_interval) - self._dfu_send_command(Procedures.SET_PRN, prn) - - self._dfu_send_init() - - self._dfu_send_image() - - # -------------------------------------------------------------------------- - # Initialize: - # Hex: read and convert hexfile into bin_array - # Bin: read binfile into bin_array - # -------------------------------------------------------------------------- - def input_setup(self): - print("Sending file " + os.path.split(self.firmware_path)[1] + " to " + self.target_mac) - - if self.firmware_path == None: - raise Exception("input invalid") - - name, extent = os.path.splitext(self.firmware_path) - - if extent == ".bin": - self.bin_array = array('B', open(self.firmware_path, 'rb').read()) - - self.image_size = len(self.bin_array) - print("Binary imge size: %d" % self.image_size) - print("Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array))) - - return - - if extent == ".hex": - intelhex = IntelHex(self.firmware_path) - self.bin_array = intelhex.tobinarray() - self.image_size = len(self.bin_array) - print("bin array size: ", self.image_size) - return - - raise Exception("input invalid") - - # -------------------------------------------------------------------------- - # Perform a scan and connect via gatttool. - # Will return True if a connection was established, False otherwise - # -------------------------------------------------------------------------- - def scan_and_connect(self, timeout=2): - if verbose: print("scan_and_connect") - - print("Connecting to %s" % (self.target_mac)) - - try: - self.ble_conn.expect('\[LE\]>', timeout=timeout) - except pexpect.TIMEOUT as e: - return False - - self.ble_conn.sendline('connect') - - try: - res = self.ble_conn.expect('.*Connection successful.*', timeout=timeout) - except pexpect.TIMEOUT as e: - return False - - return True - - # -------------------------------------------------------------------------- - # Disconnect from the peripheral and close the gatttool connection - # -------------------------------------------------------------------------- - def disconnect(self): - self.ble_conn.sendline('exit') - self.ble_conn.close() - - def target_mac_increase(self, inc): - self.target_mac = uint_to_mac_string(mac_string_to_uint(self.target_mac) + inc) - - # Re-start gatttool with the new address - self.disconnect() - self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % self.target_mac) - self.ble_conn.delaybeforesend = 0 - - # -------------------------------------------------------------------------- - # Fetch handles for a given UUID. - # Will return a three-tuple: (char handle, value handle, CCCD handle) - # Will raise an exception if the UUID is not found - # -------------------------------------------------------------------------- - def _get_handles(self, uuid): - self.ble_conn.before = "" - self.ble_conn.sendline('characteristics') - - try: - self.ble_conn.expect([uuid], timeout=10) - handles = re.findall(b'.*handle: (0x....),.*char value handle: (0x....)', self.ble_conn.before) - (handle, value_handle) = handles[-1] - except pexpect.TIMEOUT as e: - raise Exception("UUID not found: {}".format(uuid)) - - return (int(handle, 16), int(value_handle, 16), int(value_handle, 16)+1) - - # -------------------------------------------------------------------------- - # Wait for notification to arrive. - # Example format: "Notification handle = 0x0019 value: 10 01 01" - # -------------------------------------------------------------------------- - def _dfu_wait_for_notify(self): - while True: - if verbose: print("dfu_wait_for_notify") - - if not self.ble_conn.isalive(): - print("connection not alive") - return None - - try: - index = self.ble_conn.expect('Notification handle = .*? \r\n', timeout=30) - - except pexpect.TIMEOUT: - # - # The gatttool does not report link-lost directly. - # The only way found to detect it is monitoring the prompt '[CON]' - # and if it goes to '[ ]' this indicates the connection has - # been broken. - # In order to get a updated prompt string, issue an empty - # sendline(''). If it contains the '[ ]' string, then - # raise an exception. Otherwise, if not a link-lost condition, - # continue to wait. - # - self.ble_conn.sendline('') - string = self.ble_conn.before - if '[ ]' in string: - print('Connection lost! ') - raise Exception('Connection Lost') - return None - - if index == 0: - after = self.ble_conn.after - hxstr = after.split()[3:] - handle = int(float.fromhex(hxstr[0].decode('UTF-8'))) - return hxstr[2:] - - else: - print("unexpeced index: {0}".format(index)) - return None - - # -------------------------------------------------------------------------- - # Send a procedure + any parameters required - # -------------------------------------------------------------------------- - def _dfu_send_command(self, procedure, params=[]): - if verbose: print('_dfu_send_command') - - cmd = 'char-write-req 0x%04x %02x' % (self.ctrlpt_handle, procedure) - cmd += array_to_hex_string(params) - - if verbose: print(cmd) - - self.ble_conn.sendline(cmd) - - # Verify that command was successfully written - try: - res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10) - except pexpect.TIMEOUT as e: - print("State timeout") - - # -------------------------------------------------------------------------- - # Send an array of bytes - # -------------------------------------------------------------------------- - def _dfu_send_data(self, data): - cmd = 'char-write-cmd 0x%04x' % (self.data_handle) - cmd += ' ' - cmd += array_to_hex_string(data) - - if verbose: print(cmd) - - self.ble_conn.sendline(cmd) - - # -------------------------------------------------------------------------- - # Enable notifications from the Control Point Handle - # -------------------------------------------------------------------------- - def _enable_notifications(self, cccd_handle): - if verbose: print('_enable_notifications') - - cmd = 'char-write-req 0x%04x %s' % (cccd_handle, '0100') - - if verbose: print(cmd) - - self.ble_conn.sendline(cmd) - - # Verify that command was successfully written - try: - res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10) - except pexpect.TIMEOUT as e: - print("State timeout") diff --git a/src/window.py b/src/window.py index 97d7302..756098d 100644 --- a/src/window.py +++ b/src/window.py @@ -1,8 +1,7 @@ - import threading from gi.repository import Gtk, GObject from .bluetooth import InfiniTimeDevice -from .ble_legacy_dfu_controller import BleDfuControllerLegacy +from .ble_dfu import InfiniTimeDFU from .unpacker import Unpacker # calls f on another thread @@ -20,12 +19,14 @@ def do_call(): error = err GObject.idle_add(lambda: on_done(result, error)) - thread = threading.Thread(target = do_call) + + thread = threading.Thread(target=do_call) thread.start() -@Gtk.Template(resource_path='/org/gnome/siglo/window.ui') + +@Gtk.Template(resource_path="/org/gnome/siglo/window.ui") class SigloWindow(Gtk.ApplicationWindow): - __gtype_name__ = 'SigloWindow' + __gtype_name__ = "SigloWindow" info_scan_pass = Gtk.Template.Child() scan_fail_box = Gtk.Template.Child() scan_pass_box = Gtk.Template.Child() @@ -46,9 +47,15 @@ def done_scanning(self, manager): self.manager = manager scan_result = manager.get_scan_result() self.bt_spinner.set_visible(False) - if (scan_result): + if scan_result: self.main_info.set_text("Done Scanning...Success") - self.info_scan_pass.set_text(manager.alias + " Found!\n\nAdapter Name: "+ manager.adapter_name +"\nMac Address: " + manager.get_mac_address()) + self.info_scan_pass.set_text( + manager.alias + + " Found!\n\nAdapter Name: " + + manager.adapter_name + + "\nMac Address: " + + manager.get_mac_address() + ) self.scan_pass_box.set_visible(True) else: self.main_info.set_text("Done Scanning...Failed") @@ -68,7 +75,9 @@ def rescan_button_clicked(self, widget): def sync_time_button_clicked(self, widget): if self.manager is not None: print("Sync Time button clicked...") - device = InfiniTimeDevice(manager=self.manager, mac_address=self.manager.get_mac_address()) + device = InfiniTimeDevice( + manager=self.manager, mac_address=self.manager.get_mac_address() + ) device.connect() self.main_info.set_text("InfiniTime Sync... Success!") self.scan_pass_box.set_visible(False) @@ -89,46 +98,45 @@ def ota_cancel_button_clicked(self, widget): @Gtk.Template.Callback() def flash_it_button_clicked(self, widget): + self.main_info.set_text("Updating Firmware...") + self.bt_spinner.set_visible(True) + self.ota_picked_box.set_visible(False) + self.sync_time_button.set_visible(False) unpacker = Unpacker() try: - hexfile, datfile = unpacker.unpack_zipfile(self.ota_file) + binfile, datfile = unpacker.unpack_zipfile(self.ota_file) except Exception as e: print("ERR") print(e) pass - self.ble_dfu = BleDfuControllerLegacy(self.manager.get_mac_address(), hexfile, datfile) + self.ble_dfu = InfiniTimeDFU( + mac_address=self.manager.get_mac_address(), + manager=self.manager, + window = self, + firmware_path=binfile, + datfile_path=datfile, + verbose=False, + ) self.ble_dfu.input_setup() + self.ble_dfu.connect() + + # async_call(self.slow_load, self.slow_complete) - # Connect to peer device. Assume application mode. - if self.ble_dfu.scan_and_connect(): - if not self.ble_dfu.check_DFU_mode(): - print("Need to switch to DFU mode") - success = self.ble_dfu.switch_to_dfu_mode() - if not success: - print("Couldn't reconnect") - else: - # The device might already be in DFU mode (MAC + 1) - self.ble_dfu.target_mac_increase(1) - - # Try connection with new address - print("Couldn't connect, will try DFU MAC") - if not self.ble_dfu.scan_and_connect(): - raise Exception("Can't connect to device") - - async_call(self.slow_load, self.slow_complete) - def slow_complete(self, results, errors): # Disconnect from peer device if not done already and clean up. self.ble_dfu.disconnect() self.main_info.set_text("OTA Update Complete") self.bt_spinner.set_visible(False) self.sync_time_button.set_visible(True) - + + def show_complete(self): + self.main_info.set_text("OTA Update Complete") + self.bt_spinner.set_visible(False) + self.sync_time_button.set_visible(True) + def slow_load(self): self.main_info.set_text("Updating Firmware...") self.bt_spinner.set_visible(True) self.ota_picked_box.set_visible(False) self.sync_time_button.set_visible(False) self.ble_dfu.start() - -