diff --git a/ansible_risk_insight/annotators/ansible.builtin/unarchive.py b/ansible_risk_insight/annotators/ansible.builtin/unarchive.py index 941c2b68..7995cca8 100644 --- a/ansible_risk_insight/annotators/ansible.builtin/unarchive.py +++ b/ansible_risk_insight/annotators/ansible.builtin/unarchive.py @@ -16,8 +16,8 @@ from typing import List from ansible_risk_insight.models import Annotation, RiskAnnotation, TaskCall, DefaultRiskType, InboundTransferDetail +from ansible_risk_insight.utils import parse_bool from ansible_risk_insight.annotators.module_annotator_base import ModuleAnnotator, ModuleAnnotatorResult -from ansible.module_utils.parsing.convert_bool import boolean class UnarchiveAnnotator(ModuleAnnotator): @@ -34,12 +34,12 @@ def run(self, task: TaskCall) -> List[Annotation]: if isinstance(remote_src.raw, str) or isinstance(remote_src.raw, bool): try: - is_remote_src = boolean(remote_src.raw) + is_remote_src = parse_bool(remote_src.raw) except Exception: pass if not is_remote_src and (isinstance(remote_src.templated, str) or isinstance(remote_src.templated, bool)): try: - is_remote_src = boolean(remote_src.templated) + is_remote_src = parse_bool(remote_src.templated) except Exception: pass diff --git a/ansible_risk_insight/dependency_finder.py b/ansible_risk_insight/dependency_finder.py index 0f92b035..1ffb3d0c 100644 --- a/ansible_risk_insight/dependency_finder.py +++ b/ansible_risk_insight/dependency_finder.py @@ -19,7 +19,6 @@ import json import subprocess from pathlib import Path -from ansible import constants as C import ansible_risk_insight.logger as logger from .safe_glob import safe_glob @@ -35,6 +34,7 @@ galaxy_yml = "galaxy.yml" GALAXY_yml = "GALAXY.yml" github_workflows_dir = ".github/workflows" +ansible_home = os.getenv("ANSIBLE_HOME", "~/.ansible") def find_dependency(type, target, dependency_dir, use_ansible_path=False): @@ -68,7 +68,7 @@ def find_dependency(type, target, dependency_dir, use_ansible_path=False): dependencies["file"] = manifestjson if use_ansible_path and dependencies["dependencies"]: - ansible_dir = Path(C.ANSIBLE_HOME).expanduser() + ansible_dir = Path(ansible_home).expanduser() paths, metadata = search_ansible_dir(dependencies["dependencies"], str(ansible_dir)) if paths: dependencies["paths"] = paths diff --git a/ansible_risk_insight/model_loader.py b/ansible_risk_insight/model_loader.py index 82d9cb78..629553c2 100644 --- a/ansible_risk_insight/model_loader.py +++ b/ansible_risk_insight/model_loader.py @@ -28,9 +28,9 @@ except Exception: # otherwise, use Python based loader from yaml import SafeLoader as Loader -from ansible.module_utils.parsing.convert_bool import boolean import ansible_risk_insight.logger as logger +from ansible_risk_insight.utils import parse_bool from .safe_glob import safe_glob from .models import ( ExecutableType, @@ -1360,7 +1360,7 @@ def load_module(module_file_path, collection_name="", role_name="", basedir="", arg_elements_type_str = arg_elements_type.__name__ required = None try: - required = boolean(arg_spec.get("required", "false")) + required = parse_bool(arg_spec.get("required", "false")) except Exception: pass arg = ModuleArgument( diff --git a/ansible_risk_insight/models.py b/ansible_risk_insight/models.py index 8d6c8c13..f2886766 100644 --- a/ansible_risk_insight/models.py +++ b/ansible_risk_insight/models.py @@ -26,7 +26,7 @@ import jsonpickle from rapidfuzz.distance import Levenshtein import ansible_risk_insight.yaml as ariyaml -from ansible.module_utils.parsing.convert_bool import boolean +from ansible_risk_insight.utils import parse_bool from .keyutil import ( set_collection_key, set_module_key, @@ -1064,7 +1064,7 @@ def from_options(options: dict): become = options.get("become", "") enabled = False try: - enabled = boolean(become) + enabled = parse_bool(become) except Exception: pass user = options.get("become_user", "") diff --git a/ansible_risk_insight/utils.py b/ansible_risk_insight/utils.py index 842718f4..561d3222 100644 --- a/ansible_risk_insight/utils.py +++ b/ansible_risk_insight/utils.py @@ -21,6 +21,7 @@ import hashlib import yaml import json +import codecs from filelock import FileLock from copy import deepcopy from tabulate import tabulate @@ -30,6 +31,11 @@ import ansible_risk_insight.logger as logger +bool_values_true = frozenset(("y", "yes", "on", "1", "true", "t", 1, 1.0, True)) +bool_values_false = frozenset(("n", "no", "off", "0", "false", "f", 0, 0.0, False)) +bool_values = bool_values_true.union(bool_values_false) + + def lock_file(fpath, timeout=10): if not fpath: return @@ -739,3 +745,35 @@ def recursive_copy_dict(src, dst): def is_test_object(path: str): return path.startswith("tests/integration/") or path.startswith("molecule/") + + +def parse_bool(value: any): + value_str = None + use_value_str = False + if isinstance(value, bool): + return value + elif isinstance(value, str): + value_str = value + use_value_str = True + elif isinstance(value, bytes): + surrogateescape_enabled = False + try: + codecs.lookup_error("surrogateescape") + surrogateescape_enabled = True + except Exception: + pass + errors = "surrogateescape" if surrogateescape_enabled else "strict" + value_str = value.decode("utf-8", errors) + use_value_str = True + + if use_value_str and isinstance(value_str, str): + value_str = value_str.lower().strip() + + target_value = value_str if use_value_str else value + + if target_value in bool_values_true: + return True + elif target_value in bool_values_false: + return False + else: + raise TypeError(f'failed to parse the value "{value}" as a boolean.') diff --git a/pyproject.toml b/pyproject.toml index f29d4c8e..cb0a9fe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,6 @@ dependencies = [ "smmap", "tabulate", "requests", - "ansible-core", "ruamel.yaml", "filelock", "rapidfuzz",