diff --git a/pydfuutil/__main__.py b/pydfuutil/__main__.py index 0610e01..c9539eb 100644 --- a/pydfuutil/__main__.py +++ b/pydfuutil/__main__.py @@ -574,7 +574,7 @@ def main() -> None: print(f"v{__version__}") # parse options - args = parser.parse_args(sys.argv) + args = parser.parse_args() dif: dfu.DfuIf = dfu.DfuIf() file = dfu_file.DFUFile(None) mode = Mode.NONE diff --git a/pydfuutil/dfu.py b/pydfuutil/dfu.py index d9fced7..1174ff5 100644 --- a/pydfuutil/dfu.py +++ b/pydfuutil/dfu.py @@ -1,16 +1,32 @@ """ -low-level DFU message sending routines (part of dfu-programmer). +Low-level DFU communication routines (part of dfu-programmer). (C) 2023 Yaroshenko Dmytro (https://github.com/o-murphy) + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ -import inspect +# import inspect +import sys from dataclasses import dataclass from enum import IntEnum, IntFlag import usb.util from pydfuutil.logger import logger - +from pydfuutil.portable import milli_sleep +from pydfuutil.usb_dfu import FuncDescriptor _logger = logger.getChild(__name__.rsplit('.', maxsplit=1)[-1]) @@ -38,19 +54,7 @@ def to_string(self): return state_to_string(self) -_STATES_NAMES = { - State.APP_IDLE: 'appIDLE', - State.APP_DETACH: 'appDETACH', - State.DFU_IDLE: 'dfuIDLE', - State.DFU_DOWNLOAD_SYNC: 'dfuDNLOAD-SYNC', - State.DFU_DOWNLOAD_BUSY: 'dfuDNBUSY', - State.DFU_DOWNLOAD_IDLE: 'dfuDNLOAD-IDLE', - State.DFU_MANIFEST_SYNC: 'dfuMANIFEST-SYNC', - State.DFU_MANIFEST: 'dfuMANIFEST', - State.DFU_MANIFEST_WAIT_RESET: 'dfuMANIFEST-WAIT-RESET', - State.DFU_UPLOAD_IDLE: 'dfuUPLOAD-IDLE', - State.DFU_ERROR: 'dfuERROR', -} + class Status(IntEnum): @@ -79,29 +83,6 @@ def to_string(self): return status_to_string(self) -_DFU_STATUS_NAMES = { - Status.OK: "No error condition is present", - Status.ERROR_TARGET: "File is not targeted for use by this device", - Status.ERROR_FILE: "File is for this device but fails some vendor-specific test", - Status.ERROR_WRITE: "Device is unable to write memory", - Status.ERROR_ERASE: "Memory erase function failed", - Status.ERROR_CHECK_ERASED: "Memory erase check failed", - Status.ERROR_PROG: "Program memory function failed", - Status.ERROR_VERIFY: "Programmed memory failed verification", - Status.ERROR_ADDRESS: "Cannot program memory due to received address that is out of range", - Status.ERROR_NOTDONE: "Received DNLOAD with wLength = 0, " - "but device does not think that it has all data yet", - Status.ERROR_FIRMWARE: "Device's firmware is corrupt. " - "It cannot return to run-time (non-DFU) operations", - Status.ERROR_VENDOR: "iString indicates a vendor specific error", - Status.ERROR_USBR: "Device detected unexpected USB reset signalling", - Status.ERROR_POR: "Device detected unexpected power on reset", - Status.ERROR_UNKNOWN: "Something went wrong, but the device does not know what it was", - Status.ERROR_STALLEDPKT: "Device stalled an unexpected request" - -} - - class Command(IntEnum): """Dfu commands""" DETACH = 0 @@ -124,12 +105,16 @@ class Mode(IntFlag): IFF_ALT = 0x1000 IFF_DEVNUM = 0x2000 IFF_PATH = 0x4000 + # DFU_IFF_DFU = 0x0001 /* DFU Mode, (not Runtime) */ + # DFU_IFF_ALT = 0x0002 /* Multiple alternate settings */ @dataclass(frozen=True) class StatusRetVal: """ Converts dfu_get_status result bytes to applicable dataclass + This is based off of DFU_GETSTATUS + the data structure to be populated with the results """ # pylint: disable=invalid-name bStatus: Status = Status.ERROR_UNKNOWN @@ -170,10 +155,8 @@ def __int__(self): @dataclass class DfuIf: # pylint: disable=too-many-instance-attributes - """DFU Interface dataclass""" # pylint: disable=invalid-name - vendor: int = None product: int = None bcdDevice: int = None @@ -187,6 +170,11 @@ class DfuIf: # pylint: disable=too-many-instance-attributes flags: [Mode, int] = 0 count: int = None dev: usb.core.Device = None + quirks: int = None + bwPollTimeout: int = 0 + serial_name: str = "" + usb_dfu_func_desc: FuncDescriptor = None + @property def device_ids(self) -> dict: @@ -225,6 +213,10 @@ def get_state(self) -> State: """Binds self to dfu.get_state()""" return get_state(self.dev, self.interface) + def abort_to_idle(self): + """Binds self to dfu.abort_to_idle()""" + return abort_to_idle(self) + def init(timeout: int) -> None: """ @@ -243,18 +235,18 @@ def init(timeout: int) -> None: raise ValueError(f"dfu_init: Invalid timeout value {timeout}") -def verify_init() -> int: - """ - Verifies provided TIMEOUT and DEBUG_LEVEL - NOTE: (function: typing.Callable) not needed cause python can get it from stack - :raise ValueError with caller function name - :return: 0 - """ - caller = inspect.stack()[0][3] - if INVALID_DFU_TIMEOUT == TIMEOUT: - if 0 != DEBUG_LEVEL: - raise ValueError(f'"{caller}": dfu system not initialized properly.') - return 0 +# def verify_init() -> int: +# """ +# Verifies provided TIMEOUT and DEBUG_LEVEL +# NOTE: (function: typing.Callable) not needed cause python can get it from stack +# :raise ValueError with caller function name +# :return: 0 +# """ +# caller = inspect.stack()[0][3] +# if INVALID_DFU_TIMEOUT == TIMEOUT: +# if 0 != DEBUG_LEVEL: +# raise ValueError(f'"{caller}": dfu system not initialized properly.') +# return 0 def debug(level: int) -> None: @@ -271,24 +263,16 @@ def debug(level: int) -> None: def detach(device: usb.core.Device, interface: int, timeout: int) -> bytes: """ - - * DETACH Request (DFU Spec 1.0, Section 5.1) - * - * device - the usb_dev_handle to communicate with - * interface - the interface to communicate with - * timeout - the timeout in ms the USB device should wait for a pending - * USB reset before giving up and terminating the operation - * - * returns 0 or < 0 on error + DETACH Request (DFU Spec 1.0, Section 5.1) Sends to device command to switch it to DFU mode u have to free device and handle it again - :param device: usb.core.Device - :param interface: usb.core.Interface.bInterfaceNumber - :param timeout: timeout to dfu detach - :return: returns error code + :param device: the usb_dev_handle to communicate with + :param interface: the interface to communicate with + :param timeout: the timeout in ms the USB device should wait for a pending + :return: bytes or < 0 on error """ - verify_init() + # verify_init() _logger.debug('DETACH...') result = device.ctrl_transfer( bmRequestType=usb.util.ENDPOINT_OUT @@ -309,24 +293,16 @@ def download(device: usb.core.Device, transaction: int, data_or_length: [bytes, int]) -> int: """ - * DNLOAD Request (DFU Spec 1.0, Section 6.1.1) - * - * device - the usb_dev_handle to communicate with - * interface - the interface to communicate with - * length - the total number of bytes to transfer to the USB - * device - must be less than wTransferSize - * data - the data to transfer - * - * returns the number of bytes written or < 0 on error + DNLOAD Request (DFU Spec 1.0, Section 6.1.1) Download data to special page of DFU device - :param device: usb.core.Device - :param interface: usb.core.interface.bInterfaceNumber + :param device: the usb_dev_handle to communicate with + :param interface: the interface to communicate with :param transaction: start page int(total_data_size/xfer_size) - :param data_or_length: page size bytes(xfer_size) or xfer_size + :param data_or_length: the data to transfer :return: downloaded data or error code in bytes """ - verify_init() + # verify_init() _logger.debug('DFU_DOWNLOAD...') result = device.ctrl_transfer( @@ -349,24 +325,16 @@ def upload(device: usb.core.Device, transaction: int, data_or_length: [bytes, int]) -> bytes: """ - * UPLOAD Request (DFU Spec 1.0, Section 6.2) - * - * device - the usb_dev_handle to communicate with - * interface - the interface to communicate with - * length - the maximum number of bytes to receive from the USB - * device - must be less than wTransferSize - * data - the buffer to put the received data in - * - * returns the number of bytes received or < 0 on error + UPLOAD Request (DFU Spec 1.0, Section 6.2) Uploads data from special page of DFU device - :param device: usb.core.Device - :param interface: usb.core.Interface.bInterfaceNumber + :param device: the usb_dev_handle to communicate with + :param interface: the interface to communicate with :param transaction: start page int(total_data_size/xfer_size) - :param data_or_length: page size bytes(xfer_size) or xfer_size - :return: uploaded data or error code in bytes + :param data_or_length: the buffer to put the received data in + :return: uploaded bytes or < 0 on error """ - verify_init() + # verify_init() _logger.debug('UPLOAD...') result = device.ctrl_transfer( @@ -387,20 +355,14 @@ def upload(device: usb.core.Device, def get_status(device: usb.core.Device, interface: int) -> StatusRetVal: """ - * GETSTATUS Request (DFU Spec 1.0, Section 6.1.2) - * - * device - the usb_dev_handle to communicate with - * interface - the interface to communicate with - * status - the data structure to be populated with the results - * - * return the number of bytes read in or < 0 on an error + GETSTATUS Request (DFU Spec 1.0, Section 6.1.2) Returns DFU interface status - :param device: usb.core.Device - :param interface: usb.core.Interface.bInterfaceNumber - :return: error code and _STATUS [Container, dict] object + :param device: the usb_dev_handle to communicate with + :param interface: the interface to communicate with + :return: StatusRetVal """ - verify_init() + # verify_init() _logger.debug('DFU_GET_STATUS...') length = 6 @@ -425,19 +387,14 @@ def get_status(device: usb.core.Device, interface: int) -> StatusRetVal: def clear_status(device: usb.core.Device, interface: int) -> int: """ - * CLRSTATUS Request (DFU Spec 1.0, Section 6.1.3) - * - * device - the usb_dev_handle to communicate with - * interface - the interface to communicate with - * - * return 0 or < 0 on an error + CLRSTATUS Request (DFU Spec 1.0, Section 6.1.3) Clears DFU interface status - :param device: usb.core.Device - :param interface: usb.core.Interface.bInterfaceNumber - :return: error code + :param device: the usb_dev_handle to communicate with + :param interface: the interface to communicate with + :return: return 0 or < 0 on an error """ - verify_init() + # verify_init() _logger.debug('CLEAR_STATUS...') result = device.ctrl_transfer( @@ -457,22 +414,14 @@ def clear_status(device: usb.core.Device, interface: int) -> int: def get_state(device: usb.core.Device, interface: int) -> [State, int]: """ - * GETSTATE Request (DFU Spec 1.0, Section 6.1.5) - * - * device - the usb_dev_handle to communicate with - * interface - the interface to communicate with - * length - the maximum number of bytes to receive from the USB - * device - must be less than wTransferSize - * data - the buffer to put the received data in - * - * returns the state or < 0 on error + GETSTATE Request (DFU Spec 1.0, Section 6.1.5) Returns DFU interface state - :param device: usb.core.Device - :param interface: usb.core.Interface.bInterfaceNumber - :return: dfu state or error code + :param device: the usb_dev_handle to communicate with + :param interface: the interface to communicate with + :return: returns the state or < 0 on error """ - verify_init() + # verify_init() length = 1 result = device.ctrl_transfer( @@ -495,19 +444,14 @@ def get_state(device: usb.core.Device, interface: int) -> [State, int]: def abort(device: usb.core.Device, interface: int) -> int: """ - * ABORT Request (DFU Spec 1.0, Section 6.1.4) - * - * device - the usb_dev_handle to communicate with - * interface - the interface to communicate with - * - * returns 0 or < 0 on an error + ABORT Request (DFU Spec 1.0, Section 6.1.4) Aborts DFU command - :param device: usb.core.Device - :param interface: usb.core.Interface.bInterfaceNumber - :return: error code + :param device: the usb_dev_handle to communicate with + :param interface: the interface to communicate with + :return: returns 0 or < 0 on an error """ - verify_init() + # verify_init() _logger.debug('ABORT...') result = device.ctrl_transfer( @@ -548,6 +492,57 @@ def status_to_string(status: int) -> [str, None]: return None +_STATES_NAMES = { + State.APP_IDLE: 'appIDLE', + State.APP_DETACH: 'appDETACH', + State.DFU_IDLE: 'dfuIDLE', + State.DFU_DOWNLOAD_SYNC: 'dfuDNLOAD-SYNC', + State.DFU_DOWNLOAD_BUSY: 'dfuDNBUSY', + State.DFU_DOWNLOAD_IDLE: 'dfuDNLOAD-IDLE', + State.DFU_MANIFEST_SYNC: 'dfuMANIFEST-SYNC', + State.DFU_MANIFEST: 'dfuMANIFEST', + State.DFU_MANIFEST_WAIT_RESET: 'dfuMANIFEST-WAIT-RESET', + State.DFU_UPLOAD_IDLE: 'dfuUPLOAD-IDLE', + State.DFU_ERROR: 'dfuERROR', +} + + +_DFU_STATUS_NAMES = { + Status.OK: "No error condition is present", + Status.ERROR_TARGET: "File is not targeted for use by this device", + Status.ERROR_FILE: "File is for this device but fails some vendor-specific test", + Status.ERROR_WRITE: "Device is unable to write memory", + Status.ERROR_ERASE: "Memory erase function failed", + Status.ERROR_CHECK_ERASED: "Memory erase check failed", + Status.ERROR_PROG: "Program memory function failed", + Status.ERROR_VERIFY: "Programmed memory failed verification", + Status.ERROR_ADDRESS: "Cannot program memory due to received address that is out of range", + Status.ERROR_NOTDONE: "Received DNLOAD with wLength = 0, " + "but device does not think that it has all data yet", + Status.ERROR_FIRMWARE: "Device's firmware is corrupt. " + "It cannot return to run-time (non-DFU) operations", + Status.ERROR_VENDOR: "iString indicates a vendor specific error", + Status.ERROR_USBR: "Device detected unexpected USB reset signalling", + Status.ERROR_POR: "Device detected unexpected power on reset", + Status.ERROR_UNKNOWN: "Something went wrong, but the device does not know what it was", + Status.ERROR_STALLEDPKT: "Device stalled an unexpected request" +} + + +def abort_to_idle(dif: DfuIf): + if dif.abort() < 0: + _logger.error("Error sending dfu abort request") + sys.exit(1) + if (ret := int(dst := dif.get_status())) < 0: + _logger.error("Error during abort get_status") + sys.exit(1) + if dst.bState != State.DFU_IDLE: + _logger.error("Failed to enter idle state on abort") + sys.exit(1) + milli_sleep(dst.bwPollTimeout) + return ret + + # global definitions DEBUG: int = 0 diff --git a/pydfuutil/dfu_file.py b/pydfuutil/dfu_file.py index a88daba..e293b20 100644 --- a/pydfuutil/dfu_file.py +++ b/pydfuutil/dfu_file.py @@ -1,11 +1,30 @@ """ Checks for, parses and generates a DFU suffix +Load or store DFU files including suffix and prefix (C) 2023 Yaroshenko Dmytro (https://github.com/o-murphy) + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ import io import os +import struct +import sys +import warnings from dataclasses import dataclass, field +from enum import Enum from construct import (Struct, Const, ByteSwapped, Default, Int32ub, Int16ub, Int8sb, @@ -18,6 +37,9 @@ _logger = logger.getChild(__name__.rsplit('.', maxsplit=1)[-1]) DFU_SUFFIX_LENGTH = 16 +LMDFU_PREFIX_LENGTH = 8 +LPCDFU_PREFIX_LENGTH = 16 +STDIN_CHUNK_SIZE = 65536 crc32_table = [ 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, @@ -76,25 +98,67 @@ )) -def crc32_byte(accum: int, delta: int): - """ - Calculate a 32-bit CRC - """ - return crc32_table[(accum ^ delta) & 0xff] ^ (accum >> 8) +@dataclass +class DFUFile: # pylint: disable=too-many-instance-attributes, invalid-name + """Class to store DFU file data""" + name: [str, None] + file_p: io.FileIO = None + size: int = 0 + dwCRC: int = 0 + suffix_len: int = 0 + bcdDFU: int = 0 + idVendor: int = 0xffff # wildcard value + idProduct: int = 0xffff # wildcard value + bcdDevice: int = 0xffff # wildcard value + + def parse_dfu_suffix(self) -> int: + """Bind parse_dfu_suffix to DFUFile instance""" + return parse_dfu_suffix(self) + + def generate_dfu_suffix(self) -> int: + """Bind generate_dfu_suffix to DFUFile instance""" + return generate_dfu_suffix(self) @dataclass -class DFUFile: # pylint: disable=too-many-instance-attributes, invalid-name +class DFUFileSize: + total: int + prefix: int + suffix: int + + +class SuffixReq(Enum): + NO_SUFFIX = 0 + NEEDS_SUFFIX = 1 + MAYBE_SUFFIX = 2 + + +class PrefixReq(Enum): + NO_SUFFIX = 0 + NEEDS_SUFFIX = 1 + MAYBE_SUFFIX = 2 + + +class PrefixType(Enum): + ZERO_PREFIX = 0 + LMDFU_PREFIX = 1 + LPCDFU_UNENCRYPTED_PREFIX = 2 + + +@dataclass +class DFUFile011: # pylint: disable=too-many-instance-attributes, invalid-name """Class to store DFU file data""" name: [str, None] - file_p: io.FileIO = field(default=None) - size: int = field(default=0) - dwCRC: int = field(default=0) - suffix_len: int = field(default=0) - bcdDFU: int = field(default=0) - idVendor: int = field(default=0xffff) # wildcard value - idProduct: int = field(default=0xffff) # wildcard value - bcdDevice: int = field(default=0xffff) # wildcard value + firmware: [bytearray, bytes] = field(default_factory=bytearray) + file_p: io.FileIO = None + size: DFUFileSize = None + lmdfu_address: int = 0 + prefix_type: PrefixType = None + dwCRC: int = 0 + bcdDFU: int = 0 + idVendor: int = 0xffff # wildcard value + idProduct: int = 0xffff # wildcard value + bcdDevice: int = 0xffff # wildcard value def parse_dfu_suffix(self) -> int: """Bind parse_dfu_suffix to DFUFile instance""" @@ -105,12 +169,251 @@ def generate_dfu_suffix(self) -> int: return generate_dfu_suffix(self) +def crc32_byte(accum: int, delta: int): + """ + Calculate a 32-bit CRC + """ + return crc32_table[(accum ^ delta) & 0xff] ^ (accum >> 8) + + +def probe_prefix(file: DFUFile011): + prefix = file.firmware + + if file.size.total < LMDFU_PREFIX_LENGTH: + return 1 + if prefix[0] == 0x01 and prefix[1] == 0x00: + payload_len = ((prefix[7] << 24) | (prefix[6] << 16) + | (prefix[5] << 8) | prefix[4]) + expected_payload_len = file.size.total - LMDFU_PREFIX_LENGTH - file.size.suffix + if payload_len != expected_payload_len: + return 1 + file.prefix_type = PrefixType.LMDFU_PREFIX + file.size.prefix = LMDFU_PREFIX_LENGTH + file.lmdfu_address = 1024 * ((prefix[3] << 8) | prefix[2]) + + elif ((prefix[0] & 0x3f) == 0x1a) and ((prefix[1] & 0x3f) == 0x3f): + file.prefix_type = PrefixType.LPCDFU_UNENCRYPTED_PREFIX + file.size.prefix = LPCDFU_PREFIX_LENGTH + if file.size.prefix + file.size.suffix > file.size.total: + return 1 + return 0 + + +def write_crc(f: DFUFile.file_p, crc: int, buf: [bytes, bytearray], size: int) -> int: + # compute CRC + for x in range(0, size): + crc = crc32_byte(crc, buf[x]) + + # write data + if f.write(buf, size) != size: + _logger.error(f"Could not write {size} bytes to {f}") + + return crc + + +def load_file(file: DFUFile011, check_suffix: SuffixReq, check_prefix: PrefixReq): + + file.size.prefix = 0 + file.size.suffix = 0 + + # default values, if no valid suffix is found + file.bcdDFU = 0 + file.idVendor = 0xffff # wildcard value + file.idProduct = 0xffff # wildcard value + file.bcdDevice = 0xffff # wildcard value + + # default values, if no valid prefix is found + file.lmdfu_address = 0 + + file.firmware = None + + if file.name and file.name != '-': + + if sys.stdin.isatty(): + print("Please provide input via stdin.") + return None + + # Check if the platform is Windows + if sys.platform.startswith("win"): + import msvcrt + # Set stdin to binary mode + msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) + + file.firmware = bytearray() + read_bytes = sys.stdin.buffer.read(STDIN_CHUNK_SIZE) + file.firmware += read_bytes + file.size.total = len(read_bytes) + + while len(read_bytes) == STDIN_CHUNK_SIZE: + read_bytes = sys.stdin.buffer.read(STDIN_CHUNK_SIZE) + file.firmware += read_bytes + file.size.total += len(read_bytes) + + _logger.debug(f"Read {file.size.total} bytes from stdin") + + # Never require suffix when reading from stdin + check_suffix = SuffixReq.MAYBE_SUFFIX + else: + with open(file.name, 'rb') as file.file_p: + file.firmware = file.file_p.read() + file.size.total = len(file.firmware) + + # TODO + raise NotImplementedError + # # Check for possible DFU file suffix by trying to parse one + # if check_suffix != SuffixReq.NO_SUFFIX: + # dfusuffix = file.firmware[-16:] if file.size['total'] >= 16 else None + # missing_suffix = False + # reason = None + # + # if not dfusuffix: + # reason = "File too short for DFU suffix" + # missing_suffix = True + # elif dfusuffix[10:13] != b'DFU': + # reason = "Invalid DFU suffix signature" + # missing_suffix = True + # else: + # crc = 0xffffffff + # for byte in file.firmware[:-16]: + # crc = crc32_byte(crc, byte) + # + # file.dwCRC = struct.unpack(' file.size.total: + # raise ValueError("Invalid DFU suffix length") + # + # file.idVendor = struct.unpack('> 8 + lmdfu_prefix[4] = len_ & 0xff + lmdfu_prefix[5] = (len_ >> 8) & 0xff + lmdfu_prefix[6] = (len_ >> 16) & 0xff + lmdfu_prefix[7] = (len_ >> 24) + + crc = write_crc(file.file_p, crc, lmdfu_prefix, LMDFU_PREFIX_LENGTH) + + if file.prefix_type == PrefixType.LPCDFU_UNENCRYPTED_PREFIX: + lpcdfu_prefix = bytearray(LPCDFU_PREFIX_LENGTH) + + # Payload is firmware and prefix rounded to 512 bytes + len_ = (file.size.total - file.size.suffix + 511) // 512 + + lpcdfu_prefix[0] = 0x1a # Unencypted + lpcdfu_prefix[1] = 0x3f # Reserved + lpcdfu_prefix[2] = (len_ & 0xff) + lpcdfu_prefix[3] = (len_ >> 8) & 0xff + for i in range(12, LPCDFU_PREFIX_LENGTH): + lpcdfu_prefix[i] = 0xff + + crc = write_crc(file.file_p, crc, lpcdfu_prefix, LPCDFU_PREFIX_LENGTH) + + # write firmware binary + crc = write_crc(file.file_p, crc, file.firmware[file.size.prefix:], + file.size.total + file.size.prefix + file.size.suffix) + + # write suffix, if any + if write_suffix: + dfusuffix = bytearray(DFU_SUFFIX_LENGTH) + + dfusuffix[0] = file.bcdDevice & 0xff + dfusuffix[1] = file.bcdDevice >> 8 + dfusuffix[2] = file.idProduct & 0xff + dfusuffix[3] = file.idProduct >> 8 + dfusuffix[4] = file.idVendor & 0xff + dfusuffix[5] = file.idVendor >> 8 + dfusuffix[6] = file.bcdDFU & 0xff + dfusuffix[7] = file.bcdDFU >> 8 + dfusuffix[8] = ord('U') + dfusuffix[9] = ord('F') + dfusuffix[10] = ord('D') + dfusuffix[11] = DFU_SUFFIX_LENGTH + + crc = write_crc(file.file_p, crc, dfusuffix, DFU_SUFFIX_LENGTH - 4) + + dfusuffix[12] = crc + dfusuffix[13] = crc >> 8 + dfusuffix[14] = crc >> 16 + dfusuffix[15] = crc >> 24 + + crc = write_crc(file.file_p, crc, dfusuffix[12:], 4) + + except OSError as err: + _logger.debug(err) + raise OSError(f"Could not open file {file.name} for writing") + + +def show_suffix_and_prefix(file: DFUFile011) -> None: + + if file.size.prefix == LPCDFU_PREFIX_LENGTH: + print(f"The file {file.name} contains a TI Stellaris " + f"DFU prefix with the following properties:") + print(f"Address:\t0x{file.lmdfu_address:08x}") + elif file.size.prefix == LPCDFU_PREFIX_LENGTH: + prefix = file.firmware + size_kib = prefix[2] >> 1 | prefix[3] << 7 + print(f"The file {file.name} contains a NXP unencrypted " + f"LPC DFU prefix with the following properties:") + print(f"Size:\t{size_kib:5} kiB") + elif file.size.prefix != 0: + print(f"The file {file.name} contains an unknown prefix") + + if file.size.suffix > 0: + print(f"The file {file.name} contains a DFU suffix with the following properties:") + print(f"BCD device:\t0x{file.bcdDevice:04X}") + print(f"Product ID:\t0x{file.idProduct:04X}") + print(f"Vendor ID:\t0x{file.idVendor:04X}") + print(f"BCD DFU:\t0x{file.bcdDFU:04X}") + print(f"Length:\t\t{file.size.suffix}") + print(f"CRC:\t\t0x{file.dwCRC:08X}") + + def parse_dfu_suffix(file: DFUFile) -> int: """ reads the file_p and name member, fills in all others + FIXME: deprecated :param file: :return: 0 if no DFU suffix, positive if valid DFU suffix, negative on file read error """ + warnings.warn("parse_dfu_suffix is deprecated", FutureWarning) crc = 0xffffffff dfu_suffix = bytearray([0] * DFU_SUFFIX_LENGTH) @@ -178,6 +481,7 @@ def generate_dfu_suffix(file: DFUFile) -> int: :param file: :return: positive on success, negative on errors """ + warnings.warn("generate_dfu_suffix is deprecated", FutureWarning) file.size = 0 file.dwCRC = 0xffffffff diff --git a/pydfuutil/dfu_load.py b/pydfuutil/dfu_load.py index ec24f3f..383ff1f 100644 --- a/pydfuutil/dfu_load.py +++ b/pydfuutil/dfu_load.py @@ -1,8 +1,31 @@ """ +DFU transfer routines (C) 2023 Yaroshenko Dmytro (https://github.com/o-murphy) + +This is supposed to be a general DFU implementation, as specified in the +USB DFU 1.0 and 1.1 specification. + +The code was originally intended to interface with a USB device running the +"sam7dfu" firmware (see https://www.openpcd.org/) on an AT91SAM7 processor. + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ import logging +import usb + from pydfuutil import dfu from pydfuutil.dfu_file import DFUFile from pydfuutil.logger import logger @@ -16,13 +39,13 @@ def do_upload(dif: dfu.DfuIf, xfer_size: int, file: DFUFile = None, - total_size: int = -1) -> [int, bytes]: + expected_size: int = -1) -> [int, bytes]: """ Uploads data from DFU device from special page :param dif: dfu.dfu_if :param xfer_size: chunk size :param file: optional - DFUFile object - :param total_size: optional - total bytes expected to be uploaded + :param expected_size: optional - total bytes expected to be uploaded :return: uploaded bytes or error code """ @@ -33,41 +56,55 @@ def do_upload(dif: dfu.DfuIf, buf = bytearray(xfer_size) with Progress() as progress: - progress_total = total_size if total_size >= 0 else None + progress_total = expected_size if expected_size >= 0 else None progress.start_task( description="Starting upload", total=progress_total ) + # ret = 0 # need there? + try: + while True: + rc = dif.upload(transaction, buf) + + if len(rc) < 0: + _logger.error("Error during upload") + ret = rc + break - while True: - rc = dif.upload(transaction, buf) + if file: + write_rc = file.file_p.write(rc) + # TODO: replace to dfu_file_write_crc + + if write_rc < len(rc): + _logger.error(f'Short file write: {write_rc}') + ret = total_bytes + break + + total_bytes += len(rc) + + if total_bytes < 0: + raise IOError("Received too many bytes (wraparound)") - if len(rc) < 0: - ret = rc - break - if file: - write_rc = file.file_p.write(rc) + transaction += 1 + progress.update(advance=len(rc), description="Uploading...") - if write_rc < len(rc): - _logger.error(f'Short file write: {write_rc}') + # last block, return + if (len(rc) < xfer_size) or (total_bytes >= expected_size >= 0): ret = total_bytes break - total_bytes += len(rc) + progress.update(description='Upload finished!') - transaction += 1 - progress.update(advance=len(rc), description="Uploading...") + _logger.debug(f"Received a total of {total_bytes} bytes") - # last block, return - if (len(rc) < xfer_size) or (total_bytes >= total_size >= 0): - ret = total_bytes - break + if expected_size != 0 and total_bytes != expected_size: + _logger.warning("Unexpected number of bytes uploaded from device") - progress.update(description='Upload finished!') - - _logger.debug(f"Received a total of {total_bytes} bytes") - return ret + return ret + except IOError as e: + _logger.error(e) + return -1 # pylint: disable=too-many-branches @@ -76,20 +113,24 @@ def do_dnload(dif: dfu.DfuIf, xfer_size: int, file: DFUFile, quirks: int) -> int :param dif: DfuIf instance :param xfer_size: transaction size :param file: DFUFile instance - :param quirks: quirks + :param quirks: quirks TODO: replace to DFUFile011 with quirks verbose: is verbose useless cause of using python's logging :return: """ bytes_sent = 0 + # # TODO: new one + # buf = file.firmware + # expected_size = file.size.total - file.size.suffix; + # bytes_sent = 0 buf = bytearray(xfer_size) _logger.info("Copying data from PC to DFU device") - _logger.info("Starting download: ") + try: with Progress() as progress: - total_size = file.size - file.suffix_len + total_size = file.size - file.suffix_len # TODO: replace to expected_size progress.start_task( description="Starting download", total=total_size if total_size >= 0 else None @@ -157,13 +198,12 @@ def do_dnload(dif: dfu.DfuIf, xfer_size: int, file: DFUFile, quirks: int) -> int if status.bState == dfu.State.DFU_IDLE: _logger.info("Done!") + return bytes_sent except IOError as err: _logger.error(err) return -1 - return bytes_sent - def init() -> None: """Init dfu_load props""" diff --git a/pydfuutil/exceptions.py b/pydfuutil/exceptions.py index 3aa0520..ee0c7fe 100644 --- a/pydfuutil/exceptions.py +++ b/pydfuutil/exceptions.py @@ -12,6 +12,10 @@ class GeneralError(Exception): exit_code = 1 +class UsbIOError(GeneralError): + exit_code = 1 + + class GeneralWarning(GeneralError): """ Usually indicates a general warning diff --git a/pydfuutil/usb_dfu.py b/pydfuutil/usb_dfu.py index 95dc70f..c2e8863 100644 --- a/pydfuutil/usb_dfu.py +++ b/pydfuutil/usb_dfu.py @@ -5,6 +5,20 @@ This ought to be compliant to the USB DFU Spec 1.0 as available from https://www.usb.org/developers/devclass_docs/usbdfu10.pdf + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ from dataclasses import dataclass from enum import IntEnum @@ -50,6 +64,7 @@ class UsbReqDfu(IntEnum): ABORT = 0x06 +# DFU_GETSTATUS bStatus values (Section 6.1.2, DFU Rev 1.1) class DFUStatus(IntEnum): """Dfu statuses""" OK = 0x00 diff --git a/pyproject.toml b/pyproject.toml index 69d0224..a6f5a7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "pydfuutil" -version = "0.0.2" +version = "0.0.3" authors = [ { name="o-murphy", email="thehelixpg@gmail.com" }, ] diff --git a/tests/test_dfu.py b/tests/test_dfu.py index ebbe84a..e337d37 100644 --- a/tests/test_dfu.py +++ b/tests/test_dfu.py @@ -18,9 +18,8 @@ def test_str(self): self.assertEqual(dfu.state_to_string(dfu.State.APP_IDLE), "appIDLE") self.assertEqual(dfu.State.APP_IDLE.to_string(), "appIDLE") - @patch("pydfuutil.dfu.verify_init") @patch("usb.core.find") - def test_detach(self, mock_find, mock_verify_init): + def test_detach(self, mock_find): # Mocking the device mock_device = MagicMock() @@ -36,7 +35,6 @@ def test_detach(self, mock_find, mock_verify_init): result = dfu.detach(mock_device, interface, timeout) # Assertions - mock_verify_init.assert_called_once() mock_device.ctrl_transfer.assert_called_once_with( bmRequestType=0x21 | 0x01, bRequest=dfu.Command.DETACH, # Assuming Command.DETACH is 23 @@ -47,9 +45,8 @@ def test_detach(self, mock_find, mock_verify_init): ) self.assertEqual(result, 1) # Assuming success returns 1 - @patch("pydfuutil.dfu.verify_init") @patch("usb.core.find") - def test_upload(self, mock_find, mock_verify_init): + def test_upload(self, mock_find): # Mocking the device mock_device = MagicMock() @@ -66,7 +63,6 @@ def test_upload(self, mock_find, mock_verify_init): result = dfu.upload(mock_device, interface, transaction, data) # Assertions - mock_verify_init.assert_called_once() mock_device.ctrl_transfer.assert_called_once_with( bmRequestType=0xa1, bRequest=dfu.Command.UPLOAD, # Assuming Command.UPLOAD @@ -77,9 +73,8 @@ def test_upload(self, mock_find, mock_verify_init): ) self.assertEqual(result, data) # Assuming success returns bytes(10) - @patch("pydfuutil.dfu.verify_init") @patch("usb.core.find") - def test_dwnload(self, mock_find, mock_verify_init): + def test_dwnload(self, mock_find): # Mocking the device mock_device = MagicMock() @@ -96,7 +91,6 @@ def test_dwnload(self, mock_find, mock_verify_init): result = dfu.download(mock_device, interface, transaction, data) # Assertions - mock_verify_init.assert_called_once() mock_device.ctrl_transfer.assert_called_once_with( bmRequestType=0x21, bRequest=dfu.Command.DNLOAD, # Assuming Command.DNLOAD @@ -107,9 +101,8 @@ def test_dwnload(self, mock_find, mock_verify_init): ) self.assertEqual(result, len(data)) # Assuming success returns 10 - @patch("pydfuutil.dfu.verify_init") @patch("usb.core.find") - def test_get_status(self, mock_find, mock_verify_init): + def test_get_status(self, mock_find): # Mocking the device mock_device = MagicMock() @@ -126,7 +119,6 @@ def test_get_status(self, mock_find, mock_verify_init): status = dfu.get_status(mock_device, interface) # Assertions - mock_verify_init.assert_called_once() mock_device.ctrl_transfer.assert_called_once_with( bmRequestType=0xa1, bRequest=dfu.Command.GETSTATUS, # Assuming Command.GETSTATUS @@ -137,9 +129,8 @@ def test_get_status(self, mock_find, mock_verify_init): ) self.assertEqual(status.bState, 0) # Assuming success returns 0 - @patch("pydfuutil.dfu.verify_init") @patch("usb.core.find") - def test_clear_status(self, mock_find, mock_verify_init): + def test_clear_status(self, mock_find): # Mocking the device mock_device = MagicMock() @@ -155,7 +146,6 @@ def test_clear_status(self, mock_find, mock_verify_init): result = dfu.clear_status(mock_device, interface) # Assertions - mock_verify_init.assert_called_once() mock_device.ctrl_transfer.assert_called_once_with( bmRequestType=0x21, bRequest=dfu.Command.CLRSTATUS, # Assuming Command.CLRSTATUS @@ -166,9 +156,8 @@ def test_clear_status(self, mock_find, mock_verify_init): ) self.assertEqual(result, 0) # Assuming success returns 0 - @patch("pydfuutil.dfu.verify_init") @patch("usb.core.find") - def test_abort(self, mock_find, mock_verify_init): + def test_abort(self, mock_find): # Mocking the device mock_device = MagicMock() @@ -184,7 +173,6 @@ def test_abort(self, mock_find, mock_verify_init): result = dfu.abort(mock_device, interface) # Assertions - mock_verify_init.assert_called_once() mock_device.ctrl_transfer.assert_called_once_with( bmRequestType=0x21, bRequest=dfu.Command.ABORT, # Assuming Command.ABORT diff --git a/tests/test_dfu_load.py b/tests/test_dfu_load.py index f795020..6eb7d12 100644 --- a/tests/test_dfu_load.py +++ b/tests/test_dfu_load.py @@ -21,7 +21,7 @@ def test_dfu_load_do_upload(self): result = do_upload(dif, xfer_size, file, total_size) # Assertions - self.assertEqual(result, total_size) # Assuming total_size bytes are received + self.assertEqual(result, total_size) # Assuming expected_size bytes are received def test_dfu_load_do_dnload(self):