diff --git a/apio/commands/graph.py b/apio/commands/graph.py index 0b4d0ced..c53154b6 100644 --- a/apio/commands/graph.py +++ b/apio/commands/graph.py @@ -26,7 +26,8 @@ \b Examples: - apio graph + apio graph # Graph the top module + apio graph -t my_module # Graph the selected module The graph command generates the graph in .dot format and then invokes the dot command from the path to convert it to a .svg format. The dot @@ -46,7 +47,7 @@ ) @click.pass_context @options.project_dir_option -@options.top_module_option_gen() +@options.top_module_option_gen(help="Set the name of the top module to graph.") @options.verbose_option def cli( ctx: Context, diff --git a/apio/commands/time.py b/apio/commands/time.py index bcb70ae6..fa6b772e 100644 --- a/apio/commands/time.py +++ b/apio/commands/time.py @@ -30,6 +30,9 @@ \b Examples: apio time + +The time command supportw ICE40 devcies. ECP5 and Gowin devices are not +supported yet. """ diff --git a/apio/managers/drivers.py b/apio/managers/drivers.py index 81d800b5..7bcac1e5 100644 --- a/apio/managers/drivers.py +++ b/apio/managers/drivers.py @@ -461,7 +461,7 @@ def _ftdi_enable_windows(self): if zadig_ini_dst.exists(): zadig_ini_dst.unlink() - return result.get("returncode") + return result.exit_code @staticmethod def _ftdi_disable_windows(): @@ -469,7 +469,7 @@ def _ftdi_disable_windows(): click.secho(FTDI_UNINSTALL_DRIVER_INSTRUCTIONS, fg="yellow") result = util.exec_command("mmc devmgmt.msc") - return result.get("returncode") + return result.exit_code # W0703: Catching too general exception Exception (broad-except) # pylint: disable=W0703 @@ -490,14 +490,12 @@ def _serial_enable_windows(self): "Serial drivers configuration finished", fg="green" ) else: - result = 1 + result = util.CommandResult(exit_code=1) except Exception as exc: click.secho("Error: " + str(exc), fg="red") - result = 1 + result = util.CommandResult(exit_code=1) - if not isinstance(result, int): - result = result.get("returncode") - return result + return result.exit_code @staticmethod def _serial_disable_windows(): @@ -505,4 +503,4 @@ def _serial_disable_windows(): click.secho(SERIAL_UNINSTALL_DRIVER_INSTRUCTIONS, fg="yellow") result = util.exec_command("mmc devmgmt.msc") - return result.get("returncode") + return result.exit_code diff --git a/apio/managers/scons.py b/apio/managers/scons.py index b5cd9586..8c665e31 100644 --- a/apio/managers/scons.py +++ b/apio/managers/scons.py @@ -15,6 +15,7 @@ import datetime import shutil from pathlib import Path +from functools import wraps import importlib.metadata import click @@ -27,6 +28,7 @@ from apio.profile import Profile from apio.resources import Resources from apio.managers.project import Project +from apio.managers.scons_filter import SconsFilter # -- Constant for the dictionary PROG, which contains # -- the programming configuration @@ -40,6 +42,33 @@ ERASE_LINE = "\033[K" +# W0703: Catching too general exception Exception (broad-except) +# pylint: disable=W0703 +# pylint: disable=W0150 +# +# -- Based on +# -- https://stackoverflow.com/questions/5929107/decorators-with-parameters +def on_exception(*, exit_code: int): + """Decoractor for functions that return int exit code. If the function + throws an exception, the error message is printed, and the caller see the + returned value exit_code instead of the exception. + """ + + def decorator(function): + @wraps(function) + def wrapper(*args, **kwargs): + try: + return function(*args, **kwargs) + except Exception as exc: + if str(exc): + click.secho("Error: " + str(exc), fg="red") + return exit_code + + return wrapper + + return decorator + + class SCons: """Class for managing the scons tools""" @@ -70,19 +99,23 @@ def __init__(self, project_dir: Path): # Change to that folder os.chdir(project_dir) - @util.command - def clean(self, args): - """Execute apio clean""" + @on_exception(exit_code=1) + def clean(self, args) -> int: + """Runs a scons subprocess with the 'clean' target. Returns process + exit code, 0 if ok.""" # -- Split the arguments - __, __, arch = process_arguments(args, self.resources, self.project) + variables, __, arch = process_arguments( + args, self.resources, self.project + ) # --Clean the project: run scons -c (with aditional arguments) - return self.run("-c", arch=arch, variables=[], packages=[]) + return self._run("-c", arch=arch, variables=variables, packages=[]) - @util.command - def verify(self, args): - """Executes scons for verifying""" + @on_exception(exit_code=1) + def verify(self, args) -> int: + """Runs a scons subprocess with the 'verify' target. Returns process + exit code, 0 if ok.""" # -- Split the arguments variables, __, arch = process_arguments( @@ -91,16 +124,17 @@ def verify(self, args): # -- Execute scons!!! # -- The packages to check are passed - return self.run( + return self._run( "verify", variables=variables, arch=arch, packages=["oss-cad-suite"], ) - @util.command - def graph(self, args): - """Executes scons for visual graph generation""" + @on_exception(exit_code=1) + def graph(self, args) -> int: + """Runs a scons subprocess with the 'verify' target. Returns process + exit code, 0 if ok.""" # -- Split the arguments variables, _, arch = process_arguments( @@ -109,16 +143,17 @@ def graph(self, args): # -- Execute scons!!! # -- The packages to check are passed - return self.run( + return self._run( "graph", variables=variables, arch=arch, packages=["oss-cad-suite"], ) - @util.command - def lint(self, args): - """DOC: TODO""" + @on_exception(exit_code=1) + def lint(self, args) -> int: + """Runs a scons subprocess with the 'lint' target. Returns process + exit code, 0 if ok.""" config = {} __, __, arch = process_arguments(config, self.resources, self.project) @@ -131,48 +166,51 @@ def lint(self, args): "nostyle": args.get("nostyle"), } ) - return self.run( + return self._run( "lint", variables=variables, arch=arch, packages=["oss-cad-suite"], ) - @util.command - def sim(self, args): - """Simulates a testbench and shows the result in a gtkwave window.""" + @on_exception(exit_code=1) + def sim(self, args) -> int: + """Runs a scons subprocess with the 'sim' target. Returns process + exit code, 0 if ok.""" # -- Split the arguments variables, _, arch = process_arguments( args, self.resources, self.project ) - return self.run( + return self._run( "sim", variables=variables, arch=arch, packages=["oss-cad-suite", "gtkwave"], ) - @util.command - def test(self, args): - """Tests all or a single testbench by simulating.""" + @on_exception(exit_code=1) + def test(self, args) -> int: + """Runs a scons subprocess with the 'test' target. Returns process + exit code, 0 if ok.""" # -- Split the arguments variables, _, arch = process_arguments( args, self.resources, self.project ) - return self.run( + return self._run( "test", variables=variables, arch=arch, packages=["oss-cad-suite"], ) - @util.command - def build(self, args): - """Build the circuit""" + @on_exception(exit_code=1) + def build(self, args) -> int: + """Runs a scons subprocess with the 'build' target. Returns process + exit code, 0 if ok.""" # -- Split the arguments variables, board, arch = process_arguments( @@ -181,7 +219,7 @@ def build(self, args): # -- Execute scons!!! # -- The packages to check are passed - return self.run( + return self._run( "build", variables=variables, board=board, @@ -191,14 +229,24 @@ def build(self, args): # run(self, command, variables, packages, board=None, arch=None): - @util.command - def time(self, args): - """DOC: TODO""" + @on_exception(exit_code=1) + def time(self, args) -> int: + """Runs a scons subprocess with the 'time' target. Returns process + exit code, 0 if ok.""" variables, board, arch = process_arguments( args, self.resources, self.project ) - return self.run( + + if arch not in ["ice40"]: + click.secho( + "Error: Time analysis for " + f"{arch.upper()} is not supported yet.", + fg="red", + ) + return 99 + + return self._run( "time", variables=variables, board=board, @@ -206,9 +254,11 @@ def time(self, args): packages=["oss-cad-suite"], ) - @util.command - def upload(self, config: dict, prog: dict): - """Upload the circuit to the board + @on_exception(exit_code=1) + def upload(self, config: dict, prog: dict) -> int: + """Runs a scons subprocess with the 'time' target. Returns process + exit code, 0 if ok. + INPUTS: * config: Dictionary with the initial configuration * board @@ -219,7 +269,6 @@ def upload(self, config: dict, prog: dict): * ftdi_id: ftdi identificator * sram: Perform SRAM programming * flash: Perform Flash programming - OUTPUT: Exit code after executing scons """ # -- Get important information from the configuration @@ -234,13 +283,13 @@ def upload(self, config: dict, prog: dict): # -- the FPGA (programmer executable + arguments) # -- Ex: 'tinyprog --pyserial -c /dev/ttyACM0 --program' # -- Ex: 'iceprog -d i:0x0403:0x6010:0' - programmer = self.get_programmer(board, prog) + programmer = self._get_programmer(board, prog) # -- Add as a flag to pass it to scons flags += [f"prog={programmer}"] # -- Execute Scons for uploading! - exit_code = self.run( + exit_code = self._run( "upload", variables=flags, packages=["oss-cad-suite"], @@ -250,7 +299,7 @@ def upload(self, config: dict, prog: dict): return exit_code - def get_programmer(self, board: str, prog: dict) -> str: + def _get_programmer(self, board: str, prog: dict) -> str: """Get the command line (string) to execute for programming the FPGA (programmer executable + arguments) @@ -286,11 +335,11 @@ def get_programmer(self, board: str, prog: dict) -> str: # -- Check platform. If the platform is not compatible # -- with the board an exception is raised - self.check_platform(board_data) + self._check_platform(board_data) # -- Check pip packages. If the corresponding pip_packages # -- is not installed, an exception is raised - self.check_pip_packages(board_data) + self._check_pip_packages(board_data) # -- Special case for the TinyFPGA on MACOS platforms # -- TinyFPGA BX board is not detected in MacOS HighSierra @@ -315,7 +364,7 @@ def get_programmer(self, board: str, prog: dict) -> str: # -- * "${PID}" (optional): USB Product id # -- * "${FTDI_ID}" (optional): FTDI id # -- * "${SERIAL_PORT}" (optional): Serial port name - programmer = self.serialize_programmer( + programmer = self._serialize_programmer( board_data, prog[SRAM], prog[FLASH] ) # -- The placeholder for the bitstream file name should always exist. @@ -347,10 +396,10 @@ def get_programmer(self, board: str, prog: dict) -> str: # -- Check that the board is connected # -- If not, an exception is raised - self.check_usb(board, board_data) + self._check_usb(board, board_data) # -- Get the FTDI index of the connected board - ftdi_id = self.get_ftdi_id(board, board_data, prog[FTDI_ID]) + ftdi_id = self._get_ftdi_id(board, board_data, prog[FTDI_ID]) # -- Place the value in the command string programmer = programmer.replace("${FTDI_ID}", ftdi_id) @@ -360,10 +409,12 @@ def get_programmer(self, board: str, prog: dict) -> str: if "${SERIAL_PORT}" in programmer: # -- Check that the board is connected - self.check_usb(board, board_data) + self._check_usb(board, board_data) # -- Get the serial port - device = self.get_serial_port(board, board_data, prog[SERIAL_PORT]) + device = self._get_serial_port( + board, board_data, prog[SERIAL_PORT] + ) # -- Place the value in the command string programmer = programmer.replace("${SERIAL_PORT}", device) @@ -375,7 +426,7 @@ def get_programmer(self, board: str, prog: dict) -> str: return programmer @staticmethod - def check_platform(board_data: dict) -> None: + def _check_platform(board_data: dict) -> None: """Check if the current board is compatible with the current platform. There are some boards, like icoboard, that only runs in the platform linux/arm7 @@ -411,7 +462,7 @@ def check_platform(board_data: dict) -> None: raise ValueError(f"incorrect platform {platform}") - def check_pip_packages(self, board_data): + def _check_pip_packages(self, board_data): """Check if the corresponding pip package with the programmer has already been installed. In the case of an apio package it is just ignored @@ -499,7 +550,7 @@ def check_pip_packages(self, board_data): # -- Raise an exception raise ValueError(message) from exc - def serialize_programmer( + def _serialize_programmer( self, board_data: dict, sram: bool, flash: bool ) -> str: """ @@ -575,7 +626,7 @@ def serialize_programmer( return programmer - def check_usb(self, board: str, board_data: dict) -> None: + def _check_usb(self, board: str, board_data: dict) -> None: """Check if the given board is connected or not to the computer If it is not connected, an exception is raised @@ -634,7 +685,7 @@ def check_usb(self, board: str, board_data: dict) -> None: # -- Raise an exception raise ConnectionError("board " + board + " not connected") - def get_serial_port( + def _get_serial_port( self, board: str, board_data: dict, ext_serial_port: str ) -> str: """Get the serial port of the connected board @@ -790,7 +841,7 @@ def _check_tinyprog(board_data: dict, port: str) -> bool: # -- TinyFPGA board not detected! return False - def get_ftdi_id(self, board, board_data, ext_ftdi_id) -> str: + def _get_ftdi_id(self, board, board_data, ext_ftdi_id) -> str: """Get the FTDI index of the detected board * INPUT: @@ -893,7 +944,7 @@ def _check_ftdi( # pylint: disable=too-many-arguments # pylint: disable=too-many-positional-arguments - def run(self, command, variables, packages, board=None, arch=None): + def _run(self, command, variables, packages, board=None, arch=None): """Executes scons""" # -- Construct the path to the SConstruct file. @@ -981,21 +1032,24 @@ def _execute_scons(self, command: str, variables: list, board: str) -> int: ["scons"] + ["-Q", command] + variables + ["force_colors=True"] ) - # -- For debugging. - # print(f"scons_command = {' '.join(scons_command)}") + # For debugging. Print the scons command line in a forumat that is + # useful for the .vscode/launch.json scons debug target. + # import json + # print(json.dumps(scons_command)) + + # -- An output filter that manupulates the scons stdout/err lines as + # -- needed and write them to stdout. + scons_filter = SconsFilter() # -- Execute the scons builder! result = util.exec_command( scons_command, - stdout=util.AsyncPipe(self._on_stdout), - stderr=util.AsyncPipe(self._on_stderr), + stdout=util.AsyncPipe(scons_filter.on_stdout_line), + stderr=util.AsyncPipe(scons_filter.on_stderr_line), ) - # -- Get the exit code - exit_code = result["returncode"] - # -- Is there an error? True/False - is_error = exit_code != 0 + is_error = result.exit_code != 0 # -- Calculate the time it took to execute the command duration = time.time() - start_time @@ -1020,74 +1074,4 @@ def _execute_scons(self, command: str, variables: list, board: str) -> int: ) # -- Return the exit code - return exit_code - - @staticmethod - def _on_stdout(line): - - # ---- Fomu output processing BEGIN - # pattern_fomu = r"^Download\s*\[=*\]\s\d{1,3}%" - pattern_fomu = r"^Download\s*\[=*" - match = re.search(pattern_fomu, line) - if match: - # -- Delete the previous line - print(CURSOR_UP + ERASE_LINE, end="", flush=True) - # ---- Fomu output processing END - - fgcol = "green" if "is up to date" in line else None - fgcol = "green" if match else fgcol - click.secho(line, fg=fgcol) - - @staticmethod - def _on_stderr(line: str): - """Callback function. It is called when the running command - has printed something on the console - """ - - # -- Ignore blank lines ('') - if not line: - return - - # ------- tinyprog output processing BEGIN - # -- Check if the line correspond to an output of - # -- the tinyprog programmer (TinyFPGA board) - # -- Match outputs like these " 97%|█████████▋| " - # -- Regular expression remainder: - # -- \s --> Match one blank space - # -- \d{1,3} one, two or three decimal digits - pattern_tinyprog = r"\s\d{1,3}%\|█*" - - # -- Calculate if there is a match - match_tinyprog = re.search(pattern_tinyprog, line) - - # -- Math all the progress bar lines except the - # -- initial one (when it is 0%) - if match_tinyprog and " 0%|" not in line: - # -- Delete the previous line - print(CURSOR_UP + ERASE_LINE, end="", flush=True) - # ------- tinyprog output processing END - - # ------- iceprog output processing BEGIN - # -- Match outputs like these "addr 0x001400 3%" - # -- Regular expression remainder: - # -- ^ --> Match the begining of the line - # -- \s --> Match one blank space - # -- [0-9A-F]+ one or more hexadecimal digit - # -- \d{1,2} one or two decimal digits - pattern = r"^addr\s0x[0-9A-F]+\s+\d{1,2}%" - - # -- Calculate if there is a match! - match = re.search(pattern, line) - - # -- It is a match! (iceprog is running!) - # -- (or if it is the end of the writing!) - # -- (or if it is the end of verifying!) - if match or "done." in line or "VERIFY OK" in line: - # -- Delete the previous line - print(CURSOR_UP + ERASE_LINE, end="", flush=True) - # ------- Iceprog output processing END - - # -- Print the line (In YELLOW) - # -- In case of error print it in RED - fgcol = "red" if "error" in line.lower() else "yellow" - click.secho(line, fg=fgcol) + return result.exit_code diff --git a/apio/managers/scons_filter.py b/apio/managers/scons_filter.py new file mode 100644 index 00000000..8b484879 --- /dev/null +++ b/apio/managers/scons_filter.py @@ -0,0 +1,251 @@ +"""DOC: TODO""" + +# -*- coding: utf-8 -*- +# -- This file is part of the Apio project +# -- (C) 2016-2019 FPGAwars +# -- Author Jesús Arroyo +# -- Licence GPLv2 + +# pylint: disable=fixme +# TODO: Implement range detectors for Fumo, Tinyprog, and Iceprog, similar to +# the pnr detector. This will avoid matching of output from other programs. + +# TODO: Use util.get_terminal_config() to determine if the output goes to a +# terminal or a pipe and have an alternative handling for the cursor commands +# when writing to a pipe. + +import re +from enum import Enum +from typing import List, Optional, Tuple +import click + + +# -- Terminal cursor commands. +CURSOR_UP = "\033[F" +ERASE_LINE = "\033[K" + + +class PipeId(Enum): + """Represent the two output streams from the scons subprocess.""" + + STDOUT = 1 + STDERR = 2 + + +class RangeEvents(Enum): + """An stdout/err line can trigger one of these events, when detecting a + range of lines.""" + + START_BEFORE = 1 # Range starts before the current line. + START_AFTER = 2 # Range starts after the current line. + END_BEFORE = 3 # Range ends before the current line. + END_AFTER = 4 # Range ends, after the current line. + + +class SectionDetector: + """Base classifier of a range of lines within the sequence of stdout/err + lines recieves from the scons subprocess.""" + + def __init__(self): + self._in_range = False + + def update(self, pipe_id: PipeId, line: str) -> bool: + """Updates the section classifier with the next stdout/err line. + return True iff detector classified this line to be within a range.""" + + prev_state = self._in_range + event = self.classify_line(pipe_id, line) + + if event == RangeEvents.START_BEFORE: + self._in_range = True + return self._in_range + + if event == RangeEvents.START_AFTER: + self._in_range = True + return prev_state + + if event == RangeEvents.END_BEFORE: + self._in_range = False + return self._in_range + + if event == RangeEvents.END_AFTER: + self._in_range = False + return prev_state + + assert event is None, event + return self._in_range + + def classify_line( + self, pipe_id: PipeId, line: str + ) -> Optional[RangeEvents]: + """Tests if the next stdout/err line affects the range begin/end. + Subclasses should implement this with the necessary logic for the + range that is being detected. + Returns the event of None if no event.""" + raise NotImplementedError("Should be implemented by a subclass") + + +class PnrSectionDetector(SectionDetector): + """Implements a RangeDetector for the nextpnr command verbose log lines.""" + + def classify_line(self, pipe_id: PipeId, line: str) -> RangeEvents: + # -- Brek line into words. + tokens = line.split() + + # -- Range start: A nextpnr command on stdout without + # -- the -q (quiet) flag. + if ( + pipe_id == PipeId.STDOUT + and line.startswith("nextpnr") + and "-q" not in tokens + ): + return RangeEvents.START_AFTER + + # Range end: The end message of nextnpr. + if pipe_id == PipeId.STDERR and "Program finished normally." in line: + return RangeEvents.END_AFTER + + return None + + +class SconsFilter: + """Implements the filtering and printing of the stdout/err streams of the + scons subprocess. Accepts a line one at a time, detects lines ranges of + intereset, mutates and colors the lines where applicable, and print to + stdout.""" + + def __init__(self): + self._pnr_detector = PnrSectionDetector() + + def on_stdout_line(self, line: str) -> None: + """Stdout pipe calls this on each line.""" + self.on_line(PipeId.STDOUT, line) + + def on_stderr_line(self, line: str) -> None: + """Stderr pipe calls this on each line.""" + self.on_line(PipeId.STDERR, line) + + @staticmethod + def _assign_line_color( + line: str, patterns: List[Tuple[str, str]], default_color: str = None + ) -> Optional[str]: + """Assigns a color for a given line using a list of (regex, color) + pairs. Returns the color of the first matching regex or default_color + if none match. + """ + for regex, color in patterns: + if re.search(regex, line): + return color + return default_color + + def on_line(self, pipe_id: PipeId, line: str) -> None: + """A shared handler for stdout/err lines from the scons sub process. + The handler writes both stdout and stderr lines to stdout, possibly + with modifications such as text deletion, coloring, and cursor + directives. + + NOTE: Ideally, the program specific patterns such as for Fumo and + Iceprog should should be condition by a range detector for lines that + came from that program. That is to minimize the risk of matching lines + from other programs. See the PNR detector for an example. + """ + + # -- Update the classifiers + in_pnr_verbose_range = self._pnr_detector.update(pipe_id, line) + + # -- Handle the line while in the nextpnr verbose log range. + if pipe_id == PipeId.STDERR and in_pnr_verbose_range: + + # -- Remove the 'Info: ' prefix. Nextpnr write a long log where + # -- each line starts with "Info: " + if line.startswith("Info: "): + line = line[6:] + + # -- Assign line color. + line_color = self._assign_line_color( + line.lower(), + { + (r"^warning:", "yellow"), + (r"^error:", "red"), + }, + ) + click.secho(f"{line}", fg=line_color) + return + + # -- Special handling for Fumo lines. + if pipe_id == PipeId.STDOUT: + pattern_fomu = r"^Download\s*\[=*" + match = re.search(pattern_fomu, line) + if match: + # -- Delete the previous line + print(CURSOR_UP + ERASE_LINE, end="", flush=True) + click.secho(f"{line}", fg="green") + return + + # -- Special handling for tinyprog lines. + if pipe_id == PipeId.STDERR: + # -- Check if the line correspond to an output of + # -- the tinyprog programmer (TinyFPGA board) + # -- Match outputs like these " 97%|█████████▋| " + # -- Regular expression remainder: + # -- \s --> Match one blank space + # -- \d{1,3} one, two or three decimal digits + pattern_tinyprog = r"\s\d{1,3}%\|█*" + + # -- Calculate if there is a match + match_tinyprog = re.search(pattern_tinyprog, line) + + # -- Match all the progress bar lines except the + # -- initial one (when it is 0%) + if match_tinyprog and " 0%|" not in line: + # -- Delete the previous line + print(CURSOR_UP + ERASE_LINE, end="", flush=True) + click.secho(f"{line}") + return + + # -- Special handling for iceprog lines. + if pipe_id == PipeId.STDERR: + # -- Match outputs like these "addr 0x001400 3%" + # -- Regular expression remainder: + # -- ^ --> Match the begining of the line + # -- \s --> Match one blank space + # -- [0-9A-F]+ one or more hexadecimal digit + # -- \d{1,2} one or two decimal digits + pattern = r"^addr\s0x[0-9A-F]+\s+\d{1,2}%" + + # -- Calculate if there is a match! + match = re.search(pattern, line) + + # -- It is a match! (iceprog is running!) + # -- (or if it is the end of the writing!) + # -- (or if it is the end of verifying!) + if match or "done." in line or "VERIFY OK" in line: + # -- Delete the previous line + print(CURSOR_UP + ERASE_LINE, end="", flush=True) + click.secho(line) + return + + # Handling the rest of the stdout lines. + if pipe_id == PipeId.STDOUT: + # Default stdout line coloring. + line_color = self._assign_line_color( + line.lower(), + [ + (r"is up to date", "green"), + (r"^warning:", "yellow"), + (r"^error:", "red"), + ], + ) + click.secho(f"{line}", fg=line_color) + return + + # Handling the rest of stderr the lines. + line_color = self._assign_line_color( + line.lower(), + [ + (r"^info:", "yellow"), + (r"^warning:", "yellow"), + (r"^error:", "red"), + ], + ) + click.secho(f"{line}", fg=line_color) diff --git a/apio/managers/system.py b/apio/managers/system.py index 91cfb847..8fa10328 100644 --- a/apio/managers/system.py +++ b/apio/managers/system.py @@ -8,6 +8,7 @@ import re import platform +from typing import Optional import click from apio import util @@ -44,32 +45,23 @@ def __init__(self, resources: dict): def lsusb(self): """Run the lsusb system command""" - returncode = 1 result = self._run_command("lsusb") - if result: - returncode = result.get("returncode") - - return returncode + return result.exit_code if result else 1 def lsftdi(self): """DOC: TODO""" - returncode = 1 result = self._run_command("lsftdi") - if result: - returncode = result.get("returncode") - - return returncode + return result.exit_code if result else 1 @staticmethod def lsserial(): """DOC: TODO""" - returncode = 0 serial_ports = util.get_serial_ports() - click.secho(f"Number of Serial devices found: {serial_ports}\n") + click.secho(f"Number of Serial devices found: {len(serial_ports)}\n") for serial_port in serial_ports: port = serial_port.get("port") @@ -79,7 +71,7 @@ def lsserial(): click.secho(f"Description: {description}") click.secho(f"Hardware info: {hwid}\n") - return returncode + return 0 def get_usb_devices(self) -> list: """Return a list of the connected USB devices @@ -99,7 +91,7 @@ def get_usb_devices(self) -> list: result = self._run_command("lsusb", silent=True) # -- Sucess in executing the command - if result and result["returncode"] == 0: + if result and result.exit_code == 0: # -- Get the list of the usb devices. It is read # -- from the command stdout @@ -132,7 +124,7 @@ def get_ftdi_devices(self) -> list: result = self._run_command("lsftdi", silent=True) # -- Sucess in executing the command - if result and result["returncode"] == 0: + if result and result.exit_code == 0: # -- Get the list of the ftdi devices. It is read # -- from the command stdout @@ -147,21 +139,18 @@ def get_ftdi_devices(self) -> list: # -- for reading the ftdi devices raise RuntimeError("Error executing lsftdi") - def _run_command(self, command: str, silent=False) -> dict: + def _run_command( + self, command: str, silent=False + ) -> Optional[util.CommandResult]: """Execute the given system command * INPUT: * command: Command to execute (Ex. "lsusb") * silent: What to do with the command output * False --> Do not print on the console * True --> Print on the console - * OUTPUT: A dictionary with the following properties: - * returncode: - * 0: OK! Success in executing the command - * x: An error has ocurred - * out: (string). Command output - * err: (string). Command error output - - In case of not executing the command it returns none! + + * OUTPUT: An ExecResult with the command's outcome. + In case of not executing the command it returns none! """ # The system tools are locate in the @@ -200,6 +189,7 @@ def _run_command(self, command: str, silent=False) -> dict: util.show_package_path_error(self.package_name) util.show_package_install_instructions(self.package_name) + # -- Command not executed. return None # -- The command exist! Let's execute it! diff --git a/apio/scons/ecp5/SConstruct b/apio/scons/ecp5/SConstruct index f3c6919d..02e0eef4 100644 --- a/apio/scons/ecp5/SConstruct +++ b/apio/scons/ecp5/SConstruct @@ -152,20 +152,6 @@ bitstream_builder = Builder( env.Append(BUILDERS={"Bin": bitstream_builder}) -# -- Apio time. -# -- Builder | not implemented. -# -- hardware.config -> hardware.rpt -time_rpt_builder = Builder( - action=( - 'echo "Time analysis report is not impelemnted for the ECP5 family." ' - "> $TARGET" - ), - suffix=".rpt", - src_suffix=".config", -) -env.Append(BUILDERS={"Time": time_rpt_builder}) - - # -- Apio build/upload. # -- Targets. # -- (module).v -> hardware.json -> hardware.config -> hardware.report. @@ -190,14 +176,6 @@ upload_target = env.Alias("upload", bin_target, programmer_cmd) AlwaysBuild(upload_target) -# -- Apio time. -# -- Targets. -# -- hardware.config -> hardware.rpt -time_rpt_target = env.Time(pnr_target) -AlwaysBuild(time_rpt_target) -time_target = env.Alias("time", time_rpt_target) - - # -- Apio verify. # -- Builder (iverilog, verilog compiler). # -- (modules + testbenches).v -> hardware.out. @@ -378,7 +356,7 @@ verilator_builder = Builder( no_warns=NOWARNS, warns=WARNS, top_module=TOP_MODULE, - lib_files=[YOSYS_LIB_DIR], + lib_dirs=[YOSYS_LIB_DIR], ), src_suffix=".v", source_scanner=verilog_src_scanner, @@ -404,11 +382,10 @@ if GetOption("clean"): # target are dynamic and changes with the selected testbench. for glob_pattern in ["*.out", "*.vcd"]: for node in Glob(glob_pattern): - env.Clean(time_target, str(node)) + env.Clean(build_target, str(node)) env.Default( [ - time_target, build_target, synth_target, pnr_target, diff --git a/apio/scons/gowin/SConstruct b/apio/scons/gowin/SConstruct index 54350e3d..30899f0b 100644 --- a/apio/scons/gowin/SConstruct +++ b/apio/scons/gowin/SConstruct @@ -146,20 +146,6 @@ bitstream_builder = Builder( env.Append(BUILDERS={"Bin": bitstream_builder}) -# -- Apio time. -# -- Builder | not implemented. -# -- hardware.pnr.json -> hardware.rpt -time_rpt_builder = Builder( - action=( - 'echo "Time analysis report is not impelemnted for the Gowin family." ' - "> $TARGET" - ), - suffix=".rpt", - src_suffix=".pnr.json", -) -env.Append(BUILDERS={"Time": time_rpt_builder}) - - # -- Apio build/upload. # -- Targets. # -- (module).v -> hardware.json -> hardware.pnr.json -> hardware.bin. @@ -184,14 +170,6 @@ upload_target = env.Alias("upload", bin_target, programmer_cmd) AlwaysBuild(upload_target) -# -- Apio time. -# -- Targets. -# -- hardware.asc -> hardware.rpt -time_rpt_target = env.Time(pnr_target) -AlwaysBuild(time_rpt_target) -time_target = env.Alias("time", time_rpt_target) - - # -- Apio verify. # -- Builder (iverilog, verilog compiler). # -- (modules + testbenches).v -> hardware.out. @@ -395,11 +373,10 @@ if GetOption("clean"): # target are dynamic and changes with the selected testbench. for glob_pattern in ["*.out", "*.vcd"]: for node in Glob(glob_pattern): - env.Clean(time_target, str(node)) + env.Clean(build_target, str(node)) env.Default( [ - time_target, build_target, verify_out_target, graph_target, diff --git a/apio/scons/ice40/SConstruct b/apio/scons/ice40/SConstruct index 5e5599a9..1d365eeb 100644 --- a/apio/scons/ice40/SConstruct +++ b/apio/scons/ice40/SConstruct @@ -399,7 +399,7 @@ if GetOption("clean"): # target are dynamic and changes with the selected testbench. for glob_pattern in ["*.out", "*.vcd"]: for node in Glob(glob_pattern): - env.Clean(time_target, str(node)) + env.Clean(build_target, str(node)) env.Default( [ diff --git a/apio/util.py b/apio/util.py index 56f7a1ec..2009f69d 100644 --- a/apio/util.py +++ b/apio/util.py @@ -615,7 +615,16 @@ def get_package_spec_version(name: str, resources: dict) -> str: return spec_version -def exec_command(*args, **kwargs) -> dict: +@dataclass(frozen=True) +class CommandResult: + """Contains the results of a command (subprocess) execution.""" + + out_text: Optional[str] = None # stdout multi-line text. + err_text: Optional[str] = None # stderr multi-line text. + exit_code: Optional[int] = None # Exit code, 0 = OK. + + +def exec_command(*args, **kwargs) -> CommandResult: """Execute the given command: INPUTS: @@ -635,11 +644,6 @@ def exec_command(*args, **kwargs) -> dict: Example: exec_command(['scons', '-Q', '-c', '-f', 'SConstruct']) """ - # -- Default value to return after the command execution - # -- out: string with the command output - # -- err: string with the command error output - result = {"out": None, "err": None, "returncode": None} - # -- Set the default arguments to pass to subprocess.Popen() # -- for executing the command flags = { @@ -658,9 +662,14 @@ def exec_command(*args, **kwargs) -> dict: try: with subprocess.Popen(*args, **flags) as proc: - # -- Collect the results - result["out"], result["err"] = proc.communicate() - result["returncode"] = proc.returncode + # -- Run the command. + out_text, err_text = proc.communicate() + exit_code = proc.returncode + + # -- Close the pipes + for std in ("stdout", "stderr"): + if isinstance(flags[std], AsyncPipe): + flags[std].close() # -- User has pressed the Ctrl-C for aborting the command except KeyboardInterrupt: @@ -672,31 +681,22 @@ def exec_command(*args, **kwargs) -> dict: click.secho(f"Command not found:\n{args}", fg="red") sys.exit(1) - # -- Close the stdout and stderr pipes - finally: - for std in ("stdout", "stderr"): - if isinstance(flags[std], AsyncPipe): - flags[std].close() - - # -- Process the output from the stdout and stderr - # -- if they exist - for inout in ("out", "err"): - - # -- Construct the Name "stdout" or "stderr" - std = f"std{inout}" - - # -- Do it only if they have been assigned - if isinstance(flags[std], AsyncPipe): - - # -- Get the text - buffer = flags[std].get_buffer() - - # -- Create the full text message (for stdout or stderr) - # -- result["out"] contains stdout - # -- result["err"] contains stderr - result[inout] = "\n".join(buffer) - result[inout].strip() - + # -- If stdout pipe is an AsyncPipe, extract its text. + pipe = flags["stdout"] + if isinstance(pipe, AsyncPipe): + lines = pipe.get_buffer() + text = "\n".join(lines) + out_text = text.strip() + + # -- If stderr pipe is an AsyncPipe, extract its text. + pipe = flags["stderr"] + if isinstance(pipe, AsyncPipe): + lines = pipe.get_buffer() + text = "\n".join(lines) + err_text = text.strip() + + # -- All done. + result = CommandResult(out_text, err_text, exit_code) return result @@ -797,26 +797,6 @@ def get_project_dir(_dir: Path, create_if_missing: bool = False) -> Path: return _dir -# W0703: Catching too general exception Exception (broad-except) -# pylint: disable=W0703 -# pylint: disable=W0150 -def command(function): - """Command decorator""" - - def decorate(*args, **kwargs): - exit_code = 1 - try: - exit_code = function(*args, **kwargs) - - except Exception as exc: - if str(exc): - click.secho("Error: " + str(exc), fg="red") - - return exit_code - - return decorate - - def get_serial_ports() -> list: """Get a list of the serial port devices connected * OUTPUT: A list with the devides @@ -900,12 +880,13 @@ def get_tinyprog_meta() -> list: # -- It will return the meta information as a json string result = exec_command([_command, "--pyserial", "--meta"]) - # -- Get the output - out = result["out"] + # pylint: disable=fixme + # TODO: Exit with an error if result.exit_code is not zero. + + # -- Convert the json string to an object (list) try: - # -- Convert the json string to an object (list) - meta = json.loads(out) + meta = json.loads(result.out_text) except json.decoder.JSONDecodeError as exc: click.secho(f"Invalid data provided by {_command}", fg="red") diff --git a/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/apio.ini b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/apio.ini new file mode 100644 index 00000000..638824c6 --- /dev/null +++ b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/apio.ini @@ -0,0 +1,3 @@ +[env] +board = ColorLight-5A-75B-V8 +top-module = main diff --git a/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/button.ice b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/button.ice new file mode 100644 index 00000000..2ab6195c --- /dev/null +++ b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/button.ice @@ -0,0 +1,69 @@ +{ + "version": "1.2", + "package": { + "name": "", + "version": "", + "description": "", + "author": "", + "image": "" + }, + "design": { + "board": "ColorLight-5A-75B-V8", + "graph": { + "blocks": [ + { + "id": "0712d438-b46d-411e-87bf-e4379d7fae23", + "type": "basic.output", + "data": { + "name": "LED", + "virtual": false, + "pins": [ + { + "index": "0", + "name": "LED", + "value": "T6" + } + ] + }, + "position": { + "x": 512, + "y": 216 + } + }, + { + "id": "650576bb-aac4-487c-abad-ad2249663e7e", + "type": "basic.input", + "data": { + "name": "MY_INPUT", + "virtual": false, + "pins": [ + { + "index": "0", + "name": "Button", + "value": "R7" + } + ], + "clock": false + }, + "position": { + "x": 304, + "y": 216 + } + } + ], + "wires": [ + { + "source": { + "block": "650576bb-aac4-487c-abad-ad2249663e7e", + "port": "out" + }, + "target": { + "block": "0712d438-b46d-411e-87bf-e4379d7fae23", + "port": "in" + } + } + ] + } + }, + "dependencies": {} +} \ No newline at end of file diff --git a/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main.lpf b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main.lpf new file mode 100644 index 00000000..9f128821 --- /dev/null +++ b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main.lpf @@ -0,0 +1,10 @@ +# Code generated by Icestudio 0.12 + +# -- Board: ColorLight-5A-75B-V8 + +LOCATE COMP "v60f428" SITE "T6"; +IOBUF PORT "v60f428" PULLMODE=UP ; + +LOCATE COMP "v2a5c04" SITE "R7"; +IOBUF PORT "v2a5c04" ; + diff --git a/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main.v b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main.v new file mode 100644 index 00000000..feb99aee --- /dev/null +++ b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main.v @@ -0,0 +1,14 @@ +// Code generated by Icestudio 0.12 + +`default_nettype none + +//---- Top entity +module main ( + input v2a5c04, + output v60f428 +); + wire w0; + assign v60f428 = w0; + assign w0 = v2a5c04; +endmodule + diff --git a/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main_tb.gtkw b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main_tb.gtkw new file mode 100644 index 00000000..f10744da --- /dev/null +++ b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main_tb.gtkw @@ -0,0 +1,29 @@ +[*] +[*] GTKWave Analyzer v3.4.0 (w)1999-2022 BSI +[*] Sat Oct 12 18:38:03 2024 +[*] +[dumpfile] "main_tb.vcd" +[dumpfile_mtime] "Sat Oct 12 18:37:05 2024" +[dumpfile_size] 492 +[savefile] "main_tb.gtkw" +[timestart] 0 +[size] 1288 600 +[pos] -1 -1 +*-6.420950 23 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +[treeopen] main_tb. +[sst_width] 253 +[signals_width] 79 +[sst_expanded] 1 +[sst_vpaned_height] 159 +@200 +-Input +@28 +main_tb.BUTTON +@200 +- +@201 +-Output +@28 +main_tb.LED +[pattern_trace] 1 +[pattern_trace] 0 diff --git a/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main_tb.v b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main_tb.v new file mode 100644 index 00000000..581dc845 --- /dev/null +++ b/test-examples/TB/ColorLight-5A-75B-V8/icestudio/button/main_tb.v @@ -0,0 +1,42 @@ +// Code generated by Icestudio 0.9.2w202204260904 +// Thu, 28 Apr 2022 08:35:07 GMT + +// Testbench template + +`default_nettype none +`define DUMPSTR(x) `"x.vcd`" +`timescale 10 ns / 1 ns + + +module main_tb +; + + // Simulation time: 100ns (10 * 10ns) + parameter DURATION = 10; + + // Input/Output + reg BUTTON = 0; + wire LED; + + // Module instance + main MAIN ( + .v2a5c04(BUTTON), + .v60f428(LED) + ); + + initial begin + // File were to store the simulation results + $dumpfile(`DUMPSTR(`VCD_OUTPUT)); + $dumpvars(0, main_tb); + + + #10 BUTTON=1; + #10 BUTTON=0; + #10 BUTTON=1; + #10 BUTTON=0; + + $display("End of simulation"); + $finish; + end + +endmodule diff --git a/test-examples/TB/GreatScottGadgets-Cynthion/icestudio/ledon/apio.ini b/test-examples/TB/GreatScottGadgets-Cynthion/icestudio/ledon/apio.ini index a0157d2f..719bda62 100644 --- a/test-examples/TB/GreatScottGadgets-Cynthion/icestudio/ledon/apio.ini +++ b/test-examples/TB/GreatScottGadgets-Cynthion/icestudio/ledon/apio.ini @@ -1,2 +1,3 @@ [env] +top-module = main board = Cynthion-r1.4