From 1b666b28eb37b51d9d568e4ff8ded9d416e20e16 Mon Sep 17 00:00:00 2001 From: hirokuni-kitahara Date: Mon, 6 May 2024 19:28:06 +0900 Subject: [PATCH 1/5] remove ansible-core dependency Signed-off-by: hirokuni-kitahara --- .../annotators/ansible.builtin/unarchive.py | 6 +- ansible_risk_insight/dependency_finder.py | 4 +- ansible_risk_insight/model_loader.py | 4 +- ansible_risk_insight/models.py | 4 +- ansible_risk_insight/utils.py | 66 +++++++++++++++++++ pyproject.toml | 1 - 6 files changed, 75 insertions(+), 10 deletions(-) diff --git a/ansible_risk_insight/annotators/ansible.builtin/unarchive.py b/ansible_risk_insight/annotators/ansible.builtin/unarchive.py index 941c2b68..7995cca8 100644 --- a/ansible_risk_insight/annotators/ansible.builtin/unarchive.py +++ b/ansible_risk_insight/annotators/ansible.builtin/unarchive.py @@ -16,8 +16,8 @@ from typing import List from ansible_risk_insight.models import Annotation, RiskAnnotation, TaskCall, DefaultRiskType, InboundTransferDetail +from ansible_risk_insight.utils import parse_bool from ansible_risk_insight.annotators.module_annotator_base import ModuleAnnotator, ModuleAnnotatorResult -from ansible.module_utils.parsing.convert_bool import boolean class UnarchiveAnnotator(ModuleAnnotator): @@ -34,12 +34,12 @@ def run(self, task: TaskCall) -> List[Annotation]: if isinstance(remote_src.raw, str) or isinstance(remote_src.raw, bool): try: - is_remote_src = boolean(remote_src.raw) + is_remote_src = parse_bool(remote_src.raw) except Exception: pass if not is_remote_src and (isinstance(remote_src.templated, str) or isinstance(remote_src.templated, bool)): try: - is_remote_src = boolean(remote_src.templated) + is_remote_src = parse_bool(remote_src.templated) except Exception: pass diff --git a/ansible_risk_insight/dependency_finder.py b/ansible_risk_insight/dependency_finder.py index 0f92b035..1ffb3d0c 100644 --- a/ansible_risk_insight/dependency_finder.py +++ b/ansible_risk_insight/dependency_finder.py @@ -19,7 +19,6 @@ import json import subprocess from pathlib import Path -from ansible import constants as C import ansible_risk_insight.logger as logger from .safe_glob import safe_glob @@ -35,6 +34,7 @@ galaxy_yml = "galaxy.yml" GALAXY_yml = "GALAXY.yml" github_workflows_dir = ".github/workflows" +ansible_home = os.getenv("ANSIBLE_HOME", "~/.ansible") def find_dependency(type, target, dependency_dir, use_ansible_path=False): @@ -68,7 +68,7 @@ def find_dependency(type, target, dependency_dir, use_ansible_path=False): dependencies["file"] = manifestjson if use_ansible_path and dependencies["dependencies"]: - ansible_dir = Path(C.ANSIBLE_HOME).expanduser() + ansible_dir = Path(ansible_home).expanduser() paths, metadata = search_ansible_dir(dependencies["dependencies"], str(ansible_dir)) if paths: dependencies["paths"] = paths diff --git a/ansible_risk_insight/model_loader.py b/ansible_risk_insight/model_loader.py index 82d9cb78..629553c2 100644 --- a/ansible_risk_insight/model_loader.py +++ b/ansible_risk_insight/model_loader.py @@ -28,9 +28,9 @@ except Exception: # otherwise, use Python based loader from yaml import SafeLoader as Loader -from ansible.module_utils.parsing.convert_bool import boolean import ansible_risk_insight.logger as logger +from ansible_risk_insight.utils import parse_bool from .safe_glob import safe_glob from .models import ( ExecutableType, @@ -1360,7 +1360,7 @@ def load_module(module_file_path, collection_name="", role_name="", basedir="", arg_elements_type_str = arg_elements_type.__name__ required = None try: - required = boolean(arg_spec.get("required", "false")) + required = parse_bool(arg_spec.get("required", "false")) except Exception: pass arg = ModuleArgument( diff --git a/ansible_risk_insight/models.py b/ansible_risk_insight/models.py index 8d6c8c13..f2886766 100644 --- a/ansible_risk_insight/models.py +++ b/ansible_risk_insight/models.py @@ -26,7 +26,7 @@ import jsonpickle from rapidfuzz.distance import Levenshtein import ansible_risk_insight.yaml as ariyaml -from ansible.module_utils.parsing.convert_bool import boolean +from ansible_risk_insight.utils import parse_bool from .keyutil import ( set_collection_key, set_module_key, @@ -1064,7 +1064,7 @@ def from_options(options: dict): become = options.get("become", "") enabled = False try: - enabled = boolean(become) + enabled = parse_bool(become) except Exception: pass user = options.get("become_user", "") diff --git a/ansible_risk_insight/utils.py b/ansible_risk_insight/utils.py index 842718f4..2c048632 100644 --- a/ansible_risk_insight/utils.py +++ b/ansible_risk_insight/utils.py @@ -21,6 +21,7 @@ import hashlib import yaml import json +import codecs from filelock import FileLock from copy import deepcopy from tabulate import tabulate @@ -30,6 +31,19 @@ import ansible_risk_insight.logger as logger +BOOLEANS_TRUE = frozenset(("y", "yes", "on", "1", "true", "t", 1, 1.0, True)) +BOOLEANS_FALSE = frozenset(("n", "no", "off", "0", "false", "f", 0, 0.0, False)) +BOOLEANS = BOOLEANS_TRUE.union(BOOLEANS_FALSE) + +try: + codecs.lookup_error("surrogateescape") + HAS_SURROGATEESCAPE = True +except LookupError: + HAS_SURROGATEESCAPE = False + +_COMPOSED_ERROR_HANDLERS = frozenset((None, "surrogate_or_replace", "surrogate_or_strict", "surrogate_then_replace")) + + def lock_file(fpath, timeout=10): if not fpath: return @@ -739,3 +753,55 @@ def recursive_copy_dict(src, dst): def is_test_object(path: str): return path.startswith("tests/integration/") or path.startswith("molecule/") + + +def to_str(obj, encoding="utf-8", errors=None, nonstring="simplerepr"): + if isinstance(obj, str): + return obj + + if errors in _COMPOSED_ERROR_HANDLERS: + if HAS_SURROGATEESCAPE: + errors = "surrogateescape" + elif errors == "surrogate_or_strict": + errors = "strict" + else: + errors = "replace" + + if isinstance(obj, bytes): + return obj.decode(encoding, errors) + + if nonstring == "simplerepr": + try: + value = str(obj) + except UnicodeError: + try: + value = repr(obj) + except UnicodeError: + # Giving up + return "" + elif nonstring == "passthru": + return obj + elif nonstring == "empty": + return "" + elif nonstring == "strict": + raise TypeError("obj must be a string type") + else: + raise TypeError("Invalid value %s for to_text's nonstring parameter" % nonstring) + + return to_str(value, encoding, errors) + + +def parse_bool(value, strict=True): + if isinstance(value, bool): + return value + + normalized_value = value + if isinstance(value, (str, bytes)): + normalized_value = to_str(value, errors="surrogate_or_strict").lower().strip() + + if normalized_value in BOOLEANS_TRUE: + return True + elif normalized_value in BOOLEANS_FALSE or not strict: + return False + + raise TypeError("The value '%s' is not a valid boolean. Valid booleans include: %s" % (to_str(value), ", ".join(repr(i) for i in BOOLEANS))) diff --git a/pyproject.toml b/pyproject.toml index f29d4c8e..cb0a9fe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,6 @@ dependencies = [ "smmap", "tabulate", "requests", - "ansible-core", "ruamel.yaml", "filelock", "rapidfuzz", From a95a453a9a06806d93d14f313ebc752de69f15ee Mon Sep 17 00:00:00 2001 From: hirokuni-kitahara Date: Tue, 7 May 2024 10:49:39 +0900 Subject: [PATCH 2/5] update util function for parse-bool Signed-off-by: hirokuni-kitahara --- ansible_risk_insight/utils.py | 45 ++++++++++++++--------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/ansible_risk_insight/utils.py b/ansible_risk_insight/utils.py index 2c048632..cdae9224 100644 --- a/ansible_risk_insight/utils.py +++ b/ansible_risk_insight/utils.py @@ -31,17 +31,17 @@ import ansible_risk_insight.logger as logger -BOOLEANS_TRUE = frozenset(("y", "yes", "on", "1", "true", "t", 1, 1.0, True)) -BOOLEANS_FALSE = frozenset(("n", "no", "off", "0", "false", "f", 0, 0.0, False)) -BOOLEANS = BOOLEANS_TRUE.union(BOOLEANS_FALSE) +bool_values_true = frozenset(("y", "yes", "on", "1", "true", "t", 1, 1.0, True)) +bool_values_false = frozenset(("n", "no", "off", "0", "false", "f", 0, 0.0, False)) +bool_values = bool_values_true.union(bool_values_false) try: codecs.lookup_error("surrogateescape") - HAS_SURROGATEESCAPE = True + _has_surrogateescape = True except LookupError: - HAS_SURROGATEESCAPE = False + _has_surrogateescape = False -_COMPOSED_ERROR_HANDLERS = frozenset((None, "surrogate_or_replace", "surrogate_or_strict", "surrogate_then_replace")) +_surrogate_error_handlers = frozenset((None, "surrogate_or_replace", "surrogate_or_strict", "surrogate_then_replace")) def lock_file(fpath, timeout=10): @@ -755,12 +755,12 @@ def is_test_object(path: str): return path.startswith("tests/integration/") or path.startswith("molecule/") -def to_str(obj, encoding="utf-8", errors=None, nonstring="simplerepr"): +def to_str(obj, encoding="utf-8", errors=None): if isinstance(obj, str): return obj - if errors in _COMPOSED_ERROR_HANDLERS: - if HAS_SURROGATEESCAPE: + if errors in _surrogate_error_handlers: + if _has_surrogateescape: errors = "surrogateescape" elif errors == "surrogate_or_strict": errors = "strict" @@ -770,23 +770,14 @@ def to_str(obj, encoding="utf-8", errors=None, nonstring="simplerepr"): if isinstance(obj, bytes): return obj.decode(encoding, errors) - if nonstring == "simplerepr": + try: + value = str(obj) + except UnicodeError: try: - value = str(obj) + value = repr(obj) except UnicodeError: - try: - value = repr(obj) - except UnicodeError: - # Giving up - return "" - elif nonstring == "passthru": - return obj - elif nonstring == "empty": - return "" - elif nonstring == "strict": - raise TypeError("obj must be a string type") - else: - raise TypeError("Invalid value %s for to_text's nonstring parameter" % nonstring) + # Giving up + return "" return to_str(value, encoding, errors) @@ -799,9 +790,9 @@ def parse_bool(value, strict=True): if isinstance(value, (str, bytes)): normalized_value = to_str(value, errors="surrogate_or_strict").lower().strip() - if normalized_value in BOOLEANS_TRUE: + if normalized_value in bool_values_true: return True - elif normalized_value in BOOLEANS_FALSE or not strict: + elif normalized_value in bool_values_false or not strict: return False - raise TypeError("The value '%s' is not a valid boolean. Valid booleans include: %s" % (to_str(value), ", ".join(repr(i) for i in BOOLEANS))) + raise TypeError("The value '%s' is not a valid boolean. Valid booleans include: %s" % (to_str(value), ", ".join(repr(i) for i in bool_values))) From 9cb72a89a1893dd881328b920a285bfe46578552 Mon Sep 17 00:00:00 2001 From: hirokuni-kitahara Date: Tue, 7 May 2024 10:56:55 +0900 Subject: [PATCH 3/5] update util function for parse-bool Signed-off-by: hirokuni-kitahara --- ansible_risk_insight/utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ansible_risk_insight/utils.py b/ansible_risk_insight/utils.py index cdae9224..c95a7245 100644 --- a/ansible_risk_insight/utils.py +++ b/ansible_risk_insight/utils.py @@ -41,7 +41,7 @@ except LookupError: _has_surrogateescape = False -_surrogate_error_handlers = frozenset((None, "surrogate_or_replace", "surrogate_or_strict", "surrogate_then_replace")) +_surrogate_errors = frozenset((None, "surrogate_or_replace", "surrogate_or_strict", "surrogate_then_replace")) def lock_file(fpath, timeout=10): @@ -759,7 +759,7 @@ def to_str(obj, encoding="utf-8", errors=None): if isinstance(obj, str): return obj - if errors in _surrogate_error_handlers: + if errors in _surrogate_errors: if _has_surrogateescape: errors = "surrogateescape" elif errors == "surrogate_or_strict": @@ -776,7 +776,6 @@ def to_str(obj, encoding="utf-8", errors=None): try: value = repr(obj) except UnicodeError: - # Giving up return "" return to_str(value, encoding, errors) From 32a00e49e0d4a17b23a92ffba86a2f4f74b2d0e5 Mon Sep 17 00:00:00 2001 From: hirokuni-kitahara Date: Tue, 7 May 2024 14:30:08 +0900 Subject: [PATCH 4/5] update util function for parse-bool Signed-off-by: hirokuni-kitahara --- ansible_risk_insight/utils.py | 66 +++++++++++++---------------------- 1 file changed, 24 insertions(+), 42 deletions(-) diff --git a/ansible_risk_insight/utils.py b/ansible_risk_insight/utils.py index c95a7245..60eb7032 100644 --- a/ansible_risk_insight/utils.py +++ b/ansible_risk_insight/utils.py @@ -35,14 +35,6 @@ bool_values_false = frozenset(("n", "no", "off", "0", "false", "f", 0, 0.0, False)) bool_values = bool_values_true.union(bool_values_false) -try: - codecs.lookup_error("surrogateescape") - _has_surrogateescape = True -except LookupError: - _has_surrogateescape = False - -_surrogate_errors = frozenset((None, "surrogate_or_replace", "surrogate_or_strict", "surrogate_then_replace")) - def lock_file(fpath, timeout=10): if not fpath: @@ -755,43 +747,33 @@ def is_test_object(path: str): return path.startswith("tests/integration/") or path.startswith("molecule/") -def to_str(obj, encoding="utf-8", errors=None): - if isinstance(obj, str): - return obj - - if errors in _surrogate_errors: - if _has_surrogateescape: - errors = "surrogateescape" - elif errors == "surrogate_or_strict": - errors = "strict" - else: - errors = "replace" - - if isinstance(obj, bytes): - return obj.decode(encoding, errors) - - try: - value = str(obj) - except UnicodeError: - try: - value = repr(obj) - except UnicodeError: - return "" - - return to_str(value, encoding, errors) - - -def parse_bool(value, strict=True): +def parse_bool(value: any): + value_str = None + use_value_str = False if isinstance(value, bool): return value + elif isinstance(value, str): + value_str = value + use_value_str = True + elif isinstance(value, bytes): + _has_surrogateescape = False + try: + codecs.lookup_error("surrogateescape") + _has_surrogateescape = True + except Exception: + pass + errors = "surrogateescape" if _has_surrogateescape else "strict" + value_str = value.decode("utf-8", errors) + use_value_str = True - normalized_value = value - if isinstance(value, (str, bytes)): - normalized_value = to_str(value, errors="surrogate_or_strict").lower().strip() + if use_value_str and isinstance(value_str, str): + value_str = value_str.lower().strip() - if normalized_value in bool_values_true: + target_value = value_str if use_value_str else value + + if target_value in bool_values_true: return True - elif normalized_value in bool_values_false or not strict: + elif target_value in bool_values_false: return False - - raise TypeError("The value '%s' is not a valid boolean. Valid booleans include: %s" % (to_str(value), ", ".join(repr(i) for i in bool_values))) + else: + raise TypeError(f'failed to parse the value "{value}" as a boolean.') From e6265295513b4a114ae7dc09db0e736d0e00d12b Mon Sep 17 00:00:00 2001 From: hirokuni-kitahara Date: Tue, 7 May 2024 14:30:59 +0900 Subject: [PATCH 5/5] update util function for parse-bool Signed-off-by: hirokuni-kitahara --- ansible_risk_insight/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ansible_risk_insight/utils.py b/ansible_risk_insight/utils.py index 60eb7032..561d3222 100644 --- a/ansible_risk_insight/utils.py +++ b/ansible_risk_insight/utils.py @@ -756,13 +756,13 @@ def parse_bool(value: any): value_str = value use_value_str = True elif isinstance(value, bytes): - _has_surrogateescape = False + surrogateescape_enabled = False try: codecs.lookup_error("surrogateescape") - _has_surrogateescape = True + surrogateescape_enabled = True except Exception: pass - errors = "surrogateescape" if _has_surrogateescape else "strict" + errors = "surrogateescape" if surrogateescape_enabled else "strict" value_str = value.decode("utf-8", errors) use_value_str = True