diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml new file mode 100644 index 000000000..03cd5a501 --- /dev/null +++ b/.github/workflows/pre-commit.yaml @@ -0,0 +1,27 @@ +name: pre-commit + +on: [ pull_request ] + +jobs: + 'pre-commit': + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + # note: we use depth:0 so we can use + + - name: Set up Python 3.8 + uses: actions/setup-python@v1 + with: + python-version: 3.8 + + - name: install dependencies + run: | + python -m pip install --upgrade pip + pip install pre-commit + pre-commit run --files .pre-commit-config.yaml + + - name: analyze code with pre-commit + run: 'pre-commit run --from-ref "origin/${{ github.base_ref }}" --to-ref HEAD' diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml new file mode 100644 index 000000000..c737c4a22 --- /dev/null +++ b/.github/workflows/pytest.yaml @@ -0,0 +1,32 @@ +name: pytest + +on: [ pull_request ] + +defaults: + run: + shell: bash + +jobs: + pytest: + runs-on: ubuntu-latest + container: jettero/kersplat:1.0.21 + steps: + + - uses: actions/checkout@v2 + + - name: install dependencies + run: | + source /etc/profile.d/kersplat.sh + which python + which pip + python --version + pip --version + python -m pip install --upgrade pip + python -m pip install --upgrade pytest + bash mk-requires.sh + python -m pip install -r requirements.txt + + - name: run pytest + run: | + source /etc/profile.d/kersplat.sh + python -m pytest tests diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 33ec36929..1f122b28d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,6 +8,7 @@ repos: - id: trailing-whitespace - id: mixed-line-ending - id: check-merge-conflict + - id: check-ast - repo: https://github.com/psf/black rev: 20.8b1 hooks: @@ -17,3 +18,6 @@ repos: rev: 'v2.6.0' hooks: - id: pylint + - id: pylint + name: no-bad-escapes + entry: python -m pylint --confidence='' -d all -e anomalous-backslash-in-string,anomalous-unicode-escape-in-string diff --git a/hubblestack/audit/misc.py b/hubblestack/audit/misc.py index 59b5bb205..adc9a8ddd 100644 --- a/hubblestack/audit/misc.py +++ b/hubblestack/audit/misc.py @@ -1,4 +1,7 @@ # -*- encoding: utf-8 -*- +# pylint: disable=unused-argument +# ^^^ rather not pragma this disable, but I'm not sure what effect it'd have on audit's chaining +# to remove these unused arguments """ Hubble Audit plugin for running miscellaneous one-off python functions to run more complex Audit audits without allowing arbitrary command execution @@ -286,45 +289,45 @@ def validate_params(block_id, block_dict, extra_args=None): Raises: HubbleCheckValidationError: For any validation error """ - log.debug('Module: misc Start validating params for check-id: {0}'.format(block_id)) + log.debug("Module: misc Start validating params for check-id: {0}".format(block_id)) error = {} # This module is callable from Audit only - if extra_args.get('caller') == Caller.FDG: - error['misc'] = 'Module: misc called from FDG !!!!' + if extra_args.get("caller") == Caller.FDG: + error["misc"] = "Module: misc called from FDG !!!!" # fetch required param - function_name = runner_utils.get_param_for_module(block_id, block_dict, 'function') + function_name = runner_utils.get_param_for_module(block_id, block_dict, "function") if not function_name: - error['function'] = 'function not provided for block_id: {0}'.format(block_id) + error["function"] = "function not provided for block_id: {0}".format(block_id) elif function_name not in FUNCTION_MAP: - error['function'] = 'Unsupported function name: {0} for block_id: {1}'.format(function_name, block_id) + error["function"] = "Unsupported function name: {0} for block_id: {1}".format(function_name, block_id) else: - if function_name == 'check_directory_files_permission': - _validation_helper(block_id, block_dict, ['path', 'permission'], error) - elif function_name == 'check_service_status': - _validation_helper(block_id, block_dict, ['service_name', 'state'], error) - elif function_name == 'check_all_users_home_directory': - _validation_helper(block_id, block_dict, ['max_system_uid'], error) - elif function_name == 'check_users_own_their_home': - _validation_helper(block_id, block_dict, ['max_system_uid'], error) - elif function_name == 'check_list_values': - _validation_helper(block_id, - block_dict, - ['file_path', 'match_pattern', 'value_pattern', 'value_delimter'], - error) - elif function_name == 'ensure_max_password_expiration': - _validation_helper(block_id, block_dict, ['allow_max_days', 'except_for_users'], error) - elif function_name == 'check_sshd_parameters': - _validation_helper(block_id, block_dict, ['pattern'], error) - elif function_name == 'test_mount_attrs': - _validation_helper(block_id, block_dict, ['mount_name', 'attribute'], error) + if function_name == "check_directory_files_permission": + _validation_helper(block_id, block_dict, ["path", "permission"], error) + elif function_name == "check_service_status": + _validation_helper(block_id, block_dict, ["service_name", "state"], error) + elif function_name == "check_all_users_home_directory": + _validation_helper(block_id, block_dict, ["max_system_uid"], error) + elif function_name == "check_users_own_their_home": + _validation_helper(block_id, block_dict, ["max_system_uid"], error) + elif function_name == "check_list_values": + _validation_helper( + block_id, block_dict, ["file_path", "match_pattern", "value_pattern", "value_delimter"], error + ) + elif function_name == "ensure_max_password_expiration": + _validation_helper(block_id, block_dict, ["allow_max_days", "except_for_users"], error) + elif function_name == "check_sshd_parameters": + _validation_helper(block_id, block_dict, ["pattern"], error) + elif function_name == "test_mount_attrs": + _validation_helper(block_id, block_dict, ["mount_name", "attribute"], error) if error: raise HubbleCheckValidationError(error) - log.debug('Validation success for check-id: {0}'.format(block_id)) + log.debug("Validation success for check-id: {0}".format(block_id)) + def _validation_helper(block_id, block_dict, expected_args_list, error_dict): """ @@ -333,7 +336,7 @@ def _validation_helper(block_id, block_dict, expected_args_list, error_dict): for expected_arg in expected_args_list: expected_arg_value = runner_utils.get_param_for_module(block_id, block_dict, expected_arg) if not isinstance(expected_arg_value, int) and not expected_arg_value: - error_dict[expected_arg] = 'No {0} provided for block_id: {1}'.format(expected_arg, block_id) + error_dict[expected_arg] = "No {0} provided for block_id: {1}".format(expected_arg, block_id) def get_filtered_params_to_log(block_id, block_dict, extra_args=None): @@ -349,11 +352,11 @@ def get_filtered_params_to_log(block_id, block_dict, extra_args=None): Example: {'chaining_args': {'result': "/some/path/file.txt", 'status': True}, 'caller': 'Audit'} """ - log.debug('get_filtered_params_to_log for id: {0}'.format(block_id)) + log.debug("get_filtered_params_to_log for id: {0}".format(block_id)) # fetch required param - function_name = runner_utils.get_param_for_module(block_id, block_dict, 'function') + function_name = runner_utils.get_param_for_module(block_id, block_dict, "function") - return {'function_name': function_name} + return {"function_name": function_name} def execute(block_id, block_dict, extra_args=None): @@ -372,9 +375,9 @@ def execute(block_id, block_dict, extra_args=None): returns: tuple of result(value) and status(boolean) """ - log.debug('Executing misc module for id: {0}'.format(block_id)) + log.debug("Executing misc module for id: {0}".format(block_id)) - function_name = runner_utils.get_param_for_module(block_id, block_dict, 'function') + function_name = runner_utils.get_param_for_module(block_id, block_dict, "function") result = FUNCTION_MAP[function_name](block_id, block_dict, extra_args) @@ -396,7 +399,7 @@ def get_failure_reason(block_id, block_dict, extra_args=None): 'caller': 'Audit'} :return: """ - function_name = runner_utils.get_param_for_module(block_id, block_dict, 'function') + function_name = runner_utils.get_param_for_module(block_id, block_dict, "function") return "Executing function {0}".format(function_name) @@ -404,12 +407,34 @@ def _check_all_ports_firewall_rules(block_id, block_dict, extra_args): """ Ensure firewall rule for all open ports """ - start_open_ports = (_execute_shell_command('netstat -ln | grep "Active Internet connections (only servers)" -n | cut -d ":" -f1', python_shell=True)).strip() - end_open_ports = (_execute_shell_command('netstat -ln | grep "Active UNIX domain sockets (only servers)" -n | cut -d ":" -f1', python_shell=True)).strip() - open_ports = (_execute_shell_command('netstat -ln | awk \'FNR > ' + start_open_ports + ' && FNR < ' + end_open_ports + ' && $6 == "LISTEN" && $4 !~ /127.0.0.1/ {print $4}\' | sed -e "s/.*://"', python_shell=True)).strip() - open_ports = open_ports.split('\n') if open_ports != "" else [] - firewall_ports = (_execute_shell_command('iptables -L INPUT -v -n | awk \'FNR > 2 && $11 != "" && $11 ~ /^dpt:/ {print $11}\' | sed -e "s/.*://"', python_shell=True)).strip() - firewall_ports = firewall_ports.split('\n') if firewall_ports != "" else [] + start_open_ports = ( + _execute_shell_command( + 'netstat -ln | grep "Active Internet connections (only servers)" -n | cut -d ":" -f1', python_shell=True + ) + ).strip() + end_open_ports = ( + _execute_shell_command( + 'netstat -ln | grep "Active UNIX domain sockets (only servers)" -n | cut -d ":" -f1', python_shell=True + ) + ).strip() + open_ports = ( + _execute_shell_command( + "netstat -ln | awk 'FNR > " + + start_open_ports + + " && FNR < " + + end_open_ports + + ' && $6 == "LISTEN" && $4 !~ /127.0.0.1/ {print $4}\' | sed -e "s/.*://"', + python_shell=True, + ) + ).strip() + open_ports = open_ports.split("\n") if open_ports != "" else [] + firewall_ports = ( + _execute_shell_command( + 'iptables -L INPUT -v -n | awk \'FNR > 2 && $11 != "" && $11 ~ /^dpt:/ {print $11}\' | sed -e "s/.*://"', + python_shell=True, + ) + ).strip() + firewall_ports = firewall_ports.split("\n") if firewall_ports != "" else [] no_firewall_ports = [] for open_port in open_ports: @@ -418,48 +443,58 @@ def _check_all_ports_firewall_rules(block_id, block_dict, extra_args): return True if len(no_firewall_ports) == 0 else str(no_firewall_ports) + def _check_password_fields_not_empty(block_id, block_dict, extra_args): """ Ensure password fields are not empty """ - result = _execute_shell_command('cat /etc/shadow | awk -F: \'($2 == "" ) { print $1 " does not have a password "}\'', python_shell=True) - return True if result == '' else result + result = _execute_shell_command( + 'cat /etc/shadow | awk -F: \'($2 == "" ) { print $1 " does not have a password "}\'', python_shell=True + ) + return True if result == "" else result + def _system_account_non_login(block_id, block_dict, extra_args=None): """ Ensure system accounts are non-login """ - non_login_shell = runner_utils.get_param_for_module(block_id, block_dict, 'non_login_shell', '/sbin/nologin') - max_system_uid = runner_utils.get_param_for_module(block_id, block_dict, 'max_system_uid', '500') - except_for_users = runner_utils.get_param_for_module(block_id, block_dict, 'except_for_users', '') + non_login_shell = runner_utils.get_param_for_module(block_id, block_dict, "non_login_shell", "/sbin/nologin") + max_system_uid = runner_utils.get_param_for_module(block_id, block_dict, "max_system_uid", "500") + except_for_users = runner_utils.get_param_for_module(block_id, block_dict, "except_for_users", "") - users_list = ['root','halt','sync','shutdown'] + users_list = ["root", "halt", "sync", "shutdown"] for user in except_for_users.split(","): if user.strip() != "": users_list.append(user.strip()) result = [] - cmd = __mods__["cmd.run_all"]('egrep -v "^\+" /etc/passwd ') - for line in cmd['stdout'].split('\n'): - tokens = line.split(':') - if tokens[0] not in users_list and int(tokens[2]) < int(max_system_uid) and tokens[6] not in ( non_login_shell , "/bin/false" ): - result.append(line) + cmd = __mods__["cmd.run_all"]('egrep -v "^\\+" /etc/passwd ') + for line in cmd["stdout"].split("\n"): + tokens = line.split(":") + if ( + tokens[0] not in users_list + and int(tokens[2]) < int(max_system_uid) + and tokens[6] not in (non_login_shell, "/bin/false") + ): + result.append(line) return True if result == [] else str(result) + def _default_group_for_root(block_id, block_dict, extra_args): """ Ensure default group for the root account is GID 0 """ result = _execute_shell_command('grep "^root:" /etc/passwd | cut -f4 -d:', python_shell=True) result = result.strip() - return True if result == '0' else False + return True if result == "0" else False def _root_is_only_uid_0_account(block_id, block_dict, extra_args): """ Ensure root is the only UID 0 account """ - result = _execute_shell_command('cat /etc/passwd | awk -F: \'($3 == 0) { print $1 }\'', python_shell=True) - return True if result.strip() == 'root' else result + result = _execute_shell_command("cat /etc/passwd | awk -F: '($3 == 0) { print $1 }'", python_shell=True) + return True if result.strip() == "root" else result + def _check_time_synchronization(block_id, block_dict, extra_args): """ @@ -467,11 +502,12 @@ def _check_time_synchronization(block_id, block_dict, extra_args): """ command = 'systemctl status systemd-timesyncd ntpd | grep "Active: active (running)"' output = _execute_shell_command(command, python_shell=True) - if output.strip() == '': + if output.strip() == "": return "neither ntpd nor timesyncd is running" else: return True + def _check_path_integrity(block_id, block_dict, extra_args): """ Ensure that system PATH variable is not malformed. @@ -515,64 +551,69 @@ def _check_path_integrity(block_id, block_dict, extra_args): """ output = _execute_shell_command(script, python_shell=True) - return True if output.strip() == '' else output + return True if output.strip() == "" else output + def _check_duplicate_uids(block_id, block_dict, extra_args): """ Return False if any duplicate user id exist in /etc/group file, else return True """ - uids = _execute_shell_command("cat /etc/passwd | cut -f3 -d\":\"", python_shell=True).strip() - uids = uids.split('\n') if uids != "" else [] + uids = _execute_shell_command('cat /etc/passwd | cut -f3 -d":"', python_shell=True).strip() + uids = uids.split("\n") if uids != "" else [] duplicate_uids = [k for k, v in Counter(uids).items() if v > 1] if duplicate_uids is None or duplicate_uids == []: return True return str(duplicate_uids) + def _check_duplicate_gids(block_id, block_dict, extra_args): """ Return False if any duplicate group id exist in /etc/group file, else return True """ - gids = _execute_shell_command("cat /etc/group | cut -f3 -d\":\"", python_shell=True).strip() - gids = gids.split('\n') if gids != "" else [] + gids = _execute_shell_command('cat /etc/group | cut -f3 -d":"', python_shell=True).strip() + gids = gids.split("\n") if gids != "" else [] duplicate_gids = [k for k, v in Counter(gids).items() if v > 1] if duplicate_gids is None or duplicate_gids == []: return True return str(duplicate_gids) + def _check_duplicate_unames(block_id, block_dict, extra_args): """ Return False if any duplicate user names exist in /etc/group file, else return True """ - unames = _execute_shell_command("cat /etc/passwd | cut -f1 -d\":\"", python_shell=True).strip() - unames = unames.split('\n') if unames != "" else [] + unames = _execute_shell_command('cat /etc/passwd | cut -f1 -d":"', python_shell=True).strip() + unames = unames.split("\n") if unames != "" else [] duplicate_unames = [k for k, v in Counter(unames).items() if v > 1] if duplicate_unames is None or duplicate_unames == []: return True return str(duplicate_unames) + def _check_duplicate_gnames(block_id, block_dict, extra_args): """ Return False if any duplicate group names exist in /etc/group file, else return True """ - gnames = _execute_shell_command("cat /etc/group | cut -f1 -d\":\"", python_shell=True).strip() - gnames = gnames.split('\n') if gnames != "" else [] + gnames = _execute_shell_command('cat /etc/group | cut -f1 -d":"', python_shell=True).strip() + gnames = gnames.split("\n") if gnames != "" else [] duplicate_gnames = [k for k, v in Counter(gnames).items() if v > 1] if duplicate_gnames is None or duplicate_gnames == []: return True return str(duplicate_gnames) + def _check_directory_files_permission(block_id, block_dict, extra_args=None): """ Check all files permission inside a directory """ - path = runner_utils.get_param_for_module(block_id, block_dict, 'path') - permission = runner_utils.get_param_for_module(block_id, block_dict, 'permission') + path = runner_utils.get_param_for_module(block_id, block_dict, "path") + permission = runner_utils.get_param_for_module(block_id, block_dict, "permission") - blacklisted_characters = '[^a-zA-Z0-9-_/]' + blacklisted_characters = "[^a-zA-Z0-9-_/]" if "-exec" in path or re.findall(blacklisted_characters, path): - raise CommandExecutionError("Profile parameter '{0}' not a safe pattern".format(path)) + raise CommandExecutionError("Profile parameter '{0}' not a safe pattern".format(path)) files_list = _execute_shell_command("find {0} -type f".format(path)).strip() - files_list = files_list.split('\n') if files_list != "" else [] + files_list = files_list.split("\n") if files_list != "" else [] bad_permission_files = [] for file_in_directory in files_list: per = _compare_file_stats(block_id, file_in_directory, permission, True) @@ -580,28 +621,29 @@ def _check_directory_files_permission(block_id, block_dict, extra_args=None): bad_permission_files += [file_in_directory + ": Bad Permission - " + per + ":"] return True if bad_permission_files == [] else str(bad_permission_files) + def _compare_file_stats(block_id, path, permission, allow_more_strict=False): - path_details = __mods__['file.stats'](path) + path_details = __mods__["file.stats"](path) comparator_args = { "type": "file_permission", - "match": { - "required_value": permission, - "allow_more_strict": allow_more_strict - } + "match": {"required_value": permission, "allow_more_strict": allow_more_strict}, } - ret_status, ret_val = hubblestack.module_runner.comparator.run( - block_id, comparator_args, path_details.get('mode')) - return True if ret_status else path_details.get('mode') + ret_status, ret_val = hubblestack.module_runner.comparator.run(block_id, comparator_args, path_details.get("mode")) + return True if ret_status else path_details.get("mode") + def _check_core_dumps(block_id, block_dict, extra_args): """ Ensure core dumps are restricted """ - hard_core_dump_value = _execute_shell_command("grep -R -E \"hard +core\" /etc/security/limits.conf /etc/security/limits.d/ | awk '{print $4}'", python_shell=True).strip() - hard_core_dump_value = hard_core_dump_value.split('\n') if hard_core_dump_value != "" else [] - if '0' in hard_core_dump_value: + hard_core_dump_value = _execute_shell_command( + "grep -R -E \"hard +core\" /etc/security/limits.conf /etc/security/limits.d/ | awk '{print $4}'", + python_shell=True, + ).strip() + hard_core_dump_value = hard_core_dump_value.split("\n") if hard_core_dump_value != "" else [] + if "0" in hard_core_dump_value: return True if hard_core_dump_value is None or hard_core_dump_value == [] or hard_core_dump_value == "": @@ -609,94 +651,118 @@ def _check_core_dumps(block_id, block_dict, extra_args): return str(hard_core_dump_value) + def _check_service_status(block_id, block_dict, extra_args=None): """ Ensure that the given service is in the required state. Return False if it is not in desired state Return True otherwise state can be enabled or disabled. """ - service_name = runner_utils.get_param_for_module(block_id, block_dict, 'service_name') - state = runner_utils.get_param_for_module(block_id, block_dict, 'state') + service_name = runner_utils.get_param_for_module(block_id, block_dict, "service_name") + state = runner_utils.get_param_for_module(block_id, block_dict, "state") - all_services = __mods__['cmd.run']('systemctl list-unit-files') + all_services = __mods__["cmd.run"]("systemctl list-unit-files") if re.search(service_name, all_services, re.M): - output = __mods__['cmd.retcode']('systemctl is-enabled ' + service_name, ignore_retcode=True) + output = __mods__["cmd.retcode"]("systemctl is-enabled " + service_name, ignore_retcode=True) if (state == "disabled" and str(output) == "1") or (state == "enabled" and str(output) == "0"): return True else: - return __mods__['cmd.run_stdout']('systemctl is-enabled ' + service_name, ignore_retcode=True) + return __mods__["cmd.run_stdout"]("systemctl is-enabled " + service_name, ignore_retcode=True) else: if state == "disabled": return True else: - return 'Looks like ' + service_name + ' does not exists. Please check.' + return "Looks like " + service_name + " does not exists. Please check." + def _check_ssh_timeout_config(block_id, block_dict, extra_args): """ Ensure SSH Idle Timeout Interval is configured """ - client_alive_interval = _execute_shell_command("grep \"^ClientAliveInterval\" /etc/ssh/sshd_config | awk '{print $NF}'", python_shell=True).strip() - if client_alive_interval != '' and int(client_alive_interval) <= 300: - client_alive_count_max = _execute_shell_command("grep \"^ClientAliveCountMax\" /etc/ssh/sshd_config | awk '{print $NF}'", python_shell=True).strip() - if client_alive_count_max != '' and int(client_alive_count_max) <= 3: + client_alive_interval = _execute_shell_command( + "grep \"^ClientAliveInterval\" /etc/ssh/sshd_config | awk '{print $NF}'", python_shell=True + ).strip() + if client_alive_interval != "" and int(client_alive_interval) <= 300: + client_alive_count_max = _execute_shell_command( + "grep \"^ClientAliveCountMax\" /etc/ssh/sshd_config | awk '{print $NF}'", python_shell=True + ).strip() + if client_alive_count_max != "" and int(client_alive_count_max) <= 3: return True else: return "ClientAliveCountMax value should be less than equal to 3" else: return "ClientAliveInterval value should be less than equal to 300" + def _check_all_users_home_directory(block_id, block_dict, extra_args=None): """ Ensure all users' home directories exist """ - max_system_uid = runner_utils.get_param_for_module(block_id, block_dict, 'max_system_uid') + max_system_uid = runner_utils.get_param_for_module(block_id, block_dict, "max_system_uid") max_system_uid = int(max_system_uid) - users_uids_dirs = _execute_shell_command("cat /etc/passwd | awk -F: '{ print $1 \" \" $3 \" \" $6 \" \" $7}'", python_shell=True).strip() - users_uids_dirs = users_uids_dirs.split('\n') if users_uids_dirs else [] + users_uids_dirs = _execute_shell_command( + 'cat /etc/passwd | awk -F: \'{ print $1 " " $3 " " $6 " " $7}\'', python_shell=True + ).strip() + users_uids_dirs = users_uids_dirs.split("\n") if users_uids_dirs else [] error = [] for user_data in users_uids_dirs: user_uid_dir = user_data.strip().split(" ") if len(user_uid_dir) < 4: - user_uid_dir = user_uid_dir + [''] * (4 - len(user_uid_dir)) + user_uid_dir = user_uid_dir + [""] * (4 - len(user_uid_dir)) if user_uid_dir[1].isdigit(): - if not _is_valid_home_directory(user_uid_dir[2], True) and int(user_uid_dir[1]) >= max_system_uid and user_uid_dir[0] != "nfsnobody" \ - and 'nologin' not in user_uid_dir[3] and 'false' not in user_uid_dir[3]: - error += ["Either home directory " + user_uid_dir[2] + " of user " + user_uid_dir[0] + " is invalid or does not exist."] + if ( + not _is_valid_home_directory(user_uid_dir[2], True) + and int(user_uid_dir[1]) >= max_system_uid + and user_uid_dir[0] != "nfsnobody" + and "nologin" not in user_uid_dir[3] + and "false" not in user_uid_dir[3] + ): + error += [ + "Either home directory " + + user_uid_dir[2] + + " of user " + + user_uid_dir[0] + + " is invalid or does not exist." + ] else: error += ["User " + user_uid_dir[0] + " has invalid uid " + user_uid_dir[1]] return True if not error else str(error) + def _check_users_home_directory_permissions(block_id, block_dict, extra_args=None): """ Ensure users' home directories permissions are 750 or more restrictive """ - max_allowed_permission = runner_utils.get_param_for_module(block_id, block_dict, 'max_allowed_permission', 750) - except_for_users = runner_utils.get_param_for_module(block_id, block_dict, 'except_for_users', '') + max_allowed_permission = runner_utils.get_param_for_module(block_id, block_dict, "max_allowed_permission", 750) + except_for_users = runner_utils.get_param_for_module(block_id, block_dict, "except_for_users", "") - users_list = ['root','halt','sync','shutdown'] + users_list = ["root", "halt", "sync", "shutdown"] for user in except_for_users.split(","): if user.strip() != "": users_list.append(user.strip()) users_dirs = [] - cmd = __mods__["cmd.run_all"]('egrep -v "^\+" /etc/passwd ') - for line in cmd['stdout'].split('\n'): - tokens = line.split(':') - if tokens[0] not in users_list and 'nologin' not in tokens[6] and 'false' not in tokens[6]: + cmd = __mods__["cmd.run_all"]('egrep -v "^\\+" /etc/passwd ') + for line in cmd["stdout"].split("\n"): + tokens = line.split(":") + if tokens[0] not in users_list and "nologin" not in tokens[6] and "false" not in tokens[6]: users_dirs.append(tokens[0] + " " + tokens[5]) error = [] for user_dir in users_dirs: user_dir = user_dir.split(" ") if len(user_dir) < 2: - user_dir = user_dir + [''] * (2 - len(user_dir)) + user_dir = user_dir + [""] * (2 - len(user_dir)) if _is_valid_home_directory(user_dir[1]): result = _compare_file_stats(block_id, user_dir[1], max_allowed_permission, True) if result is not True: - error += ["permission on home directory " + user_dir[1] + " of user " + user_dir[0] + " is wrong: " + result] + error += [ + "permission on home directory " + user_dir[1] + " of user " + user_dir[0] + " is wrong: " + result + ] return True if error == [] else str(error) + def _is_valid_home_directory(directory_path, check_slash_home=False): directory_path = None if directory_path is None else directory_path.strip() if directory_path is not None and directory_path != "" and os.path.isdir(directory_path): @@ -707,53 +773,81 @@ def _is_valid_home_directory(directory_path, check_slash_home=False): return False + def _check_users_own_their_home(block_id, block_dict, extra_args=None): """ Ensure users own their home directories """ - max_system_uid = runner_utils.get_param_for_module(block_id, block_dict, 'max_system_uid') + max_system_uid = runner_utils.get_param_for_module(block_id, block_dict, "max_system_uid") max_system_uid = int(max_system_uid) - users_uids_dirs = _execute_shell_command("cat /etc/passwd | awk -F: '{ print $1 \" \" $3 \" \" $6 \" \" $7}'", python_shell=True).strip() - users_uids_dirs = users_uids_dirs.split('\n') if users_uids_dirs != "" else [] + users_uids_dirs = _execute_shell_command( + 'cat /etc/passwd | awk -F: \'{ print $1 " " $3 " " $6 " " $7}\'', python_shell=True + ).strip() + users_uids_dirs = users_uids_dirs.split("\n") if users_uids_dirs != "" else [] error = [] for user_data in users_uids_dirs: user_uid_dir = user_data.strip().split(" ") if len(user_uid_dir) < 4: - user_uid_dir = user_uid_dir + [''] * (4 - len(user_uid_dir)) + user_uid_dir = user_uid_dir + [""] * (4 - len(user_uid_dir)) if user_uid_dir[1].isdigit(): if not _is_valid_home_directory(user_uid_dir[2]): - if int(user_uid_dir[1]) >= max_system_uid and 'nologin' not in user_uid_dir[3] and 'false' not in user_uid_dir[3]: - error += ["Either home directory " + user_uid_dir[2] + " of user " + user_uid_dir[0] + " is invalid or does not exist."] - elif int(user_uid_dir[1]) >= max_system_uid and user_uid_dir[0] != "nfsnobody" and 'nologin' not in user_uid_dir[3] \ - and 'false' not in user_uid_dir[3]: - owner = __mods__['cmd.run']("stat -L -c \"%U\" \"" + user_uid_dir[2] + "\"") + if ( + int(user_uid_dir[1]) >= max_system_uid + and "nologin" not in user_uid_dir[3] + and "false" not in user_uid_dir[3] + ): + error += [ + "Either home directory " + + user_uid_dir[2] + + " of user " + + user_uid_dir[0] + + " is invalid or does not exist." + ] + elif ( + int(user_uid_dir[1]) >= max_system_uid + and user_uid_dir[0] != "nfsnobody" + and "nologin" not in user_uid_dir[3] + and "false" not in user_uid_dir[3] + ): + owner = __mods__["cmd.run"]('stat -L -c "%U" "' + user_uid_dir[2] + '"') if owner != user_uid_dir[0]: - error += ["The home directory " + user_uid_dir[2] + " of user " + user_uid_dir[0] + " is owned by " + owner] + error += [ + "The home directory " + + user_uid_dir[2] + + " of user " + + user_uid_dir[0] + + " is owned by " + + owner + ] else: error += ["User " + user_uid_dir[0] + " has invalid uid " + user_uid_dir[1]] return True if not error else str(error) + def _check_users_dot_files(block_id, block_dict, extra_args): """ Ensure users' dot files are not group or world writable """ - users_dirs = _execute_shell_command("cat /etc/passwd | egrep -v '(root|halt|sync|shutdown)' | awk -F: '($7 != \"/sbin/nologin\") {print $1\" \"$6}'", python_shell=True).strip() - users_dirs = users_dirs.split('\n') if users_dirs != "" else [] + users_dirs = _execute_shell_command( + "cat /etc/passwd | egrep -v '(root|halt|sync|shutdown)' | awk -F: '($7 != \"/sbin/nologin\") {print $1\" \"$6}'", + python_shell=True, + ).strip() + users_dirs = users_dirs.split("\n") if users_dirs != "" else [] error = [] for user_dir in users_dirs: user_dir = user_dir.split() if len(user_dir) < 2: - user_dir = user_dir + [''] * (2 - len(user_dir)) + user_dir = user_dir + [""] * (2 - len(user_dir)) if _is_valid_home_directory(user_dir[1]): - dot_files = _execute_shell_command("find " + user_dir[1] + " -name \".*\"").strip() - dot_files = dot_files.split('\n') if dot_files != "" else [] + dot_files = _execute_shell_command("find " + user_dir[1] + ' -name ".*"').strip() + dot_files = dot_files.split("\n") if dot_files != "" else [] for dot_file in dot_files: if os.path.isfile(dot_file): - path_details = __mods__['file.stats'](dot_file) - given_permission = path_details.get('mode') + path_details = __mods__["file.stats"](dot_file) + given_permission = path_details.get("mode") file_permission = given_permission[-3:] if file_permission[1] in ["2", "3", "6", "7"]: error += ["Group Write permission set on file " + dot_file + " for user " + user_dir[0]] @@ -762,60 +856,68 @@ def _check_users_dot_files(block_id, block_dict, extra_args): return True if error == [] else str(error) + def _check_users_forward_files(block_id, block_dict, extra_args): """ Ensure no users have .forward files """ users_dirs = _execute_shell_command("cat /etc/passwd | awk -F: '{ print $1\" \"$6 }'", python_shell=True).strip() - users_dirs = users_dirs.split('\n') if users_dirs != "" else [] + users_dirs = users_dirs.split("\n") if users_dirs != "" else [] error = [] for user_dir in users_dirs: user_dir = user_dir.split() if len(user_dir) < 2: - user_dir = user_dir + [''] * (2 - len(user_dir)) + user_dir = user_dir + [""] * (2 - len(user_dir)) if _is_valid_home_directory(user_dir[1]): - forward_file = _execute_shell_command("find " + user_dir[1] + " -maxdepth 1 -name \".forward\"").strip() + forward_file = _execute_shell_command("find " + user_dir[1] + ' -maxdepth 1 -name ".forward"').strip() if forward_file is not None and os.path.isfile(forward_file): - error += ["Home directory: " + user_dir[1] + ", for user: " + user_dir[0] + " has " + forward_file + " file"] + error += [ + "Home directory: " + user_dir[1] + ", for user: " + user_dir[0] + " has " + forward_file + " file" + ] return True if error == [] else str(error) + def _check_users_netrc_files(block_id, block_dict, extra_args): """ Ensure no users have .netrc files """ users_dirs = _execute_shell_command("cat /etc/passwd | awk -F: '{ print $1\" \"$6 }'", python_shell=True).strip() - users_dirs = users_dirs.split('\n') if users_dirs != "" else [] + users_dirs = users_dirs.split("\n") if users_dirs != "" else [] error = [] for user_dir in users_dirs: user_dir = user_dir.split() if len(user_dir) < 2: - user_dir = user_dir + [''] * (2 - len(user_dir)) + user_dir = user_dir + [""] * (2 - len(user_dir)) if _is_valid_home_directory(user_dir[1]): - netrc_file = _execute_shell_command("find " + user_dir[1] + " -maxdepth 1 -name \".netrc\"").strip() + netrc_file = _execute_shell_command("find " + user_dir[1] + ' -maxdepth 1 -name ".netrc"').strip() if netrc_file is not None and os.path.isfile(netrc_file): error += ["Home directory: " + user_dir[1] + ", for user: " + user_dir[0] + " has .netrc file"] return True if error == [] else str(error) + def _check_groups_validity(block_id, block_dict, extra_args): """ Ensure all groups in /etc/passwd exist in /etc/group """ group_ids_in_passwd = _execute_shell_command("cut -s -d: -f4 /etc/passwd 2>/dev/null", python_shell=True).strip() - group_ids_in_passwd = group_ids_in_passwd.split('\n') if group_ids_in_passwd != "" else [] + group_ids_in_passwd = group_ids_in_passwd.split("\n") if group_ids_in_passwd != "" else [] group_ids_in_passwd = list(set(group_ids_in_passwd)) invalid_groups = [] for group_id in group_ids_in_passwd: - group_presence_validity = _execute_shell_command("getent group " + group_id + " 2>/dev/null 1>/dev/null; echo $?", python_shell=True).strip() + group_presence_validity = _execute_shell_command( + "getent group " + group_id + " 2>/dev/null 1>/dev/null; echo $?", python_shell=True + ).strip() if str(group_presence_validity) != "0": invalid_groups += ["Invalid groupid: " + group_id + " in /etc/passwd file"] return True if invalid_groups == [] else str(invalid_groups) + def _ensure_reverse_path_filtering(block_id, block_dict, extra_args): """ Ensure Reverse Path Filtering is enabled @@ -823,17 +925,17 @@ def _ensure_reverse_path_filtering(block_id, block_dict, extra_args): error_list = [] command = "sysctl net.ipv4.conf.all.rp_filter 2> /dev/null" output = _execute_shell_command(command, python_shell=True) - if output.strip() == '': + if output.strip() == "": error_list.append("net.ipv4.conf.all.rp_filter not found") - search_results = re.findall("rp_filter = (\d+)", output) + search_results = re.findall("rp_filter = (\\d+)", output) result = int(search_results[0]) if result < 1: error_list.append("net.ipv4.conf.all.rp_filter value set to " + str(result)) command = "sysctl net.ipv4.conf.default.rp_filter 2> /dev/null" output = _execute_shell_command(command, python_shell=True) - if output.strip() == '': + if output.strip() == "": error_list.append("net.ipv4.conf.default.rp_filter not found") - search_results = re.findall("rp_filter = (\d+)", output) + search_results = re.findall("rp_filter = (\\d+)", output) result = int(search_results[0]) if result < 1: error_list.append("net.ipv4.conf.default.rp_filter value set to " + str(result)) @@ -842,24 +944,29 @@ def _ensure_reverse_path_filtering(block_id, block_dict, extra_args): else: return True + def _check_users_rhosts_files(block_id, block_dict, extra_args): """ Ensure no users have .rhosts files """ - users_dirs = _execute_shell_command("cat /etc/passwd | egrep -v '(root|halt|sync|shutdown)' | awk -F: '($7 != \"/sbin/nologin\") {print $1\" \"$6}'", python_shell=True).strip() - users_dirs = users_dirs.split('\n') if users_dirs != "" else [] + users_dirs = _execute_shell_command( + "cat /etc/passwd | egrep -v '(root|halt|sync|shutdown)' | awk -F: '($7 != \"/sbin/nologin\") {print $1\" \"$6}'", + python_shell=True, + ).strip() + users_dirs = users_dirs.split("\n") if users_dirs != "" else [] error = [] for user_dir in users_dirs: user_dir = user_dir.split() if len(user_dir) < 2: - user_dir = user_dir + [''] * (2 - len(user_dir)) + user_dir = user_dir + [""] * (2 - len(user_dir)) if _is_valid_home_directory(user_dir[1]): - rhosts_file = _execute_shell_command("find " + user_dir[1] + " -maxdepth 1 -name \".rhosts\"").strip() + rhosts_file = _execute_shell_command("find " + user_dir[1] + ' -maxdepth 1 -name ".rhosts"').strip() if rhosts_file is not None and os.path.isfile(rhosts_file): error += ["Home directory: " + user_dir[1] + ", for user: " + user_dir[0] + " has .rhosts file"] return True if error == [] else str(error) + def _check_netrc_files_accessibility(block_id, block_dict, extra_args): """ Ensure users' .netrc Files are not group or world accessible @@ -894,12 +1001,10 @@ def _check_netrc_files_accessibility(block_id, block_dict, extra_args): """ output = _execute_shell_command(script, python_shell=True) - return True if output.strip() == '' else output + return True if output.strip() == "" else output -def _grep(path, - pattern, - *args): +def _grep(path, pattern, *args): """ Grep for a string in the specified file @@ -939,21 +1044,18 @@ def _grep(path, path = os.path.expanduser(path) if args: - options = ' '.join(args) + options = " ".join(args) else: - options = '' - cmd = ( - r'''grep {options} {pattern} {path}''' - .format( - options=options, - pattern=pattern, - path=path, - ) + options = "" + cmd = r"""grep {options} {pattern} {path}""".format( + options=options, + pattern=pattern, + path=path, ) try: log.info(cmd) - ret = __mods__['cmd.run_all'](cmd, python_shell=False, ignore_retcode=True) + ret = __mods__["cmd.run_all"](cmd, python_shell=False, ignore_retcode=True) except (IOError, OSError) as exc: raise CommandExecutionError(exc.strerror) @@ -962,55 +1064,55 @@ def _grep(path, def _check_list_values(block_id, block_dict, extra_args=None): """ - This function will first get the line matching given match_pattern. - After this value pattern will be extracted from the above line. - value pattern will be splitted by value_delimiter to get the list of values. - match_pattern will be regex patter for grep command. - value_pattern will be regex for re module of python to get matched values. - Only one of white_list and blacklist is allowed. - white_list and black_list should have comma(,) seperated values. - - audit profile example: - -sshd_hostbased_auth_coreos: - description: 'Ensure SSH HostbasedAuthentication is disabled' - tag: 'test1' - implementations: - - filter: - grains: 'G@osfinger:CentOS*Linux-7' - module: misc - items: - - args: - function: check_list_values - file_path: /srv/hubble/hubblestack_audit_profiles/ssh_config - match_pattern: '^restrict.*default' - value_pattern: '^restrict.*default(.*)$' - grep_arg: null - white_list: kod,nomodify,notrap,nopeer,noquery - black_list: null - value_delimter: ' ' - comparator: - type: "boolean" - match: true - """ - file_path = runner_utils.get_param_for_module(block_id, block_dict, 'file_path') - match_pattern = runner_utils.get_param_for_module(block_id, block_dict, 'match_pattern') - value_pattern = runner_utils.get_param_for_module(block_id, block_dict, 'value_pattern') - grep_arg = runner_utils.get_param_for_module(block_id, block_dict, 'grep_arg') - white_list = runner_utils.get_param_for_module(block_id, block_dict, 'white_list') - black_list = runner_utils.get_param_for_module(block_id, block_dict, 'black_list') - value_delimter = runner_utils.get_param_for_module(block_id, block_dict, 'value_delimter') + This function will first get the line matching given match_pattern. + After this value pattern will be extracted from the above line. + value pattern will be splitted by value_delimiter to get the list of values. + match_pattern will be regex patter for grep command. + value_pattern will be regex for re module of python to get matched values. + Only one of white_list and blacklist is allowed. + white_list and black_list should have comma(,) seperated values. + + audit profile example: + + sshd_hostbased_auth_coreos: + description: 'Ensure SSH HostbasedAuthentication is disabled' + tag: 'test1' + implementations: + - filter: + grains: 'G@osfinger:CentOS*Linux-7' + module: misc + items: + - args: + function: check_list_values + file_path: /srv/hubble/hubblestack_audit_profiles/ssh_config + match_pattern: '^restrict.*default' + value_pattern: '^restrict.*default(.*)$' + grep_arg: null + white_list: kod,nomodify,notrap,nopeer,noquery + black_list: null + value_delimter: ' ' + comparator: + type: "boolean" + match: true + """ + file_path = runner_utils.get_param_for_module(block_id, block_dict, "file_path") + match_pattern = runner_utils.get_param_for_module(block_id, block_dict, "match_pattern") + value_pattern = runner_utils.get_param_for_module(block_id, block_dict, "value_pattern") + grep_arg = runner_utils.get_param_for_module(block_id, block_dict, "grep_arg") + white_list = runner_utils.get_param_for_module(block_id, block_dict, "white_list") + black_list = runner_utils.get_param_for_module(block_id, block_dict, "black_list") + value_delimter = runner_utils.get_param_for_module(block_id, block_dict, "value_delimter") list_delimter = "," if black_list is not None and white_list is not None: return "Both black_list and white_list values are not allowed." grep_args = [] if grep_arg is None else [grep_arg] - matched_lines = _grep(file_path, match_pattern, *grep_args).get('stdout') + matched_lines = _grep(file_path, match_pattern, *grep_args).get("stdout") if not matched_lines: return "No match found for the given pattern: " + str(match_pattern) - matched_lines = matched_lines.split('\n') if matched_lines is not None else [] + matched_lines = matched_lines.split("\n") if matched_lines is not None else [] error = [] for matched_line in matched_lines: regexp = re.compile(value_pattern) @@ -1027,28 +1129,32 @@ def _check_list_values(block_id, block_dict, extra_args=None): return True if error == [] else str(error) + def _mail_conf_check(block_id, block_dict, extra_args): """ Ensure mail transfer agent is configured for local-only mode """ valid_addresses = ["localhost", "127.0.0.1", "::1"] - mail_addresses = _execute_shell_command("grep '^[[:blank:]]*inet_interfaces' /etc/postfix/main.cf | awk -F'=' '{print $2}'", python_shell=True).strip() + mail_addresses = _execute_shell_command( + "grep '^[[:blank:]]*inet_interfaces' /etc/postfix/main.cf | awk -F'=' '{print $2}'", python_shell=True + ).strip() mail_addresses = str(mail_addresses) - mail_addresses = mail_addresses.split(',') if mail_addresses != "" else [] + mail_addresses = mail_addresses.split(",") if mail_addresses != "" else [] mail_addresses = list(map(str.strip, mail_addresses)) invalid_addresses = list(set(mail_addresses) - set(valid_addresses)) return str(invalid_addresses) if invalid_addresses != [] else True + def _ensure_max_password_expiration(block_id, block_dict, extra_args=None): """ Ensure max password expiration days is set to the value less than or equal to that given in args """ - allow_max_days = runner_utils.get_param_for_module(block_id, block_dict, 'allow_max_days') - except_for_users = runner_utils.get_param_for_module(block_id, block_dict, 'except_for_users', '') + allow_max_days = runner_utils.get_param_for_module(block_id, block_dict, "allow_max_days") + except_for_users = runner_utils.get_param_for_module(block_id, block_dict, "except_for_users", "") grep_args = [] - pass_max_days_output = _grep('/etc/login.defs', '^PASS_MAX_DAYS', *grep_args).get('stdout') + pass_max_days_output = _grep("/etc/login.defs", "^PASS_MAX_DAYS", *grep_args).get("stdout") if not pass_max_days_output: return "PASS_MAX_DAYS must be set" system_pass_max_days = pass_max_days_output.split()[1] @@ -1058,24 +1164,36 @@ def _ensure_max_password_expiration(block_id, block_dict, extra_args=None): if int(system_pass_max_days) > allow_max_days: return "PASS_MAX_DAYS must be less than or equal to " + str(allow_max_days) - #fetch all users with passwords - grep_args.append('-E') - all_users = _grep('/etc/shadow', '^[^:]+:[^\!*]', *grep_args).get('stdout') + # fetch all users with passwords + grep_args.append("-E") + all_users = _grep("/etc/shadow", "^[^:]+:[^\\!*]", *grep_args).get("stdout") - except_for_users_list=[] + except_for_users_list = [] for user in except_for_users.split(","): if user.strip() != "": except_for_users_list.append(user.strip()) result = [] - for line in all_users.split('\n'): - user = line.split(':')[0] - #As per CIS doc, 5th field is the password max expiry days - user_passwd_expiry = line.split(':')[4] - if not user in except_for_users_list and _is_int(user_passwd_expiry) and int(user_passwd_expiry) > allow_max_days: - result.append('User ' + user + ' has max password expiry days ' + user_passwd_expiry + ', which is more than ' + str(allow_max_days)) + for line in all_users.split("\n"): + user = line.split(":")[0] + # As per CIS doc, 5th field is the password max expiry days + user_passwd_expiry = line.split(":")[4] + if ( + not user in except_for_users_list + and _is_int(user_passwd_expiry) + and int(user_passwd_expiry) > allow_max_days + ): + result.append( + "User " + + user + + " has max password expiry days " + + user_passwd_expiry + + ", which is more than " + + str(allow_max_days) + ) return True if result == [] else str(result) + def _is_int(input): try: num = int(input) @@ -1083,6 +1201,7 @@ def _is_int(input): return False return True + def _check_sshd_parameters(block_id, block_dict, extra_args=None): """ This function will check if any pattern passed is present in ssh service @@ -1096,7 +1215,7 @@ def _check_sshd_parameters(block_id, block_dict, extra_args=None): tag: CIS-1.1.1 function: check_sshd_paramters args: - - '^LogLevel\s+INFO' + - '^LogLevel\\s+INFO' description: Ensure SSH LogLevel is set to INFO 2) To check for only approved ciphers in any order sshd_approved_cipher: @@ -1111,19 +1230,19 @@ def _check_sshd_parameters(block_id, block_dict, extra_args=None): comparetype: only description: Ensure only approved ciphers are used """ - pattern = runner_utils.get_param_for_module(block_id, block_dict, 'pattern') - values = runner_utils.get_param_for_module(block_id, block_dict, 'values', None) - comparetype = runner_utils.get_param_for_module(block_id, block_dict, 'comparetype', 'regex') + pattern = runner_utils.get_param_for_module(block_id, block_dict, "pattern") + values = runner_utils.get_param_for_module(block_id, block_dict, "values", None) + comparetype = runner_utils.get_param_for_module(block_id, block_dict, "comparetype", "regex") - output = __mods__['cmd.run']('sshd -T') - if comparetype == 'only': + output = __mods__["cmd.run"]("sshd -T") + if comparetype == "only": if not values: return "You need to provide values for comparetype 'only'." else: for line in output.splitlines(): if re.match(pattern, line, re.I): - expected_values = values.split(',') - found_values = line[len(pattern):].strip().split(',') + expected_values = values.split(",") + found_values = line[len(pattern) :].strip().split(",") for found_value in found_values: if found_value in expected_values: continue @@ -1131,7 +1250,7 @@ def _check_sshd_parameters(block_id, block_dict, extra_args=None): return "Allowed values for pattern: " + pattern + " are " + values return True return "Looks like pattern i.e. " + pattern + " not found in sshd -T. Please check." - elif comparetype == 'regex': + elif comparetype == "regex": if re.search(pattern, output, re.M | re.I): return True else: @@ -1139,25 +1258,26 @@ def _check_sshd_parameters(block_id, block_dict, extra_args=None): else: return "The comparetype: " + comparetype + " not found. It can be 'regex' or 'only'. Please check." + def _test_mount_attrs(block_id, block_dict, extra_args=None): """ Ensure that a given directory is mounted with appropriate attributes If check_type is soft, then in absence of volume, True will be returned If check_type is hard, then in absence of volume, False will be returned """ - mount_name = runner_utils.get_param_for_module(block_id, block_dict, 'mount_name') - attribute = runner_utils.get_param_for_module(block_id, block_dict, 'attribute') - check_type = runner_utils.get_param_for_module(block_id, block_dict, 'check_type', 'hard') + mount_name = runner_utils.get_param_for_module(block_id, block_dict, "mount_name") + attribute = runner_utils.get_param_for_module(block_id, block_dict, "attribute") + check_type = runner_utils.get_param_for_module(block_id, block_dict, "check_type", "hard") # check that the path exists on system - command = 'test -e ' + mount_name - results = __mods__['cmd.run_all'](command, ignore_retcode=True) - retcode = results['retcode'] - if str(retcode) == '1': + command = "test -e " + mount_name + results = __mods__["cmd.run_all"](command, ignore_retcode=True) + retcode = results["retcode"] + if str(retcode) == "1": return True if check_type == "soft" else (mount_name + " folder does not exist") # if the path exits, proceed with following code - output = __mods__['cmd.run']('cat /proc/mounts') + output = __mods__["cmd.run"]("cat /proc/mounts") if not re.search(mount_name, output, re.M): return True if check_type == "soft" else (mount_name + " is not mounted") else: @@ -1166,6 +1286,7 @@ def _test_mount_attrs(block_id, block_dict, extra_args=None): return str(line) return True + def _test_success(block_id, block_dict, extra_args): """ Automatically returns success @@ -1184,47 +1305,48 @@ def _test_failure_reason(block_id, block_dict, extra_args): """ Automatically returns failure, with a reason """ - return runner_utils.get_param_for_module(block_id, block_dict, 'reason') + return runner_utils.get_param_for_module(block_id, block_dict, "reason") + def _execute_shell_command(cmd, python_shell=False): """ This function will execute passed command in /bin/shell """ - return __mods__['cmd.run'](cmd, python_shell=python_shell, shell='/bin/bash', ignore_retcode=True) + return __mods__["cmd.run"](cmd, python_shell=python_shell, shell="/bin/bash", ignore_retcode=True) FUNCTION_MAP = { - 'check_all_ports_firewall_rules': _check_all_ports_firewall_rules, - 'check_password_fields_not_empty': _check_password_fields_not_empty, - 'system_account_non_login': _system_account_non_login, - 'default_group_for_root': _default_group_for_root, - 'root_is_only_uid_0_account': _root_is_only_uid_0_account, - 'test_success': _test_success, - 'test_failure': _test_failure, - 'test_failure_reason': _test_failure_reason, - 'check_path_integrity': _check_path_integrity, - 'check_time_synchronization': _check_time_synchronization, - 'check_core_dumps': _check_core_dumps, - 'check_directory_files_permission': _check_directory_files_permission, - 'check_duplicate_gnames': _check_duplicate_gnames, - 'check_duplicate_unames': _check_duplicate_unames, - 'check_duplicate_gids': _check_duplicate_gids, - 'check_duplicate_uids': _check_duplicate_uids, - 'check_service_status': _check_service_status, - 'check_ssh_timeout_config': _check_ssh_timeout_config, - 'check_all_users_home_directory': _check_all_users_home_directory, - 'check_users_home_directory_permissions': _check_users_home_directory_permissions, - 'check_users_own_their_home': _check_users_own_their_home, - 'check_users_dot_files': _check_users_dot_files, - 'check_users_forward_files': _check_users_forward_files, - 'check_users_netrc_files': _check_users_netrc_files, - 'check_groups_validity': _check_groups_validity, - 'ensure_reverse_path_filtering': _ensure_reverse_path_filtering, - 'check_users_rhosts_files': _check_users_rhosts_files, - 'check_netrc_files_accessibility': _check_netrc_files_accessibility, - 'check_list_values': _check_list_values, - 'mail_conf_check': _mail_conf_check, - 'ensure_max_password_expiration': _ensure_max_password_expiration, - 'check_sshd_parameters': _check_sshd_parameters, - 'test_mount_attrs': _test_mount_attrs + "check_all_ports_firewall_rules": _check_all_ports_firewall_rules, + "check_password_fields_not_empty": _check_password_fields_not_empty, + "system_account_non_login": _system_account_non_login, + "default_group_for_root": _default_group_for_root, + "root_is_only_uid_0_account": _root_is_only_uid_0_account, + "test_success": _test_success, + "test_failure": _test_failure, + "test_failure_reason": _test_failure_reason, + "check_path_integrity": _check_path_integrity, + "check_time_synchronization": _check_time_synchronization, + "check_core_dumps": _check_core_dumps, + "check_directory_files_permission": _check_directory_files_permission, + "check_duplicate_gnames": _check_duplicate_gnames, + "check_duplicate_unames": _check_duplicate_unames, + "check_duplicate_gids": _check_duplicate_gids, + "check_duplicate_uids": _check_duplicate_uids, + "check_service_status": _check_service_status, + "check_ssh_timeout_config": _check_ssh_timeout_config, + "check_all_users_home_directory": _check_all_users_home_directory, + "check_users_home_directory_permissions": _check_users_home_directory_permissions, + "check_users_own_their_home": _check_users_own_their_home, + "check_users_dot_files": _check_users_dot_files, + "check_users_forward_files": _check_users_forward_files, + "check_users_netrc_files": _check_users_netrc_files, + "check_groups_validity": _check_groups_validity, + "ensure_reverse_path_filtering": _ensure_reverse_path_filtering, + "check_users_rhosts_files": _check_users_rhosts_files, + "check_netrc_files_accessibility": _check_netrc_files_accessibility, + "check_list_values": _check_list_values, + "mail_conf_check": _mail_conf_check, + "ensure_max_password_expiration": _ensure_max_password_expiration, + "check_sshd_parameters": _check_sshd_parameters, + "test_mount_attrs": _test_mount_attrs, } diff --git a/hubblestack/audit/win_reg.py b/hubblestack/audit/win_reg.py index 805cbae84..8c22942a0 100644 --- a/hubblestack/audit/win_reg.py +++ b/hubblestack/audit/win_reg.py @@ -1,9 +1,12 @@ # -*- encoding: utf-8 -*- +# pylint: disable=unused-argument +# ^^^ rather not pragma this disable, but I'm not sure what effect it'd have on audit's chaining +# to remove these unused arguments r""" Module for fetching registry values from windows registry Note: Now each module just returns its output (As Data gathering) - For Audit checks, comparison logic is now moved to comparators. + For Audit checks, comparison logic is now moved to comparators. See below sections for more understanding Usable in Modules @@ -17,24 +20,24 @@ Its a unique string within a yaml file. It is present on top of a yaml block -- description +- description Description of the check -- tag +- tag (Applicable only for Audit) Check tag value -- sub_check (Optional, default: false) +- sub_check (Optional, default: false) (Applicable only for Audit) If true, its individual result will not be counted in compliance It might be referred in some boolean expression -- failure_reason (Optional) +- failure_reason (Optional) (Applicable only for Audit) By default, module will generate failure reason string at runtime If this is passed, this will override module's actual failure reason -- invert_result (Optional, default: false) +- invert_result (Optional, default: false) (Applicable only for Audit) This is used to flip the boolean output from a check @@ -46,7 +49,7 @@ - grains (under filter) (Applicable only for Audit) - Any grains with and/or/not supported. This is used to filter whether + Any grains with and/or/not supported. This is used to filter whether this check can run on the current OS or not. To run this check on all OS, put a '*' @@ -68,7 +71,7 @@ (Applicable only for Audit) It takes a boolean (true/false) value. If its true, the implementation will not be executed. And true is returned - + This can be useful in cases where you don't have any implementation for some OS, and you want a result from the block. Else, your meta-check(bexpr) will be failed. @@ -76,7 +79,7 @@ (Applicable only for Audit) An array of multiple module implementations. At least one block is necessary. Each item in array will result into a boolean value. - If multiple module implementations exists, final result will be evaluated as + If multiple module implementations exists, final result will be evaluated as boolean AND (default, see parameter: check_eval_logic) - check_eval_logic (Optional, default: and) @@ -91,7 +94,7 @@ - comparator For the purpose of comparing output of module with expected values. Parameters depends upon the comparator used. - For detailed documentation on comparators, + For detailed documentation on comparators, read comparator's implementations at (/hubblestack/extmods/comparators/) FDG Schema @@ -203,9 +206,10 @@ log = logging.getLogger(__name__) + def __virtual__(): if not hubblestack.utils.platform.is_windows(): - return False, 'This audit module only runs on windows' + return False, "This audit module only runs on windows" return True @@ -225,43 +229,43 @@ def execute(block_id, block_dict, extra_args=None): returns: tuple of result(value) and status(boolean) """ - log.debug('Executing win_reg module for id: {0}'.format(block_id)) + log.debug("Executing win_reg module for id: {0}".format(block_id)) chained_result = runner_utils.get_chained_param(extra_args) if chained_result: reg_name = chained_result.get("name") else: - reg_name = runner_utils.get_param_for_module(block_id, block_dict, 'name') + reg_name = runner_utils.get_param_for_module(block_id, block_dict, "name") reg_dict = _reg_path_splitter(reg_name) - secret = _find_option_value_in_reg(reg_dict.get('hive'), reg_dict.get('key'), reg_dict.get('value')) + secret = _find_option_value_in_reg(reg_dict.get("hive"), reg_dict.get("key"), reg_dict.get("value")) result = {reg_name: secret} log.debug("win_reg module output for block_id %s, is %s", block_id, result) if secret is False: - return runner_utils.prepare_negative_result_for_module(block_id, - "registry value couldn't " - "be fetched for reg_name {0}".format(reg_name)) + return runner_utils.prepare_negative_result_for_module( + block_id, "registry value couldn't " "be fetched for reg_name {0}".format(reg_name) + ) return runner_utils.prepare_positive_result_for_module(block_id, result) def validate_params(block_id, block_dict, extra_args=None): r""" - Validate all mandatory params required for this module - - :param block_id: - id of the block - :param block_dict: - parameter for this module - :param extra_args: - Chained argument dictionary, (If any) - Example: {'chaining_args': {'result': "HKEY_LOCAL_MACHINE\Software\Policies\Microsoft\Windows\EventLog\Application\MaxSize", 'status': True}, - 'caller': 'Audit'} - - Raises: - HubbleCheckValidationError: For any validation error + Validate all mandatory params required for this module + + :param block_id: + id of the block + :param block_dict: + parameter for this module + :param extra_args: + Chained argument dictionary, (If any) + Example: {'chaining_args': {'result': "HKEY_LOCAL_MACHINE\Software\Policies\Microsoft\Windows\EventLog\Application\MaxSize", 'status': True}, + 'caller': 'Audit'} + + Raises: + HubbleCheckValidationError: For any validation error """ - log.debug('Module: win_reg. Start validating params for check-id: {0}'.format(block_id)) + log.debug("Module: win_reg. Start validating params for check-id: {0}".format(block_id)) error = {} @@ -272,13 +276,13 @@ def validate_params(block_id, block_dict, extra_args=None): if chained_result: chained_pkg_name = chained_result.get("name") else: - reg_name = runner_utils.get_param_for_module(block_id, block_dict, 'name') + reg_name = runner_utils.get_param_for_module(block_id, block_dict, "name") if not chained_pkg_name and not reg_name: - error['name'] = 'Mandatory parameter: name not found for id: %s' % block_id + error["name"] = "Mandatory parameter: name not found for id: %s" % block_id if error: raise HubbleCheckValidationError(error) - log.debug('Validation success for check-id: {0}'.format(block_id)) + log.debug("Validation success for check-id: {0}".format(block_id)) def get_filtered_params_to_log(block_id, block_dict, extra_args=None): @@ -294,26 +298,26 @@ def get_filtered_params_to_log(block_id, block_dict, extra_args=None): Example: {'chaining_args': {'result': "HKEY_LOCAL_MACHINE\Software\Policies\Microsoft\Windows\EventLog\Application\MaxSize", 'status': True}, 'caller': 'Audit'} """ - log.debug('get_filtered_params_to_log for win_reg and id: {0}'.format(block_id)) + log.debug("get_filtered_params_to_log for win_reg and id: {0}".format(block_id)) # fetch required param chained_result = runner_utils.get_chained_param(extra_args) if chained_result: reg_name = chained_result else: - reg_name = runner_utils.get_param_for_module(block_id, block_dict, 'name') + reg_name = runner_utils.get_param_for_module(block_id, block_dict, "name") - return {'name': reg_name} + return {"name": reg_name} def _reg_path_splitter(reg_path): dict_return = {} - dict_return['hive'], temp = reg_path.split('\\', 1) - if '\\\\*\\' in temp: - dict_return['key'], dict_return['value'] = temp.rsplit('\\\\', 1) - dict_return['value'] = '\\\\{}'.format(dict_return['value']) + dict_return["hive"], temp = reg_path.split("\\", 1) + if "\\\\*\\" in temp: + dict_return["key"], dict_return["value"] = temp.rsplit("\\\\", 1) + dict_return["value"] = "\\\\{}".format(dict_return["value"]) else: - dict_return['key'], dict_return['value'] = temp.rsplit('\\', 1) + dict_return["key"], dict_return["value"] = temp.rsplit("\\", 1) return dict_return @@ -323,17 +327,17 @@ def _find_option_value_in_reg(reg_hive, reg_key, reg_value): helper function to retrieve Windows registry settings for a particular option """ - if reg_hive.lower() in ('hku', 'hkey_users'): + if reg_hive.lower() in ("hku", "hkey_users"): key_list = [] ret_dict = {} - sid_return = __mods__['cmd.run']('reg query hku').split('\n') + sid_return = __mods__["cmd.run"]("reg query hku").split("\n") for line in sid_return: - if '\\' in line: - key_list.append(line.split('\\')[1].strip()) + if "\\" in line: + key_list.append(line.split("\\")[1].strip()) for sid in key_list: - if len(sid) <= 15 or '_Classes' in sid: + if len(sid) <= 15 or "_Classes" in sid: continue - temp_reg_key = reg_key.replace('', sid) + temp_reg_key = reg_key.replace("", sid) ret_dict[sid] = _read_reg_value(reg_hive, temp_reg_key, reg_value) return ret_dict else: @@ -341,18 +345,18 @@ def _find_option_value_in_reg(reg_hive, reg_key, reg_value): def _read_reg_value(reg_hive, reg_key, reg_value): - reg_result = __mods__['reg.read_value'](reg_hive, reg_key, reg_value) - if reg_result.get('success'): - if reg_result.get('vdata') == '(value not set)': + reg_result = __mods__["reg.read_value"](reg_hive, reg_key, reg_value) + if reg_result.get("success"): + if reg_result.get("vdata") == "(value not set)": return False else: - return reg_result.get('vdata') + return reg_result.get("vdata") else: return False def get_failure_reason(block_id, block_dict, extra_args=None): - """ + r""" The function is used to find the action that was performed during the audit check :param block_id: id of the block @@ -360,9 +364,9 @@ def get_failure_reason(block_id, block_dict, extra_args=None): parameter for this module :param extra_args: Extra argument dictionary, (If any) - Example: {'chaining_args': {'result': "HKEY_LOCAL_MACHINE\Software\Policies\Microsoft\Windows\EventLog\Application\MaxSize", 'status': True}, + Example: {'chaining_args': {'result': "HKEY_LOCAL_MACHINE\Software\Policies\Microsoft\Windows\EventLog\Applica 'caller': 'Audit'} :return: """ - reg_name = runner_utils.get_param_for_module(block_id, block_dict, 'name') + reg_name = runner_utils.get_param_for_module(block_id, block_dict, "name") return "Fetching registry information for {0}".format(reg_name) diff --git a/tests/unittests/grains/test_core.py b/tests/unittests/grains/test_core.py index bf856cdd0..84e53f67c 100644 --- a/tests/unittests/grains/test_core.py +++ b/tests/unittests/grains/test_core.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- -''' +""" :codeauthor: Erik Johnson -''' +""" import logging import os @@ -15,14 +15,8 @@ from tests.support.mixins import LoaderModuleMockMixin from tests.support.unit import TestCase, skipIf -from tests.support.mock import ( - Mock, - MagicMock, - patch, - mock_open, - NO_MOCK, - NO_MOCK_REASON -) +from tests.support.mock import Mock, MagicMock, patch, mock_open, NO_MOCK, NO_MOCK_REASON + # from unittest import skipIf, TestCase, mock import hubblestack.utils.dns @@ -38,22 +32,23 @@ # Globals IPv4Address = ipaddress.IPv4Address IPv6Address = ipaddress.IPv6Address -IP4_LOCAL = '127.0.0.1' -IP4_ADD1 = '10.0.0.1' -IP4_ADD2 = '10.0.0.2' -IP6_LOCAL = '::1' -IP6_ADD1 = '2001:4860:4860::8844' -IP6_ADD2 = '2001:4860:4860::8888' -IP6_ADD_SCOPE = 'fe80::6238:e0ff:fe06:3f6b%enp2s0' +IP4_LOCAL = "127.0.0.1" +IP4_ADD1 = "10.0.0.1" +IP4_ADD2 = "10.0.0.2" +IP6_LOCAL = "::1" +IP6_ADD1 = "2001:4860:4860::8844" +IP6_ADD2 = "2001:4860:4860::8888" +IP6_ADD_SCOPE = "fe80::6238:e0ff:fe06:3f6b%enp2s0" OS_RELEASE_DIR = os.path.join(os.path.dirname(__file__), "os-releases") @skipIf(NO_MOCK, NO_MOCK_REASON) @skipIf(not pytest, False) class CoreGrainsTestCase(TestCase, LoaderModuleMockMixin): - ''' + """ Test cases for core grains - ''' + """ + def setup_loader_modules(self): return {core: {}} @@ -63,122 +58,156 @@ def test_parse_etc_os_release(self, path_isfile_mock): with hubblestack.utils.files.fopen(os.path.join(OS_RELEASE_DIR, "ubuntu-17.10")) as os_release_file: os_release_content = os_release_file.read() with patch("hubblestack.utils.files.fopen", mock_open(read_data=os_release_content)): - os_release = core._parse_os_release( - '/etc/os-release', - '/usr/lib/os-release') - self.assertEqual(os_release, { - "NAME": "Ubuntu", - "VERSION": "17.10 (Artful Aardvark)", - "ID": "ubuntu", - "ID_LIKE": "debian", - "PRETTY_NAME": "Ubuntu 17.10", - "VERSION_ID": "17.10", - "HOME_URL": "https://www.ubuntu.com/", - "SUPPORT_URL": "https://help.ubuntu.com/", - "BUG_REPORT_URL": "https://bugs.launchpad.net/ubuntu/", - "PRIVACY_POLICY_URL": "https://www.ubuntu.com/legal/terms-and-policies/privacy-policy", - "VERSION_CODENAME": "artful", - "UBUNTU_CODENAME": "artful", - }) + os_release = core._parse_os_release("/etc/os-release", "/usr/lib/os-release") + self.assertEqual( + os_release, + { + "NAME": "Ubuntu", + "VERSION": "17.10 (Artful Aardvark)", + "ID": "ubuntu", + "ID_LIKE": "debian", + "PRETTY_NAME": "Ubuntu 17.10", + "VERSION_ID": "17.10", + "HOME_URL": "https://www.ubuntu.com/", + "SUPPORT_URL": "https://help.ubuntu.com/", + "BUG_REPORT_URL": "https://bugs.launchpad.net/ubuntu/", + "PRIVACY_POLICY_URL": "https://www.ubuntu.com/legal/terms-and-policies/privacy-policy", + "VERSION_CODENAME": "artful", + "UBUNTU_CODENAME": "artful", + }, + ) def test_parse_cpe_name_wfn(self): - ''' + """ Parse correct CPE_NAME data WFN formatted :return: - ''' - for cpe, cpe_ret in [('cpe:/o:opensuse:leap:15.0', - {'phase': None, 'version': '15.0', 'product': 'leap', - 'vendor': 'opensuse', 'part': 'operating system'}), - ('cpe:/o:vendor:product:42:beta', - {'phase': 'beta', 'version': '42', 'product': 'product', - 'vendor': 'vendor', 'part': 'operating system'})]: + """ + for cpe, cpe_ret in [ + ( + "cpe:/o:opensuse:leap:15.0", + { + "phase": None, + "version": "15.0", + "product": "leap", + "vendor": "opensuse", + "part": "operating system", + }, + ), + ( + "cpe:/o:vendor:product:42:beta", + { + "phase": "beta", + "version": "42", + "product": "product", + "vendor": "vendor", + "part": "operating system", + }, + ), + ]: ret = core._parse_cpe_name(cpe) for key in cpe_ret: assert key in ret assert cpe_ret[key] == ret[key] def test_parse_cpe_name_v23(self): - ''' + """ Parse correct CPE_NAME data v2.3 formatted :return: - ''' - for cpe, cpe_ret in [('cpe:2.3:o:microsoft:windows_xp:5.1.601:beta:*:*:*:*:*:*', - {'phase': 'beta', 'version': '5.1.601', 'product': 'windows_xp', - 'vendor': 'microsoft', 'part': 'operating system'}), - ('cpe:2.3:h:corellian:millenium_falcon:1.0:*:*:*:*:*:*:*', - {'phase': None, 'version': '1.0', 'product': 'millenium_falcon', - 'vendor': 'corellian', 'part': 'hardware'}), - ('cpe:2.3:*:dark_empire:light_saber:3.0:beta:*:*:*:*:*:*', - {'phase': 'beta', 'version': '3.0', 'product': 'light_saber', - 'vendor': 'dark_empire', 'part': None})]: + """ + for cpe, cpe_ret in [ + ( + "cpe:2.3:o:microsoft:windows_xp:5.1.601:beta:*:*:*:*:*:*", + { + "phase": "beta", + "version": "5.1.601", + "product": "windows_xp", + "vendor": "microsoft", + "part": "operating system", + }, + ), + ( + "cpe:2.3:h:corellian:millenium_falcon:1.0:*:*:*:*:*:*:*", + { + "phase": None, + "version": "1.0", + "product": "millenium_falcon", + "vendor": "corellian", + "part": "hardware", + }, + ), + ( + "cpe:2.3:*:dark_empire:light_saber:3.0:beta:*:*:*:*:*:*", + {"phase": "beta", "version": "3.0", "product": "light_saber", "vendor": "dark_empire", "part": None}, + ), + ]: ret = core._parse_cpe_name(cpe) for key in cpe_ret: assert key in ret assert cpe_ret[key] == ret[key] def test_parse_cpe_name_broken(self): - ''' + """ Parse broken CPE_NAME data :return: - ''' - for cpe in ['cpe:broken', 'cpe:broken:in:all:ways:*:*:*:*', - 'cpe:x:still:broken:123', 'who:/knows:what:is:here']: + """ + for cpe in [ + "cpe:broken", + "cpe:broken:in:all:ways:*:*:*:*", + "cpe:x:still:broken:123", + "who:/knows:what:is:here", + ]: assert core._parse_cpe_name(cpe) == {} def test_missing_os_release(self): - with patch('hubblestack.utils.files.fopen', mock_open(read_data={})): - os_release = core._parse_os_release('/etc/os-release', '/usr/lib/os-release') + with patch("hubblestack.utils.files.fopen", mock_open(read_data={})): + os_release = core._parse_os_release("/etc/os-release", "/usr/lib/os-release") self.assertEqual(os_release, {}) - @skipIf(not hubblestack.utils.platform.is_windows(), 'System is not Windows') + @skipIf(not hubblestack.utils.platform.is_windows(), "System is not Windows") def test__windows_platform_data(self): grains = core._windows_platform_data() - keys = ['biosversion', - 'osrelease', - 'kernelrelease', - 'motherboard', - 'serialnumber', - 'timezone', - 'manufacturer', - 'kernelversion', - 'osservicepack', - 'virtual', - 'productname', - 'osfullname', - 'osmanufacturer', - 'osversion', - 'windowsdomain'] + keys = [ + "biosversion", + "osrelease", + "kernelrelease", + "motherboard", + "serialnumber", + "timezone", + "manufacturer", + "kernelversion", + "osservicepack", + "virtual", + "productname", + "osfullname", + "osmanufacturer", + "osversion", + "windowsdomain", + ] for key in keys: self.assertIn(key, grains) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_gnu_slash_linux_in_os_name(self): - ''' + """ Test to return a list of all enabled services - ''' - _path_exists_map = { - '/proc/1/cmdline': False - } + """ + _path_exists_map = {"/proc/1/cmdline": False} _path_isfile_map = {} _cmd_run_map = { - 'dpkg --print-architecture': 'amd64', + "dpkg --print-architecture": "amd64", } path_exists_mock = MagicMock(side_effect=lambda x: _path_exists_map[x]) - path_isfile_mock = MagicMock( - side_effect=lambda x: _path_isfile_map.get(x, False) - ) - cmd_run_mock = MagicMock( - side_effect=lambda x: _cmd_run_map[x] - ) + path_isfile_mock = MagicMock(side_effect=lambda x: _path_isfile_map.get(x, False)) + cmd_run_mock = MagicMock(side_effect=lambda x: _cmd_run_map[x]) empty_mock = MagicMock(return_value={}) orig_import = __import__ - built_in = 'builtins' + built_in = "builtins" def _import_mock(name, *args): - if name == 'lsb_release': - raise ImportError('No module named lsb_release') + if name == "lsb_release": + raise ImportError("No module named lsb_release") return orig_import(name, *args) # - Skip the first if statement @@ -190,45 +219,54 @@ def _import_mock(name, *args): # - Make a bunch of functions return empty dicts, we don't care about # these grains for the purposes of this test. # - Mock the osarch - distro_mock = MagicMock(return_value=('Debian GNU/Linux', '8.3', '')) - with patch.object(hubblestack.utils.platform, 'is_proxy', - MagicMock(return_value=False)), \ - patch.object(core, '_linux_bin_exists', - MagicMock(return_value=False)), \ - patch.object(os.path, 'exists', path_exists_mock), \ - patch('{0}.__import__'.format(built_in), side_effect=_import_mock), \ - patch.object(os.path, 'isfile', path_isfile_mock), \ - patch.object(core, '_parse_lsb_release', empty_mock), \ - patch.object(core, '_parse_os_release', empty_mock), \ - patch.object(core, '_parse_lsb_release', empty_mock), \ - patch.object(core, 'linux_distribution', distro_mock), \ - patch.object(core, '_linux_cpudata', empty_mock), \ - patch.object(core, '_linux_gpu_data', empty_mock), \ - patch.object(core, '_memdata', empty_mock), \ - patch.object(core, '_hw_data', empty_mock), \ - patch.object(core, '_virtual', empty_mock), \ - patch.object(core, '_ps', empty_mock), \ - patch.dict(core.__mods__, {'cmd.run': cmd_run_mock}): + distro_mock = MagicMock(return_value=("Debian GNU/Linux", "8.3", "")) + with patch.object(hubblestack.utils.platform, "is_proxy", MagicMock(return_value=False)), patch.object( + core, "_linux_bin_exists", MagicMock(return_value=False) + ), patch.object(os.path, "exists", path_exists_mock), patch( + "{0}.__import__".format(built_in), side_effect=_import_mock + ), patch.object( + os.path, "isfile", path_isfile_mock + ), patch.object( + core, "_parse_lsb_release", empty_mock + ), patch.object( + core, "_parse_os_release", empty_mock + ), patch.object( + core, "_parse_lsb_release", empty_mock + ), patch.object( + core, "linux_distribution", distro_mock + ), patch.object( + core, "_linux_cpudata", empty_mock + ), patch.object( + core, "_linux_gpu_data", empty_mock + ), patch.object( + core, "_memdata", empty_mock + ), patch.object( + core, "_hw_data", empty_mock + ), patch.object( + core, "_virtual", empty_mock + ), patch.object( + core, "_ps", empty_mock + ), patch.dict( + core.__mods__, {"cmd.run": cmd_run_mock} + ): os_grains = core.os_data() - self.assertEqual(os_grains.get('os_family'), 'Debian') + self.assertEqual(os_grains.get("os_family"), "Debian") - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_suse_os_from_cpe_data(self): - ''' + """ Test if 'os' grain is parsed from CPE_NAME of /etc/os-release - ''' - _path_exists_map = { - '/proc/1/cmdline': False - } + """ + _path_exists_map = {"/proc/1/cmdline": False} _os_release_map = { - 'NAME': 'SLES', - 'VERSION': '12-SP1', - 'VERSION_ID': '12.1', - 'PRETTY_NAME': 'SUSE Linux Enterprise Server 12 SP1', - 'ID': 'sles', - 'ANSI_COLOR': '0;32', - 'CPE_NAME': 'cpe:/o:suse:sles:12:sp1' + "NAME": "SLES", + "VERSION": "12-SP1", + "VERSION_ID": "12.1", + "PRETTY_NAME": "SUSE Linux Enterprise Server 12 SP1", + "ID": "sles", + "ANSI_COLOR": "0;32", + "CPE_NAME": "cpe:/o:suse:sles:12:sp1", } path_exists_mock = MagicMock(side_effect=lambda x: _path_exists_map[x]) @@ -237,16 +275,14 @@ def test_suse_os_from_cpe_data(self): os_release_mock = MagicMock(return_value=_os_release_map) orig_import = __import__ - built_in = 'builtins' + built_in = "builtins" def _import_mock(name, *args): - if name == 'lsb_release': - raise ImportError('No module named lsb_release') + if name == "lsb_release": + raise ImportError("No module named lsb_release") return orig_import(name, *args) - distro_mock = MagicMock( - return_value=('SUSE Linux Enterprise Server ', '12', 'x86_64') - ) + distro_mock = MagicMock(return_value=("SUSE Linux Enterprise Server ", "12", "x86_64")) # - Skip the first if statement # - Skip the selinux/systemd stuff (not pertinent) @@ -255,52 +291,57 @@ def _import_mock(name, *args): # - Skip all the /etc/*-release stuff (not pertinent) # - Mock linux_distribution to give us the OS name that we want # - Mock the osarch - with patch.object(hubblestack.utils.platform, 'is_proxy', - MagicMock(return_value=False)), \ - patch.object(core, '_linux_bin_exists', - MagicMock(return_value=False)), \ - patch.object(os.path, 'exists', path_exists_mock), \ - patch('{0}.__import__'.format(built_in), - side_effect=_import_mock), \ - patch.object(os.path, 'isfile', MagicMock(return_value=False)), \ - patch.object(core, '_parse_os_release', os_release_mock), \ - patch.object(core, '_parse_lsb_release', empty_mock), \ - patch.object(core, 'linux_distribution', distro_mock), \ - patch.object(core, '_linux_gpu_data', empty_mock), \ - patch.object(core, '_hw_data', empty_mock), \ - patch.object(core, '_linux_cpudata', empty_mock), \ - patch.object(core, '_virtual', empty_mock), \ - patch.dict(core.__mods__, {'cmd.run': osarch_mock}): + with patch.object(hubblestack.utils.platform, "is_proxy", MagicMock(return_value=False)), patch.object( + core, "_linux_bin_exists", MagicMock(return_value=False) + ), patch.object(os.path, "exists", path_exists_mock), patch( + "{0}.__import__".format(built_in), side_effect=_import_mock + ), patch.object( + os.path, "isfile", MagicMock(return_value=False) + ), patch.object( + core, "_parse_os_release", os_release_mock + ), patch.object( + core, "_parse_lsb_release", empty_mock + ), patch.object( + core, "linux_distribution", distro_mock + ), patch.object( + core, "_linux_gpu_data", empty_mock + ), patch.object( + core, "_hw_data", empty_mock + ), patch.object( + core, "_linux_cpudata", empty_mock + ), patch.object( + core, "_virtual", empty_mock + ), patch.dict( + core.__mods__, {"cmd.run": osarch_mock} + ): os_grains = core.os_data() - self.assertEqual(os_grains.get('os_family'), 'Suse') - self.assertEqual(os_grains.get('os'), 'SUSE') + self.assertEqual(os_grains.get("os_family"), "Suse") + self.assertEqual(os_grains.get("os"), "SUSE") def _run_os_grains_tests(self, os_release_filename, os_release_map, expectation): - path_isfile_mock = MagicMock(side_effect=lambda x: x in os_release_map.get('files', [])) + path_isfile_mock = MagicMock(side_effect=lambda x: x in os_release_map.get("files", [])) empty_mock = MagicMock(return_value={}) osarch_mock = MagicMock(return_value="amd64") if os_release_filename: - os_release_data = core._parse_os_release( - os.path.join(OS_RELEASE_DIR, os_release_filename) - ) + os_release_data = core._parse_os_release(os.path.join(OS_RELEASE_DIR, os_release_filename)) else: - os_release_data = os_release_map.get('os_release_file', {}) + os_release_data = os_release_map.get("os_release_file", {}) os_release_mock = MagicMock(return_value=os_release_data) orig_import = __import__ - built_in = 'builtins' + built_in = "builtins" def _import_mock(name, *args): - if name == 'lsb_release': - raise ImportError('No module named lsb_release') + if name == "lsb_release": + raise ImportError("No module named lsb_release") return orig_import(name, *args) - suse_release_file = os_release_map.get('suse_release_file') + suse_release_file = os_release_map.get("suse_release_file") - file_contents = {'/proc/1/cmdline': ''} + file_contents = {"/proc/1/cmdline": ""} if suse_release_file: - file_contents['/etc/SuSE-release'] = suse_release_file + file_contents["/etc/SuSE-release"] = suse_release_file # - Skip the first if statement # - Skip the selinux/systemd stuff (not pertinent) @@ -309,769 +350,799 @@ def _import_mock(name, *args): # - Skip all the /etc/*-release stuff (not pertinent) # - Mock linux_distribution to give us the OS name that we want # - Mock the osarch - distro_mock = MagicMock(return_value=os_release_map['linux_distribution']) - with patch.object(hubblestack.utils.platform, 'is_proxy', MagicMock(return_value=False)), \ - patch.object(core, '_linux_bin_exists', MagicMock(return_value=False)), \ - patch.object(os.path, 'exists', path_isfile_mock), \ - patch('{0}.__import__'.format(built_in), side_effect=_import_mock), \ - patch.object(os.path, 'isfile', path_isfile_mock), \ - patch.object(core, '_parse_os_release', os_release_mock), \ - patch.object(core, '_parse_lsb_release', empty_mock), \ - patch('hubblestack.utils.files.fopen', mock_open(read_data=file_contents)), \ - patch.object(core, 'linux_distribution', distro_mock), \ - patch.object(core, '_linux_gpu_data', empty_mock), \ - patch.object(core, '_linux_cpudata', empty_mock), \ - patch.object(core, '_virtual', empty_mock), \ - patch.dict(core.__mods__, {'cmd.run': osarch_mock}): + distro_mock = MagicMock(return_value=os_release_map["linux_distribution"]) + with patch.object(hubblestack.utils.platform, "is_proxy", MagicMock(return_value=False)), patch.object( + core, "_linux_bin_exists", MagicMock(return_value=False) + ), patch.object(os.path, "exists", path_isfile_mock), patch( + "{0}.__import__".format(built_in), side_effect=_import_mock + ), patch.object( + os.path, "isfile", path_isfile_mock + ), patch.object( + core, "_parse_os_release", os_release_mock + ), patch.object( + core, "_parse_lsb_release", empty_mock + ), patch( + "hubblestack.utils.files.fopen", mock_open(read_data=file_contents) + ), patch.object( + core, "linux_distribution", distro_mock + ), patch.object( + core, "_linux_gpu_data", empty_mock + ), patch.object( + core, "_linux_cpudata", empty_mock + ), patch.object( + core, "_virtual", empty_mock + ), patch.dict( + core.__mods__, {"cmd.run": osarch_mock} + ): os_grains = core.os_data() - grains = {k: v for k, v in os_grains.items() - if k in set(["os", "os_family", "osfullname", "oscodename", "osfinger", - "osrelease", "osrelease_info", "osmajorrelease"])} + grains = { + k: v + for k, v in os_grains.items() + if k + in set( + [ + "os", + "os_family", + "osfullname", + "oscodename", + "osfinger", + "osrelease", + "osrelease_info", + "osmajorrelease", + ] + ) + } self.assertEqual(grains, expectation) def _run_suse_os_grains_tests(self, os_release_map, expectation): - os_release_map['linux_distribution'] = ('SUSE test', 'version', 'arch') - expectation['os'] = 'SUSE' - expectation['os_family'] = 'Suse' + os_release_map["linux_distribution"] = ("SUSE test", "version", "arch") + expectation["os"] = "SUSE" + expectation["os_family"] = "Suse" self._run_os_grains_tests(None, os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_suse_os_grains_sles11sp3(self): - ''' + """ Test if OS grains are parsed correctly in SLES 11 SP3 - ''' + """ _os_release_map = { - 'suse_release_file': textwrap.dedent(''' + "suse_release_file": textwrap.dedent( + """ SUSE Linux Enterprise Server 11 (x86_64) VERSION = 11 PATCHLEVEL = 3 - '''), - 'files': ["/etc/SuSE-release"], + """ + ), + "files": ["/etc/SuSE-release"], } expectation = { - 'oscodename': 'SUSE Linux Enterprise Server 11 SP3', - 'osfullname': "SLES", - 'osrelease': '11.3', - 'osrelease_info': (11, 3), - 'osmajorrelease': 11, - 'osfinger': 'SLES-11', + "oscodename": "SUSE Linux Enterprise Server 11 SP3", + "osfullname": "SLES", + "osrelease": "11.3", + "osrelease_info": (11, 3), + "osmajorrelease": 11, + "osfinger": "SLES-11", } self._run_suse_os_grains_tests(_os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_suse_os_grains_sles11sp4(self): - ''' + """ Test if OS grains are parsed correctly in SLES 11 SP4 - ''' + """ _os_release_map = { - 'os_release_file': { - 'NAME': 'SLES', - 'VERSION': '11.4', - 'VERSION_ID': '11.4', - 'PRETTY_NAME': 'SUSE Linux Enterprise Server 11 SP4', - 'ID': 'sles', - 'ANSI_COLOR': '0;32', - 'CPE_NAME': 'cpe:/o:suse:sles:11:4' + "os_release_file": { + "NAME": "SLES", + "VERSION": "11.4", + "VERSION_ID": "11.4", + "PRETTY_NAME": "SUSE Linux Enterprise Server 11 SP4", + "ID": "sles", + "ANSI_COLOR": "0;32", + "CPE_NAME": "cpe:/o:suse:sles:11:4", }, } expectation = { - 'oscodename': 'SUSE Linux Enterprise Server 11 SP4', - 'osfullname': "SLES", - 'osrelease': '11.4', - 'osrelease_info': (11, 4), - 'osmajorrelease': 11, - 'osfinger': 'SLES-11', + "oscodename": "SUSE Linux Enterprise Server 11 SP4", + "osfullname": "SLES", + "osrelease": "11.4", + "osrelease_info": (11, 4), + "osmajorrelease": 11, + "osfinger": "SLES-11", } self._run_suse_os_grains_tests(_os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_suse_os_grains_sles12(self): - ''' + """ Test if OS grains are parsed correctly in SLES 12 - ''' + """ _os_release_map = { - 'os_release_file': { - 'NAME': 'SLES', - 'VERSION': '12', - 'VERSION_ID': '12', - 'PRETTY_NAME': 'SUSE Linux Enterprise Server 12', - 'ID': 'sles', - 'ANSI_COLOR': '0;32', - 'CPE_NAME': 'cpe:/o:suse:sles:12' + "os_release_file": { + "NAME": "SLES", + "VERSION": "12", + "VERSION_ID": "12", + "PRETTY_NAME": "SUSE Linux Enterprise Server 12", + "ID": "sles", + "ANSI_COLOR": "0;32", + "CPE_NAME": "cpe:/o:suse:sles:12", }, } expectation = { - 'oscodename': 'SUSE Linux Enterprise Server 12', - 'osfullname': "SLES", - 'osrelease': '12', - 'osrelease_info': (12,), - 'osmajorrelease': 12, - 'osfinger': 'SLES-12', + "oscodename": "SUSE Linux Enterprise Server 12", + "osfullname": "SLES", + "osrelease": "12", + "osrelease_info": (12,), + "osmajorrelease": 12, + "osfinger": "SLES-12", } self._run_suse_os_grains_tests(_os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_suse_os_grains_sles12sp1(self): - ''' + """ Test if OS grains are parsed correctly in SLES 12 SP1 - ''' + """ _os_release_map = { - 'os_release_file': { - 'NAME': 'SLES', - 'VERSION': '12-SP1', - 'VERSION_ID': '12.1', - 'PRETTY_NAME': 'SUSE Linux Enterprise Server 12 SP1', - 'ID': 'sles', - 'ANSI_COLOR': '0;32', - 'CPE_NAME': 'cpe:/o:suse:sles:12:sp1' + "os_release_file": { + "NAME": "SLES", + "VERSION": "12-SP1", + "VERSION_ID": "12.1", + "PRETTY_NAME": "SUSE Linux Enterprise Server 12 SP1", + "ID": "sles", + "ANSI_COLOR": "0;32", + "CPE_NAME": "cpe:/o:suse:sles:12:sp1", }, } expectation = { - 'oscodename': 'SUSE Linux Enterprise Server 12 SP1', - 'osfullname': "SLES", - 'osrelease': '12.1', - 'osrelease_info': (12, 1), - 'osmajorrelease': 12, - 'osfinger': 'SLES-12', + "oscodename": "SUSE Linux Enterprise Server 12 SP1", + "osfullname": "SLES", + "osrelease": "12.1", + "osrelease_info": (12, 1), + "osmajorrelease": 12, + "osfinger": "SLES-12", } self._run_suse_os_grains_tests(_os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_suse_os_grains_opensuse_leap_42_1(self): - ''' + """ Test if OS grains are parsed correctly in openSUSE Leap 42.1 - ''' + """ _os_release_map = { - 'os_release_file': { - 'NAME': 'openSUSE Leap', - 'VERSION': '42.1', - 'VERSION_ID': '42.1', - 'PRETTY_NAME': 'openSUSE Leap 42.1 (x86_64)', - 'ID': 'opensuse', - 'ANSI_COLOR': '0;32', - 'CPE_NAME': 'cpe:/o:opensuse:opensuse:42.1' + "os_release_file": { + "NAME": "openSUSE Leap", + "VERSION": "42.1", + "VERSION_ID": "42.1", + "PRETTY_NAME": "openSUSE Leap 42.1 (x86_64)", + "ID": "opensuse", + "ANSI_COLOR": "0;32", + "CPE_NAME": "cpe:/o:opensuse:opensuse:42.1", }, } expectation = { - 'oscodename': 'openSUSE Leap 42.1 (x86_64)', - 'osfullname': "Leap", - 'osrelease': '42.1', - 'osrelease_info': (42, 1), - 'osmajorrelease': 42, - 'osfinger': 'Leap-42', + "oscodename": "openSUSE Leap 42.1 (x86_64)", + "osfullname": "Leap", + "osrelease": "42.1", + "osrelease_info": (42, 1), + "osmajorrelease": 42, + "osfinger": "Leap-42", } self._run_suse_os_grains_tests(_os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_suse_os_grains_tumbleweed(self): - ''' + """ Test if OS grains are parsed correctly in openSUSE Tumbleweed - ''' + """ _os_release_map = { - 'os_release_file': { - 'NAME': 'openSUSE', - 'VERSION': 'Tumbleweed', - 'VERSION_ID': '20160504', - 'PRETTY_NAME': 'openSUSE Tumbleweed (20160504) (x86_64)', - 'ID': 'opensuse', - 'ANSI_COLOR': '0;32', - 'CPE_NAME': 'cpe:/o:opensuse:opensuse:20160504' + "os_release_file": { + "NAME": "openSUSE", + "VERSION": "Tumbleweed", + "VERSION_ID": "20160504", + "PRETTY_NAME": "openSUSE Tumbleweed (20160504) (x86_64)", + "ID": "opensuse", + "ANSI_COLOR": "0;32", + "CPE_NAME": "cpe:/o:opensuse:opensuse:20160504", }, } expectation = { - 'oscodename': 'openSUSE Tumbleweed (20160504) (x86_64)', - 'osfullname': "Tumbleweed", - 'osrelease': '20160504', - 'osrelease_info': (20160504,), - 'osmajorrelease': 20160504, - 'osfinger': 'Tumbleweed-20160504', + "oscodename": "openSUSE Tumbleweed (20160504) (x86_64)", + "osfullname": "Tumbleweed", + "osrelease": "20160504", + "osrelease_info": (20160504,), + "osmajorrelease": 20160504, + "osfinger": "Tumbleweed-20160504", } self._run_suse_os_grains_tests(_os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_debian_7_os_grains(self): - ''' + """ Test if OS grains are parsed correctly in Debian 7 "wheezy" - ''' + """ _os_release_map = { - 'linux_distribution': ('debian', '7.11', ''), + "linux_distribution": ("debian", "7.11", ""), } expectation = { - 'os': 'Debian', - 'os_family': 'Debian', - 'oscodename': 'wheezy', - 'osfullname': 'Debian GNU/Linux', - 'osrelease': '7', - 'osrelease_info': (7,), - 'osmajorrelease': 7, - 'osfinger': 'Debian-7', + "os": "Debian", + "os_family": "Debian", + "oscodename": "wheezy", + "osfullname": "Debian GNU/Linux", + "osrelease": "7", + "osrelease_info": (7,), + "osmajorrelease": 7, + "osfinger": "Debian-7", } self._run_os_grains_tests("debian-7", _os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_debian_8_os_grains(self): - ''' + """ Test if OS grains are parsed correctly in Debian 8 "jessie" - ''' + """ _os_release_map = { - 'linux_distribution': ('debian', '8.10', ''), + "linux_distribution": ("debian", "8.10", ""), } expectation = { - 'os': 'Debian', - 'os_family': 'Debian', - 'oscodename': 'jessie', - 'osfullname': 'Debian GNU/Linux', - 'osrelease': '8', - 'osrelease_info': (8,), - 'osmajorrelease': 8, - 'osfinger': 'Debian-8', + "os": "Debian", + "os_family": "Debian", + "oscodename": "jessie", + "osfullname": "Debian GNU/Linux", + "osrelease": "8", + "osrelease_info": (8,), + "osmajorrelease": 8, + "osfinger": "Debian-8", } self._run_os_grains_tests("debian-8", _os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_debian_9_os_grains(self): - ''' + """ Test if OS grains are parsed correctly in Debian 9 "stretch" - ''' + """ _os_release_map = { - 'linux_distribution': ('debian', '9.3', ''), + "linux_distribution": ("debian", "9.3", ""), } expectation = { - 'os': 'Debian', - 'os_family': 'Debian', - 'oscodename': 'stretch', - 'osfullname': 'Debian GNU/Linux', - 'osrelease': '9', - 'osrelease_info': (9,), - 'osmajorrelease': 9, - 'osfinger': 'Debian-9', + "os": "Debian", + "os_family": "Debian", + "oscodename": "stretch", + "osfullname": "Debian GNU/Linux", + "osrelease": "9", + "osrelease_info": (9,), + "osmajorrelease": 9, + "osfinger": "Debian-9", } self._run_os_grains_tests("debian-9", _os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_ubuntu_xenial_os_grains(self): - ''' + """ Test if OS grains are parsed correctly in Ubuntu 16.04 "Xenial Xerus" - ''' + """ _os_release_map = { - 'linux_distribution': ('Ubuntu', '16.04', 'xenial'), + "linux_distribution": ("Ubuntu", "16.04", "xenial"), } expectation = { - 'os': 'Ubuntu', - 'os_family': 'Debian', - 'oscodename': 'xenial', - 'osfullname': 'Ubuntu', - 'osrelease': '16.04', - 'osrelease_info': (16, 4), - 'osmajorrelease': 16, - 'osfinger': 'Ubuntu-16.04', + "os": "Ubuntu", + "os_family": "Debian", + "oscodename": "xenial", + "osfullname": "Ubuntu", + "osrelease": "16.04", + "osrelease_info": (16, 4), + "osmajorrelease": 16, + "osfinger": "Ubuntu-16.04", } self._run_os_grains_tests("ubuntu-16.04", _os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_ubuntu_artful_os_grains(self): - ''' + """ Test if OS grains are parsed correctly in Ubuntu 17.10 "Artful Aardvark" - ''' + """ _os_release_map = { - 'linux_distribution': ('Ubuntu', '17.10', 'artful'), + "linux_distribution": ("Ubuntu", "17.10", "artful"), } expectation = { - 'os': 'Ubuntu', - 'os_family': 'Debian', - 'oscodename': 'artful', - 'osfullname': 'Ubuntu', - 'osrelease': '17.10', - 'osrelease_info': (17, 10), - 'osmajorrelease': 17, - 'osfinger': 'Ubuntu-17.10', + "os": "Ubuntu", + "os_family": "Debian", + "oscodename": "artful", + "osfullname": "Ubuntu", + "osrelease": "17.10", + "osrelease_info": (17, 10), + "osmajorrelease": 17, + "osfinger": "Ubuntu-17.10", } self._run_os_grains_tests("ubuntu-17.10", _os_release_map, expectation) - @skipIf(not hubblestack.utils.platform.is_windows(), 'System is not Windows') + @skipIf(not hubblestack.utils.platform.is_windows(), "System is not Windows") def test_windows_platform_data(self): - ''' + """ Test the _windows_platform_data function - ''' - grains = ['biosversion', 'kernelrelease', 'kernelversion', - 'manufacturer', 'motherboard', 'osfullname', 'osmanufacturer', - 'osrelease', 'osservicepack', 'osversion', 'productname', - 'serialnumber', 'timezone', 'virtual', 'windowsdomain', - 'windowsdomaintype'] + """ + grains = [ + "biosversion", + "kernelrelease", + "kernelversion", + "manufacturer", + "motherboard", + "osfullname", + "osmanufacturer", + "osrelease", + "osservicepack", + "osversion", + "productname", + "serialnumber", + "timezone", + "virtual", + "windowsdomain", + "windowsdomaintype", + ] returned_grains = core._windows_platform_data() for grain in grains: self.assertIn(grain, returned_grains) - valid_types = ['Unknown', 'Unjoined', 'Workgroup', 'Domain'] - self.assertIn(returned_grains['windowsdomaintype'], valid_types) - valid_releases = ['Vista', '7', '8', '8.1', '10', '2008Server', - '2008ServerR2', '2012Server', '2012ServerR2', - '2016Server', '2019Server'] - self.assertIn(returned_grains['osrelease'], valid_releases) + valid_types = ["Unknown", "Unjoined", "Workgroup", "Domain"] + self.assertIn(returned_grains["windowsdomaintype"], valid_types) + valid_releases = [ + "Vista", + "7", + "8", + "8.1", + "10", + "2008Server", + "2008ServerR2", + "2012Server", + "2012ServerR2", + "2016Server", + "2019Server", + ] + self.assertIn(returned_grains["osrelease"], valid_releases) def test__windows_os_release_grain(self): versions = { - 'Windows 10 Home': '10', - 'Windows 10 Pro': '10', - 'Windows 10 Pro for Workstations': '10', - 'Windows 10 Pro Education': '10', - 'Windows 10 Enterprise': '10', - 'Windows 10 Enterprise LTSB': '10', - 'Windows 10 Education': '10', - 'Windows 10 IoT Core': '10', - 'Windows 10 IoT Enterprise': '10', - 'Windows 10 S': '10', - 'Windows 8.1': '8.1', - 'Windows 8.1 Pro': '8.1', - 'Windows 8.1 Enterprise': '8.1', - 'Windows 8.1 OEM': '8.1', - 'Windows 8.1 with Bing': '8.1', - 'Windows 8': '8', - 'Windows 8 Pro': '8', - 'Windows 8 Enterprise': '8', - 'Windows 8 OEM': '8', - 'Windows 7 Starter': '7', - 'Windows 7 Home Basic': '7', - 'Windows 7 Home Premium': '7', - 'Windows 7 Professional': '7', - 'Windows 7 Enterprise': '7', - 'Windows 7 Ultimate': '7', - 'Windows Thin PC': 'Thin', - 'Windows Vista Starter': 'Vista', - 'Windows Vista Home Basic': 'Vista', - 'Windows Vista Home Premium': 'Vista', - 'Windows Vista Business': 'Vista', - 'Windows Vista Enterprise': 'Vista', - 'Windows Vista Ultimate': 'Vista', - 'Windows Server 2019 Essentials': '2019Server', - 'Windows Server 2019 Standard': '2019Server', - 'Windows Server 2019 Datacenter': '2019Server', - 'Windows Server 2016 Essentials': '2016Server', - 'Windows Server 2016 Standard': '2016Server', - 'Windows Server 2016 Datacenter': '2016Server', - 'Windows Server 2012 R2 Foundation': '2012ServerR2', - 'Windows Server 2012 R2 Essentials': '2012ServerR2', - 'Windows Server 2012 R2 Standard': '2012ServerR2', - 'Windows Server 2012 R2 Datacenter': '2012ServerR2', - 'Windows Server 2012 Foundation': '2012Server', - 'Windows Server 2012 Essentials': '2012Server', - 'Windows Server 2012 Standard': '2012Server', - 'Windows Server 2012 Datacenter': '2012Server', - 'Windows MultiPoint Server 2012': '2012Server', - 'Windows Small Business Server 2011': '2011Server', - 'Windows MultiPoint Server 2011': '2011Server', - 'Windows Home Server 2011': '2011Server', - 'Windows MultiPoint Server 2010': '2010Server', - 'Windows Server 2008 R2 Foundation': '2008ServerR2', - 'Windows Server 2008 R2 Standard': '2008ServerR2', - 'Windows Server 2008 R2 Enterprise': '2008ServerR2', - 'Windows Server 2008 R2 Datacenter': '2008ServerR2', - 'Windows Server 2008 R2 for Itanium-based Systems': '2008ServerR2', - 'Windows Web Server 2008 R2': '2008ServerR2', - 'Windows Storage Server 2008 R2': '2008ServerR2', - 'Windows HPC Server 2008 R2': '2008ServerR2', - 'Windows Server 2008 Standard': '2008Server', - 'Windows Server 2008 Enterprise': '2008Server', - 'Windows Server 2008 Datacenter': '2008Server', - 'Windows Server 2008 for Itanium-based Systems': '2008Server', - 'Windows Server Foundation 2008': '2008Server', - 'Windows Essential Business Server 2008': '2008Server', - 'Windows HPC Server 2008': '2008Server', - 'Windows Small Business Server 2008': '2008Server', - 'Windows Storage Server 2008': '2008Server', - 'Windows Web Server 2008': '2008Server' + "Windows 10 Home": "10", + "Windows 10 Pro": "10", + "Windows 10 Pro for Workstations": "10", + "Windows 10 Pro Education": "10", + "Windows 10 Enterprise": "10", + "Windows 10 Enterprise LTSB": "10", + "Windows 10 Education": "10", + "Windows 10 IoT Core": "10", + "Windows 10 IoT Enterprise": "10", + "Windows 10 S": "10", + "Windows 8.1": "8.1", + "Windows 8.1 Pro": "8.1", + "Windows 8.1 Enterprise": "8.1", + "Windows 8.1 OEM": "8.1", + "Windows 8.1 with Bing": "8.1", + "Windows 8": "8", + "Windows 8 Pro": "8", + "Windows 8 Enterprise": "8", + "Windows 8 OEM": "8", + "Windows 7 Starter": "7", + "Windows 7 Home Basic": "7", + "Windows 7 Home Premium": "7", + "Windows 7 Professional": "7", + "Windows 7 Enterprise": "7", + "Windows 7 Ultimate": "7", + "Windows Thin PC": "Thin", + "Windows Vista Starter": "Vista", + "Windows Vista Home Basic": "Vista", + "Windows Vista Home Premium": "Vista", + "Windows Vista Business": "Vista", + "Windows Vista Enterprise": "Vista", + "Windows Vista Ultimate": "Vista", + "Windows Server 2019 Essentials": "2019Server", + "Windows Server 2019 Standard": "2019Server", + "Windows Server 2019 Datacenter": "2019Server", + "Windows Server 2016 Essentials": "2016Server", + "Windows Server 2016 Standard": "2016Server", + "Windows Server 2016 Datacenter": "2016Server", + "Windows Server 2012 R2 Foundation": "2012ServerR2", + "Windows Server 2012 R2 Essentials": "2012ServerR2", + "Windows Server 2012 R2 Standard": "2012ServerR2", + "Windows Server 2012 R2 Datacenter": "2012ServerR2", + "Windows Server 2012 Foundation": "2012Server", + "Windows Server 2012 Essentials": "2012Server", + "Windows Server 2012 Standard": "2012Server", + "Windows Server 2012 Datacenter": "2012Server", + "Windows MultiPoint Server 2012": "2012Server", + "Windows Small Business Server 2011": "2011Server", + "Windows MultiPoint Server 2011": "2011Server", + "Windows Home Server 2011": "2011Server", + "Windows MultiPoint Server 2010": "2010Server", + "Windows Server 2008 R2 Foundation": "2008ServerR2", + "Windows Server 2008 R2 Standard": "2008ServerR2", + "Windows Server 2008 R2 Enterprise": "2008ServerR2", + "Windows Server 2008 R2 Datacenter": "2008ServerR2", + "Windows Server 2008 R2 for Itanium-based Systems": "2008ServerR2", + "Windows Web Server 2008 R2": "2008ServerR2", + "Windows Storage Server 2008 R2": "2008ServerR2", + "Windows HPC Server 2008 R2": "2008ServerR2", + "Windows Server 2008 Standard": "2008Server", + "Windows Server 2008 Enterprise": "2008Server", + "Windows Server 2008 Datacenter": "2008Server", + "Windows Server 2008 for Itanium-based Systems": "2008Server", + "Windows Server Foundation 2008": "2008Server", + "Windows Essential Business Server 2008": "2008Server", + "Windows HPC Server 2008": "2008Server", + "Windows Small Business Server 2008": "2008Server", + "Windows Storage Server 2008": "2008Server", + "Windows Web Server 2008": "2008Server", } for caption in versions: version = core._windows_os_release_grain(caption, 1) self.assertEqual( version, versions[caption], - 'version: {0}\n' - 'found: {1}\n' - 'caption: {2}'.format(version, versions[caption], caption) + "version: {0}\n" "found: {1}\n" "caption: {2}".format(version, versions[caption], caption), ) embedded_versions = { - 'Windows Embedded 8.1 Industry Pro': '8.1', - 'Windows Embedded 8 Industry Pro': '8', - 'Windows POSReady 7': '7', - 'Windows Embedded Standard 7': '7', - 'Windows Embedded POSReady 2009': '2009', - 'Windows Embedded Standard 2009': '2009', - 'Windows XP Embedded': 'XP', + "Windows Embedded 8.1 Industry Pro": "8.1", + "Windows Embedded 8 Industry Pro": "8", + "Windows POSReady 7": "7", + "Windows Embedded Standard 7": "7", + "Windows Embedded POSReady 2009": "2009", + "Windows Embedded Standard 2009": "2009", + "Windows XP Embedded": "XP", } for caption in embedded_versions: version = core._windows_os_release_grain(caption, 1) self.assertEqual( version, embedded_versions[caption], - '{0} != {1}\n' - 'version: {0}\n' - 'found: {1}\n' - 'caption: {2}'.format(version, embedded_versions[caption], caption) + "{0} != {1}\n" + "version: {0}\n" + "found: {1}\n" + "caption: {2}".format(version, embedded_versions[caption], caption), ) # Special Cases # Windows Embedded Standard is Windows 7 - caption = 'Windows Embedded Standard' - with patch('platform.release', MagicMock(return_value='7')): + caption = "Windows Embedded Standard" + with patch("platform.release", MagicMock(return_value="7")): version = core._windows_os_release_grain(caption, 1) - self.assertEqual(version, '7') + self.assertEqual(version, "7") - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_linux_memdata(self): - ''' + """ Test memdata on Linux systems - ''' - _proc_meminfo = textwrap.dedent('''\ + """ + _proc_meminfo = textwrap.dedent( + """\ MemTotal: 16277028 kB - SwapTotal: 4789244 kB''') - with patch('hubblestack.utils.files.fopen', mock_open(read_data=_proc_meminfo)): + SwapTotal: 4789244 kB""" + ) + with patch("hubblestack.utils.files.fopen", mock_open(read_data=_proc_meminfo)): memdata = core._linux_memdata() - self.assertEqual(memdata.get('mem_total'), 15895) - self.assertEqual(memdata.get('swap_total'), 4676) + self.assertEqual(memdata.get("mem_total"), 15895) + self.assertEqual(memdata.get("swap_total"), 4676) - @skipIf(hubblestack.utils.platform.is_windows(), 'System is Windows') + @skipIf(hubblestack.utils.platform.is_windows(), "System is Windows") def test_bsd_memdata(self): - ''' + """ Test to memdata on *BSD systems - ''' + """ _path_exists_map = {} _path_isfile_map = {} _cmd_run_map = { - 'freebsd-version -u': '10.3-RELEASE', - '/sbin/sysctl -n hw.physmem': '2121781248', - '/sbin/sysctl -n vm.swap_total': '419430400' + "freebsd-version -u": "10.3-RELEASE", + "/sbin/sysctl -n hw.physmem": "2121781248", + "/sbin/sysctl -n vm.swap_total": "419430400", } path_exists_mock = MagicMock(side_effect=lambda x: _path_exists_map[x]) - path_isfile_mock = MagicMock( - side_effect=lambda x: _path_isfile_map.get(x, False) - ) - cmd_run_mock = MagicMock( - side_effect=lambda x: _cmd_run_map[x] - ) + path_isfile_mock = MagicMock(side_effect=lambda x: _path_isfile_map.get(x, False)) + cmd_run_mock = MagicMock(side_effect=lambda x: _cmd_run_map[x]) empty_mock = MagicMock(return_value={}) - mock_freebsd_uname = ('FreeBSD', - 'freebsd10.3-hostname-8148', - '10.3-RELEASE', - 'FreeBSD 10.3-RELEASE #0 r297264: Fri Mar 25 02:10:02 UTC 2016 root@releng1.nyi.freebsd.org:/usr/obj/usr/src/sys/GENERIC', - 'amd64', - 'amd64') - - with patch('platform.uname', - MagicMock(return_value=mock_freebsd_uname)): - with patch.object(hubblestack.utils.platform, 'is_linux', - MagicMock(return_value=False)): - with patch.object(hubblestack.utils.platform, 'is_freebsd', - MagicMock(return_value=True)): + mock_freebsd_uname = ( + "FreeBSD", + "freebsd10.3-hostname-8148", + "10.3-RELEASE", + "FreeBSD 10.3-RELEASE #0 r297264: Fri Mar 25 02:10:02 UTC 2016 root@releng1.nyi.freebsd.org:/usr/obj/usr/src/sys/GENERIC", + "amd64", + "amd64", + ) + + with patch("platform.uname", MagicMock(return_value=mock_freebsd_uname)): + with patch.object(hubblestack.utils.platform, "is_linux", MagicMock(return_value=False)): + with patch.object(hubblestack.utils.platform, "is_freebsd", MagicMock(return_value=True)): # Skip the first if statement - with patch.object(hubblestack.utils.platform, 'is_proxy', - MagicMock(return_value=False)): + with patch.object(hubblestack.utils.platform, "is_proxy", MagicMock(return_value=False)): # Skip the init grain compilation (not pertinent) - with patch.object(os.path, 'exists', path_exists_mock): - with patch('hubblestack.utils.path.which') as mock: - mock.return_value = '/sbin/sysctl' + with patch.object(os.path, "exists", path_exists_mock): + with patch("hubblestack.utils.path.which") as mock: + mock.return_value = "/sbin/sysctl" # Make a bunch of functions return empty dicts, # we don't care about these grains for the # purposes of this test. - with patch.object( - core, - '_bsd_cpudata', - empty_mock): - with patch.object( - core, - '_hw_data', - empty_mock): - with patch.object( - core, - '_virtual', - empty_mock): - with patch.object( - core, - '_ps', - empty_mock): + with patch.object(core, "_bsd_cpudata", empty_mock): + with patch.object(core, "_hw_data", empty_mock): + with patch.object(core, "_virtual", empty_mock): + with patch.object(core, "_ps", empty_mock): # Mock the osarch - with patch.dict( - core.__mods__, - {'cmd.run': cmd_run_mock}): + with patch.dict(core.__mods__, {"cmd.run": cmd_run_mock}): os_grains = core.os_data() - self.assertEqual(os_grains.get('mem_total'), 2023) - self.assertEqual(os_grains.get('swap_total'), 400) + self.assertEqual(os_grains.get("mem_total"), 2023) + self.assertEqual(os_grains.get("swap_total"), 400) - @skipIf(hubblestack.utils.platform.is_windows(), 'System is Windows') + @skipIf(hubblestack.utils.platform.is_windows(), "System is Windows") def test_docker_virtual(self): - ''' + """ Test if OS grains are parsed correctly in Ubuntu Xenial Xerus - ''' - with patch.object(os.path, 'isdir', MagicMock(return_value=False)): - with patch.object(os.path, - 'isfile', - MagicMock(side_effect=lambda x: True if x == '/proc/1/cgroup' else False)): - for cgroup_substr in (':/system.slice/docker', ':/docker/', - ':/docker-ce/'): - cgroup_data = \ - '10:memory{0}a_long_sha256sum'.format(cgroup_substr) - log.debug( - 'Testing Docker cgroup substring \'%s\'', cgroup_substr) - with patch('hubblestack.utils.files.fopen', mock_open(read_data=cgroup_data)): - with patch.dict(core.__mods__, {'cmd.run_all': MagicMock()}): - self.assertEqual( - core._virtual({'kernel': 'Linux'}).get('virtual_subtype'), - 'Docker' - ) - - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') - def test_xen_virtual(self): - ''' - Test if OS grains are parsed correctly in Ubuntu Xenial Xerus - ''' - with patch.object(os.path, 'isfile', MagicMock(return_value=False)): - with patch.dict(core.__mods__, {'cmd.run': MagicMock(return_value='')}), \ - patch.object(os.path, - 'isfile', - MagicMock(side_effect=lambda x: True if x == '/sys/bus/xen/drivers/xenconsole' else False)): - log.debug('Testing Xen') - self.assertEqual( - core._virtual({'kernel': 'Linux'}).get('virtual_subtype'), - 'Xen PV DomU' - ) + """ + with patch.object(os.path, "isdir", MagicMock(return_value=False)): + with patch.object( + os.path, "isfile", MagicMock(side_effect=lambda x: True if x == "/proc/1/cgroup" else False) + ): + for cgroup_substr in (":/system.slice/docker", ":/docker/", ":/docker-ce/"): + cgroup_data = "10:memory{0}a_long_sha256sum".format(cgroup_substr) + log.debug("Testing Docker cgroup substring '%s'", cgroup_substr) + with patch("hubblestack.utils.files.fopen", mock_open(read_data=cgroup_data)): + with patch.dict(core.__mods__, {"cmd.run_all": MagicMock()}): + self.assertEqual(core._virtual({"kernel": "Linux"}).get("virtual_subtype"), "Docker") + + # @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + # def test_xen_virtual(self): + # ''' + # Test if OS grains are parsed correctly in Ubuntu Xenial Xerus + # ''' + # with patch.object(os.path, 'isfile', MagicMock(return_value=False)): + # with patch.dict(core.__mods__, {'cmd.run': MagicMock(return_value='')}), \ + # patch.object(os.path, + # 'isfile', + # MagicMock(side_effect=lambda x: True if x == '/sys/bus/xen/drivers/xenconsole' else False)): + # log.debug('Testing Xen') + # self.assertEqual( + # core._virtual({'kernel': 'Linux'}).get('virtual_subtype'), + # 'Xen PV DomU' + # ) def _check_ipaddress(self, value, ip_v): - ''' + """ check if ip address in a list is valid - ''' + """ for val in value: assert isinstance(val, str) - ip_method = 'is_ipv{0}'.format(ip_v) + ip_method = "is_ipv{0}".format(ip_v) self.assertTrue(getattr(hubblestack.utils.network, ip_method)(val)) def _check_empty(self, key, value, empty): - ''' + """ if empty is False and value does not exist assert error if empty is True and value exists assert error - ''' + """ if not empty and not value: raise Exception("{0} is empty, expecting a value".format(key)) elif empty and value: - raise Exception("{0} is suppose to be empty. value: {1} \ - exists".format(key, value)) + raise Exception( + "{0} is suppose to be empty. value: {1} \ + exists".format( + key, value + ) + ) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_fqdn_return(self): - ''' + """ test ip4 and ip6 return values - ''' + """ net_ip4_mock = [IP4_LOCAL, IP4_ADD1, IP4_ADD2] net_ip6_mock = [IP6_LOCAL, IP6_ADD1, IP6_ADD2] - self._run_fqdn_tests(net_ip4_mock, net_ip6_mock, - ip4_empty=False, ip6_empty=False) + self._run_fqdn_tests(net_ip4_mock, net_ip6_mock, ip4_empty=False, ip6_empty=False) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_fqdn6_empty(self): - ''' + """ test when ip6 is empty - ''' + """ net_ip4_mock = [IP4_LOCAL, IP4_ADD1, IP4_ADD2] net_ip6_mock = [] - self._run_fqdn_tests(net_ip4_mock, net_ip6_mock, - ip4_empty=False) + self._run_fqdn_tests(net_ip4_mock, net_ip6_mock, ip4_empty=False) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_fqdn4_empty(self): - ''' + """ test when ip4 is empty - ''' + """ net_ip4_mock = [] net_ip6_mock = [IP6_LOCAL, IP6_ADD1, IP6_ADD2] - self._run_fqdn_tests(net_ip4_mock, net_ip6_mock, - ip6_empty=False) + self._run_fqdn_tests(net_ip4_mock, net_ip6_mock, ip6_empty=False) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_fqdn_all_empty(self): - ''' + """ test when both ip4 and ip6 are empty - ''' + """ net_ip4_mock = [] net_ip6_mock = [] self._run_fqdn_tests(net_ip4_mock, net_ip6_mock) - def _run_fqdn_tests(self, net_ip4_mock, net_ip6_mock, - ip6_empty=True, ip4_empty=True): - + def _run_fqdn_tests(self, net_ip4_mock, net_ip6_mock, ip6_empty=True, ip4_empty=True): def _check_type(key, value, ip4_empty, ip6_empty): - ''' + """ check type and other checks - ''' + """ assert isinstance(value, list) - if '4' in key: + if "4" in key: self._check_empty(key, value, ip4_empty) - self._check_ipaddress(value, ip_v='4') - elif '6' in key: + self._check_ipaddress(value, ip_v="4") + elif "6" in key: self._check_empty(key, value, ip6_empty) - self._check_ipaddress(value, ip_v='6') - - ip4_mock = [(2, 1, 6, '', (IP4_ADD1, 0)), - (2, 3, 0, '', (IP4_ADD2, 0))] - ip6_mock = [(10, 1, 6, '', (IP6_ADD1, 0, 0, 0)), - (10, 3, 0, '', (IP6_ADD2, 0, 0, 0))] - - with patch.dict(core.__opts__, {'ipv6': False}): - with patch.object(hubblestack.utils.network, 'ip_addrs', - MagicMock(return_value=net_ip4_mock)): - with patch.object(hubblestack.utils.network, 'ip_addrs6', - MagicMock(return_value=net_ip6_mock)): - with patch.object(core.socket, 'getaddrinfo', side_effect=[ip4_mock, ip6_mock]): + self._check_ipaddress(value, ip_v="6") + + ip4_mock = [(2, 1, 6, "", (IP4_ADD1, 0)), (2, 3, 0, "", (IP4_ADD2, 0))] + ip6_mock = [(10, 1, 6, "", (IP6_ADD1, 0, 0, 0)), (10, 3, 0, "", (IP6_ADD2, 0, 0, 0))] + + with patch.dict(core.__opts__, {"ipv6": False}): + with patch.object(hubblestack.utils.network, "ip_addrs", MagicMock(return_value=net_ip4_mock)): + with patch.object(hubblestack.utils.network, "ip_addrs6", MagicMock(return_value=net_ip6_mock)): + with patch.object(core.socket, "getaddrinfo", side_effect=[ip4_mock, ip6_mock]): get_fqdn = core.ip_fqdn() - ret_keys = ['fqdn_ip4', 'fqdn_ip6', 'ipv4', 'ipv6'] + ret_keys = ["fqdn_ip4", "fqdn_ip6", "ipv4", "ipv6"] for key in ret_keys: value = get_fqdn[key] _check_type(key, value, ip4_empty, ip6_empty) - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') - @patch.object(hubblestack.utils.platform, 'is_windows', MagicMock(return_value=False)) - @patch('hubblestack.grains.core.__opts__', {'ipv6': False}) + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") + @patch.object(hubblestack.utils.platform, "is_windows", MagicMock(return_value=False)) + @patch("hubblestack.grains.core.__opts__", {"ipv6": False}) def test_dns_return(self): - ''' + """ test the return for a dns grain. test for issue: https://github.com/saltstack/salt/issues/41230 - ''' - resolv_mock = {'domain': '', 'sortlist': [], 'nameservers': - [ipaddress.IPv4Address(IP4_ADD1), - ipaddress.IPv6Address(IP6_ADD1), - IP6_ADD_SCOPE], 'ip4_nameservers': - [ipaddress.IPv4Address(IP4_ADD1)], - 'search': ['test.saltstack.com'], 'ip6_nameservers': - [ipaddress.IPv6Address(IP6_ADD1), - IP6_ADD_SCOPE], 'options': []} - ret = {'dns': {'domain': '', 'sortlist': [], 'nameservers': - [IP4_ADD1, IP6_ADD1, - IP6_ADD_SCOPE], 'ip4_nameservers': - [IP4_ADD1], 'search': ['test.saltstack.com'], - 'ip6_nameservers': [IP6_ADD1, IP6_ADD_SCOPE], - 'options': []}} - with patch.object(hubblestack.utils.dns, 'parse_resolv', MagicMock(return_value=resolv_mock)): + """ + resolv_mock = { + "domain": "", + "sortlist": [], + "nameservers": [ipaddress.IPv4Address(IP4_ADD1), ipaddress.IPv6Address(IP6_ADD1), IP6_ADD_SCOPE], + "ip4_nameservers": [ipaddress.IPv4Address(IP4_ADD1)], + "search": ["test.saltstack.com"], + "ip6_nameservers": [ipaddress.IPv6Address(IP6_ADD1), IP6_ADD_SCOPE], + "options": [], + } + ret = { + "dns": { + "domain": "", + "sortlist": [], + "nameservers": [IP4_ADD1, IP6_ADD1, IP6_ADD_SCOPE], + "ip4_nameservers": [IP4_ADD1], + "search": ["test.saltstack.com"], + "ip6_nameservers": [IP6_ADD1, IP6_ADD_SCOPE], + "options": [], + } + } + with patch.object(hubblestack.utils.dns, "parse_resolv", MagicMock(return_value=resolv_mock)): assert core.dns() == ret - @skipIf(not hubblestack.utils.platform.is_linux(), 'System is not Linux') - @patch.object(hubblestack.utils.platform, 'is_windows', MagicMock(return_value=False)) - @patch('hubblestack.utils.network.ip_addrs', MagicMock(return_value=['1.2.3.4', '5.6.7.8'])) - @patch('hubblestack.utils.network.ip_addrs6', - MagicMock(return_value=['fe80::a8b2:93ff:fe00:0', 'fe80::a8b2:93ff:dead:beef'])) - @patch('hubblestack.utils.network.socket.getfqdn', MagicMock(side_effect=lambda v: v)) # Just pass-through + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") + @patch.object(hubblestack.utils.platform, "is_windows", MagicMock(return_value=False)) + @patch("hubblestack.utils.network.ip_addrs", MagicMock(return_value=["1.2.3.4", "5.6.7.8"])) + @patch( + "hubblestack.utils.network.ip_addrs6", + MagicMock(return_value=["fe80::a8b2:93ff:fe00:0", "fe80::a8b2:93ff:dead:beef"]), + ) + @patch("hubblestack.utils.network.socket.getfqdn", MagicMock(side_effect=lambda v: v)) # Just pass-through def test_fqdns_return(self): - ''' + """ test the return for a dns grain. test for issue: https://github.com/saltstack/salt/issues/41230 - ''' - reverse_resolv_mock = [('foo.bar.baz', [], ['1.2.3.4']), - ('rinzler.evil-corp.com', [], ['5.6.7.8']), - ('foo.bar.baz', [], ['fe80::a8b2:93ff:fe00:0']), - ('bluesniff.foo.bar', [], ['fe80::a8b2:93ff:dead:beef'])] - ret = {'fqdns': ['bluesniff.foo.bar', 'foo.bar.baz', 'rinzler.evil-corp.com']} - with patch.object(socket, 'gethostbyaddr', side_effect=reverse_resolv_mock): + """ + reverse_resolv_mock = [ + ("foo.bar.baz", [], ["1.2.3.4"]), + ("rinzler.evil-corp.com", [], ["5.6.7.8"]), + ("foo.bar.baz", [], ["fe80::a8b2:93ff:fe00:0"]), + ("bluesniff.foo.bar", [], ["fe80::a8b2:93ff:dead:beef"]), + ] + ret = {"fqdns": ["bluesniff.foo.bar", "foo.bar.baz", "rinzler.evil-corp.com"]} + with patch.object(socket, "gethostbyaddr", side_effect=reverse_resolv_mock): fqdns = core.fqdns() - self.assertIn('fqdns', fqdns) - self.assertEqual(len(fqdns['fqdns']), len(ret['fqdns'])) - self.assertEqual(set(fqdns['fqdns']), set(ret['fqdns'])) + self.assertIn("fqdns", fqdns) + self.assertEqual(len(fqdns["fqdns"]), len(ret["fqdns"])) + self.assertEqual(set(fqdns["fqdns"]), set(ret["fqdns"])) def test_core_virtual(self): - ''' + """ test virtual grain with cmd virt-what - ''' - virt = 'kvm' - with patch.object(hubblestack.utils.platform, 'is_windows', - MagicMock(return_value=False)): - with patch.object(hubblestack.utils.path, 'which', - MagicMock(return_value=True)): - with patch.dict(core.__mods__, {'cmd.run_all': - MagicMock(return_value={'pid': 78, - 'retcode': 0, - 'stderr': '', - 'stdout': virt})}): - osdata = {'kernel': 'test', } + """ + virt = "kvm" + with patch.object(hubblestack.utils.platform, "is_windows", MagicMock(return_value=False)): + with patch.object(hubblestack.utils.path, "which", MagicMock(return_value=True)): + with patch.dict( + core.__mods__, + {"cmd.run_all": MagicMock(return_value={"pid": 78, "retcode": 0, "stderr": "", "stdout": virt})}, + ): + osdata = { + "kernel": "test", + } ret = core._virtual(osdata) - self.assertEqual(ret['virtual'], virt) + self.assertEqual(ret["virtual"], virt) - @patch('os.path.isfile') - @patch('os.path.isdir') + @patch("os.path.isfile") + @patch("os.path.isdir") def test_core_virtual_unicode(self, mock_file, mock_dir): - ''' + """ test virtual grain with unicode character in product_name file - ''' + """ + def path_side_effect(path): - if path == '/sys/devices/virtual/dmi/id/product_name': + if path == "/sys/devices/virtual/dmi/id/product_name": return True return False - virt = 'kvm' + virt = "kvm" mock_file.side_effect = path_side_effect mock_dir.side_effect = path_side_effect - with patch.object(hubblestack.utils.platform, 'is_windows', - MagicMock(return_value=False)): - with patch.object(hubblestack.utils.path, 'which', - MagicMock(return_value=True)): - with patch.dict(core.__mods__, {'cmd.run_all': - MagicMock(return_value={'pid': 78, - 'retcode': 0, - 'stderr': '', - 'stdout': virt})}): - with patch('hubblestack.utils.files.fopen', - mock_open(read_data='嗨')): - osdata = {'kernel': 'Linux', } + with patch.object(hubblestack.utils.platform, "is_windows", MagicMock(return_value=False)): + with patch.object(hubblestack.utils.path, "which", MagicMock(return_value=True)): + with patch.dict( + core.__mods__, + {"cmd.run_all": MagicMock(return_value={"pid": 78, "retcode": 0, "stderr": "", "stdout": virt})}, + ): + with patch("hubblestack.utils.files.fopen", mock_open(read_data="嗨")): + osdata = { + "kernel": "Linux", + } ret = core._virtual(osdata) - self.assertEqual(ret['virtual'], virt) + self.assertEqual(ret["virtual"], virt) - @patch('hubblestack.utils.path.which', MagicMock(return_value='/usr/sbin/sysctl')) + @patch("hubblestack.utils.path.which", MagicMock(return_value="/usr/sbin/sysctl")) def test_osx_memdata_with_comma(self): - ''' + """ test osx memdata method when comma returns - ''' + """ + def _cmd_side_effect(cmd): - if 'hw.memsize' in cmd: - return '4294967296' - elif 'vm.swapusage' in cmd: - return 'total = 1024,00M used = 160,75M free = 863,25M (encrypted)' - with patch.dict(core.__mods__, {'cmd.run': MagicMock(side_effect=_cmd_side_effect)}): + if "hw.memsize" in cmd: + return "4294967296" + elif "vm.swapusage" in cmd: + return "total = 1024,00M used = 160,75M free = 863,25M (encrypted)" + + with patch.dict(core.__mods__, {"cmd.run": MagicMock(side_effect=_cmd_side_effect)}): ret = core._osx_memdata() - assert ret['swap_total'] == 1024 - assert ret['mem_total'] == 4096 + assert ret["swap_total"] == 1024 + assert ret["mem_total"] == 4096 - @patch('hubblestack.utils.path.which', MagicMock(return_value='/usr/sbin/sysctl')) + @patch("hubblestack.utils.path.which", MagicMock(return_value="/usr/sbin/sysctl")) def test_osx_memdata(self): - ''' + """ test osx memdata - ''' + """ + def _cmd_side_effect(cmd): - if 'hw.memsize' in cmd: - return '4294967296' - elif 'vm.swapusage' in cmd: - return 'total = 0.00M used = 0.00M free = 0.00M (encrypted)' - with patch.dict(core.__mods__, {'cmd.run': MagicMock(side_effect=_cmd_side_effect)}): + if "hw.memsize" in cmd: + return "4294967296" + elif "vm.swapusage" in cmd: + return "total = 0.00M used = 0.00M free = 0.00M (encrypted)" + + with patch.dict(core.__mods__, {"cmd.run": MagicMock(side_effect=_cmd_side_effect)}): ret = core._osx_memdata() - assert ret['swap_total'] == 0 - assert ret['mem_total'] == 4096 + assert ret["swap_total"] == 0 + assert ret["mem_total"] == 4096 - @skipIf(not core._DATEUTIL_TZ, 'Missing dateutil.tz') + @skipIf(not core._DATEUTIL_TZ, "Missing dateutil.tz") def test_locale_info_tzname(self): # mock datetime.now().tzname() # cant just mock now because it is read only - tzname = Mock(return_value='MDT_FAKE') + tzname = Mock(return_value="MDT_FAKE") now_ret_object = Mock(tzname=tzname) now = Mock(return_value=now_ret_object) datetime = Mock(now=now) - with patch.object(core, 'datetime', datetime=datetime) as datetime_module: - with patch.object(core.dateutil.tz, 'tzlocal', return_value=object) as tzlocal: - with patch.object(hubblestack.utils.platform, 'is_proxy', return_value=False) as is_proxy: + with patch.object(core, "datetime", datetime=datetime) as datetime_module: + with patch.object(core.dateutil.tz, "tzlocal", return_value=object) as tzlocal: + with patch.object(hubblestack.utils.platform, "is_proxy", return_value=False) as is_proxy: ret = core.locale_info() tzname.assert_called_once_with() @@ -1082,49 +1153,53 @@ def test_locale_info_tzname(self): tzlocal.assert_called_once_with() is_proxy.assert_called_once_with() - self.assertEqual(ret['locale_info']['timezone'], 'MDT_FAKE') + self.assertEqual(ret["locale_info"]["timezone"], "MDT_FAKE") - @skipIf(not core._DATEUTIL_TZ, 'Missing dateutil.tz') + @skipIf(not core._DATEUTIL_TZ, "Missing dateutil.tz") def test_locale_info_unicode_error_tzname(self): # UnicodeDecodeError most have the default string encoding - unicode_error = UnicodeDecodeError(str('fake'), b'\x00\x00', 1, 2, str('fake')) + unicode_error = UnicodeDecodeError(str("fake"), b"\x00\x00", 1, 2, str("fake")) # mock datetime.now().tzname() # cant just mock now because it is read only - tzname = Mock(return_value='MDT_FAKE') + tzname = Mock(return_value="MDT_FAKE") now_ret_object = Mock(tzname=tzname) now = Mock(return_value=now_ret_object) datetime = Mock(now=now) # mock tzname[0].decode() - decode = Mock(return_value='CST_FAKE') - tzname2 = (Mock(decode=decode,),) - - with patch.object(core, 'datetime', datetime=datetime) as datetime_module: - with patch.object(core.dateutil.tz, 'tzlocal', side_effect=unicode_error) as tzlocal: - with patch.object(hubblestack.utils.platform, 'is_proxy', return_value=False) as is_proxy: - with patch.object(core.hubblestack.utils.platform, 'is_windows', return_value=True) as is_windows: - with patch.object(core, 'time', tzname=tzname2): + decode = Mock(return_value="CST_FAKE") + tzname2 = ( + Mock( + decode=decode, + ), + ) + + with patch.object(core, "datetime", datetime=datetime) as datetime_module: + with patch.object(core.dateutil.tz, "tzlocal", side_effect=unicode_error) as tzlocal: + with patch.object(hubblestack.utils.platform, "is_proxy", return_value=False) as is_proxy: + with patch.object(core.hubblestack.utils.platform, "is_windows", return_value=True) as is_windows: + with patch.object(core, "time", tzname=tzname2): ret = core.locale_info() tzname.assert_not_called() self.assertEqual(len(now_ret_object.method_calls), 0) now.assert_not_called() self.assertEqual(len(datetime.method_calls), 0) - decode.assert_called_once_with('mbcs') + decode.assert_called_once_with("mbcs") self.assertEqual(len(tzname2[0].method_calls), 1) self.assertEqual(len(datetime_module.method_calls), 0) tzlocal.assert_called_once_with() is_proxy.assert_called_once_with() is_windows.assert_called_once_with() - self.assertEqual(ret['locale_info']['timezone'], 'CST_FAKE') + self.assertEqual(ret["locale_info"]["timezone"], "CST_FAKE") - @skipIf(core._DATEUTIL_TZ, 'Not Missing dateutil.tz') + @skipIf(core._DATEUTIL_TZ, "Not Missing dateutil.tz") def test_locale_info_no_tz_tzname(self): - with patch.object(hubblestack.utils.platform, 'is_proxy', return_value=False) as is_proxy: - with patch.object(core.hubblestack.utils.platform, 'is_windows', return_value=True) as is_windows: + with patch.object(hubblestack.utils.platform, "is_proxy", return_value=False) as is_proxy: + with patch.object(core.hubblestack.utils.platform, "is_windows", return_value=True) as is_windows: ret = core.locale_info() is_proxy.assert_called_once_with() is_windows.assert_not_called() - self.assertEqual(ret['locale_info']['timezone'], 'unknown') + self.assertEqual(ret["locale_info"]["timezone"], "unknown")