From dabdcf46a2888499db8299d5682294dea3e95697 Mon Sep 17 00:00:00 2001 From: Li Hua Qian Date: Fri, 18 Aug 2023 12:00:05 +0800 Subject: [PATCH] iot2050-firmware-update: Refactor the firmware update tool At the beginning, a additional firmware was intended to be integrated into the upgrade tarball package. And the original code was kind of process-oriented code, many common features are not extracted, many duplicate codes exist, and the code coupling is very high, which is very difficult to expand and maintain. It was necessary to refactor the code for better maintainability and extension. Anyway, in the end the additional firmware is planed to integrated into its associated subsystem for decoupling. Refactored code is preserved for possible future maintainability and extension. For example, if new firmware needs to be added in the future, whether it is the existing firmware upgrade method or other upgrade methods, the previous upgrade logic or upgrade method can be more easily introduced and reused. Signed-off-by: Li Hua Qian --- .../files/iot2050-firmware-update.tmpl | 1166 ++++++++++------- .../files/update.conf.json.tmpl | 20 + ..._0.4.bb => iot2050-firmware-update_0.5.bb} | 15 +- 3 files changed, 704 insertions(+), 497 deletions(-) create mode 100644 recipes-app/iot2050-firmware-update/files/update.conf.json.tmpl rename recipes-app/iot2050-firmware-update/{iot2050-firmware-update_0.4.bb => iot2050-firmware-update_0.5.bb} (55%) diff --git a/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl b/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl index 6249be580..5595eb1e6 100755 --- a/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl +++ b/recipes-app/iot2050-firmware-update/files/iot2050-firmware-update.tmpl @@ -24,32 +24,28 @@ Example of update.conf.json: { "firmware": [ { - "description": "[optional] bla bla bla", - "name": "pg1-basic-vx.y.z.bin", - "version": "[optional] whatever", + "description": "IOT2050 PG1 Bootloader Release V01.01.01", + "name": "iot2050-pg1-image-boot.bin", + "version": "V01.01.01", + "type": "uboot", "target_boards": [ + "SIMATIC IOT2050-BASIC", "SIMATIC IOT2050 Basic", - "SIMATIC IOT2050-ADVANCED" + "SIMATIC IOT2050-ADVANCED", + "SIMATIC IOT2050 Advanced" ] }, { - "description": "[optional] bla bla bla", - "name": "pg2-advanced-vx.y.z.bin", - "version": "[optional] whatever", - "target_boards": "SIMATIC IOT2050 Advanced PG2", - "target_os": [ - { - "type": "[optional] Example Image", - "key": "BUILD_ID", - "min_version": "V01.02.02" - }, - { - "type": "[optional] Industrial OS", - "key": "VERSION_ID", - "min_version": "3.1.1" - } + "description": "IOT2050 PG2 Bootloader Release V01.01.01", + "name": "iot2050-pg2-image-boot.bin", + "version": "V01.01.01", + "type": "uboot", + "target_boards": [ + "SIMATIC IOT2050 Basic PG2", + "SIMATIC IOT2050 Advanced PG2", + "SIMATIC IOT2050 Advanced SM" ] - } + }, ], "target_os": [ { @@ -104,63 +100,76 @@ preserved list from cli, this would not use the preserved env variable in the `suggest_preserved_uboot_env` node. """ -import sys -import os +import argparse import fcntl +import hashlib +import io +import json +import os +import sys +import shutil import struct -import argparse +import subprocess +import time import tarfile -import json import textwrap -from types import SimpleNamespace as Namespace -import subprocess import tempfile -import hashlib -import io +from abc import ABC, abstractmethod +from ctypes import * from enum import Enum from io import StringIO -import shutil +from progress.bar import Bar +from threading import Thread, Event +from types import SimpleNamespace as Namespace + +class ErrorCode(Enum): + """The ErrorCode class describes the return codes""" + SUCCESS = 0 + ROLLBACK_SUCCESS = 1 + INVALID_ARG = 2 + BACKUP_FAILED = 3 + ROLLBACK_FAILED = 4 + FLASHING_FAILED = 5 + CANCELED = 6 + INVALID_FIRMWARE = 7 + FAILED = 8 -BOLD = "\033[1m" -ENDC = "\033[0m" class UpgradeError(Exception): - def __init__(self, ErrorInfo): + def __init__(self, ErrorInfo, code=ErrorCode.FAILED.value): super().__init__(self) - self.erroinfo = ErrorInfo + self.err = ErrorInfo + self.code = code def __str__(self): - return self.erroinfo + return repr(self) -class UpgradeRollbackError(Exception): - def __init__(self, ErrorInfo): - super().__init__(self) - self.erroinfo = ErrorInfo - def __str__(self): - return self.erroinfo +class Firmware(): + """The Firmware class represents flash base operations for all flashes""" + def __init__(self, firmware): + try: + self.firmware = open(firmware, "rb") + except (IOError, TypeError): + self.firmware = firmware -class ErrorCode(Enum): - SUCCESS = 0 - ROLLBACK_SUCCESS = 1 - INVALID_ARG = 2 - BACKUP_FAILED = 3 - ROLLBACK_FAILED = 4 - FLASHING_FAILED = 5 - CANCELED = 6 - INVALID_FIRMWARE = 7 + def __del__(self): + try: + self.firmware.close() + except Exception: + pass -class FirmwareUpdate(object): - def __init__(self, *args): - self.firmware = args[0] - self.back_fw_path = os.path.join("".join(args[1]), ".rollback_fw") - self.rollback_fw_tar = os.path.join(self.back_fw_path, - 'rollback_backup_fw.tar') - self.backup_fw_name = 'rollback_fw.bin' - self.firmware_len = 0 + @abstractmethod + def write(self): + """An Firmware can be written to flash""" + + @abstractmethod + def read(self): + """An Firmware can be read out from flash""" - @staticmethod - def get_path_type_value(path): + +class MtdDevice(): + def __get_path_type_value(self, path): """get the path value""" try: with open(path, "r") as f: @@ -168,8 +177,7 @@ class FirmwareUpdate(object): except IOError as e: raise UpgradeError("Reading {} failed: {}".format(path, e.strerror)) - @staticmethod - def flash_erase(dev, start, nbytes): + def __erase(self, dev, start, nbytes): """This function erases flash sectors @dev: flash device file descriptor @start: start address @@ -185,6 +193,7 @@ class FirmwareUpdate(object): raise UpgradeError("Flash erasing failed") def get_mtd_info(self, mtd_num): + """The uboot ops can get all mtd infos of uboot""" ospi_dev_path = "/sys/bus/platform/devices/47040000.spi" if os.path.exists(ospi_dev_path + "/spi_master"): # kernel 5.9 and later @@ -202,52 +211,77 @@ class FirmwareUpdate(object): mtd_erasesize_path = "{}/erasesize".format(mtd_sys_path) mtd_dev_path = "/dev/mtd{}".format(mtd_num) - mtd_size = int(self.get_path_type_value(mtd_size_path)) - mtd_erasesize = int(self.get_path_type_value(mtd_erasesize_path)) - mtd_name = self.get_path_type_value(mtd_name_path).strip() + try: + mtd_size = int(self.__get_path_type_value(mtd_size_path)) + mtd_erasesize = int(self.__get_path_type_value(mtd_erasesize_path)) + mtd_name = self.__get_path_type_value(mtd_name_path).strip() + except UpgradeError as e: + raise UpgradeError(e.err) return mtd_dev_path, mtd_size, mtd_erasesize, mtd_name - def flash_operate(self, mtd_dev_path, mtd_size, mtd_erasesize, + def write(self, mtd_dev_path, mtd_size, mtd_erasesize, file_obj, file_size): mtd_pos = 0 try: mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDWR) + + while mtd_pos < mtd_size and file_size > 0: + mtd_content = os.read(mtd_dev, mtd_erasesize) + firmware_content = file_obj.read(mtd_erasesize) + padsize = mtd_erasesize - len(firmware_content) + firmware_content += bytearray([0xff] * padsize) + + if not mtd_content == firmware_content: + #sys.stdout.flush() + self.__erase(mtd_dev, mtd_pos, mtd_erasesize) + os.lseek(mtd_dev, mtd_pos, os.SEEK_SET) + os.write(mtd_dev, firmware_content) + #else: + # print(".", end="") + # sys.stdout.flush() + mtd_pos += mtd_erasesize + file_size -= mtd_erasesize + os.close(mtd_dev) except IOError as e: raise UpgradeError("Opening {} failed: {}".format(mtd_dev_path, e.strerror)) + except UpgradeError as e: + raise UpgradeError(e.err) + + return file_size + + def read(self, mtd_dev_path, mtd_size, mtd_erasesize, + file_size): + mtd_in_memory = b'' + try: + mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDONLY) + except IOError as e: + raise UpgradeError("Opening {} failed: {}" + "".format(mtd_dev_path, e.strerror)) + + mtd_pos = 0 while mtd_pos < mtd_size and file_size > 0: mtd_content = os.read(mtd_dev, mtd_erasesize) - firmware_content = file_obj.read(mtd_erasesize) - padsize = mtd_erasesize - len(firmware_content) - firmware_content += bytearray([0xff] * padsize) - - if not mtd_content == firmware_content: - print("U", end="") - sys.stdout.flush() - self.flash_erase(mtd_dev, mtd_pos, mtd_erasesize) - os.lseek(mtd_dev, mtd_pos, os.SEEK_SET) - os.write(mtd_dev, firmware_content) - else: - print(".", end="") - sys.stdout.flush() mtd_pos += mtd_erasesize file_size -= mtd_erasesize - print() + mtd_in_memory += mtd_content + os.close(mtd_dev) - return file_size - def check_firmware(self): - return True + return mtd_in_memory - def update_firmware(self): - """Update Firmware""" - mtd_num = 0 - print("===================================================") - print("IOT2050 firmware update started - DO NOT INTERRUPT!") - print("===================================================") +class BootloaderFirmware(Firmware): + """The Bootloader Firmware class represents uboot flash operations""" + def __init__(self, firmware): + super().__init__(firmware) + self.mtd_device = MtdDevice() + + def write(self): + """The uboot ops can write contents to uboot flash""" + mtd_num = 0 self.firmware.seek(0) self.firmware.seek(0, os.SEEK_END) @@ -259,178 +293,344 @@ class FirmwareUpdate(object): if firmware_size <= 0: break - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) - print("Updating %-20s" % mtd_name, end="") - firmware_size = self.flash_operate( - mtd_dev_path, mtd_size, mtd_erasesize, - self.firmware, firmware_size - ) + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + firmware_size = self.mtd_device.write( + mtd_dev_path, mtd_size, mtd_erasesize, + self.firmware, firmware_size + ) + except UpgradeError as e: + raise UpgradeError("BootloaderFirmware: {}".format(e.err)) mtd_num += 1 - self.firmware_integrity_check() - - def flash_md5_digest(self, firmware_len, mtd_num=0): - flash_md5 = hashlib.md5() + def read(self, firmware_len=0x8c0000): + mtd_in_memory = b'' + mtd_num = 0 while True: if firmware_len <= 0: break - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + + mtd_in_memory += self.mtd_device.read(mtd_dev_path, + mtd_size, + mtd_erasesize, + firmware_len) + except UpgradeError as e: + raise UpgradeError("BootloaderFirmware: {}".format(e.err)) + + firmware_len -= mtd_size + mtd_num += 1 - mtd_pos = 0 + return mtd_in_memory + + +class EnvFirmware(Firmware): + """The EnvFirmware class represents env partition operations""" + def __init__(self, firmware): + super().__init__(firmware) + self.firmware_path = firmware + self.mtd_device = MtdDevice() + + mtd_num = 0 + self.env_mtd_num = 0 + self.env_bk_mtd_num = 0 + while True: try: - mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDWR) - except IOError as e: - raise UpgradeError("Opening {} failed: {}".format(mtd_dev_path, - e.strerror)) + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + except UpgradeError as e: + raise UpgradeError("EnvFirmware: {}".format(e.err)) - while mtd_pos < mtd_size and firmware_len > 0: - mtd_content = os.read(mtd_dev, mtd_erasesize) - flash_md5.update(mtd_content) + if "env" == mtd_name: + self.env_mtd_num = mtd_num + if "env.backup" == mtd_name: + self.env_bk_mtd_num = mtd_num + mtd_num +=1 - mtd_pos += mtd_erasesize - firmware_len -= mtd_erasesize + if self.env_mtd_num and self.env_bk_mtd_num: + break - os.close(mtd_dev) + def write(self): + """A env firmware can write contents to the env partition""" + with tempfile.NamedTemporaryFile() as env_default_binary: + for mtd_num in self.env_mtd_num, self.env_bk_mtd_num: + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(mtd_num) + subprocess.run('mkenvimage -s {} -r -o {} {}'.format( + mtd_size, + env_default_binary.name, + self.firmware_path), + check=True, shell=True) + firmware_size = os.path.getsize(env_default_binary.name) + self.firmware = open(env_default_binary.name, "rb") + + while True: + if firmware_size <= 0: + break + firmware_size = self.mtd_device.write( + mtd_dev_path, mtd_size, mtd_erasesize, + env_default_binary, firmware_size + ) + except subprocess.CalledProcessError as error: + print(error.stdout) + raise UpgradeError("EnvFirmware: Run mkenvimage failed") + except UpgradeError as e: + raise UpgradeError("EnvFirmware: {}".format(e.err)) - mtd_num += 1 + def read(self): + """A env firmware can read contents from the env partition""" + mtd_in_memory = b'' + try: + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ + self.mtd_device.get_mtd_info(self.env_mtd_num) - return flash_md5.hexdigest() + if "env" == mtd_name: + mtd_in_memory = self.mtd_device.read(mtd_dev_path, + mtd_size, + mtd_erasesize, + mtd_size) + except UpgradeError as e: + raise UpgradeError("EnvFirmware: {}".format(e.err)) - def firmware_md5_digest(self): - self.firmware.seek(0) - firmware_md5 = hashlib.md5() + return mtd_in_memory - for chunk in iter(lambda: self.firmware.read(4096), b""): - firmware_md5.update(chunk) - return firmware_md5.hexdigest() +class ForceUpdate(): + def __init__(self, interactor, firmware, firmware_type="uboot"): + self.firmware = firmware + self.firmware_type = firmware_type + self.interactor = interactor - def firmware_integrity_check(self): - firmware_md5_digest = self.firmware_md5_digest() - firmware_flash_md5_digest = self.flash_md5_digest(self.firmware_len) + def update(self): + if self.firmware_type == "uboot": + firmware_obj = BootloaderFirmware(self.firmware) + else: + raise UpgradeError("Unsupported firmware type!") + + print("===================================================") + print("IOT2050 firmware update started - DO NOT INTERRUPT!") + print("===================================================") - if firmware_md5_digest != firmware_flash_md5_digest: - raise UpgradeError("Firmware digest verification failed") + self.interactor.progress_bar(info="Updating {}".format(self.firmware_type)) + firmware_obj.write() - def backup_firmware(self, firmware_length=0x8c0000): + firmware_md5 = hashlib.md5() + self.firmware.seek(0) + firmware_md5.update(self.firmware.read()) + + read_out_md5 = hashlib.md5() + read_out_md5.update(firmware_obj.read()) + self.interactor.progress_bar(start=False) + + if firmware_md5.hexdigest() != read_out_md5.hexdigest(): + raise UpgradeError("Firmware digest verification failed", + ErrorCode.FLASHING_FAILED.value) + + def backup(self): + pass + +class FirmwareUpdate(): + """ + The FirmwareUpdate models the firmware updating behavior for all IOT2050 + firmware update. + """ + def __init__(self, tarball, backup_path, interactor, + rollback=False, reset=False): + self.back_fw_path = os.path.join("".join(backup_path), ".rollback_fw") + self.rollback_fw_tar = os.path.join(self.back_fw_path, + 'rollback_backup_fw.tar') + self.interactor = interactor + try: + if rollback: + if not os.path.exists(self.rollback_fw_tar) or \ + not tarfile.is_tarfile(self.rollback_fw_tar): + raise UpgradeError("No rollback firmware exists", + ErrorCode.ROLLBACK_FAILED.value) + + tarball = open(self.rollback_fw_tar, "rb") + self.tarball = FirmwareTarball(tarball, interactor, None) + else: + self.tarball = tarball + + self.firmwares = {} + for firmware_type in self.tarball.FIRMWARE_TYPES: + if firmware_type == self.tarball.FIRMWARE_TYPES[0]: + name = self.tarball.get_file_name(firmware_type) + self.firmwares[firmware_type] = BootloaderFirmware( + self.tarball.get_file(name) + ) + elif firmware_type == self.tarball.FIRMWARE_TYPES[1]: + if not reset: + env_list = self.tarball.get_preserved_uboot_env() + print("\nPreserved env list: ") + for env in env_list: + print(env) + self.firmwares[firmware_type] = EnvFirmware( + self.tarball.generate_env_firmware(env_list) + ) + else: + self.firmwares[firmware_type] = EnvFirmware( + self.tarball.get_file(self.tarball.UBOOT_ENV_FILE) + ) + elif firmware_type == self.tarball.FIRMWARE_TYPES[2]: + self.firmwares[firmware_type] = \ + Firmware(self.tarball.get_file(self.tarball.CONF_JSON)) + except UpgradeError as e: + raise UpgradeError(e.err, e.code) + + def backup(self): + """Backup the original firmware from flash""" + print("\nFirmware backup started") try: if not os.path.exists(self.back_fw_path): os.mkdir(self.back_fw_path) - except OSError: - raise UpgradeError("Failed to create path %s for firmware rollback" - % self.back_fw_path) - - print("\nFirmware backup started") - - flash_md5_digest = [] - # The reason for reading twice. The uboot binary lacks a check code, - # and reading the key flash data cannot be guaranteed to be 100% - # correct.Therefore, the method of reading twice and comparing the - # check code to reduce the chance error. - i = 0 - while i < 2: - mtd_num = 0 - mtd_in_memory = b'' - firmware_len = firmware_length - - flash_md5 = hashlib.md5() - while True: - if firmware_len <= 0: - break - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) - - mtd_pos = 0 - try: - mtd_dev = os.open(mtd_dev_path, os.O_SYNC | os.O_RDONLY) - except IOError as e: - raise UpgradeError("Opening {} failed: {}" - "".format(mtd_dev_path, e.strerror)) + self.interactor.progress_bar(info="Backing up") + tmpl_json = json.load(open("/usr/lib/iot2050/fwu/update.conf.json", "r")) + for firmware_type in self.firmwares: + md5_digest = [] + fw_name = self.tarball.get_file_name(firmware_type) + + if self.tarball.FIRMWARE_TYPES[0] == firmware_type: + i = 0 + while i < 2: + file_content = self.firmwares[firmware_type].read() + md5_digest.append(self.__get_md5_digest(file_content)) + i += 1 + fw_name = tmpl_json['firmware'][0]['name'] + elif self.tarball.FIRMWARE_TYPES[1] == firmware_type: + file = self.firmwares[firmware_type].firmware + file.seek(0) + file_content = file.read() + elif self.tarball.FIRMWARE_TYPES[2] == firmware_type: + with open("/sys/firmware/devicetree/base/model", "r") as model_f, \ + open("/etc/os-release", "r") as release_f: + for line in release_f.readlines(): + key, value = line.rstrip("\n").split("=") + if "BUILD_ID" in key: + break + target_board = model_f.read().replace("\u0000", "") + tmpl_json['firmware'][0]['target_boards'] = target_board + tmpl_json['target_os'][0]['min_version'] = \ + value.replace('"', '')[:1] + \ + tmpl_json['target_os'][0]['min_version'][1:] + file_content = bytes(json.dumps(tmpl_json, indent=4), "utf8") + else: + raise UpgradeError("Wrong Firmware Type!") + + if len(md5_digest) > 0 and md5_digest[0] != md5_digest[1]: + raise UpgradeError("Firmware backup failed") + + info = tarfile.TarInfo(fw_name) + info.size = len(file_content) + + if firmware_type == self.tarball.FIRMWARE_TYPES[0]: + with tarfile.TarFile(self.rollback_fw_tar, 'w') as tar: + tar.addfile(info, io.BytesIO(file_content)) + else: + with tarfile.TarFile(self.rollback_fw_tar, 'a') as tar: + tar.addfile(info, io.BytesIO(file_content)) + self.interactor.progress_bar(start=False) + except (OSError, UpgradeError) as e: + self.interactor.progress_bar(start=False) + raise UpgradeError(e.err, ErrorCode.BACKUP_FAILED.value) + print("Firmware backup ended\n") + + def update(self): + """Update the firmware to the specified flash""" + print("===================================================") + print("IOT2050 firmware update started - DO NOT INTERRUPT!") + print("===================================================") - while mtd_pos < mtd_size and firmware_len > 0: - mtd_content = os.read(mtd_dev, mtd_erasesize) - mtd_pos += mtd_erasesize - firmware_len -= mtd_erasesize - flash_md5.update(mtd_content) - mtd_in_memory += mtd_content + try: + for firmware_type in self.firmwares: + if firmware_type == self.tarball.FIRMWARE_TYPES[2]: + continue + self.interactor.progress_bar(info="Updating {}".format(firmware_type)) - os.close(mtd_dev) + self.firmwares[firmware_type].write() - mtd_num += 1 - flash_md5_digest.append(flash_md5.hexdigest()) - i += 1 + self.firmwares[firmware_type].firmware.seek(0) + content = self.firmwares[firmware_type].firmware.read() + firmware_md5 = self.__get_md5_digest(content) - if flash_md5_digest[0] != flash_md5_digest[1]: - print("Firmware backup failed") - return ErrorCode.BACKUP_FAILED.value + content = self.firmwares[firmware_type].read() + read_out_md5 = self.__get_md5_digest(content) + self.interactor.progress_bar(start=False) - info = tarfile.TarInfo(name=self.backup_fw_name) - info.size = len(mtd_in_memory) - with tarfile.TarFile(self.rollback_fw_tar, 'w') as tar: - tar.addfile(info, io.BytesIO(mtd_in_memory)) - print('Firmware backup ended\n') + if firmware_md5 != read_out_md5: + raise UpgradeError("Firmware digest verification failed") + except UpgradeError as e: + self.interactor.progress_bar(start=False) + raise UpgradeError(e.err, ErrorCode.FLASHING_FAILED.value) -class BoardInformation(object): - def __init__(self): - self.board_name = self._get_board_name() + def __get_md5_digest(self, content): + """Verify the update integrity""" + md5 = hashlib.md5() - print("Current board: {}".format(self.board_name)) + md5.update(content) - self.os_info = self._get_os_info() + return md5.hexdigest() - @staticmethod - def _get_board_name() -> str: - """ - Get the board name by checking the device tree node - /proc/device-tree/model - """ - with open('/proc/device-tree/model') as f_model: - board_name = f_model.read().strip('\0') - return board_name +class FirmwareTarball(object): + """A FirmwareTarball models a upgrade package in specified format""" - @staticmethod - def _get_os_info() -> dict: - ''' - Get the OS information by parsing the /etc/os-release + CONF_JSON = 'update.conf.json' + UBOOT_ENV_FILE = 'u-boot-initial-env' + # "env" must be after "uboot" because uboot update will overwrite env + # partition + FIRMWARE_TYPES = [ + "uboot", + "env", + "conf" + ] - Returned is a dict that converted from /etc/os-release, for example: - NAME="debian" - VERSION_ID="3.1.1" + def __init__(self, firmware_tarball, interactor, env_list): + self.interactor = interactor + self.firmware_tarball = firmware_tarball + self.env_list = env_list - => - { - "NAME": "debian" - "VERSION_ID": "3.1.1" - } - ''' - with open('/etc/os-release') as f: - return { - l.split('=')[0]: - l.strip().split('=')[-1].strip('"') - for l in f.readlines() - } + # extract file path + self.extract_path = "/tmp" + self.firmware_tarball.seek(0) + with tarfile.open(fileobj=self.firmware_tarball) as f: + for member in f: + file_tarfileinfo = f.getmember(name=member.name) + file_tarfileinfo.uid = os.getuid() + file_tarfileinfo.gid = os.getgid() + f.extract(file_tarfileinfo, path=self.extract_path) + self._board_info = BoardInfo() + print("Current board: {}".format(self._board_info.board_name)) -class UpdateConfiguration(object): - def __init__(self, json_fileobj): + # Parse the update configuration from the json file within the tarball # Deserialize the json file to an object so that we can use dot operator # to access the fields. try: self._jsonobj = json.load( - json_fileobj, object_hook=lambda d: Namespace(**d)) + open(self.get_file(self.CONF_JSON), "rb"), + object_hook=lambda d: Namespace(**d) + ) except ValueError: raise UpgradeError("Decoding JSON has failed") - @staticmethod - def _check_os(target_os, os_info) -> bool: + self.firmware_names = dict.fromkeys(self.FIRMWARE_TYPES) + + def __del__(self): + with tarfile.open(fileobj=self.firmware_tarball) as f: + for member in f: + os.remove(self.extract_path + "/" + member.name) + + def __check_os(self, target_os, os_info) -> bool: for tos in target_os: if tos.key in os_info and os_info[tos.key] >= (tos.min_version): return True @@ -440,114 +640,107 @@ class UpdateConfiguration(object): return False - def firmware_filter(self, board_name: str, os_info: dict) -> list: - ''' - Filter out the firmwares that could not be updated on the current board - and current OS. + def check_firmware(self): + """Check if the tarball is a valid upgrade package""" + if len(self.get_file_name(self.FIRMWARE_TYPES[0])) <= 0: + return False + return True - Return the list of the firmware names that could be updated. - ''' + def get_file_name(self, firmware_type): + """Get the file names of working firmware""" res = [] - for firmware in self._jsonobj.firmware: - if board_name in firmware.target_boards: - target_os = [] - try: - target_os = self._jsonobj.target_os - except AttributeError: - pass - - # local target_os configuration prior to global - try: - target_os = firmware.target_os - except AttributeError: - pass - - # Only choose the firmware that have the target_os configuration - # identifying with current OS info, or the firmware w/o - # target_os node which means it doesn't care the OS info. - if len(target_os) == 0 or self._check_os(target_os, os_info): - res.append(firmware.name) - - return res - - def preserved_uboot_env(self): + if self.FIRMWARE_TYPES[1] == firmware_type: + res = self.UBOOT_ENV_FILE + return res + if self.FIRMWARE_TYPES[2] == firmware_type: + res = self.CONF_JSON + return res + if not self.firmware_names[firmware_type]: + for firmware in self._jsonobj.firmware: + if self._board_info.board_name in firmware.target_boards: + # Be forward compatible, the previous uboot firmware + # tarballs don't have the type node + if not hasattr(firmware, "type"): + firmware.type = self.FIRMWARE_TYPES[0] + + if firmware.type == firmware_type: + target_os = [] + try: + target_os = self._jsonobj.target_os + except AttributeError: + pass + + # local target_os configuration prior to global + try: + target_os = firmware.target_os + except AttributeError: + pass + + # Get available firmware names by checking the board name + # and the os information, or the firmware w/o target_os node + # which means it doesn't care the OS info. + if len(target_os) == 0 or \ + self.__check_os(target_os, self._board_info.os_info): + res.append(firmware.name) + + if len(res) > 1: + # Ask user to pick one firmware image to update + print("Please select which firmware image to update:") + for n in res: + print("{}\t{}".format(res.index(n) + 1, n)) + + choice = int(self.interactor.interact("-> ")) + while choice > len(res) or choice < 1: + print("Out of range, please reinput your choice:") + choice = int(self.interactor.interact("-> ")) + + res = res[choice - 1] + + self.firmware_names[firmware_type] = "".join(res) + + return self.firmware_names[firmware_type] + + def get_file(self, name): + """Get the file object of specified name""" + file = os.path.join(self.extract_path, name) + + return file + + def __get_suggest_preserved_uboot_env(self): + """Get the default uboot env list from tarball""" try: return self._jsonobj.suggest_preserved_uboot_env except Exception: raise UpgradeError("Get suggested preserved uboot env failed") + def get_preserved_uboot_env(self): + try: + if self.env_list: + preserved_uboot_env_name = [ + item for item in self.env_list.split(',')] + else: + preserved_uboot_env_name = \ + self.__get_suggest_preserved_uboot_env() -class ManagedFirmwareUpdate(FirmwareUpdate): - ''' - ManagedFirmwareUpdate will perform various checking upon both the update - package and the targeted board, before calling the underlying - FirmwareUpdate() to flash the binaries to the SPI flash. - ''' - _update_conf_json = 'update.conf.json' - uboot_default_env_file_name = 'u-boot-initial-env' - - def __init__(self, *args): - super().__init__(*args) - - # Build the board information - self._board_info = BoardInformation() - - # Parse the update configuration from the json file within the tarball - self._update_conf = UpdateConfiguration( - tarfile.open( - fileobj=self.firmware - ).extractfile(self._update_conf_json) - ) - - # extract file path - self.extract_path = "/tmp" - - # Get available firmwares by checking the board name and the os - # information - self.firmware_names = self._update_conf.firmware_filter( - self._board_info.board_name, self._board_info.os_info - ) - - def check_firmware(self): - if len(self.firmware_names) <= 0: - return False - return True - - def update_firmware(self): - - if len(self.firmware_names) == 1: - firmware_name = self.firmware_names[0] - else: - # Ask user to pick one firmware image to update - print("Please select which firmware image to update:") - for n in self.firmware_names: - print("{}\t{}".format(self.firmware_names.index(n) + 1, n)) - - choice = int(the_input(False, "-> ")) - while choice > len(self.firmware_names) or choice < 1: - print("Out of range, please reinput your choice:") - choice = int(the_input(False, "-> ")) + preserved_uboot_env_value = [] - firmware_name = self.firmware_names[choice - 1] + if not preserved_uboot_env_name: + return None - print("Will write {} to the SPI flash".format(firmware_name)) + for env_name in preserved_uboot_env_name: + env_value = subprocess.run( + 'fw_printenv %s' % env_name, shell=True, + stdout=subprocess.PIPE, check=True) \ + .stdout.decode('utf-8').lstrip().rstrip() + preserved_uboot_env_value.append(env_value) - self.firmware.seek(0) - with tarfile.open(fileobj=self.firmware) as f: - self.firmware = f.extractfile(firmware_name) - file_tarfileinfo = f.getmember( - name=self.uboot_default_env_file_name) - file_tarfileinfo.uid = os.getuid() - file_tarfileinfo.gid = os.getgid() - f.extract(file_tarfileinfo, path=self.extract_path) - super().update_firmware() - - def get_default_env_list(self, fname): - with open(fname, 'r', encoding='utf-8') as f: - default_env_list = [i.split('=')[0] for i in f.readlines()] - return default_env_list + return preserved_uboot_env_value + except subprocess.CalledProcessError: + pass + except UpgradeError as e: + raise UpgradeError(e.err) - def remove_line_by_index(self, file, index): + def __remove_line_by_index(self, file, index): with open(file, 'r+') as fp: lines = fp.readlines() fp.seek(0) @@ -556,137 +749,136 @@ class ManagedFirmwareUpdate(FirmwareUpdate): if number != index: fp.write(line) - def remove_duplicate_default_env(self, uboot_env_file, env_list): - default_env_list = self.get_default_env_list(uboot_env_file) + def __remove_duplicate_default_env(self, uboot_env_file, env_list): + with open(uboot_env_file, 'r', encoding='utf-8') as f: + default_env_list = [i.split('=')[0] for i in f.readlines()] for value in env_list: if value.split('=')[0] in default_env_list: value_index = default_env_list.index(value.split('=')[0]) - self.remove_line_by_index(uboot_env_file, value_index) + self.__remove_line_by_index(uboot_env_file, value_index) - def update_uboot_env(self, env_list): - ''' - restore uboot env - retrive the uboot default env file belong to the upgrade firmware. - makeup the restore env binary and flash to the env mtd partition - ''' - # /dev/mtd3 /dev/mtd4 - env_partition_list = [3, 4] + def generate_env_firmware(self, env_list): + """Generate the update env file based on env_list""" uboot_default_env_file = os.path.join( - self.extract_path, self.uboot_default_env_file_name) + self.extract_path, self.UBOOT_ENV_FILE) uboot_env_assemble_file = os.path.join( self.extract_path, "env_assemble_file") assert os.path.isfile(uboot_default_env_file) # assemble the env shutil.copy(uboot_default_env_file, uboot_env_assemble_file) - self.remove_duplicate_default_env(uboot_env_assemble_file, env_list) + self.__remove_duplicate_default_env(uboot_env_assemble_file, env_list) with open(uboot_env_assemble_file, encoding="utf-8", mode="a") as file: for value in env_list: file.write(value) file.write("\n") - with tempfile.NamedTemporaryFile() as env_default_binary: - for mtd_num in env_partition_list: - mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = \ - self.get_mtd_info(mtd_num) - try: - subprocess.run('mkenvimage -s {} -r -o {} {}'.format( - mtd_size, - env_default_binary.name, - uboot_env_assemble_file), - check=True, shell=True) - except subprocess.CalledProcessError as error: - print(error.stdout) - raise UpgradeError("Run mkenvimage failed") + return uboot_env_assemble_file - firmware_size = os.path.getsize(env_default_binary.name) - while True: - if firmware_size <= 0: - break - print("Updating %-20s" % mtd_name, end="") - firmware_size = self.flash_operate( - mtd_dev_path, mtd_size, mtd_erasesize, - env_default_binary, firmware_size - ) - - self.uboot_env_integrify_check(env_default_binary) +class BoardInfo(object): + """The BoardInfo represents the updating IOT2050 board information""" + def __init__(self): + self.board_name = self._get_board_name() + self.os_info = self._get_os_info() - def uboot_env_integrify_check(self, file_obj): - file_obj.seek(0) + def _get_board_name(self) -> str: + """ + Get the board name by checking the device tree node + /proc/device-tree/model + """ + with open('/proc/device-tree/model') as f_model: + board_name = f_model.read().strip('\0') - env_md5 = hashlib.md5() - env_md5.update(file_obj.read()) - env_file_md5_digest = env_md5.hexdigest() + return board_name - env_file_size = os.path.getsize(file_obj.name) - env_flash_md5_digest = self.flash_md5_digest(env_file_size, mtd_num=3) + def _get_os_info(self) -> dict: + ''' + Get the OS information by parsing the /etc/os-release - if env_file_md5_digest != env_flash_md5_digest: - raise UpgradeError("Env digest verification failed") + Returned is a dict that converted from /etc/os-release, for example: + NAME="debian" + VERSION_ID="3.1.1" - def get_preserved_uboot_env(self, preserve_list): - if preserve_list: - preserved_uboot_env_name = [ - item for item in preserve_list.split(',')] - else: - preserved_uboot_env_name = self._update_conf.preserved_uboot_env() + => + { + "NAME": "debian" + "VERSION_ID": "3.1.1" + } + ''' + with open('/etc/os-release') as f: + return { + l.split('=')[0]: + l.strip().split('=')[-1].strip('"') + for l in f.readlines() + } - preserved_uboot_env_value = [] - if not preserved_uboot_env_name: - return None +class UserInterface(object): + """The UserInterface models the interaction for user""" + BOLD = "\033[1m" + ENDC = "\033[0m" + def __init__(self, quiet): + self.quiet = quiet + self.__progress_bar_occupied = None + + def interact(self, *args): + """Prompt the user and wait for user input""" + # No quiet mode when no input context as argument + if self.quiet and len(args) > 1: + the_stdin = sys.stdin + sys.stdin = StringIO(args[0]) + if len(args) > 1: + info = args[1] + else: + info = args[0] - for env_name in preserved_uboot_env_name: - try: - env_value = subprocess.run( - 'fw_printenv %s' % env_name, shell=True, - stdout=subprocess.PIPE, check=True) \ - .stdout.decode('utf-8').lstrip().rstrip() - preserved_uboot_env_value.append(env_value) - except subprocess.CalledProcessError: - pass + ret = input(self.BOLD + info + self.ENDC ) - return preserved_uboot_env_value + if self.quiet and len(args) > 1: + print(args[0] + '\n', end="") + sys.stdin = the_stdin + return ret -class RollbackFirmware(FirmwareUpdate): - def rollback_firmware(self): - if not os.path.exists(self.rollback_fw_tar) or \ - not tarfile.is_tarfile(self.rollback_fw_tar): - print('No rollback firmware exists') - print('It could be that you have not done any upgrades') - return ErrorCode.ROLLBACK_FAILED.value + def print_info(self, *args): + print(args[0]) - try: - with tarfile.open(self.rollback_fw_tar) as f: - self.firmware = f.extractfile(self.backup_fw_name) - self.update_firmware() - return ErrorCode.ROLLBACK_SUCCESS.value - except tarfile.ExtractError: - raise UpgradeRollbackError("Failed to rollback firmware due to" - " extract firmware error!") + def __show_progress_bar(self, info, interval, event: Event) -> None: + with Bar(info.ljust(15), fill='.', bar_prefix='', + bar_suffix='', suffix='') as bar: + while True: + bar.next() + if event.is_set(): + break + time.sleep(interval) + + def progress_bar(self, info="", interval=0.2, start=True): + if start and self.__progress_bar_occupied: + print("Progress bar is occupied!") + return + self.__progress_bar_occupied = start + + if start: + self.event = Event() + self.t = Thread( + target = self.__show_progress_bar, + args = (info, interval, self.event) + ) + self.t.start() + else: + try: + self.event.set() + self.t.join() + except AttributeError as e: + print("Progress bar is not started yet!") + pass __version__ = "${PV}" - -def the_input(is_quiet, *args): - if is_quiet: - the_stdin = sys.stdin - sys.stdin = StringIO(args[0]) - info = args[1] - else: - if len(args) > 1: - info = args[1] - else: - info = args[0] - ret = input(info) - if is_quiet: - print(args[0] + '\n', end="") - sys.stdin = the_stdin - return ret - def main(argv): + """The main function""" + description=textwrap.dedent('''\ Update OSPI firmware. Examples: @@ -713,6 +905,7 @@ def main(argv): | 5 | Flashing error | | 6 | User canceled | | 7 | Invalid firmware | + | 8 | Failed to update | ''') parser = argparse.ArgumentParser( description=description, @@ -722,7 +915,7 @@ def main(argv): group = parser.add_mutually_exclusive_group() parser.add_argument('firmware', nargs='?', metavar='FIRMWARE', type=argparse.FileType('rb'), - help='firmware image or package') + help='firmware tarball') group.add_argument('-f', '--force', help='Force update, ignore all the checking', action='store_true') @@ -751,6 +944,7 @@ def main(argv): firmware auto backed up and rollback in case of \ failure, and without automatic rebooting', action='store_true') + try: args = parser.parse_args() except IOError as e: @@ -761,113 +955,97 @@ def main(argv): print("No firmware specified") return ErrorCode.INVALID_ARG.value - try: - if args.force: - # When forced, ignore all the checking and call the underlying - # updater directly - updater = FirmwareUpdate(args.firmware, args.backup_dir) - elif args.rollback: - updater = RollbackFirmware(args.firmware, args.backup_dir) - else: - updater = ManagedFirmwareUpdate(args.firmware, args.backup_dir) + interactor = UserInterface(args.quiet); - if not updater.check_firmware(): - print("OS image version must be newer than the minimal version, " - "no firmware image could be updated on this device!") - return ErrorCode.INVALID_FIRMWARE.value - - update_input = the_input( - args.quiet, 'y', - "\n" + BOLD + "Warning: Update may render device unbootable." - + ENDC + " Continue (y/N)? " + try: + if args.rollback: + tarball = None + elif not args.force: + tarball = FirmwareTarball(args.firmware, interactor, + args.preserve_list) + # FirmwareTarball to check firmware + if not tarball.check_firmware(): + print("OS image version must be newer than the minimal version, " + "no firmware image could be updated on this device!") + return ErrorCode.INVALID_FIRMWARE.value + + update_input = interactor.interact( + 'y', + "\nWarning: Update may render device unbootable. Continue (y/N)?" ) if not update_input == "y": return ErrorCode.CANCELED.value - if args.force or args.reset: - erase_env_input = the_input( - args.quiet, 'y', "\n" + BOLD + - "Warning: All U-Boot environment variables will be reset to" - " factory settings." - + ENDC + " Continue (y/N)? " + if args.reset: + erase_env_input = interactor.interact( + 'y', + "\nWarning: All U-Boot environment variables will be reset to" + " factory settings. Continue (y/N)? " ) if not erase_env_input == "y": return ErrorCode.CANCELED.value + if args.force: + updater = ForceUpdate(interactor, args.firmware) + else: + updater = FirmwareUpdate( + tarball, + args.backup_dir, + interactor, + args.rollback, + args.reset + ) + + # FirmwareUpdate to rollback if args.rollback: - rollback_confirm = the_input( - args.quiet, 'y', + rollback_confirm = interactor.interact( + 'y', "\nRollback the firmware to the version before the upgrade (y/N)?" ) if not rollback_confirm == "y": return ErrorCode.CANCELED.value - return updater.rollback_firmware() - - envlist = None - if not args.force and not args.reset: - envlist = updater.get_preserved_uboot_env(args.preserve_list) + if not updater.update(): + return ErrorCode.ROLLBACK_SUCCESS.value if not args.no_backup: - if ErrorCode.BACKUP_FAILED.value == updater.backup_firmware(): - return ErrorCode.BACKUP_FAILED.value - updater.update_firmware() - - if envlist: - print("\n") - for env in envlist: - print(env) - preserved_env_input = the_input( - args.quiet, 'y', - "\n" + BOLD + "Warning: Preserve the above env variables." - + ENDC + " Continue (y/N)? " - ) - if not preserved_env_input == "n": - updater.update_uboot_env(envlist) + updater.backup() - reboot_input = the_input( - args.quiet, 'n', - "\n" + BOLD + "Warning: Completed. Please reboot the device." - + ENDC + " Reboot now (y/N)? " - ) - if not reboot_input == "y": - return ErrorCode.SUCCESS.value - os.system('reboot') + index = 0 + while True: + try: + updater.update() + break + except UpgradeError as e: + if index > 2: + raise UpgradeError(e.err, e.code) + index += 1 + print("{}, try again!".format(e.err)) except UpgradeError as e: - print(e) - intput_reminder = ''' - ==================================== - Upgrade Failed! Please do not reboot!! - The Device may become brick. - Please try to upgrade firmware again! - ==================================== + print(e.err) + input_reminder = ''' + ==================================== + Upgrade Failed! Please do not reboot!! + The Device may become brick. + Please try to rollback! + ==================================== Hit the Enter Key to Exit: - ''' - # Rollback the firmware when the upgrade fails - try: - print(BOLD + 'Warning: Upgrade failed, rollback started' + - ENDC + '\n') - updater = RollbackFirmware(args.firmware, args.backup_dir) - if ErrorCode.ROLLBACK_FAILED.value == updater.rollback_firmware(): - return ErrorCode.ROLLBACK_FAILED.value - except (UpgradeError, UpgradeRollbackError) as e: - print(e) - - the_input(args.quiet, " ", intput_reminder) - return ErrorCode.FLASHING_FAILED.value - except UpgradeRollbackError as e: - print(e) - intput_reminder = ''' - ==================================== - Rollback Failed! Please do not reboot!! - The Device may become brick. - Please try to rollback firmware again! - ==================================== -Hit the Enter Key to Exit: - ''' +''' + if e.code == ErrorCode.FLASHING_FAILED.value: + interactor.interact(" ", input_reminder) + if args.rollback: + e.code = ErrorCode.ROLLBACK_FAILED.value + + return e.code + + reboot_input = interactor.interact( + 'n', + "\nWarning: Completed. Please reboot the device. Reboot now (y/N)?" + ) + if not reboot_input == "y": + return ErrorCode.SUCCESS.value + os.system('reboot') - the_input(args.quiet, " ", intput_reminder) - return ErrorCode.ROLLBACK_FAILED.value if __name__ == '__main__': - code = main(sys.argv) - sys.exit(code) + CODE = main(sys.argv) + sys.exit(CODE) diff --git a/recipes-app/iot2050-firmware-update/files/update.conf.json.tmpl b/recipes-app/iot2050-firmware-update/files/update.conf.json.tmpl new file mode 100644 index 000000000..720b3962f --- /dev/null +++ b/recipes-app/iot2050-firmware-update/files/update.conf.json.tmpl @@ -0,0 +1,20 @@ +{ + "firmware": [ + { + "description": "IOT2050 Bootloader Backup Firmware", + "name": "iot2050-pg-image-boot.bin", + "type": "uboot", + "target_boards": "" + } + ], + "target_os": [ + { + "type": "Example Image", + "key": "BUILD_ID", + "min_version": "V01.01.01" + } + ], + "suggest_preserved_uboot_env": [ + "boot_targets" + ] +} diff --git a/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.4.bb b/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.5.bb similarity index 55% rename from recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.4.bb rename to recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.5.bb index 7bf817623..dbb0c63d4 100644 --- a/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.4.bb +++ b/recipes-app/iot2050-firmware-update/iot2050-firmware-update_0.5.bb @@ -3,6 +3,7 @@ # # Authors: # Chao Zeng +# Li Hua Qian # # This file is subject to the terms and conditions of the MIT License. See # COPYING.MIT file in the top-level directory. @@ -11,15 +12,23 @@ DESCRIPTION = "OSPI Firmware Update Scripts" MAINTAINER = "chao.zeng@siemens.com" -SRC_URI = "file://iot2050-firmware-update.tmpl" +SRC_URI = " \ + file://update.conf.json.tmpl \ + file://iot2050-firmware-update.tmpl" -TEMPLATE_FILES = "iot2050-firmware-update.tmpl" +TEMPLATE_FILES = "update.conf.json.tmpl iot2050-firmware-update.tmpl" inherit dpkg-raw +DEBIAN_DEPENDS = "python3-progress" + do_install() { + install -v -d ${D}/usr/lib/iot2050/fwu + install -v -m 644 ${WORKDIR}/update.conf.json ${D}/usr/lib/iot2050/fwu/ + install -v -m 755 ${WORKDIR}/iot2050-firmware-update ${D}/usr/lib/iot2050/fwu/ + install -v -d ${D}/usr/sbin/ - install -v -m 755 ${WORKDIR}/iot2050-firmware-update ${D}/usr/sbin/ + ln -sf ../lib/iot2050/fwu/iot2050-firmware-update ${D}/usr/sbin/iot2050-firmware-update } do_deploy_deb:append() {