diff --git a/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl b/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl index 6249be580..5595eb1e6 100755 --- a/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl +++ b/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl @@ -24,32 +24,28 @@ Example of update.conf.json: { "firmware": [ { - "description": "[optional] bla bla bla", - "name": "pg1-basic-vx.y.z.bin", - "version": "[optional] whatever", + "description": "IOT2050 PG1 Bootloader Release V01.01.01", + "name": "iot2050-pg1-image-boot.bin", + "version": "V01.01.01", + "type": "uboot", "target_boards": [ + "SIMATIC IOT2050-BASIC", "SIMATIC IOT2050 Basic", - "SIMATIC IOT2050-ADVANCED" + "SIMATIC IOT2050-ADVANCED", + "SIMATIC IOT2050 Advanced" ] }, { - "description": "[optional] bla bla bla", - "name": "pg2-advanced-vx.y.z.bin", - "version": "[optional] whatever", - "target_boards": "SIMATIC IOT2050 Advanced PG2", - "target_os": [ - { - "type": "[optional] Example Image", - "key": "BUILD_ID", - "min_version": "V01.02.02" - }, - { - "type": "[optional] Industrial OS", - "key": "VERSION_ID", - "min_version": "3.1.1" - } + "description": "IOT2050 PG2 Bootloader Release V01.01.01", + "name": "iot2050-pg2-image-boot.bin", + "version": "V01.01.01", + "type": "uboot", + "target_boards": [ + "SIMATIC IOT2050 Basic PG2", + "SIMATIC IOT2050 Advanced PG2", + "SIMATIC IOT2050 Advanced SM" ] - } + }, ], "target_os": [ { @@ -104,63 +100,76 @@ preserved list from cli, this would not use the preserved env variable in the `suggest_preserved_uboot_env` node. """ -import sys -import os +import argparse import fcntl +import hashlib +import io +import json +import os +import sys +import shutil import struct -import argparse +import subprocess +import time import tarfile -import json import textwrap -from types import SimpleNamespace as Namespace -import subprocess import tempfile -import hashlib -import io +from abc import ABC, abstractmethod +from ctypes import * from enum import Enum from io import StringIO -import shutil +from progress.bar import Bar +from threading import Thread, Event +from types import SimpleNamespace as Namespace + +class ErrorCode(Enum): + """The ErrorCode class describes the return codes""" + SUCCESS = 0 + ROLLBACK_SUCCESS = 1 + INVALID_ARG = 2 + BACKUP_FAILED = 3 + ROLLBACK_FAILED = 4 + FLASHING_FAILED = 5 + CANCELED = 6 + INVALID_FIRMWARE = 7 + FAILED = 8 -BOLD = "\033[1m" -ENDC = "\033[0m" class UpgradeError(Exception): - def __init__(self, ErrorInfo): + def __init__(self, ErrorInfo, code=ErrorCode.FAILED.value): super().__init__(self) - self.erroinfo = ErrorInfo + self.err = ErrorInfo + self.code = code def __str__(self): - return self.erroinfo + return repr(self) -class UpgradeRollbackError(Exception): - def __init__(self, ErrorInfo): - super().__init__(self) - self.erroinfo = ErrorInfo - def __str__(self): - return self.erroinfo +class Firmware(): + """The Firmware class represents flash base operations for all flashes""" + def __init__(self, firmware): + try: + self.firmware = open(firmware, "rb") + except (IOError, TypeError): + self.firmware = firmware -class ErrorCode(Enum): - SUCCESS = 0 - ROLLBACK_SUCCESS = 1 - INVALID_ARG = 2 - BACKUP_FAILED = 3 - ROLLBACK_FAILED = 4 - FLASHING_FAILED = 5 - CANCELED = 6 - INVALID_FIRMWARE = 7 + def __del__(self): + try: + self.firmware.close() + except Exception: + pass -class FirmwareUpdate(object): - def __init__(self, *args): - self.firmware = args[0] - self.back_fw_path = os.path.join("".join(args[1]), ".rollback_fw") - self.rollback_fw_tar = os.path.join(self.back_fw_path, - 'rollback_backup_fw.tar') - self.backup_fw_name = 'rollback_fw.bin' - self.firmware_len = 0 + @abstractmethod + def write(self): + """An Firmware can be written to flash""" + + @abstractmethod + def read(self): + """An Firmware can be read out from flash""" - @staticmethod - def get_path_type_value(path): + +class MtdDevice(): + def __get_path_type_value(self, path): """get the path value""" try: with open(path, "r") as f: @@ -168,8 +177,7 @@ class FirmwareUpdate(object): except IOError as e: raise UpgradeError("Reading {} failed: {}".format(path, e.strerror)) - @staticmethod - def flash_erase(dev, start, nbytes): + def __erase(self, dev, start, nbytes): """This function erases flash sectors @dev: flash device file descriptor @start: start address @@ -185,6 +193,7 @@ class FirmwareUpdate(object): raise UpgradeError("Flash erasing failed") def get_mtd_info(self, mtd_num): + """The uboot ops can get all mtd infos of uboot""" ospi_dev_path = "/sys/bus/platform/devices/47040000.spi" if os.path.exists(ospi_dev_path + "/spi_master"): # kernel 5.9 and later @@ -202,52 +211,77 @@ class FirmwareUpdate(object): mtd_erasesize_path = "{}/erasesize".format(mtd_sys_path) mtd_dev_path = "/dev/mtd{}".format(mtd_num) - mtd_size = int(self.get_path_type_value(mtd_size_path)) - mtd_erasesize = int(self.get_path_type_value(mtd_erasesize_path)) - mtd_name = self.get_path_type_value(mtd_name_path).strip() + try: + mtd_size = int(self.__get_path_type_value(mtd_size_path)) + mtd_erasesize = int(self.__get_path_type_value(mtd_erasesize_path)) + mtd_name = self.__get_path_type_value(mtd_name_path).strip() + except UpgradeError as e: + raise UpgradeError(e.err) return mtd_dev_path, mtd_size, mtd_erasesize, mtd_name - def flash_operate(self, mtd_dev_path, mtd_size, mtd_erasesize, + def write(self, mtd_dev_path, mtd_size, mtd_erasesize, file_obj, file_size): mtd_pos = 0 try: mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDWR) + + while mtd_pos < mtd_size and file_size > 0: + mtd_content = os.read(mtd_dev, mtd_erasesize) + firmware_content = file_obj.read(mtd_erasesize) + padsize = mtd_erasesize - len(firmware_content) + firmware_content += bytearray([0xff] * padsize) + + if not mtd_content == firmware_content: + #sys.stdout.flush() + self.__erase(mtd_dev, mtd_pos, mtd_erasesize) + os.lseek(mtd_dev, mtd_pos, os.SEEK_SET) + os.write(mtd_dev, firmware_content) + #else: + # print(".", end="") + # sys.stdout.flush() + mtd_pos += mtd_erasesize + file_size -= mtd_erasesize + os.close(mtd_dev) except IOError as e: raise UpgradeError("Opening {} failed: {}".format(mtd_dev_path, e.strerror)) + except UpgradeError as e: + raise UpgradeError(e.err) + + return file_size + + def read(self, mtd_dev_path, mtd_size, mtd_erasesize, + file_size): + mtd_in_memory = b'' + try: + mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDONLY) + except IOError as e: + raise UpgradeError("Opening {} failed: {}" + "".format(mtd_dev_path, e.strerror)) + + mtd_pos = 0 while mtd_pos < mtd_size and file_size > 0: mtd_content = os.read(mtd_dev, mtd_erasesize) - firmware_content = file_obj.read(mtd_erasesize) - padsize = mtd_erasesize - len(firmware_content) - firmware_content += bytearray([0xff] * padsize) - - if not mtd_content == firmware_content: - print("U", end="") - sys.stdout.flush() - self.flash_erase(mtd_dev, mtd_pos, mtd_erasesize) - os.lseek(mtd_dev, mtd_pos, os.SEEK_SET) - os.write(mtd_dev, firmware_content) - else: - print(".", end="") - sys.stdout.flush() mtd_pos += mtd_erasesize file_size -= mtd_erasesize - print() + mtd_in_memory += mtd_content + os.close(mtd_dev) - return file_size - def check_firmware(self): - return True + return mtd_in_memory - def update_firmware(self): - """Update Firmware""" - mtd_num = 0 - print("===================================================") - print("IOT2050 firmware update started - DO NOT INTERRUPT!") - print("===================================================") +class BootloaderFirmware(Firmware): + """The Bootloader Firmware class represents uboot flash operations""" + def __init__(self, firmware): + super().__init__(firmware) + self.mtd_device = MtdDevice() + + def write(self): + """The uboot ops can write contents to uboot flash""" + mtd_num = 0 self.firmware.seek(0) self.firmware.seek(0, os.SEEK_END) @@ -259,178 +293,344 @@ class FirmwareUpdate(object): if firmware_size <= 0: break - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) - print("Updating %-20s" % mtd_name, end="") - firmware_size = self.flash_operate( - mtd_dev_path, mtd_size, mtd_erasesize, - self.firmware, firmware_size - ) + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + firmware_size = self.mtd_device.write( + mtd_dev_path, mtd_size, mtd_erasesize, + self.firmware, firmware_size + ) + except UpgradeError as e: + raise UpgradeError("BootloaderFirmware: {}".format(e.err)) mtd_num += 1 - self.firmware_integrity_check() - - def flash_md5_digest(self, firmware_len, mtd_num=0): - flash_md5 = hashlib.md5() + def read(self, firmware_len=0x8c0000): + mtd_in_memory = b'' + mtd_num = 0 while True: if firmware_len <= 0: break - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + + mtd_in_memory += self.mtd_device.read(mtd_dev_path, + mtd_size, + mtd_erasesize, + firmware_len) + except UpgradeError as e: + raise UpgradeError("BootloaderFirmware: {}".format(e.err)) + + firmware_len -= mtd_size + mtd_num += 1 - mtd_pos = 0 + return mtd_in_memory + + +class EnvFirmware(Firmware): + """The EnvFirmware class represents env partition operations""" + def __init__(self, firmware): + super().__init__(firmware) + self.firmware_path = firmware + self.mtd_device = MtdDevice() + + mtd_num = 0 + self.env_mtd_num = 0 + self.env_bk_mtd_num = 0 + while True: try: - mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDWR) - except IOError as e: - raise UpgradeError("Opening {} failed: {}".format(mtd_dev_path, - e.strerror)) + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + except UpgradeError as e: + raise UpgradeError("EnvFirmware: {}".format(e.err)) - while mtd_pos < mtd_size and firmware_len > 0: - mtd_content = os.read(mtd_dev, mtd_erasesize) - flash_md5.update(mtd_content) + if "env" == mtd_name: + self.env_mtd_num = mtd_num + if "env.backup" == mtd_name: + self.env_bk_mtd_num = mtd_num + mtd_num +=1 - mtd_pos += mtd_erasesize - firmware_len -= mtd_erasesize + if self.env_mtd_num and self.env_bk_mtd_num: + break - os.close(mtd_dev) + def write(self): + """A env firmware can write contents to the env partition""" + with tempfile.NamedTemporaryFile() as env_default_binary: + for mtd_num in self.env_mtd_num, self.env_bk_mtd_num: + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + subprocess.run('mkenvimage -s {} -r -o {} {}'.format( + mtd_size, + env_default_binary.name, + self.firmware_path), + check=True, shell=True) + firmware_size = os.path.getsize(env_default_binary.name) + self.firmware = open(env_default_binary.name, "rb") + + while True: + if firmware_size <= 0: + break + firmware_size = self.mtd_device.write( + mtd_dev_path, mtd_size, mtd_erasesize, + env_default_binary, firmware_size + ) + except subprocess.CalledProcessError as error: + print(error.stdout) + raise UpgradeError("EnvFirmware: Run mkenvimage failed") + except UpgradeError as e: + raise UpgradeError("EnvFirmware: {}".format(e.err)) - mtd_num += 1 + def read(self): + """A env firmware can read contents from the env partition""" + mtd_in_memory = b'' + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(self.env_mtd_num) - return flash_md5.hexdigest() + if "env" == mtd_name: + mtd_in_memory = self.mtd_device.read(mtd_dev_path, + mtd_size, + mtd_erasesize, + mtd_size) + except UpgradeError as e: + raise UpgradeError("EnvFirmware: {}".format(e.err)) - def firmware_md5_digest(self): - self.firmware.seek(0) - firmware_md5 = hashlib.md5() + return mtd_in_memory - for chunk in iter(lambda: self.firmware.read(4096), b""): - firmware_md5.update(chunk) - return firmware_md5.hexdigest() +class ForceUpdate(): + def __init__(self, interactor, firmware, firmware_type="uboot"): + self.firmware = firmware + self.firmware_type = firmware_type + self.interactor = interactor - def firmware_integrity_check(self): - firmware_md5_digest = self.firmware_md5_digest() - firmware_flash_md5_digest = self.flash_md5_digest(self.firmware_len) + def update(self): + if self.firmware_type == "uboot": + firmware_obj = BootloaderFirmware(self.firmware) + else: + raise UpgradeError("Unsupported firmware type!") + + print("===================================================") + print("IOT2050 firmware update started - DO NOT INTERRUPT!") + print("===================================================") - if firmware_md5_digest != firmware_flash_md5_digest: - raise UpgradeError("Firmware digest verification failed") + self.interactor.progress_bar(info="Updating {}".format(self.firmware_type)) + firmware_obj.write() - def backup_firmware(self, firmware_length=0x8c0000): + firmware_md5 = hashlib.md5() + self.firmware.seek(0) + firmware_md5.update(self.firmware.read()) + + read_out_md5 = hashlib.md5() + read_out_md5.update(firmware_obj.read()) + self.interactor.progress_bar(start=False) + + if firmware_md5.hexdigest() != read_out_md5.hexdigest(): + raise UpgradeError("Firmware digest verification failed", + ErrorCode.FLASHING_FAILED.value) + + def backup(self): + pass + +class FirmwareUpdate(): + """ + The FirmwareUpdate models the firmware updating behavior for all IOT2050 + firmware update. + """ + def __init__(self, tarball, backup_path, interactor, + rollback=False, reset=False): + self.back_fw_path = os.path.join("".join(backup_path), ".rollback_fw") + self.rollback_fw_tar = os.path.join(self.back_fw_path, + 'rollback_backup_fw.tar') + self.interactor = interactor + try: + if rollback: + if not os.path.exists(self.rollback_fw_tar) or \ + not tarfile.is_tarfile(self.rollback_fw_tar): + raise UpgradeError("No rollback firmware exists", + ErrorCode.ROLLBACK_FAILED.value) + + tarball = open(self.rollback_fw_tar, "rb") + self.tarball = FirmwareTarball(tarball, interactor, None) + else: + self.tarball = tarball + + self.firmwares = {} + for firmware_type in self.tarball.FIRMWARE_TYPES: + if firmware_type == self.tarball.FIRMWARE_TYPES[0]: + name = self.tarball.get_file_name(firmware_type) + self.firmwares[firmware_type] = BootloaderFirmware( + self.tarball.get_file(name) + ) + elif firmware_type == self.tarball.FIRMWARE_TYPES[1]: + if not reset: + env_list = self.tarball.get_preserved_uboot_env() + print("\nPreserved env list: ") + for env in env_list: + print(env) + self.firmwares[firmware_type] = EnvFirmware( + self.tarball.generate_env_firmware(env_list) + ) + else: + self.firmwares[firmware_type] = EnvFirmware( + self.tarball.get_file(self.tarball.UBOOT_ENV_FILE) + ) + elif firmware_type == self.tarball.FIRMWARE_TYPES[2]: + self.firmwares[firmware_type] = \ + Firmware(self.tarball.get_file(self.tarball.CONF_JSON)) + except UpgradeError as e: + raise UpgradeError(e.err, e.code) + + def backup(self): + """Backup the original firmware from flash""" + print("\nFirmware backup started") try: if not os.path.exists(self.back_fw_path): os.mkdir(self.back_fw_path) - except OSError: - raise UpgradeError("Failed to create path %s for firmware rollback" - % self.back_fw_path) - - print("\nFirmware backup started") - - flash_md5_digest = [] - # The reason for reading twice. The uboot binary lacks a check code, - # and reading the key flash data cannot be guaranteed to be 100% - # correct.Therefore, the method of reading twice and comparing the - # check code to reduce the chance error. - i = 0 - while i < 2: - mtd_num = 0 - mtd_in_memory = b'' - firmware_len = firmware_length - - flash_md5 = hashlib.md5() - while True: - if firmware_len <= 0: - break - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) - - mtd_pos = 0 - try: - mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDONLY) - except IOError as e: - raise UpgradeError("Opening {} failed: {}" - "".format(mtd_dev_path, e.strerror)) + self.interactor.progress_bar(info="Backing up") + tmpl_json = json.load(open("/usr/lib/iot2050/fwu/update.conf.json", "r")) + for firmware_type in self.firmwares: + md5_digest = [] + fw_name = self.tarball.get_file_name(firmware_type) + + if self.tarball.FIRMWARE_TYPES[0] == firmware_type: + i = 0 + while i < 2: + file_content = self.firmwares[firmware_type].read() + md5_digest.append(self.__get_md5_digest(file_content)) + i += 1 + fw_name = tmpl_json['firmware'][0]['name'] + elif self.tarball.FIRMWARE_TYPES[1] == firmware_type: + file = self.firmwares[firmware_type].firmware + file.seek(0) + file_content = file.read() + elif self.tarball.FIRMWARE_TYPES[2] == firmware_type: + with open("/sys/firmware/devicetree/base/model", "r") as model_f, \ + open("/etc/os-release", "r") as release_f: + for line in release_f.readlines(): + key, value = line.rstrip("\n").split("=") + if "BUILD_ID" in key: + break + target_board = model_f.read().replace("\u0000", "") + tmpl_json['firmware'][0]['target_boards'] = target_board + tmpl_json['target_os'][0]['min_version'] = \ + value.replace('"', '')[:1] + \ + tmpl_json['target_os'][0]['min_version'][1:] + file_content = bytes(json.dumps(tmpl_json, indent=4), "utf8") + else: + raise UpgradeError("Wrong Firmware Type!") + + if len(md5_digest) > 0 and md5_digest[0] != md5_digest[1]: + raise UpgradeError("Firmware backup failed") + + info = tarfile.TarInfo(fw_name) + info.size = len(file_content) + + if firmware_type == self.tarball.FIRMWARE_TYPES[0]: + with tarfile.TarFile(self.rollback_fw_tar, 'w') as tar: + tar.addfile(info, io.BytesIO(file_content)) + else: + with tarfile.TarFile(self.rollback_fw_tar, 'a') as tar: + tar.addfile(info, io.BytesIO(file_content)) + self.interactor.progress_bar(start=False) + except (OSError, UpgradeError) as e: + self.interactor.progress_bar(start=False) + raise UpgradeError(e.err, ErrorCode.BACKUP_FAILED.value) + print("Firmware backup ended\n") + + def update(self): + """Update the firmware to the specified flash""" + print("===================================================") + print("IOT2050 firmware update started - DO NOT INTERRUPT!") + print("===================================================") - while mtd_pos < mtd_size and firmware_len > 0: - mtd_content = os.read(mtd_dev, mtd_erasesize) - mtd_pos += mtd_erasesize - firmware_len -= mtd_erasesize - flash_md5.update(mtd_content) - mtd_in_memory += mtd_content + try: + for firmware_type in self.firmwares: + if firmware_type == self.tarball.FIRMWARE_TYPES[2]: + continue + self.interactor.progress_bar(info="Updating {}".format(firmware_type)) - os.close(mtd_dev) + self.firmwares[firmware_type].write() - mtd_num += 1 - flash_md5_digest.append(flash_md5.hexdigest()) - i += 1 + self.firmwares[firmware_type].firmware.seek(0) + content = self.firmwares[firmware_type].firmware.read() + firmware_md5 = self.__get_md5_digest(content) - if flash_md5_digest[0] != flash_md5_digest[1]: - print("Firmware backup failed") - return ErrorCode.BACKUP_FAILED.value + content = self.firmwares[firmware_type].read() + read_out_md5 = self.__get_md5_digest(content) + self.interactor.progress_bar(start=False) - info = tarfile.TarInfo(name=self.backup_fw_name) - info.size = len(mtd_in_memory) - with tarfile.TarFile(self.rollback_fw_tar, 'w') as tar: - tar.addfile(info, io.BytesIO(mtd_in_memory)) - print('Firmware backup ended\n') + if firmware_md5 != read_out_md5: + raise UpgradeError("Firmware digest verification failed") + except UpgradeError as e: + self.interactor.progress_bar(start=False) + raise UpgradeError(e.err, ErrorCode.FLASHING_FAILED.value) -class BoardInformation(object): - def __init__(self): - self.board_name = self._get_board_name() + def __get_md5_digest(self, content): + """Verify the update integrity""" + md5 = hashlib.md5() - print("Current board: {}".format(self.board_name)) + md5.update(content) - self.os_info = self._get_os_info() + return md5.hexdigest() - @staticmethod - def _get_board_name() -> str: - """ - Get the board name by checking the device tree node - /proc/device-tree/model - """ - with open('/proc/device-tree/model') as f_model: - board_name = f_model.read().strip('\0') - return board_name +class FirmwareTarball(object): + """A FirmwareTarball models a upgrade package in specified format""" - @staticmethod - def _get_os_info() -> dict: - ''' - Get the OS information by parsing the /etc/os-release + CONF_JSON = 'update.conf.json' + UBOOT_ENV_FILE = 'u-boot-initial-env' + # "env" must be after "uboot" because uboot update will overwrite env + # partition + FIRMWARE_TYPES = [ + "uboot", + "env", + "conf" + ] - Returned is a dict that converted from /etc/os-release, for example: - NAME="debian" - VERSION_ID="3.1.1" + def __init__(self, firmware_tarball, interactor, env_list): + self.interactor = interactor + self.firmware_tarball = firmware_tarball + self.env_list = env_list - => - { - "NAME": "debian" - "VERSION_ID": "3.1.1" - } - ''' - with open('/etc/os-release') as f: - return { - l.split('=')[0]: - l.strip().split('=')[-1].strip('"') - for l in f.readlines() - } + # extract file path + self.extract_path = "/tmp" + self.firmware_tarball.seek(0) + with tarfile.open(fileobj=self.firmware_tarball) as f: + for member in f: + file_tarfileinfo = f.getmember(name=member.name) + file_tarfileinfo.uid = os.getuid() + file_tarfileinfo.gid = os.getgid() + f.extract(file_tarfileinfo, path=self.extract_path) + self._board_info = BoardInfo() + print("Current board: {}".format(self._board_info.board_name)) -class UpdateConfiguration(object): - def __init__(self, json_fileobj): + # Parse the update configuration from the json file within the tarball # Deserialize the json file to an object so that we can use dot operator # to access the fields. try: self._jsonobj = json.load( - json_fileobj, object_hook=lambda d: Namespace(**d)) + open(self.get_file(self.CONF_JSON), "rb"), + object_hook=lambda d: Namespace(**d) + ) except ValueError: raise UpgradeError("Decoding JSON has failed") - @staticmethod - def _check_os(target_os, os_info) -> bool: + self.firmware_names = dict.fromkeys(self.FIRMWARE_TYPES) + + def __del__(self): + with tarfile.open(fileobj=self.firmware_tarball) as f: + for member in f: + os.remove(self.extract_path + "/" + member.name) + + def __check_os(self, target_os, os_info) -> bool: for tos in target_os: if tos.key in os_info and os_info[tos.key] >= (tos.min_version): return True @@ -440,114 +640,107 @@ class UpdateConfiguration(object): return False - def firmware_filter(self, board_name: str, os_info: dict) -> list: - ''' - Filter out the firmwares that could not be updated on the current board - and current OS. + def check_firmware(self): + """Check if the tarball is a valid upgrade package""" + if len(self.get_file_name(self.FIRMWARE_TYPES[0])) <= 0: + return False + return True - Return the list of the firmware names that could be updated. - ''' + def get_file_name(self, firmware_type): + """Get the file names of working firmware""" res = [] - for firmware in self._jsonobj.firmware: - if board_name in firmware.target_boards: - target_os = [] - try: - target_os = self._jsonobj.target_os - except AttributeError: - pass - - # local target_os configuration prior to global - try: - target_os = firmware.target_os - except AttributeError: - pass - - # Only choose the firmware that have the target_os configuration - # identifying with current OS info, or the firmware w/o - # target_os node which means it doesn't care the OS info. - if len(target_os) == 0 or self._check_os(target_os, os_info): - res.append(firmware.name) - - return res - - def preserved_uboot_env(self): + if self.FIRMWARE_TYPES[1] == firmware_type: + res = self.UBOOT_ENV_FILE + return res + if self.FIRMWARE_TYPES[2] == firmware_type: + res = self.CONF_JSON + return res + if not self.firmware_names[firmware_type]: + for firmware in self._jsonobj.firmware: + if self._board_info.board_name in firmware.target_boards: + # Be forward compatible, the previous uboot firmware + # tarballs don't have the type node + if not hasattr(firmware, "type"): + firmware.type = self.FIRMWARE_TYPES[0] + + if firmware.type == firmware_type: + target_os = [] + try: + target_os = self._jsonobj.target_os + except AttributeError: + pass + + # local target_os configuration prior to global + try: + target_os = firmware.target_os + except AttributeError: + pass + + # Get available firmware names by checking the board name + # and the os information, or the firmware w/o target_os node + # which means it doesn't care the OS info. + if len(target_os) == 0 or \ + self.__check_os(target_os, self._board_info.os_info): + res.append(firmware.name) + + if len(res) > 1: + # Ask user to pick one firmware image to update + print("Please select which firmware image to update:") + for n in res: + print("{}\t{}".format(res.index(n) + 1, n)) + + choice = int(self.interactor.interact("-> ")) + while choice > len(res) or choice < 1: + print("Out of range, please reinput your choice:") + choice = int(self.interactor.interact("-> ")) + + res = res[choice - 1] + + self.firmware_names[firmware_type] = "".join(res) + + return self.firmware_names[firmware_type] + + def get_file(self, name): + """Get the file object of specified name""" + file = os.path.join(self.extract_path, name) + + return file + + def __get_suggest_preserved_uboot_env(self): + """Get the default uboot env list from tarball""" try: return self._jsonobj.suggest_preserved_uboot_env except Exception: raise UpgradeError("Get suggested preserved uboot env failed") + def get_preserved_uboot_env(self): + try: + if self.env_list: + preserved_uboot_env_name = [ + item for item in self.env_list.split(',')] + else: + preserved_uboot_env_name = \ + self.__get_suggest_preserved_uboot_env() -class ManagedFirmwareUpdate(FirmwareUpdate): - ''' - ManagedFirmwareUpdate will perform various checking upon both the update - package and the targeted board, before calling the underlying - FirmwareUpdate() to flash the binaries to the SPI flash. - ''' - _update_conf_json = 'update.conf.json' - uboot_default_env_file_name = 'u-boot-initial-env' - - def __init__(self, *args): - super().__init__(*args) - - # Build the board information - self._board_info = BoardInformation() - - # Parse the update configuration from the json file within the tarball - self._update_conf = UpdateConfiguration( - tarfile.open( - fileobj=self.firmware - ).extractfile(self._update_conf_json) - ) - - # extract file path - self.extract_path = "/tmp" - - # Get available firmwares by checking the board name and the os - # information - self.firmware_names = self._update_conf.firmware_filter( - self._board_info.board_name, self._board_info.os_info - ) - - def check_firmware(self): - if len(self.firmware_names) <= 0: - return False - return True - - def update_firmware(self): - - if len(self.firmware_names) == 1: - firmware_name = self.firmware_names[0] - else: - # Ask user to pick one firmware image to update - print("Please select which firmware image to update:") - for n in self.firmware_names: - print("{}\t{}".format(self.firmware_names.index(n) + 1, n)) - - choice = int(the_input(False, "-> ")) - while choice > len(self.firmware_names) or choice < 1: - print("Out of range, please reinput your choice:") - choice = int(the_input(False, "-> ")) + preserved_uboot_env_value = [] - firmware_name = self.firmware_names[choice - 1] + if not preserved_uboot_env_name: + return None - print("Will write {} to the SPI flash".format(firmware_name)) + for env_name in preserved_uboot_env_name: + env_value = subprocess.run( + 'fw_printenv %s' % env_name, shell=True, + stdout=subprocess.PIPE, check=True) \ + .stdout.decode('utf-8').lstrip().rstrip() + preserved_uboot_env_value.append(env_value) - self.firmware.seek(0) - with tarfile.open(fileobj=self.firmware) as f: - self.firmware = f.extractfile(firmware_name) - file_tarfileinfo = f.getmember( - name=self.uboot_default_env_file_name) - file_tarfileinfo.uid = os.getuid() - file_tarfileinfo.gid = os.getgid() - f.extract(file_tarfileinfo, path=self.extract_path) - super().update_firmware() - - def get_default_env_list(self, fname): - with open(fname, 'r', encoding='utf-8') as f: - default_env_list = [i.split('=')[0] for i in f.readlines()] - return default_env_list + return preserved_uboot_env_value + except subprocess.CalledProcessError: + pass + except UpgradeError as e: + raise UpgradeError(e.err) - def remove_line_by_index(self, file, index): + def __remove_line_by_index(self, file, index): with open(file, 'r+') as fp: lines = fp.readlines() fp.seek(0) @@ -556,137 +749,136 @@ class ManagedFirmwareUpdate(FirmwareUpdate): if number != index: fp.write(line) - def remove_duplicate_default_env(self, uboot_env_file, env_list): - default_env_list = self.get_default_env_list(uboot_env_file) + def __remove_duplicate_default_env(self, uboot_env_file, env_list): + with open(uboot_env_file, 'r', encoding='utf-8') as f: + default_env_list = [i.split('=')[0] for i in f.readlines()] for value in env_list: if value.split('=')[0] in default_env_list: value_index = default_env_list.index(value.split('=')[0]) - self.remove_line_by_index(uboot_env_file, value_index) + self.__remove_line_by_index(uboot_env_file, value_index) - def update_uboot_env(self, env_list): - ''' - restore uboot env - retrive the uboot default env file belong to the upgrade firmware. - makeup the restore env binary and flash to the env mtd partition - ''' - # /dev/mtd3 /dev/mtd4 - env_partition_list = [3, 4] + def generate_env_firmware(self, env_list): + """Generate the update env file based on env_list""" uboot_default_env_file = os.path.join( - self.extract_path, self.uboot_default_env_file_name) + self.extract_path, self.UBOOT_ENV_FILE) uboot_env_assemble_file = os.path.join( self.extract_path, "env_assemble_file") assert os.path.isfile(uboot_default_env_file) # assemble the env shutil.copy(uboot_default_env_file, uboot_env_assemble_file) - self.remove_duplicate_default_env(uboot_env_assemble_file, env_list) + self.__remove_duplicate_default_env(uboot_env_assemble_file, env_list) with open(uboot_env_assemble_file, encoding="utf-8", mode="a") as file: for value in env_list: file.write(value) file.write("\n") - with tempfile.NamedTemporaryFile() as env_default_binary: - for mtd_num in env_partition_list: - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) - try: - subprocess.run('mkenvimage -s {} -r -o {} {}'.format( - mtd_size, - env_default_binary.name, - uboot_env_assemble_file), - check=True, shell=True) - except subprocess.CalledProcessError as error: - print(error.stdout) - raise UpgradeError("Run mkenvimage failed") + return uboot_env_assemble_file - firmware_size = os.path.getsize(env_default_binary.name) - while True: - if firmware_size <= 0: - break - print("Updating %-20s" % mtd_name, end="") - firmware_size = self.flash_operate( - mtd_dev_path, mtd_size, mtd_erasesize, - env_default_binary, firmware_size - ) - - self.uboot_env_integrify_check(env_default_binary) +class BoardInfo(object): + """The BoardInfo represents the updating IOT2050 board information""" + def __init__(self): + self.board_name = self._get_board_name() + self.os_info = self._get_os_info() - def uboot_env_integrify_check(self, file_obj): - file_obj.seek(0) + def _get_board_name(self) -> str: + """ + Get the board name by checking the device tree node + /proc/device-tree/model + """ + with open('/proc/device-tree/model') as f_model: + board_name = f_model.read().strip('\0') - env_md5 = hashlib.md5() - env_md5.update(file_obj.read()) - env_file_md5_digest = env_md5.hexdigest() + return board_name - env_file_size = os.path.getsize(file_obj.name) - env_flash_md5_digest = self.flash_md5_digest(env_file_size, mtd_num=3) + def _get_os_info(self) -> dict: + ''' + Get the OS information by parsing the /etc/os-release - if env_file_md5_digest != env_flash_md5_digest: - raise UpgradeError("Env digest verification failed") + Returned is a dict that converted from /etc/os-release, for example: + NAME="debian" + VERSION_ID="3.1.1" - def get_preserved_uboot_env(self, preserve_list): - if preserve_list: - preserved_uboot_env_name = [ - item for item in preserve_list.split(',')] - else: - preserved_uboot_env_name = self._update_conf.preserved_uboot_env() + => + { + "NAME": "debian" + "VERSION_ID": "3.1.1" + } + ''' + with open('/etc/os-release') as f: + return { + l.split('=')[0]: + l.strip().split('=')[-1].strip('"') + for l in f.readlines() + } - preserved_uboot_env_value = [] - if not preserved_uboot_env_name: - return None +class UserInterface(object): + """The UserInterface models the interaction for user""" + BOLD = "\033[1m" + ENDC = "\033[0m" + def __init__(self, quiet): + self.quiet = quiet + self.__progress_bar_occupied = None + + def interact(self, *args): + """Prompt the user and wait for user input""" + # No quiet mode when no input context as argument + if self.quiet and len(args) > 1: + the_stdin = sys.stdin + sys.stdin = StringIO(args[0]) + if len(args) > 1: + info = args[1] + else: + info = args[0] - for env_name in preserved_uboot_env_name: - try: - env_value = subprocess.run( - 'fw_printenv %s' % env_name, shell=True, - stdout=subprocess.PIPE, check=True) \ - .stdout.decode('utf-8').lstrip().rstrip() - preserved_uboot_env_value.append(env_value) - except subprocess.CalledProcessError: - pass + ret = input(self.BOLD + info + self.ENDC ) - return preserved_uboot_env_value + if self.quiet and len(args) > 1: + print(args[0] + '\n', end="") + sys.stdin = the_stdin + return ret -class RollbackFirmware(FirmwareUpdate): - def rollback_firmware(self): - if not os.path.exists(self.rollback_fw_tar) or \ - not tarfile.is_tarfile(self.rollback_fw_tar): - print('No rollback firmware exists') - print('It could be that you have not done any upgrades') - return ErrorCode.ROLLBACK_FAILED.value + def print_info(self, *args): + print(args[0]) - try: - with tarfile.open(self.rollback_fw_tar) as f: - self.firmware = f.extractfile(self.backup_fw_name) - self.update_firmware() - return ErrorCode.ROLLBACK_SUCCESS.value - except tarfile.ExtractError: - raise UpgradeRollbackError("Failed to rollback firmware due to" - " extract firmware error!") + def __show_progress_bar(self, info, interval, event: Event) -> None: + with Bar(info.ljust(15), fill='.', bar_prefix='', + bar_suffix='', suffix='') as bar: + while True: + bar.next() + if event.is_set(): + break + time.sleep(interval) + + def progress_bar(self, info="", interval=0.2, start=True): + if start and self.__progress_bar_occupied: + print("Progress bar is occupied!") + return + self.__progress_bar_occupied = start + + if start: + self.event = Event() + self.t = Thread( + target = self.__show_progress_bar, + args = (info, interval, self.event) + ) + self.t.start() + else: + try: + self.event.set() + self.t.join() + except AttributeError as e: + print("Progress bar is not started yet!") + pass __version__ = "${PV}" - -def the_input(is_quiet, *args): - if is_quiet: - the_stdin = sys.stdin - sys.stdin = StringIO(args[0]) - info = args[1] - else: - if len(args) > 1: - info = args[1] - else: - info = args[0] - ret = input(info) - if is_quiet: - print(args[0] + '\n', end="") - sys.stdin = the_stdin - return ret - def main(argv): + """The main function""" + description=textwrap.dedent('''\ Update OSPI firmware. Examples: @@ -713,6 +905,7 @@ def main(argv): | 5 | Flashing error | | 6 | User canceled | | 7 | Invalid firmware | + | 8 | Failed to update | ''') parser = argparse.ArgumentParser( description=description, @@ -722,7 +915,7 @@ def main(argv): group = parser.add_mutually_exclusive_group() parser.add_argument('firmware', nargs='?', metavar='FIRMWARE', type=argparse.FileType('rb'), - help='firmware image or package') + help='firmware tarball') group.add_argument('-f', '--force', help='Force update, ignore all the checking', action='store_true') @@ -751,6 +944,7 @@ def main(argv): firmware auto backed up and rollback in case of \ failure, and without automatic rebooting', action='store_true') + try: args = parser.parse_args() except IOError as e: @@ -761,113 +955,97 @@ def main(argv): print("No firmware specified") return ErrorCode.INVALID_ARG.value - try: - if args.force: - # When forced, ignore all the checking and call the underlying - # updater directly - updater = FirmwareUpdate(args.firmware, args.backup_dir) - elif args.rollback: - updater = RollbackFirmware(args.firmware, args.backup_dir) - else: - updater = ManagedFirmwareUpdate(args.firmware, args.backup_dir) + interactor = UserInterface(args.quiet); - if not updater.check_firmware(): - print("OS image version must be newer than the minimal version, " - "no firmware image could be updated on this device!") - return ErrorCode.INVALID_FIRMWARE.value - - update_input = the_input( - args.quiet, 'y', - "\n" + BOLD + "Warning: Update may render device unbootable." - + ENDC + " Continue (y/N)? " + try: + if args.rollback: + tarball = None + elif not args.force: + tarball = FirmwareTarball(args.firmware, interactor, + args.preserve_list) + # FirmwareTarball to check firmware + if not tarball.check_firmware(): + print("OS image version must be newer than the minimal version, " + "no firmware image could be updated on this device!") + return ErrorCode.INVALID_FIRMWARE.value + + update_input = interactor.interact( + 'y', + "\nWarning: Update may render device unbootable. Continue (y/N)?" ) if not update_input == "y": return ErrorCode.CANCELED.value - if args.force or args.reset: - erase_env_input = the_input( - args.quiet, 'y', "\n" + BOLD + - "Warning: All U-Boot environment variables will be reset to" - " factory settings." - + ENDC + " Continue (y/N)? " + if args.reset: + erase_env_input = interactor.interact( + 'y', + "\nWarning: All U-Boot environment variables will be reset to" + " factory settings. Continue (y/N)? " ) if not erase_env_input == "y": return ErrorCode.CANCELED.value + if args.force: + updater = ForceUpdate(interactor, args.firmware) + else: + updater = FirmwareUpdate( + tarball, + args.backup_dir, + interactor, + args.rollback, + args.reset + ) + + # FirmwareUpdate to rollback if args.rollback: - rollback_confirm = the_input( - args.quiet, 'y', + rollback_confirm = interactor.interact( + 'y', "\nRollback the firmware to the version before the upgrade (y/N)?" ) if not rollback_confirm == "y": return ErrorCode.CANCELED.value - return updater.rollback_firmware() - - envlist = None - if not args.force and not args.reset: - envlist = updater.get_preserved_uboot_env(args.preserve_list) + if not updater.update(): + return ErrorCode.ROLLBACK_SUCCESS.value if not args.no_backup: - if ErrorCode.BACKUP_FAILED.value == updater.backup_firmware(): - return ErrorCode.BACKUP_FAILED.value - updater.update_firmware() - - if envlist: - print("\n") - for env in envlist: - print(env) - preserved_env_input = the_input( - args.quiet, 'y', - "\n" + BOLD + "Warning: Preserve the above env variables." - + ENDC + " Continue (y/N)? " - ) - if not preserved_env_input == "n": - updater.update_uboot_env(envlist) + updater.backup() - reboot_input = the_input( - args.quiet, 'n', - "\n" + BOLD + "Warning: Completed. Please reboot the device." - + ENDC + " Reboot now (y/N)? " - ) - if not reboot_input == "y": - return ErrorCode.SUCCESS.value - os.system('reboot') + index = 0 + while True: + try: + updater.update() + break + except UpgradeError as e: + if index > 2: + raise UpgradeError(e.err, e.code) + index += 1 + print("{}, try again!".format(e.err)) except UpgradeError as e: - print(e) - intput_reminder = ''' - ==================================== - Upgrade Failed! Please do not reboot!! - The Device may become brick. - Please try to upgrade firmware again! - ==================================== + print(e.err) + input_reminder = ''' + ==================================== + Upgrade Failed! Please do not reboot!! + The Device may become brick. + Please try to rollback! + ==================================== Hit the Enter Key to Exit: - ''' - # Rollback the firmware when the upgrade fails - try: - print(BOLD + 'Warning: Upgrade failed, rollback started' + - ENDC + '\n') - updater = RollbackFirmware(args.firmware, args.backup_dir) - if ErrorCode.ROLLBACK_FAILED.value == updater.rollback_firmware(): - return ErrorCode.ROLLBACK_FAILED.value - except (UpgradeError, UpgradeRollbackError) as e: - print(e) - - the_input(args.quiet, " ", intput_reminder) - return ErrorCode.FLASHING_FAILED.value - except UpgradeRollbackError as e: - print(e) - intput_reminder = ''' - ==================================== - Rollback Failed! Please do not reboot!! - The Device may become brick. - Please try to rollback firmware again! - ==================================== -Hit the Enter Key to Exit: - ''' +''' + if e.code == ErrorCode.FLASHING_FAILED.value: + interactor.interact(" ", input_reminder) + if args.rollback: + e.code = ErrorCode.ROLLBACK_FAILED.value + + return e.code + + reboot_input = interactor.interact( + 'n', + "\nWarning: Completed. Please reboot the device. Reboot now (y/N)?" + ) + if not reboot_input == "y": + return ErrorCode.SUCCESS.value + os.system('reboot') - the_input(args.quiet, " ", intput_reminder) - return ErrorCode.ROLLBACK_FAILED.value if __name__ == '__main__': - code = main(sys.argv) - sys.exit(code) + CODE = main(sys.argv) + sys.exit(CODE) diff --git a/recipes-app/iot2050-firmware-update/files/update.conf.json.tmpl b/recipes-app/iot2050-firmware-update/files/update.conf.json.tmpl new file mode 100644 index 000000000..720b3962f --- /dev/null +++ b/recipes-app/iot2050-firmware-update/files/update.conf.json.tmpl @@ -0,0 +1,20 @@ +{ + "firmware": [ + { + "description": "IOT2050 Bootloader Backup Firmware", + "name": "iot2050-pg-image-boot.bin", + "type": "uboot", + "target_boards": "" + } + ], + "target_os": [ + { + "type": "Example Image", + "key": "BUILD_ID", + "min_version": "V01.01.01" + } + ], + "suggest_preserved_uboot_env": [ + "boot_targets" + ] +} diff --git a/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.4.bb b/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.5.bb similarity index 55% rename from recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.4.bb rename to recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.5.bb index 7bf817623..dbb0c63d4 100644 --- a/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.4.bb +++ b/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.5.bb @@ -3,6 +3,7 @@ # # Authors: # Chao Zeng +# Li Hua Qian # # This file is subject to the terms and conditions of the MIT License. See # COPYING.MIT file in the top-level directory. @@ -11,15 +12,23 @@ DESCRIPTION = "OSPI Firmware Update Scripts" MAINTAINER = "chao.zeng@siemens.com" -SRC_URI = "file://iot2050-firmware-update.tmpl" +SRC_URI = " \ + file://update.conf.json.tmpl \ + file://iot2050-firmware-update.tmpl" -TEMPLATE_FILES = "iot2050-firmware-update.tmpl" +TEMPLATE_FILES = "update.conf.json.tmpl iot2050-firmware-update.tmpl" inherit dpkg-raw +DEBIAN_DEPENDS = "python3-progress" + do_install() { + install -v -d ${D}/usr/lib/iot2050/fwu + install -v -m 644 ${WORKDIR}/update.conf.json ${D}/usr/lib/iot2050/fwu/ + install -v -m 755 ${WORKDIR}/iot2050-firmware-update ${D}/usr/lib/iot2050/fwu/ + install -v -d ${D}/usr/sbin/ - install -v -m 755 ${WORKDIR}/iot2050-firmware-update ${D}/usr/sbin/ + ln -sf ../lib/iot2050/fwu/iot2050-firmware-update ${D}/usr/sbin/iot2050-firmware-update } do_deploy_deb:append() {