Skip to content

Commit

Permalink
feat: extend exception capture to connection, not just opening
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jul 19, 2024
1 parent 2f9a384 commit 3bd3d61
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 81 deletions.
57 changes: 53 additions & 4 deletions esptool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import os
import shlex
import sys
import termios
import time
import traceback

Expand Down Expand Up @@ -68,9 +69,9 @@
from esptool.config import load_config_file
from esptool.loader import (
DEFAULT_CONNECT_ATTEMPTS,
DEFAULT_RETRY_OPEN_SERIAL,
ESPLoader,
list_ports,
cfg,
)
from esptool.targets import CHIP_DEFS, CHIP_LIST, ESP32ROM
from esptool.util import (
Expand All @@ -82,6 +83,9 @@

import serial

# Retry opening the serial port indefinitely
DEFAULT_RETRY_OPEN_SERIAL = cfg.getboolean("retry_open_serial", False)


def main(argv=None, esp=None):
"""
Expand Down Expand Up @@ -1055,6 +1059,44 @@ def expand_file_arguments(argv):
return argv


def get_default_specific_connected_device(
chip, each_port, initial_baud, trace, before, connect_attempts, retry_open_serial
):
retry_attempts = 0
_esp = None
while True:
try:
chip_class = CHIP_DEFS[chip]
_esp = chip_class(each_port, initial_baud, trace)
_esp.connect(before, connect_attempts)
if retry_attempts > 0:
# break the retrying line
print("")
return _esp
except (
FatalError,
serial.serialutil.SerialException,
IOError,
OSError,
termios.error,
) as e:
if not retry_open_serial:
raise
if _esp and _esp._port:
_esp._port.close()
_esp = None
if retry_attempts == 0:
print(e)
print("Retrying failed connection ", end="", flush=True)
else:
if retry_attempts % 9 == 0:
# print a dot every second
print(".", end="", flush=True)
time.sleep(0.1)
retry_attempts += 1
continue


def get_default_connected_device(
serial_list,
port,
Expand All @@ -1074,9 +1116,15 @@ def get_default_connected_device(
each_port, initial_baud, before, trace, connect_attempts
)
else:
chip_class = CHIP_DEFS[chip]
_esp = chip_class(each_port, initial_baud, trace, retry_open_serial)
_esp.connect(before, connect_attempts)
_esp = get_default_specific_connected_device(
chip,
each_port,
initial_baud,
trace,
before,
connect_attempts,
retry_open_serial,
)
break
except (FatalError, OSError) as err:
if port is not None:
Expand Down Expand Up @@ -1193,6 +1241,7 @@ def _main():
try:
main()
except FatalError as e:
print(traceback.format_exc())
print(f"\nA fatal error occurred: {e}")
sys.exit(2)
except serial.serialutil.SerialException as e:
Expand Down
120 changes: 43 additions & 77 deletions esptool/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import string
import struct
import sys
import termios
import time
from typing import Optional

Expand Down Expand Up @@ -97,8 +96,6 @@
DEFAULT_SERIAL_WRITE_TIMEOUT = cfg.getfloat("serial_write_timeout", 10)
# Default number of times to try connection
DEFAULT_CONNECT_ATTEMPTS = cfg.getint("connect_attempts", 7)
# Default number of times to try connection
DEFAULT_RETRY_OPEN_SERIAL = cfg.getboolean("retry_open_serial", False)
# Number of times to try writing a data block
WRITE_BLOCK_ATTEMPTS = cfg.getint("write_block_attempts", 3)

Expand Down Expand Up @@ -195,7 +192,6 @@ class ESPLoader(object):
BOOTLOADER_IMAGE: Optional[object] = None

DEFAULT_PORT = "/dev/ttyUSB0"
DEFAULT_RETRY_OPEN_SERIAL = False

USES_RFC2217 = False

Expand Down Expand Up @@ -285,13 +281,7 @@ class ESPLoader(object):
# Number of attempts to write flash data
WRITE_FLASH_ATTEMPTS = 2

def __init__(
self,
port=DEFAULT_PORT,
baud=ESP_ROM_BAUD,
trace_enabled=False,
retry_open_serial=DEFAULT_RETRY_OPEN_SERIAL,
):
def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD, trace_enabled=False):
"""Base constructor for ESPLoader bootloader interaction
Don't call this constructor, either instantiate a specific
Expand All @@ -318,75 +308,51 @@ def __init__(
}

if isinstance(port, str):
printed_failure = False
retry_attempts = 0
while True:
try:
self._port = serial.serial_for_url(
port, exclusive=True, do_not_open=True
)
if sys.platform == "win32":
# When opening a port on Windows,
# the RTS/DTR (active low) lines
# need to be set to False (pulled high)
# to avoid unwanted chip reset
self._port.rts = False
self._port.dtr = False
self._port.open()
if retry_attempts > 0:
# break the retrying line
print("")
break
except (
serial.serialutil.SerialException,
IOError,
OSError,
termios.error,
) as e:
if retry_open_serial:
if not printed_failure:
print(e)
print("Retrying to open port ", end="", flush=True)
printed_failure = True
else:
if retry_attempts % 9 == 0:
# print a dot every second
print(".", end="", flush=True)
time.sleep(0.1)
retry_attempts += 1
continue
port_issues = [
[ # does not exist error
re.compile(r"Errno 2|FileNotFoundError", re.IGNORECASE),
"Check if the port is correct and ESP connected",
],
[ # busy port error
re.compile(r"Access is denied", re.IGNORECASE),
"Check if the port is not used by another task",
try:
self._port = serial.serial_for_url(
port, exclusive=True, do_not_open=True
)
if sys.platform == "win32":
# When opening a port on Windows,
# the RTS/DTR (active low) lines
# need to be set to False (pulled high)
# to avoid unwanted chip reset
self._port.rts = False
self._port.dtr = False
self._port.open()
except serial.serialutil.SerialException as e:
port_issues = [
[ # does not exist error
re.compile(r"Errno 2|FileNotFoundError", re.IGNORECASE),
"Check if the port is correct and ESP connected",
],
[ # busy port error
re.compile(r"Access is denied", re.IGNORECASE),
"Check if the port is not used by another task",
],
]
if sys.platform.startswith("linux"):
port_issues.append(
[ # permission denied error
re.compile(r"Permission denied", re.IGNORECASE),
(
"Try to add user into dialout group: "
"sudo usermod -a -G dialout $USER"
),
],
]
if sys.platform.startswith("linux"):
port_issues.append(
[ # permission denied error
re.compile(r"Permission denied", re.IGNORECASE),
(
"Try to add user into dialout group: "
"sudo usermod -a -G dialout $USER"
),
],
)
)

hint_msg = ""
for port_issue in port_issues:
if port_issue[0].search(str(e)):
hint_msg = f"\nHint: {port_issue[1]}\n"
break
hint_msg = ""
for port_issue in port_issues:
if port_issue[0].search(str(e)):
hint_msg = f"\nHint: {port_issue[1]}\n"
break

raise FatalError(
f"Could not open {port}, the port is busy or doesn't exist."
f"\n({e})\n"
f"{hint_msg}"
)
raise FatalError(
f"Could not open {port}, the port is busy or doesn't exist."
f"\n({e})\n"
f"{hint_msg}"
)
else:
self._port = port
self._slip_reader = slip_reader(self._port, self.trace)
Expand Down

0 comments on commit 3bd3d61

Please sign in to comment.