diff --git a/bbot/core/core.py b/bbot/core/core.py index e7eacf18d..47831af25 100644 --- a/bbot/core/core.py +++ b/bbot/core/core.py @@ -34,8 +34,6 @@ def __init__(self): self._logger = None self._files_config = None - self.bbot_sudo_pass = None - self._config = None self._custom_config = None diff --git a/bbot/core/helpers/command.py b/bbot/core/helpers/command.py index 6f43a401d..16f9c9131 100644 --- a/bbot/core/helpers/command.py +++ b/bbot/core/helpers/command.py @@ -295,7 +295,7 @@ def _prepare_command_kwargs(self, command, kwargs): if sudo and os.geteuid() != 0: self.depsinstaller.ensure_root() env["SUDO_ASKPASS"] = str((self.tools_dir / self.depsinstaller.askpass_filename).resolve()) - env["BBOT_SUDO_PASS"] = self.depsinstaller._sudo_password + env["BBOT_SUDO_PASS"] = self.depsinstaller.encrypted_sudo_pw kwargs["env"] = env PATH = os.environ.get("PATH", "") diff --git a/bbot/core/helpers/depsinstaller/installer.py b/bbot/core/helpers/depsinstaller/installer.py index f17c96499..8df5f05cf 100644 --- a/bbot/core/helpers/depsinstaller/installer.py +++ b/bbot/core/helpers/depsinstaller/installer.py @@ -10,10 +10,11 @@ from threading import Lock from itertools import chain from contextlib import suppress +from secrets import token_bytes from ansible_runner.interface import run from subprocess import CalledProcessError -from ..misc import can_sudo_without_password, os_platform +from ..misc import can_sudo_without_password, os_platform, rm_at_exit log = logging.getLogger("bbot.core.helpers.depsinstaller") @@ -29,14 +30,13 @@ def __init__(self, parent_helper): http_timeout = self.web_config.get("http_timeout", 30) os.environ["ANSIBLE_TIMEOUT"] = str(http_timeout) + # cache encrypted sudo pass self.askpass_filename = "sudo_askpass.py" + self._sudo_password = None + self._sudo_cache_setup = False + self._setup_sudo_cache() self._installed_sudo_askpass = False - self._sudo_password = os.environ.get("BBOT_SUDO_PASS", None) - if self._sudo_password is None: - if self.core.bbot_sudo_pass is not None: - self._sudo_password = self.core.bbot_sudo_pass - elif can_sudo_without_password(): - self._sudo_password = "" + self.data_dir = self.parent_helper.cache_dir / "depsinstaller" self.parent_helper.mkdir(self.data_dir) self.setup_status_cache = self.data_dir / "setup_status.json" @@ -314,20 +314,27 @@ def write_setup_status(self): def ensure_root(self, message=""): self._install_sudo_askpass() + # skip if we've already done this + if self._sudo_password is not None: + return with self.ensure_root_lock: - if os.geteuid() != 0 and self._sudo_password is None: - if message: - log.warning(message) - while not self._sudo_password: - # sleep for a split second to flush previous log messages - sleep(0.1) - password = getpass.getpass(prompt="[USER] Please enter sudo password: ") - if self.parent_helper.verify_sudo_password(password): - log.success("Authentication successful") - self._sudo_password = password - self.core.bbot_sudo_pass = password - else: - log.warning("Incorrect password") + # first check if the environment variable is set + _sudo_password = os.environ.get("BBOT_SUDO_PASS", None) + if _sudo_password is not None or os.geteuid() == 0 or can_sudo_without_password(): + # if we're already root or we can sudo without a password, there's no need to prompt + return + + if message: + log.warning(message) + while not self._sudo_password: + # sleep for a split second to flush previous log messages + sleep(0.1) + _sudo_password = getpass.getpass(prompt="[USER] Please enter sudo password: ") + if self.parent_helper.verify_sudo_password(_sudo_password): + log.success("Authentication successful") + self._sudo_password = _sudo_password + else: + log.warning("Incorrect password") def install_core_deps(self): to_install = set() @@ -343,6 +350,38 @@ def install_core_deps(self): self.ensure_root() self.apt_install(list(to_install)) + def _setup_sudo_cache(self): + if not self._sudo_cache_setup: + self._sudo_cache_setup = True + # write temporary encryption key, to be deleted upon scan completion + self._sudo_temp_keyfile = self.parent_helper.temp_filename() + # remove it at exit + rm_at_exit(self._sudo_temp_keyfile) + # generate random 32-byte key + random_key = token_bytes(32) + # write key to file and set secure permissions + self._sudo_temp_keyfile.write_bytes(random_key) + self._sudo_temp_keyfile.chmod(0o600) + # export path to environment variable, for use in askpass script + os.environ["BBOT_SUDO_KEYFILE"] = str(self._sudo_temp_keyfile.resolve()) + + @property + def encrypted_sudo_pw(self): + if self._sudo_password is None: + return "" + return self._encrypt_sudo_pw(self._sudo_password) + + def _encrypt_sudo_pw(self, pw): + from Crypto.Cipher import AES + from Crypto.Util.Padding import pad + + key = self._sudo_temp_keyfile.read_bytes() + cipher = AES.new(key, AES.MODE_CBC) + ct_bytes = cipher.encrypt(pad(pw.encode(), AES.block_size)) + iv = cipher.iv.hex() + ct = ct_bytes.hex() + return f"{iv}:{ct}" + def _install_sudo_askpass(self): if not self._installed_sudo_askpass: self._installed_sudo_askpass = True diff --git a/bbot/core/helpers/depsinstaller/sudo_askpass.py b/bbot/core/helpers/depsinstaller/sudo_askpass.py index 42eccc167..ccd8cd01b 100644 --- a/bbot/core/helpers/depsinstaller/sudo_askpass.py +++ b/bbot/core/helpers/depsinstaller/sudo_askpass.py @@ -1,5 +1,41 @@ #!/usr/bin/env python3 - import os +import sys +from pathlib import Path +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad + +ENV_VAR_NAME = "BBOT_SUDO_PASS" +KEY_ENV_VAR_PATH = "BBOT_SUDO_KEYFILE" + + +def decrypt_password(encrypted_data, key): + iv, ciphertext = encrypted_data.split(":") + iv = bytes.fromhex(iv) + ct = bytes.fromhex(ciphertext) + cipher = AES.new(key, AES.MODE_CBC, iv) + pt = unpad(cipher.decrypt(ct), AES.block_size) + return pt.decode("utf-8") + + +def main(): + encrypted_password = os.environ.get(ENV_VAR_NAME, "") + # remove variable from environment once we've got it + os.environ.pop(ENV_VAR_NAME, None) + encryption_keypath = Path(os.environ.get(KEY_ENV_VAR_PATH, "")) + + if not encrypted_password or not encryption_keypath.is_file(): + print("Error: Encrypted password or encryption key not found in environment variables.", file=sys.stderr) + sys.exit(1) + + try: + key = encryption_keypath.read_bytes() + decrypted_password = decrypt_password(encrypted_password, key) + print(decrypted_password, end="") + except Exception as e: + print(f'Error decrypting password "{encrypted_password}": {str(e)}', file=sys.stderr) + sys.exit(1) + -print(os.environ.get("BBOT_SUDO_PASS", ""), end="") +if __name__ == "__main__": + main()