diff --git a/rebench/denoise.py b/rebench/denoise.py index b2576a28..674405cf 100644 --- a/rebench/denoise.py +++ b/rebench/denoise.py @@ -6,7 +6,7 @@ from math import log, floor from multiprocessing import Pool from subprocess import check_output, CalledProcessError, DEVNULL, STDOUT -from typing import Optional, Union +from typing import Optional, Union, Mapping, Literal from .output import output_as_str, UIError from .subprocess_kill import kill_process @@ -111,12 +111,14 @@ def _can_set_niceness() -> Union[bool, str]: our benchmarks. """ try: - output = check_output(["nice", "-n-20", "echo", "test"], stderr=STDOUT) - output = output_as_str(output) + out = check_output(["nice", "-n-20", "echo", "test"], stderr=STDOUT) + output = output_as_str(out) except OSError: return "failed: OSError" - if "cannot set niceness" in output or "Permission denied" in output: + if output is not None and ( + "cannot set niceness" in output or "Permission denied" in output + ): return "failed: permission denied" else: return True @@ -137,6 +139,7 @@ def _get_core_spec(num_cores) -> str: return core_spec +# pylint: disable-next=too-many-return-statements def _activate_shielding(num_cores) -> str: if not num_cores: return "failed: num-cores not set" @@ -147,13 +150,16 @@ def _activate_shielding(num_cores) -> str: return "failed: cset-path not set" try: - output = check_output( + out = check_output( [paths.get_cset(), "shield", "-c", core_spec, "-k", "on"], stderr=STDOUT ) - output = output_as_str(output) + output = output_as_str(out) except OSError as e: return "failed: " + str(e) + if output is None: + return "failed: no output" + if "Permission denied" in output: return "failed: Permission denied" @@ -168,9 +174,9 @@ def _reset_shielding() -> Union[str, bool]: return "failed: cset-path not set" try: - output = check_output([paths.get_cset(), "shield", "-r"], stderr=STDOUT) - output = output_as_str(output) - return "cset: done" in output + out = check_output([paths.get_cset(), "shield", "-r"], stderr=STDOUT) + output = output_as_str(out) + return output is not None and "cset: done" in output except OSError: return "failed: OSError" except CalledProcessError: @@ -221,7 +227,7 @@ def _read_no_turbo() -> Optional[bool]: return None -def _set_no_turbo(with_no_turbo: bool) -> Union[bool, str]: +def _set_no_turbo(with_no_turbo: bool) -> Union[Literal[True], str]: if with_no_turbo: value = "1" else: @@ -235,10 +241,10 @@ def _set_no_turbo(with_no_turbo: bool) -> Union[bool, str]: except IOError: return "failed: IOError" - return with_no_turbo + return True -def _configure_perf_sampling(for_profiling: bool) -> Union[int, str]: +def _configure_perf_sampling(for_profiling: bool) -> Union[Literal[True], str]: try: with open( "/proc/sys/kernel/perf_cpu_time_max_percent", "w", encoding="utf-8" @@ -263,10 +269,7 @@ def _configure_perf_sampling(for_profiling: bool) -> Union[int, str]: except IOError: return "failed: IOError" - if for_profiling: - return 0 - else: - return 1 + return True def _restore_perf_sampling() -> str: @@ -290,20 +293,24 @@ def _restore_perf_sampling() -> str: return "restored" -def _initial_settings_and_capabilities(args) -> dict: +def _initial_settings_and_capabilities( + args, +) -> Mapping[str, Union[str, bool, None]]: result = {} if args.use_nice: - result["can_set_nice"] = _can_set_niceness() + r = _can_set_niceness() + result["can_set_nice"] = r is True if args.use_shielding: num_cores = int(args.num_cores) if args.num_cores else None if paths.has_cset() and num_cores: - can_use_shielding = _activate_shielding(num_cores) - # TODO: check that it didn't return "failed" - # TODO: think about whether can_use_shield below being set to False is a problem - if can_use_shielding: + output = _activate_shielding(num_cores) + if "failed" not in output: _reset_shielding() + can_use_shielding = True + else: + can_use_shielding = False else: can_use_shielding = False result["can_set_shield"] = can_use_shielding @@ -313,13 +320,13 @@ def _initial_settings_and_capabilities(args) -> dict: can_set_no_turbo = False if initial_no_turbo is not None and not initial_no_turbo: - result = _set_no_turbo(True) - can_set_no_turbo = result is True + r = _set_no_turbo(True) + can_set_no_turbo = r is True if can_set_no_turbo: _set_no_turbo(False) result["can_set_no_turbo"] = can_set_no_turbo - result["initial_no_turbo"] = initial_no_turbo + result["initial_no_turbo"] = initial_no_turbo # type: ignore if args.use_scaling_governor: initial_governor = _read_scaling_governor() @@ -329,11 +336,11 @@ def _initial_settings_and_capabilities(args) -> dict: initial_governor is not None and initial_governor != SCALING_GOVERNOR_PERFORMANCE ): - result = _set_scaling_governor(SCALING_GOVERNOR_PERFORMANCE, num_cores) - can_set_governor = result == SCALING_GOVERNOR_PERFORMANCE + r = _set_scaling_governor(SCALING_GOVERNOR_PERFORMANCE, num_cores) + can_set_governor = r == SCALING_GOVERNOR_PERFORMANCE result["can_set_scaling_governor"] = can_set_governor - result["initial_scaling_governor"] = initial_governor + result["initial_scaling_governor"] = initial_governor # type: ignore if args.use_mini_perf_sampling: can_minimize_perf_sampling = ( @@ -354,7 +361,8 @@ def _minimize_noise(args) -> dict: result["shielding"] = _activate_shielding(num_cores) if args.use_no_turbo: - result["no_turbo"] = _set_no_turbo(True) + r = _set_no_turbo(True) + result["no_turbo"] = "succeeded" if r is True else r if args.use_scaling_governor: result["scaling_governor"] = _set_scaling_governor( @@ -362,9 +370,8 @@ def _minimize_noise(args) -> dict: ) if args.use_mini_perf_sampling: - result["perf_event_max_sample_rate"] = _configure_perf_sampling( - args.for_profiling - ) + r = _configure_perf_sampling(args.for_profiling) + result["perf_event_max_sample_rate"] = "succeeded" if r is True else r return result diff --git a/rebench/output.py b/rebench/output.py index 01f0eee7..22104537 100644 --- a/rebench/output.py +++ b/rebench/output.py @@ -1,6 +1,9 @@ -def output_as_str(string_like): +from typing import Union, Optional + + +def output_as_str(string_like: Optional[Union[str, bytes]]) -> Optional[str]: if string_like is not None and type(string_like) != str: # pylint: disable=unidiomatic-typecheck - return string_like.decode("utf-8", errors="replace") + return string_like.decode("utf-8", errors="replace") # type: ignore else: return string_like