diff --git a/pyproject.toml b/pyproject.toml index 7988aa2a58d..61982c48c08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ target-version = "py311" fixable = ["ALL"] select = [ + "B", # bugbear # "C90", # mccabe "E", # pycodestyle "F", # flake8 diff --git a/robottelo/cli/base.py b/robottelo/cli/base.py index 2d3d7974696..2d3e6dbf850 100644 --- a/robottelo/cli/base.py +++ b/robottelo/cli/base.py @@ -169,15 +169,9 @@ def _get_username_password(cls, username=None, password=None): """ if username is None: - try: - username = getattr(cls, 'foreman_admin_username') - except AttributeError: - username = settings.server.admin_username + username = getattr(cls, 'foreman_admin_username', settings.server.admin_username) if password is None: - try: - password = getattr(cls, 'foreman_admin_password') - except AttributeError: - password = settings.server.admin_password + password = getattr(cls, 'foreman_admin_password', settings.server.admin_password) return (username, password) diff --git a/robottelo/cli/hammer.py b/robottelo/cli/hammer.py index f6ddb11b4f3..661042050e6 100644 --- a/robottelo/cli/hammer.py +++ b/robottelo/cli/hammer.py @@ -42,7 +42,7 @@ def parse_csv(output): # Generate the key names, spaces will be converted to dashes "-" keys = [_normalize(header) for header in next(reader)] # For each entry, create a dict mapping each key with each value - return [dict(zip(keys, values)) for values in reader if len(values) > 0] + return [dict(zip(keys, values, strict=True)) for values in reader if len(values) > 0] def parse_help(output): diff --git a/robottelo/host_helpers/api_factory.py b/robottelo/host_helpers/api_factory.py index 92f40652d34..0fff2579352 100644 --- a/robottelo/host_helpers/api_factory.py +++ b/robottelo/host_helpers/api_factory.py @@ -468,14 +468,14 @@ def create_role_permissions( if entity_permission.name != name: raise self._satellite.api.APIResponseError( 'the returned permission is different from the' - ' requested one "{} != {}"'.format(entity_permission.name, name) + f' requested one "{entity_permission.name} != {name}"' ) permissions_entities.append(entity_permission) else: if not permissions_name: raise ValueError( - 'resource type "{}" empty. You must select at' - ' least one permission'.format(resource_type) + f'resource type "{resource_type}" empty. You must select at' + ' least one permission' ) resource_type_permissions_entities = self._satellite.api.Permission().search( @@ -575,8 +575,8 @@ def satellite_setting(self, key_val: str): setting = self._satellite.api.Setting().search( query={'search': f'name={name.strip()}'} )[0] - except IndexError: - raise KeyError(f'The setting {name} in not available in satellite.') + except IndexError as err: + raise KeyError(f'The setting {name} in not available in satellite.') from err old_value = setting.value setting.value = value.strip() setting.update({'value'}) diff --git a/robottelo/host_helpers/capsule_mixins.py b/robottelo/host_helpers/capsule_mixins.py index 712bf98f533..0589b5f15a3 100644 --- a/robottelo/host_helpers/capsule_mixins.py +++ b/robottelo/host_helpers/capsule_mixins.py @@ -61,10 +61,12 @@ def wait_for_tasks( raise AssertionError(f"No task was found using query '{search_query}'") return tasks - def wait_for_sync(self, timeout=600, start_time=datetime.utcnow()): + def wait_for_sync(self, timeout=600, start_time=None): """Wait for capsule sync to finish and assert the sync task succeeded""" # Assert that a task to sync lifecycle environment to the capsule # is started (or finished already) + if start_time is None: + start_time = datetime.utcnow() logger.info(f"Waiting for capsule {self.hostname} sync to finish ...") sync_status = self.nailgun_capsule.content_get_sync() logger.info(f"Active tasks {sync_status['active_sync_tasks']}") diff --git a/robottelo/host_helpers/cli_factory.py b/robottelo/host_helpers/cli_factory.py index 1742f3f8d02..b676fe27c02 100644 --- a/robottelo/host_helpers/cli_factory.py +++ b/robottelo/host_helpers/cli_factory.py @@ -4,7 +4,7 @@ example: my_satellite.cli_factory.make_org() """ import datetime -from functools import lru_cache, partial +from functools import partial import inspect import os from os import chmod @@ -14,6 +14,7 @@ from time import sleep from box import Box +from cachetools import cachedmethod from fauxfactory import ( gen_alpha, gen_alphanumeric, @@ -59,9 +60,9 @@ def create_object(cli_object, options, values=None, credentials=None): 'Failed to create {} with data:\n{}\n{}'.format( cli_object.__name__, pprint.pformat(options, indent=2), err.msg ) - ) + ) from err # Sometimes we get a list with a dictionary and not a dictionary. - if type(result) is list and len(result) > 0: + if isinstance(result, list) and len(result) > 0: result = result[0] return Box(result) @@ -294,7 +295,7 @@ def _evaluate_functions(self, iterable): if not key.startswith('_') } - @lru_cache + @cachedmethod def _find_entity_class(self, entity_name): entity_name = entity_name.replace('_', '').lower() for name, class_obj in self._satellite.cli.__dict__.items(): @@ -394,8 +395,8 @@ def make_product_wait(self, options=None, wait_for=5): product = self._satellite.cli.Product.info( {'name': options.get('name'), 'organization-id': options.get('organization-id')} ) - except CLIReturnCodeError: - raise err + except CLIReturnCodeError as nested_err: + raise nested_err from err if not product: raise err return product @@ -503,7 +504,7 @@ def make_proxy(self, options=None): args['url'] = url return create_object(self._satellite.cli.Proxy, args, options) except CapsuleTunnelError as err: - raise CLIFactoryError(f'Failed to create ssh tunnel: {err}') + raise CLIFactoryError(f'Failed to create ssh tunnel: {err}') from None args['url'] = options['url'] return create_object(self._satellite.cli.Proxy, args, options) @@ -569,7 +570,7 @@ def activationkey_add_subscription_to_repo(self, options=None): except CLIReturnCodeError as err: raise CLIFactoryError( f'Failed to add subscription to activation key\n{err.msg}' - ) + ) from err def setup_org_for_a_custom_repo(self, options=None): """Sets up Org for the given custom repo by: @@ -607,7 +608,7 @@ def setup_org_for_a_custom_repo(self, options=None): try: self._satellite.cli.Repository.synchronize({'id': custom_repo['id']}) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') + raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') from err # Create CV if needed and associate repo with it if options.get('content-view-id') is None: cv_id = self.make_content_view({'organization-id': org_id})['id'] @@ -618,12 +619,14 @@ def setup_org_for_a_custom_repo(self, options=None): {'id': cv_id, 'organization-id': org_id, 'repository-id': custom_repo['id']} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') + raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') from err # Publish a new version of CV try: self._satellite.cli.ContentView.publish({'id': cv_id}) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to publish new version of content view\n{err.msg}') + raise CLIFactoryError( + f'Failed to publish new version of content view\n{err.msg}' + ) from err # Get the version id cv_info = self._satellite.cli.ContentView.info({'id': cv_id}) lce_promoted = cv_info['lifecycle-environments'] @@ -639,7 +642,9 @@ def setup_org_for_a_custom_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to promote version to next environment\n{err.msg}') + raise CLIFactoryError( + f'Failed to promote version to next environment\n{err.msg}' + ) from err # Create activation key if needed and associate content view with it if options.get('activationkey-id') is None: activationkey_id = self.make_activation_key( @@ -658,7 +663,9 @@ def setup_org_for_a_custom_repo(self, options=None): {'content-view-id': cv_id, 'id': activationkey_id, 'organization-id': org_id} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to associate activation-key with CV\n{err.msg}') + raise CLIFactoryError( + f'Failed to associate activation-key with CV\n{err.msg}' + ) from err # Add custom_product subscription to activation-key, if SCA mode is disabled if self._satellite.is_sca_mode_enabled(org_id) is False: @@ -709,15 +716,16 @@ def _setup_org_for_a_rh_repo(self, options=None): env_id = self.make_lifecycle_environment({'organization-id': org_id})['id'] else: env_id = options['lifecycle-environment-id'] - # Clone manifest and upload it - with clone() as manifest: - self._satellite.put(manifest.path, manifest.name) - try: - self._satellite.cli.Subscription.upload( - {'file': manifest.name, 'organization-id': org_id} - ) - except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') + # If manifest does not exist, clone and upload it + if len(self._satellite.cli.Subscription.exists({'organization-id': org_id})) == 0: + with clone() as manifest: + self._satellite.put(manifest.path, manifest.name) + try: + self._satellite.cli.Subscription.upload( + {'file': manifest.name, 'organization-id': org_id} + ) + except CLIReturnCodeError as err: + raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') from err # Enable repo from Repository Set try: self._satellite.cli.RepositorySet.enable( @@ -730,7 +738,7 @@ def _setup_org_for_a_rh_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to enable repository set\n{err.msg}') + raise CLIFactoryError(f'Failed to enable repository set\n{err.msg}') from err # Fetch repository info try: rhel_repo = self._satellite.cli.Repository.info( @@ -741,7 +749,7 @@ def _setup_org_for_a_rh_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to fetch repository info\n{err.msg}') + raise CLIFactoryError(f'Failed to fetch repository info\n{err.msg}') from err # Synchronize the RH repository try: self._satellite.cli.Repository.synchronize( @@ -752,7 +760,7 @@ def _setup_org_for_a_rh_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') + raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') from err # Create CV if needed and associate repo with it if options.get('content-view-id') is None: cv_id = self.make_content_view({'organization-id': org_id})['id'] @@ -763,24 +771,28 @@ def _setup_org_for_a_rh_repo(self, options=None): {'id': cv_id, 'organization-id': org_id, 'repository-id': rhel_repo['id']} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') + raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') from err # Publish a new version of CV try: self._satellite.cli.ContentView.publish({'id': cv_id}) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to publish new version of content view\n{err.msg}') + raise CLIFactoryError( + f'Failed to publish new version of content view\n{err.msg}' + ) from err # Get the version id try: cvv = self._satellite.cli.ContentView.info({'id': cv_id})['versions'][-1] except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to fetch content view info\n{err.msg}') + raise CLIFactoryError(f'Failed to fetch content view info\n{err.msg}') from err # Promote version1 to next env try: self._satellite.cli.ContentView.version_promote( {'id': cvv['id'], 'organization-id': org_id, 'to-lifecycle-environment-id': env_id} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to promote version to next environment\n{err.msg}') + raise CLIFactoryError( + f'Failed to promote version to next environment\n{err.msg}' + ) from err # Create activation key if needed and associate content view with it if options.get('activationkey-id') is None: activationkey_id = self.make_activation_key( @@ -799,7 +811,9 @@ def _setup_org_for_a_rh_repo(self, options=None): {'id': activationkey_id, 'organization-id': org_id, 'content-view-id': cv_id} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to associate activation-key with CV\n{err.msg}') + raise CLIFactoryError( + f'Failed to associate activation-key with CV\n{err.msg}' + ) from err # Add default subscription to activation-key, if SCA mode is disabled if self._satellite.is_sca_mode_enabled(org_id) is False: @@ -865,7 +879,7 @@ def setup_org_for_a_rh_repo( } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') + raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') from err # Add default subscription to activation-key, if SCA mode is disabled if self._satellite.is_sca_mode_enabled(result['organization-id']) is False: @@ -1075,7 +1089,7 @@ def setup_cdn_and_custom_repos_content( try: self._satellite.upload_manifest(org_id, interface='CLI') except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') + raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') from err custom_product, repos_info = self.setup_cdn_and_custom_repositories( org_id=org_id, repos=repos, download_policy=download_policy diff --git a/robottelo/host_helpers/repository_mixins.py b/robottelo/host_helpers/repository_mixins.py index b4b6849aeac..040afc05581 100644 --- a/robottelo/host_helpers/repository_mixins.py +++ b/robottelo/host_helpers/repository_mixins.py @@ -771,7 +771,7 @@ def setup_virtual_machine( patch_os_release_distro = self.os_repo.distro rh_repo_ids = [] if enable_rh_repos: - rh_repo_ids = [getattr(repo, 'rh_repository_id') for repo in self.rh_repos] + rh_repo_ids = [repo.rh_repository_id for repo in self.rh_repos] repo_labels = [] if enable_custom_repos: repo_labels = [ diff --git a/robottelo/host_helpers/satellite_mixins.py b/robottelo/host_helpers/satellite_mixins.py index 311b8a71b71..2370d7b21f7 100644 --- a/robottelo/host_helpers/satellite_mixins.py +++ b/robottelo/host_helpers/satellite_mixins.py @@ -1,10 +1,10 @@ import contextlib -from functools import cache import io import os import random import re +from cachetools import cachedmethod import requests from robottelo.cli.proxy import CapsuleTunnelError @@ -187,7 +187,7 @@ def publish_content_view(self, org, repo_list): :returns: A dictionary containing the details of the published content view. """ - repo = repo_list if type(repo_list) is list else [repo_list] + repo = repo_list if isinstance(repo_list, list) else [repo_list] content_view = self.api.ContentView(organization=org, repository=repo).create() content_view.publish() content_view = content_view.read() @@ -234,9 +234,9 @@ def available_capsule_port(self): :rtype: int """ port_pool_range = settings.fake_capsules.port_range - if type(port_pool_range) is str: + if isinstance(port_pool_range, str): port_pool_range = tuple(port_pool_range.split('-')) - if type(port_pool_range) is tuple and len(port_pool_range) == 2: + if isinstance(port_pool_range, tuple) and len(port_pool_range) == 2: port_pool = range(int(port_pool_range[0]), int(port_pool_range[1])) else: raise TypeError( @@ -262,14 +262,14 @@ def available_capsule_port(self): except ValueError: raise CapsuleTunnelError( f'Failed parsing the port numbers from stdout: {ss_cmd.stdout.splitlines()[:-1]}' - ) + ) from None try: # take the list of available ports and return randomly selected one return random.choice([port for port in port_pool if port not in used_ports]) except IndexError: raise CapsuleTunnelError( 'Failed to create ssh tunnel: No more ports available for mapping' - ) + ) from None @contextlib.contextmanager def default_url_on_new_port(self, oldport, newport): @@ -307,7 +307,7 @@ def validate_pulp_filepath( self, org, dir_path, - file_names=['*.json', '*.tar.gz'], + file_names=('*.json', '*.tar.gz'), ): """Checks the existence of certain files in a pulp dir""" extension_query = ' -o '.join([f'-name "{file}"' for file in file_names]) @@ -357,6 +357,6 @@ def api_factory(self): self._api_factory = APIFactory(self) return self._api_factory - @cache + @cachedmethod def ui_factory(self, session): return UIFactory(self, session=session) diff --git a/robottelo/host_helpers/ui_factory.py b/robottelo/host_helpers/ui_factory.py index cb85436051e..df156ad6d6f 100644 --- a/robottelo/host_helpers/ui_factory.py +++ b/robottelo/host_helpers/ui_factory.py @@ -18,12 +18,14 @@ def __init__(self, satellite, session=None): def create_fake_host( self, host, - interface_id=gen_string('alpha'), + interface_id=None, global_parameters=None, host_parameters=None, extra_values=None, new_host_details=False, ): + if interface_id is None: + interface_id = gen_string('alpha') if extra_values is None: extra_values = {} os_name = f'{host.operatingsystem.name} {host.operatingsystem.major}' diff --git a/robottelo/hosts.py b/robottelo/hosts.py index 70a8484a81d..35fbe4b63f7 100644 --- a/robottelo/hosts.py +++ b/robottelo/hosts.py @@ -16,6 +16,7 @@ from box import Box from broker import Broker from broker.hosts import Host +from cachetools import cachedmethod from dynaconf.vendor.box.exceptions import BoxKeyError from fauxfactory import gen_alpha, gen_string from nailgun import entities @@ -387,11 +388,11 @@ def power_control(self, state=VmState.RUNNING, ensure=True): try: vm_operation = POWER_OPERATIONS.get(state) workflow_name = settings.broker.host_workflows.power_control - except (AttributeError, KeyError): + except (AttributeError, KeyError) as err: raise NotImplementedError( 'No workflow in broker.host_workflows for power control, ' 'or VM operation not supported' - ) + ) from err assert ( # TODO read the kwarg name from settings too? Broker() @@ -498,10 +499,12 @@ def subscription_manager_status(self): def subscription_manager_list(self): return self.execute('subscription-manager list') - def subscription_manager_get_pool(self, sub_list=[]): + def subscription_manager_get_pool(self, sub_list=None): """ Return pool ids for the corresponding subscriptions in the list """ + if sub_list is None: + sub_list = [] pool_ids = [] for sub in sub_list: result = self.execute( @@ -513,10 +516,12 @@ def subscription_manager_get_pool(self, sub_list=[]): pool_ids.append(result) return pool_ids - def subscription_manager_attach_pool(self, pool_list=[]): + def subscription_manager_attach_pool(self, pool_list=None): """ Attach pool ids to the host and return the result """ + if pool_list is None: + pool_list = [] result = [] for pool in pool_list: result.append(self.execute(f'subscription-manager attach --pool={pool}')) @@ -589,8 +594,8 @@ def install_katello_agent(self): # We're in a traditional VM, so goferd should be running after katello-agent install try: wait_for(lambda: self.execute('service goferd status').status == 0) - except TimedOutError: - raise ContentHostError('katello-agent is not running') + except TimedOutError as err: + raise ContentHostError('katello-agent is not running') from err def install_katello_host_tools(self): """Installs Katello host tools on the broker virtual machine @@ -1435,8 +1440,10 @@ def install_tracer(self): raise ContentHostError('There was an error installing katello-host-tools-tracer') self.execute('katello-tracer-upload') - def register_to_cdn(self, pool_ids=[settings.subscription.rhn_poolid]): + def register_to_cdn(self, pool_ids=None): """Subscribe satellite to CDN""" + if pool_ids is None: + pool_ids = [settings.subscription.rhn_poolid] self.remove_katello_ca() cmd_result = self.register_contenthost( org=None, @@ -2255,7 +2262,7 @@ def get_rhsso_client_id(self): break return client_id - @lru_cache + @cachedmethod def get_rhsso_user_details(self, username): """Getter method to receive the user id""" result = self.execute( @@ -2264,7 +2271,7 @@ def get_rhsso_user_details(self, username): result_json = json.loads(result.stdout) return result_json[0] - @lru_cache + @cachedmethod def get_rhsso_groups_details(self, group_name): """Getter method to receive the group id""" result = self.execute(f"{KEY_CLOAK_CLI} get groups -r {settings.rhsso.realm}") diff --git a/robottelo/utils/decorators/func_locker.py b/robottelo/utils/decorators/func_locker.py index 2d575d8f560..4d82c3add28 100644 --- a/robottelo/utils/decorators/func_locker.py +++ b/robottelo/utils/decorators/func_locker.py @@ -227,8 +227,8 @@ def lock_function( def main_wrapper(func): - setattr(func, '__class_name__', class_name) - setattr(func, '__function_locked__', True) + func.__class_name__ = class_name + func.__function_locked__ = True @functools.wraps(func) def function_wrapper(*args, **kwargs): diff --git a/robottelo/utils/decorators/func_shared/shared.py b/robottelo/utils/decorators/func_shared/shared.py index 62e529b8d8a..072ebc3fb47 100644 --- a/robottelo/utils/decorators/func_shared/shared.py +++ b/robottelo/utils/decorators/func_shared/shared.py @@ -392,8 +392,7 @@ def __call__(self): # if was not able to restore the original exception raise this one raise SharedFunctionException( - 'Error generated by process: {} Exception: {}' - ' error: {}'.format(pid, error_class_name, error) + f'Error generated by process: {pid} Exception: {error_class_name} error: {error}' ) if not call_function and self._inject: diff --git a/robottelo/utils/ohsnap.py b/robottelo/utils/ohsnap.py index 74494766114..89eb6a97e22 100644 --- a/robottelo/utils/ohsnap.py +++ b/robottelo/utils/ohsnap.py @@ -90,7 +90,9 @@ def dogfood_repository( try: repository = next(r for r in res.json() if r['label'] == repo) except StopIteration: - raise RepositoryDataNotFound(f'Repository "{repo}" is not provided by the given product') + raise RepositoryDataNotFound( + f'Repository "{repo}" is not provided by the given product' + ) from None repository['baseurl'] = repository['baseurl'].replace('$basearch', arch) # If repo check is enabled, check that the repository actually exists on the remote server dogfood_req = requests.get(repository['baseurl']) diff --git a/robottelo/utils/virtwho.py b/robottelo/utils/virtwho.py index 8c7a6c48658..aaae57d7c97 100644 --- a/robottelo/utils/virtwho.py +++ b/robottelo/utils/virtwho.py @@ -301,9 +301,9 @@ def get_hypervisor_ahv_mapping(hypervisor_type): # Always check the last json section to get the host_uuid for item in mapping: if 'entities' in item: - for item in item['entities']: - if 'host_uuid' in item: - system_uuid = item['host_uuid'] + for _item in item['entities']: + if 'host_uuid' in _item: + system_uuid = _item['host_uuid'] break message = f"Host UUID {system_uuid} found for VM: {guest_uuid}" for line in logs.split('\n'): @@ -384,8 +384,8 @@ def deploy_configure_by_command_check(command): virtwho_cleanup() try: ret, stdout = runcmd(command) - except Exception: - raise VirtWhoError(f"Failed to deploy configure by {command}") + except Exception as err: + raise VirtWhoError(f"Failed to deploy configure by {command}") from err else: if ret != 0 or 'Finished successfully' not in stdout: raise VirtWhoError(f"Failed to deploy configure by {command}") @@ -410,7 +410,7 @@ def update_configure_option(option, value, config_file): :param value: set the option to the value :param config_file: path of virt-who config file """ - cmd = 'sed -i "s|^{0}.*|{0}={1}|g" {2}'.format(option, value, config_file) + cmd = f'sed -i "s|^{option}.*|{option}={value}|g" {config_file}' ret, output = runcmd(cmd) if ret != 0: raise VirtWhoError(f"Failed to set option {option} value to {value}") @@ -422,7 +422,7 @@ def delete_configure_option(option, config_file): :param option: the option you want to delete :param config_file: path of virt-who config file """ - cmd = 'sed -i "/^{0}/d" {1}; sed -i "/^#{0}/d" {1}'.format(option, config_file) + cmd = f'sed -i "/^{option}/d" {config_file}; sed -i "/^#{option}/d" {config_file}' ret, output = runcmd(cmd) if ret != 0: raise VirtWhoError(f"Failed to delete option {option}") @@ -437,11 +437,11 @@ def add_configure_option(option, value, config_file): """ try: get_configure_option(option, config_file) - except Exception: + except Exception as err: cmd = f'echo -e "\n{option}={value}" >> {config_file}' - ret, output = runcmd(cmd) + ret, _ = runcmd(cmd) if ret != 0: - raise VirtWhoError(f"Failed to add option {option}={value}") + raise VirtWhoError(f"Failed to add option {option}={value}") from err else: raise VirtWhoError(f"option {option} is already exist in {config_file}") @@ -456,9 +456,9 @@ def hypervisor_json_create(hypervisors, guests): :param guests: how many guests will be created """ hypervisors_list = [] - for i in range(hypervisors): + for _ in range(hypervisors): guest_list = [] - for c in range(guests): + for _ in range(guests): guest_list.append( { "guestId": str(uuid.uuid4()), diff --git a/tests/foreman/api/test_acs.py b/tests/foreman/api/test_acs.py index bb4cfaf125e..bdca5243454 100644 --- a/tests/foreman/api/test_acs.py +++ b/tests/foreman/api/test_acs.py @@ -130,7 +130,7 @@ def test_positive_run_bulk_actions(module_target_sat, module_yum_repo): """ acs_ids = [] - for i in range(3): + for _ in range(3): acs = module_target_sat.api.AlternateContentSource( name=gen_string('alpha'), alternate_content_source_type='simplified', diff --git a/tests/foreman/api/test_discoveredhost.py b/tests/foreman/api/test_discoveredhost.py index 7d260b3d5e8..948c68e0549 100644 --- a/tests/foreman/api/test_discoveredhost.py +++ b/tests/foreman/api/test_discoveredhost.py @@ -72,9 +72,9 @@ def _assert_discovered_host(host, channel=None, user_config=None): ]: try: dhcp_pxe = _wait_for_log(channel, pattern[0], timeout=10) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_pxe.out) assert len(groups.groups()) == 1, 'Unable to parse bootloader ip address' pxe_ip = groups.groups()[0] @@ -87,9 +87,9 @@ def _assert_discovered_host(host, channel=None, user_config=None): ]: try: _wait_for_log(channel, pattern[0], timeout=20) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') + raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') from err # assert that server receives DHCP discover from FDI for pattern in [ ( @@ -100,9 +100,9 @@ def _assert_discovered_host(host, channel=None, user_config=None): ]: try: dhcp_fdi = _wait_for_log(channel, pattern[0], timeout=30) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse FDI ip address' fdi_ip = groups.groups()[0] @@ -114,17 +114,17 @@ def _assert_discovered_host(host, channel=None, user_config=None): f'"/api/v2/discovered_hosts/facts" for {fdi_ip}', timeout=60, ) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for /facts POST request') + raise AssertionError('Timed out waiting for /facts POST request') from err groups = re.search('\\[I\\|app\\|([a-z0-9]+)\\]', facts_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse POST request UUID' req_id = groups.groups()[0] try: _wait_for_log(channel, f'\\[I\\|app\\|{req_id}\\] Completed 201 Created') - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for "/facts" 201 response') + raise AssertionError('Timed out waiting for "/facts" 201 response') from err default_config = entity_mixins.DEFAULT_SERVER_CONFIG try: wait_for( @@ -138,8 +138,10 @@ def _assert_discovered_host(host, channel=None, user_config=None): delay=2, logger=logger, ) - except TimedOutError: - raise AssertionError('Timed out waiting for discovered_host to appear on satellite') + except TimedOutError as err: + raise AssertionError( + 'Timed out waiting for discovered_host to appear on satellite' + ) from err discovered_host = host.api.DiscoveredHost(user_config or default_config).search( query={'search': f'name={host.guest_name}'} ) @@ -153,8 +155,8 @@ def assert_discovered_host_provisioned(channel, ksrepo): try: log = _wait_for_log(channel, pattern, timeout=300, delay=10) assert pattern in log - except TimedOutError: - raise AssertionError(f'Timed out waiting for {pattern} from VM') + except TimedOutError as err: + raise AssertionError(f'Timed out waiting for {pattern} from VM') from err @pytest.fixture @@ -434,7 +436,7 @@ def test_positive_reboot_all_pxe_hosts( host.power_control(ensure=False) mac = host._broker_args['provisioning_nic_mac_addr'] wait_for( - lambda: sat.api.DiscoveredHost().search(query={'mac': mac}) != [], + lambda: sat.api.DiscoveredHost().search(query={'mac': mac}) != [], # noqa: B023 timeout=240, delay=20, ) diff --git a/tests/foreman/api/test_host.py b/tests/foreman/api/test_host.py index 15411a4741b..d495d6c7c24 100644 --- a/tests/foreman/api/test_host.py +++ b/tests/foreman/api/test_host.py @@ -998,7 +998,7 @@ def test_positive_read_content_source_id( 'lifecycle_environment_id': module_lce.id, }, ).create() - content_facet_attributes = getattr(host, 'content_facet_attributes') + content_facet_attributes = host.content_facet_attributes assert content_facet_attributes is not None content_source_id = content_facet_attributes.get('content_source_id') assert content_source_id is not None diff --git a/tests/foreman/api/test_media.py b/tests/foreman/api/test_media.py index 8fb0ec5af66..f68ea37bd72 100644 --- a/tests/foreman/api/test_media.py +++ b/tests/foreman/api/test_media.py @@ -41,7 +41,9 @@ def class_media(self, module_org, class_target_sat): @pytest.mark.upgrade @pytest.mark.parametrize( ('name', 'new_name'), - **parametrized(list(zip(valid_data_list().values(), valid_data_list().values()))) + **parametrized( + list(zip(valid_data_list().values(), valid_data_list().values(), strict=True)) + ) ) def test_positive_crud_with_name(self, module_org, name, new_name, module_target_sat): """Create, update, delete media with valid name only diff --git a/tests/foreman/api/test_organization.py b/tests/foreman/api/test_organization.py index 868ec7368b1..c407691804e 100644 --- a/tests/foreman/api/test_organization.py +++ b/tests/foreman/api/test_organization.py @@ -230,7 +230,7 @@ def test_positive_update_name(self, module_org, name): :parametrized: yes """ - setattr(module_org, 'name', name) + module_org.name = name module_org = module_org.update(['name']) assert module_org.name == name @@ -247,7 +247,7 @@ def test_positive_update_description(self, module_org, desc): :parametrized: yes """ - setattr(module_org, 'description', desc) + module_org.description = desc module_org = module_org.update(['description']) assert module_org.description == desc diff --git a/tests/foreman/api/test_partitiontable.py b/tests/foreman/api/test_partitiontable.py index 6d78ba1135b..fd0027cbc40 100644 --- a/tests/foreman/api/test_partitiontable.py +++ b/tests/foreman/api/test_partitiontable.py @@ -66,6 +66,7 @@ def test_positive_create_with_one_character_name(self, target_sat, name): zip( generate_strings_list(length=gen_integer(4, 30)), generate_strings_list(length=gen_integer(4, 30)), + strict=True, ) ) ), @@ -95,7 +96,8 @@ def test_positive_crud_with_name(self, target_sat, name, new_name): @pytest.mark.tier1 @pytest.mark.parametrize( - ('layout', 'new_layout'), **parametrized(list(zip(valid_data_list(), valid_data_list()))) + ('layout', 'new_layout'), + **parametrized(list(zip(valid_data_list(), valid_data_list(), strict=True))), ) def test_positive_create_update_with_layout(self, target_sat, layout, new_layout): """Create new and update partition tables using different inputs as a diff --git a/tests/foreman/api/test_provisioning.py b/tests/foreman/api/test_provisioning.py index cee77929db8..5e8c1ea133b 100644 --- a/tests/foreman/api/test_provisioning.py +++ b/tests/foreman/api/test_provisioning.py @@ -60,8 +60,8 @@ def assert_host_logs(channel, pattern): try: log = _wait_for_log(channel, pattern, timeout=300, delay=10) assert pattern in log - except TimedOutError: - raise AssertionError(f'Timed out waiting for {pattern} from VM') + except TimedOutError as err: + raise AssertionError(f'Timed out waiting for {pattern} from VM') from err @pytest.mark.e2e diff --git a/tests/foreman/api/test_repository.py b/tests/foreman/api/test_repository.py index 587df90e197..640f987875a 100644 --- a/tests/foreman/api/test_repository.py +++ b/tests/foreman/api/test_repository.py @@ -2519,6 +2519,17 @@ def test_positive_create_with_long_token( container_repo = container_repos[0] except IndexError: pytest.skip('No registries with "long_pass" set to true') + + to_clean = [] + + @request.addfinalizer + def clean_repos(): + for repo in to_clean: + try: + repo.delete(synchronous=False) + except Exception: + logger.exception(f'Exception cleaning up docker repo:\n{repo}') + for docker_repo_name in container_repo.repos_to_sync: repo_options = dict( content_type='docker', @@ -2536,13 +2547,7 @@ def test_positive_create_with_long_token( pytest.skip('The "long_pass" registry does not meet length requirement') repo = module_target_sat.api.Repository(**repo_options).create() - - @request.addfinalizer - def clean_repo(): - try: - repo.delete(synchronous=False) - except Exception: - logger.exception('Exception cleaning up docker repo:') + to_clean.append(repo) repo = repo.read() for field in 'name', 'docker_upstream_name', 'content_type', 'upstream_username': @@ -2569,6 +2574,17 @@ def test_positive_tag_whitelist( :expectedresults: multiple products and repos are created """ container_repo = getattr(settings.container_repo.registries, repo_key) + + to_clean = [] + + @request.addfinalizer + def clean_repos(): + for repo in to_clean: + try: + repo.delete(synchronous=False) + except Exception: + logger.exception(f'Exception cleaning up docker repo:\n{repo}') + for docker_repo_name in container_repo.repos_to_sync: repo_options = dict( content_type='docker', @@ -2583,13 +2599,7 @@ def test_positive_tag_whitelist( repo_options['product'] = module_product repo = module_target_sat.api.Repository(**repo_options).create() - - @request.addfinalizer - def clean_repo(): - try: - repo.delete(synchronous=False) - except Exception: - logger.exception('Exception cleaning up docker repo:') + to_clean.append(repo) for field in 'name', 'docker_upstream_name', 'content_type', 'upstream_username': assert getattr(repo, field) == repo_options[field] diff --git a/tests/foreman/api/test_role.py b/tests/foreman/api/test_role.py index df0bc677718..926a8f48fe8 100644 --- a/tests/foreman/api/test_role.py +++ b/tests/foreman/api/test_role.py @@ -37,7 +37,7 @@ class TestRole: @pytest.mark.upgrade @pytest.mark.parametrize( ('name', 'new_name'), - **parametrized(list(zip(generate_strings_list(), generate_strings_list()))), + **parametrized(list(zip(generate_strings_list(), generate_strings_list(), strict=True))), ) def test_positive_crud(self, name, new_name, target_sat): """Create, update and delete role with name ``name_generator()``. diff --git a/tests/foreman/api/test_webhook.py b/tests/foreman/api/test_webhook.py index 1bdafd10d8e..a0a34b6207a 100644 --- a/tests/foreman/api/test_webhook.py +++ b/tests/foreman/api/test_webhook.py @@ -61,8 +61,8 @@ def assert_event_triggered(channel, event): try: log = _wait_for_log(channel, pattern) assert pattern in log - except TimedOutError: - raise AssertionError(f'Timed out waiting for {pattern} from VM') + except TimedOutError as err: + raise AssertionError(f'Timed out waiting for {pattern} from VM') from err class TestWebhook: diff --git a/tests/foreman/cli/test_errata.py b/tests/foreman/cli/test_errata.py index d8e8bd93969..4a0900928da 100644 --- a/tests/foreman/cli/test_errata.py +++ b/tests/foreman/cli/test_errata.py @@ -115,8 +115,8 @@ def products_with_repos(orgs, module_target_sat): """Create and return a list of products. For each product, create and sync a single repo.""" products = [] # Create one product for each org, and a second product for the last org. - for org, params in zip(orgs + orgs[-1:], REPOS_WITH_ERRATA): - product = entities.Product(organization=org).create() + for org, params in zip(orgs + orgs[-1:], REPOS_WITH_ERRATA, strict=True): + product = module_target_sat.api.Product(organization=org).create() # Replace the organization entity returned by create(), which contains only the id, # with the one we already have. product.organization = org @@ -291,7 +291,7 @@ def check_errata(errata_ids, by_org=False): :param errata_ids: a list containing a list of errata ids for each repo :type errata_ids: list[list] """ - for ids, repo_with_errata in zip(errata_ids, REPOS_WITH_ERRATA): + for ids, repo_with_errata in zip(errata_ids, REPOS_WITH_ERRATA, strict=True): assert len(ids) == repo_with_errata['org_errata_count' if by_org else 'errata_count'] assert repo_with_errata['errata_id'] in ids diff --git a/tests/foreman/cli/test_host.py b/tests/foreman/cli/test_host.py index d08ef33ac96..1d515c34739 100644 --- a/tests/foreman/cli/test_host.py +++ b/tests/foreman/cli/test_host.py @@ -1757,11 +1757,11 @@ def test_positive_erratum_applicability( timeout=300, delay=5, ) - except TimedOutError: + except TimedOutError as err: raise TimedOutError( f"Timed out waiting for erratum \"{setup_custom_repo['security_errata']}\"" " to disappear from the list" - ) + ) from err @pytest.mark.cli_katello_host_tools diff --git a/tests/foreman/cli/test_model.py b/tests/foreman/cli/test_model.py index 0dfc2cb3ba1..a627a3158b7 100644 --- a/tests/foreman/cli/test_model.py +++ b/tests/foreman/cli/test_model.py @@ -40,7 +40,9 @@ def class_model(self, target_sat): @pytest.mark.upgrade @pytest.mark.parametrize( ('name', 'new_name'), - **parametrized(list(zip(valid_data_list().values(), valid_data_list().values()))) + **parametrized( + list(zip(valid_data_list().values(), valid_data_list().values(), strict=True)) + ) ) def test_positive_crud_with_name(self, name, new_name, module_target_sat): """Successfully creates, updates and deletes a Model. diff --git a/tests/foreman/cli/test_partitiontable.py b/tests/foreman/cli/test_partitiontable.py index fe863cfd42e..d32bd11bf03 100644 --- a/tests/foreman/cli/test_partitiontable.py +++ b/tests/foreman/cli/test_partitiontable.py @@ -55,6 +55,7 @@ def test_positive_create_with_one_character_name(self, name, target_sat): zip( generate_strings_list(length=randint(4, 30)), generate_strings_list(length=randint(4, 30)), + strict=True, ) ) ) diff --git a/tests/foreman/cli/test_ping.py b/tests/foreman/cli/test_ping.py index 3a59698fb82..68f6024372b 100644 --- a/tests/foreman/cli/test_ping.py +++ b/tests/foreman/cli/test_ping.py @@ -52,7 +52,7 @@ def test_positive_ping(target_sat, switch_user): # iterate over the lines grouping every 3 lines # example [1, 2, 3, 4, 5, 6] will return [(1, 2, 3), (4, 5, 6)] # only the status line is relevant for this test - for _, status, _ in zip(*[iter(result.stdout)] * 3): + for _, status, _ in zip(*[iter(result.stdout)] * 3, strict=False): # should this be strict? status_count += 1 if status.split(':')[1].strip().lower() == 'ok': diff --git a/tests/foreman/cli/test_remoteexecution.py b/tests/foreman/cli/test_remoteexecution.py index f94dc4d4668..f822c45022e 100644 --- a/tests/foreman/cli/test_remoteexecution.py +++ b/tests/foreman/cli/test_remoteexecution.py @@ -62,7 +62,7 @@ def assert_job_invocation_result( result = sat.cli.JobInvocation.info({'id': invocation_command_id}) try: assert result[expected_result] == '1' - except AssertionError: + except AssertionError as err: raise AssertionError( 'host output: {}'.format( ' '.join( @@ -71,7 +71,7 @@ def assert_job_invocation_result( ) ) ) - ) + ) from err def assert_job_invocation_status(sat, invocation_command_id, client_hostname, status): @@ -80,7 +80,7 @@ def assert_job_invocation_status(sat, invocation_command_id, client_hostname, st result = sat.cli.JobInvocation.info({'id': invocation_command_id}) try: assert result['status'] == status - except AssertionError: + except AssertionError as err: raise AssertionError( 'host output: {}'.format( ' '.join( @@ -89,7 +89,7 @@ def assert_job_invocation_status(sat, invocation_command_id, client_hostname, st ) ) ) - ) + ) from err class TestRemoteExecution: diff --git a/tests/foreman/cli/test_report.py b/tests/foreman/cli/test_report.py index 94946b88e66..34bae335913 100644 --- a/tests/foreman/cli/test_report.py +++ b/tests/foreman/cli/test_report.py @@ -84,7 +84,7 @@ def test_positive_install_configure_host( :BZ: 2126891, 2026239 """ puppet_infra_host = [session_puppet_enabled_sat, session_puppet_enabled_capsule] - for client, puppet_proxy in zip(content_hosts, puppet_infra_host): + for client, puppet_proxy in zip(content_hosts, puppet_infra_host, strict=True): client.configure_puppet(proxy_hostname=puppet_proxy.hostname) report = session_puppet_enabled_sat.cli.ConfigReport.list( {'search': f'host~{client.hostname},origin=Puppet'} diff --git a/tests/foreman/cli/test_repository.py b/tests/foreman/cli/test_repository.py index eeaf392c0a5..d625c5974d1 100644 --- a/tests/foreman/cli/test_repository.py +++ b/tests/foreman/cli/test_repository.py @@ -269,7 +269,7 @@ def test_positive_create_with_auth_yum_repo(self, repo_options, repo): for key in 'url', 'content-type': assert repo.get(key) == repo_options[key] repo = entities.Repository(id=repo['id']).read() - assert getattr(repo, 'upstream_username') == repo_options['upstream-username'] + assert repo.upstream_username == repo_options['upstream-username'] @pytest.mark.tier1 @pytest.mark.upgrade diff --git a/tests/foreman/cli/test_role.py b/tests/foreman/cli/test_role.py index 11474b42f1e..b3cd83c6d1f 100644 --- a/tests/foreman/cli/test_role.py +++ b/tests/foreman/cli/test_role.py @@ -35,7 +35,9 @@ class TestRole: @pytest.mark.parametrize( ('name', 'new_name'), **parametrized( - list(zip(generate_strings_list(length=10), generate_strings_list(length=10))) + list( + zip(generate_strings_list(length=10), generate_strings_list(length=10), strict=True) + ) ), ) def test_positive_crud_with_name(self, name, new_name, module_target_sat): diff --git a/tests/foreman/destructive/test_capsulecontent.py b/tests/foreman/destructive/test_capsulecontent.py index f7a6618ad3e..9def4e5c46b 100644 --- a/tests/foreman/destructive/test_capsulecontent.py +++ b/tests/foreman/destructive/test_capsulecontent.py @@ -71,7 +71,7 @@ def test_positive_sync_without_deadlock( cv = target_sat.publish_content_view(function_entitlement_manifest_org, repo) - for i in range(4): + for _ in range(4): copy_id = target_sat.api.ContentView(id=cv.id).copy(data={'name': gen_alpha()})['id'] copy_cv = target_sat.api.ContentView(id=copy_id).read() copy_cv.publish() diff --git a/tests/foreman/destructive/test_discoveredhost.py b/tests/foreman/destructive/test_discoveredhost.py index 4466747f2ed..cff2fac1f8f 100644 --- a/tests/foreman/destructive/test_discoveredhost.py +++ b/tests/foreman/destructive/test_discoveredhost.py @@ -70,9 +70,9 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): ]: try: dhcp_pxe = _wait_for_log(channel, pattern[0], timeout=10) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_pxe.out) assert len(groups.groups()) == 1, 'Unable to parse bootloader ip address' @@ -87,9 +87,9 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): ]: try: _wait_for_log(channel, pattern[0], timeout=20) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') + raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') from err # assert that server receives DHCP discover from FDI for pattern in [ @@ -101,9 +101,9 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): ]: try: dhcp_fdi = _wait_for_log(channel, pattern[0], timeout=30) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse FDI ip address' fdi_ip = groups.groups()[0] @@ -116,18 +116,18 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): f'"/api/v2/discovered_hosts/facts" for {fdi_ip}', timeout=60, ) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for /facts POST request') + raise AssertionError('Timed out waiting for /facts POST request') from err groups = re.search('\\[I\\|app\\|([a-z0-9]+)\\]', facts_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse POST request UUID' req_id = groups.groups()[0] try: _wait_for_log(channel, f'\\[I\\|app\\|{req_id}\\] Completed 201 Created') - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for "/facts" 201 response') + raise AssertionError('Timed out waiting for "/facts" 201 response') from err default_config = entity_mixins.DEFAULT_SERVER_CONFIG @@ -143,8 +143,10 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): delay=2, logger=logger, ) - except TimedOutError: - raise AssertionError('Timed out waiting for discovered_host to appear on satellite') + except TimedOutError as err: + raise AssertionError( + 'Timed out waiting for discovered_host to appear on satellite' + ) from err discovered_host = sat.api.DiscoveredHost(user_config or default_config).search( query={'search': f'name={host.guest_name}'} ) diff --git a/tests/foreman/destructive/test_ldap_authentication.py b/tests/foreman/destructive/test_ldap_authentication.py index bc35a10bd83..9ce3cc07261 100644 --- a/tests/foreman/destructive/test_ldap_authentication.py +++ b/tests/foreman/destructive/test_ldap_authentication.py @@ -548,7 +548,7 @@ def test_user_permissions_rhsso_user_multiple_group( group_names = ['sat_users', 'sat_admins'] arguments = [{'roles': katello_role.name}, {'admin': 1}] external_auth_source = module_target_sat.cli.ExternalAuthSource.info({'name': "External"}) - for group_name, argument in zip(group_names, arguments): + for group_name, argument in zip(group_names, arguments, strict=True): # adding/creating rhsso groups default_sso_host.create_group(group_name=group_name) default_sso_host.update_rhsso_user(username, group_name=group_name) diff --git a/tests/foreman/destructive/test_remoteexecution.py b/tests/foreman/destructive/test_remoteexecution.py index 23cee12cff1..6bbb3853cdb 100644 --- a/tests/foreman/destructive/test_remoteexecution.py +++ b/tests/foreman/destructive/test_remoteexecution.py @@ -131,14 +131,14 @@ def test_positive_use_alternate_directory( result = target_sat.cli.JobInvocation.info({'id': invocation_command['id']}) try: assert result['success'] == '1' - except AssertionError: + except AssertionError as err: output = ' '.join( target_sat.cli.JobInvocation.get_output( {'id': invocation_command['id'], 'host': client.hostname} ) ) result = f'host output: {output}' - raise AssertionError(result) + raise AssertionError(result) from err task = target_sat.cli.Task.list_tasks({'search': command})[0] search = target_sat.cli.Task.list_tasks({'search': f'id={task["id"]}'}) diff --git a/tests/foreman/longrun/test_oscap.py b/tests/foreman/longrun/test_oscap.py index b5b7a269916..a6490efc7c6 100644 --- a/tests/foreman/longrun/test_oscap.py +++ b/tests/foreman/longrun/test_oscap.py @@ -215,12 +215,12 @@ def test_positive_oscap_run_via_ansible( try: result = target_sat.cli.JobInvocation.info({'id': job_id})['success'] assert result == '1' - except AssertionError: + except AssertionError as err: output = ' '.join( target_sat.cli.JobInvocation.get_output({'id': job_id, 'host': vm.hostname}) ) result = f'host output: {output}' - raise AssertionError(result) + raise AssertionError(result) from err result = vm.run('cat /etc/foreman_scap_client/config.yaml | grep profile') assert result.status == 0 # Runs the actual oscap scan on the vm/clients and @@ -320,12 +320,12 @@ def test_positive_oscap_run_via_ansible_bz_1814988( try: result = target_sat.cli.JobInvocation.info({'id': job_id})['success'] assert result == '1' - except AssertionError: + except AssertionError as err: output = ' '.join( target_sat.cli.JobInvocation.get_output({'id': job_id, 'host': vm.hostname}) ) result = f'host output: {output}' - raise AssertionError(result) + raise AssertionError(result) from err result = vm.run('cat /etc/foreman_scap_client/config.yaml | grep profile') assert result.status == 0 # Runs the actual oscap scan on the vm/clients and diff --git a/tests/foreman/maintain/test_advanced.py b/tests/foreman/maintain/test_advanced.py index 279ce48c4eb..5b105b4c0bb 100644 --- a/tests/foreman/maintain/test_advanced.py +++ b/tests/foreman/maintain/test_advanced.py @@ -27,8 +27,10 @@ def get_satellite_capsule_repos( - x_y_release=sat_x_y_release, product='satellite', os_major_ver=get_sat_rhel_version().major + x_y_release=sat_x_y_release, product='satellite', os_major_ver=None ): + if os_major_ver is None: + os_major_ver = get_sat_rhel_version().major if product == 'capsule': product = 'satellite-capsule' repos = [ diff --git a/tests/foreman/maintain/test_health.py b/tests/foreman/maintain/test_health.py index d1ce8ccd694..7eceff2f992 100644 --- a/tests/foreman/maintain/test_health.py +++ b/tests/foreman/maintain/test_health.py @@ -213,7 +213,7 @@ def test_negative_health_check_upstream_repository(sat_maintain, request): assert result.status == 0 assert 'System has upstream foreman_repo,puppet_repo repositories enabled' in result.stdout assert 'FAIL' in result.stdout - for name in upstream_url.keys(): + for name in upstream_url: result = sat_maintain.execute(f'cat /etc/yum.repos.d/{name}.repo') if name == 'fedorapeople_repo': assert 'enabled=1' in result.stdout @@ -222,7 +222,7 @@ def test_negative_health_check_upstream_repository(sat_maintain, request): @request.addfinalizer def _finalize(): - for name, url in upstream_url.items(): + for name in upstream_url: sat_maintain.execute(f'rm -fr /etc/yum.repos.d/{name}.repo') sat_maintain.execute('dnf clean all') diff --git a/tests/foreman/ui/test_computeresource_vmware.py b/tests/foreman/ui/test_computeresource_vmware.py index df02d618383..66bf85252c0 100644 --- a/tests/foreman/ui/test_computeresource_vmware.py +++ b/tests/foreman/ui/test_computeresource_vmware.py @@ -294,8 +294,8 @@ def test_positive_resource_vm_power_management(session): timeout=30, delay=2, ) - except TimedOutError: - raise AssertionError('Timed out waiting for VM to toggle power state') + except TimedOutError as err: + raise AssertionError('Timed out waiting for VM to toggle power state') from err @pytest.mark.tier2 diff --git a/tests/foreman/ui/test_contenthost.py b/tests/foreman/ui/test_contenthost.py index 735109cb666..19bf880b0db 100644 --- a/tests/foreman/ui/test_contenthost.py +++ b/tests/foreman/ui/test_contenthost.py @@ -1491,7 +1491,7 @@ def test_syspurpose_attributes_empty(session, default_location, vm_module_stream ] syspurpose_status = details['system_purpose_status'] assert syspurpose_status.lower() == 'not specified' - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spname in DEFAULT_SYSPURPOSE_ATTRIBUTES: assert details[spname] == '' @@ -1529,7 +1529,7 @@ def test_set_syspurpose_attributes_cli(session, default_location, vm_module_stre with session: session.location.select(default_location.name) # Set sypurpose attributes - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.values(): run_remote_command_on_content_host( f'syspurpose set-{spdata[0]} "{spdata[1]}"', vm_module_streams ) @@ -1574,11 +1574,11 @@ def test_unset_syspurpose_attributes_cli(session, default_location, vm_module_st :CaseImportance: High """ # Set sypurpose attributes... - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.values(): run_remote_command_on_content_host( f'syspurpose set-{spdata[0]} "{spdata[1]}"', vm_module_streams ) - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.values(): # ...and unset them. run_remote_command_on_content_host(f'syspurpose unset-{spdata[0]}', vm_module_streams) @@ -1587,7 +1587,7 @@ def test_unset_syspurpose_attributes_cli(session, default_location, vm_module_st details = session.contenthost.read(vm_module_streams.hostname, widget_names='details')[ 'details' ] - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spname in DEFAULT_SYSPURPOSE_ATTRIBUTES: assert details[spname] == '' diff --git a/tests/foreman/ui/test_dashboard.py b/tests/foreman/ui/test_dashboard.py index 672203ed7eb..44c09e217a1 100644 --- a/tests/foreman/ui/test_dashboard.py +++ b/tests/foreman/ui/test_dashboard.py @@ -86,7 +86,7 @@ def test_positive_host_configuration_status(session): else: assert dashboard_values['status_list'][criteria] == 0 - for criteria, search in zip(criteria_list, search_strings_list): + for criteria, search in zip(criteria_list, search_strings_list, strict=True): if criteria == 'Hosts with no reports': session.dashboard.action({'HostConfigurationStatus': {'status_list': criteria}}) values = session.host.read_all() diff --git a/tests/foreman/ui/test_host.py b/tests/foreman/ui/test_host.py index 46e32a62a45..8d776446167 100644 --- a/tests/foreman/ui/test_host.py +++ b/tests/foreman/ui/test_host.py @@ -803,7 +803,7 @@ def test_positive_search_by_parameter_with_different_values( for host in hosts: assert session.host.search(host.name)[0]['Name'] == host.name # Check that search by parameter returns only one host in the list - for param_value, host in zip(param_values, hosts): + for param_value, host in zip(param_values, hosts, strict=True): values = session.host.search(f'params.{param_name} = {param_value}') assert len(values) == 1 assert values[0]['Name'] == host.name @@ -1582,7 +1582,7 @@ def test_global_registration_token_restriction( for curl_cmd in (curl_users, curl_hosts): result = client.execute(curl_cmd) assert result.status == 0 - 'Unable to authenticate user' in result.stdout + assert 'Unable to authenticate user' in result.stdout @pytest.mark.tier4 diff --git a/tests/foreman/ui/test_hostcollection.py b/tests/foreman/ui/test_hostcollection.py index 2c60125c293..e01be3e49a4 100644 --- a/tests/foreman/ui/test_hostcollection.py +++ b/tests/foreman/ui/test_hostcollection.py @@ -705,9 +705,10 @@ def test_negative_hosts_limit( session.hostcollection.associate_host(hc_name, hosts[0].name) with pytest.raises(AssertionError) as context: session.hostcollection.associate_host(hc_name, hosts[1].name) - assert "cannot have more than 1 host(s) associated with host collection '{}'".format( - hc_name - ) in str(context.value) + assert ( + f"cannot have more than 1 host(s) associated with host collection '{hc_name}'" + in str(context.value) + ) @pytest.mark.tier3 diff --git a/tests/foreman/ui/test_sync.py b/tests/foreman/ui/test_sync.py index 30ccce8c95f..041a36a011d 100644 --- a/tests/foreman/ui/test_sync.py +++ b/tests/foreman/ui/test_sync.py @@ -80,7 +80,7 @@ def test_positive_sync_rh_repos(session, target_sat, module_entitlement_manifest distros = ['rhel6', 'rhel7'] repo_collections = [ target_sat.cli_factory.RepositoryCollection(distro=distro, repositories=[repo]) - for distro, repo in zip(distros, repos) + for distro, repo in zip(distros, repos, strict=True) ] for repo_collection in repo_collections: repo_collection.setup(module_entitlement_manifest_org.id, synchronize=False) diff --git a/tests/robottelo/test_func_shared.py b/tests/robottelo/test_func_shared.py index 7cc635e1aa0..4e70550a08d 100644 --- a/tests/robottelo/test_func_shared.py +++ b/tests/robottelo/test_func_shared.py @@ -538,7 +538,7 @@ def test_function_kw_scope(self): """ prefixes = [f'pre_{i}' for i in range(10)] suffixes = [f'suf_{i}' for i in range(10)] - for prefix, suffix in zip(prefixes, suffixes): + for prefix, suffix in zip(prefixes, suffixes, strict=True): counter_value = gen_integer(min_value=2, max_value=10000) inc_string = basic_shared_counter_string( prefix=prefix, suffix=suffix, counter=counter_value diff --git a/tests/robottelo/test_report.py b/tests/robottelo/test_report.py index e5f2090b21a..acdc41c6ad4 100644 --- a/tests/robottelo/test_report.py +++ b/tests/robottelo/test_report.py @@ -44,11 +44,13 @@ def test_junit_timestamps(exec_test, property_level): prop = [prop] try: assert 'start_time' in [p['@name'] for p in prop] - except KeyError as e: - raise AssertionError(f'Missing property node: "start_time": {e}') + except KeyError as err: + raise AssertionError(f'Missing property node: "start_time": {err}') from err try: for p in prop: if p['@name'] == 'start_time': datetime.datetime.strptime(p['@value'], XUNIT_TIME_FORMAT) - except ValueError as e: - raise AssertionError(f'Unable to parse datetime for "start_time" property node: {e}') + except ValueError as err: + raise AssertionError( + f'Unable to parse datetime for "start_time" property node: {err}' + ) from err