diff --git a/pyproject.toml b/pyproject.toml index 7988aa2a58d..61982c48c08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ target-version = "py311" fixable = ["ALL"] select = [ + "B", # bugbear # "C90", # mccabe "E", # pycodestyle "F", # flake8 diff --git a/robottelo/cli/base.py b/robottelo/cli/base.py index 2d3d7974696..2d3e6dbf850 100644 --- a/robottelo/cli/base.py +++ b/robottelo/cli/base.py @@ -169,15 +169,9 @@ def _get_username_password(cls, username=None, password=None): """ if username is None: - try: - username = getattr(cls, 'foreman_admin_username') - except AttributeError: - username = settings.server.admin_username + username = getattr(cls, 'foreman_admin_username', settings.server.admin_username) if password is None: - try: - password = getattr(cls, 'foreman_admin_password') - except AttributeError: - password = settings.server.admin_password + password = getattr(cls, 'foreman_admin_password', settings.server.admin_password) return (username, password) diff --git a/robottelo/cli/hammer.py b/robottelo/cli/hammer.py index f6ddb11b4f3..661042050e6 100644 --- a/robottelo/cli/hammer.py +++ b/robottelo/cli/hammer.py @@ -42,7 +42,7 @@ def parse_csv(output): # Generate the key names, spaces will be converted to dashes "-" keys = [_normalize(header) for header in next(reader)] # For each entry, create a dict mapping each key with each value - return [dict(zip(keys, values)) for values in reader if len(values) > 0] + return [dict(zip(keys, values, strict=True)) for values in reader if len(values) > 0] def parse_help(output): diff --git a/robottelo/host_helpers/api_factory.py b/robottelo/host_helpers/api_factory.py index 92f40652d34..0fff2579352 100644 --- a/robottelo/host_helpers/api_factory.py +++ b/robottelo/host_helpers/api_factory.py @@ -468,14 +468,14 @@ def create_role_permissions( if entity_permission.name != name: raise self._satellite.api.APIResponseError( 'the returned permission is different from the' - ' requested one "{} != {}"'.format(entity_permission.name, name) + f' requested one "{entity_permission.name} != {name}"' ) permissions_entities.append(entity_permission) else: if not permissions_name: raise ValueError( - 'resource type "{}" empty. You must select at' - ' least one permission'.format(resource_type) + f'resource type "{resource_type}" empty. You must select at' + ' least one permission' ) resource_type_permissions_entities = self._satellite.api.Permission().search( @@ -575,8 +575,8 @@ def satellite_setting(self, key_val: str): setting = self._satellite.api.Setting().search( query={'search': f'name={name.strip()}'} )[0] - except IndexError: - raise KeyError(f'The setting {name} in not available in satellite.') + except IndexError as err: + raise KeyError(f'The setting {name} in not available in satellite.') from err old_value = setting.value setting.value = value.strip() setting.update({'value'}) diff --git a/robottelo/host_helpers/capsule_mixins.py b/robottelo/host_helpers/capsule_mixins.py index 0a4c88ddf6c..f726b921efc 100644 --- a/robottelo/host_helpers/capsule_mixins.py +++ b/robottelo/host_helpers/capsule_mixins.py @@ -61,10 +61,12 @@ def wait_for_tasks( raise AssertionError(f"No task was found using query '{search_query}'") return tasks - def wait_for_sync(self, timeout=600, start_time=datetime.utcnow()): + def wait_for_sync(self, timeout=600, start_time=None): """Wait for capsule sync to finish and assert the sync task succeeded""" # Assert that a task to sync lifecycle environment to the capsule # is started (or finished already) + if start_time is None: + start_time = datetime.utcnow() logger.info(f"Waiting for capsule {self.hostname} sync to finish ...") sync_status = self.nailgun_capsule.content_get_sync() logger.info(f"Active tasks {sync_status['active_sync_tasks']}") diff --git a/robottelo/host_helpers/cli_factory.py b/robottelo/host_helpers/cli_factory.py index 7ff8eb086ef..ba82a80e7ff 100644 --- a/robottelo/host_helpers/cli_factory.py +++ b/robottelo/host_helpers/cli_factory.py @@ -4,7 +4,7 @@ example: my_satellite.cli_factory.make_org() """ import datetime -from functools import lru_cache, partial +from functools import partial import inspect import os from os import chmod @@ -14,6 +14,7 @@ from time import sleep from box import Box +from cachetools import cachedmethod from fauxfactory import ( gen_alpha, gen_alphanumeric, @@ -59,9 +60,9 @@ def create_object(cli_object, options, values=None, credentials=None): 'Failed to create {} with data:\n{}\n{}'.format( cli_object.__name__, pprint.pformat(options, indent=2), err.msg ) - ) + ) from err # Sometimes we get a list with a dictionary and not a dictionary. - if type(result) is list and len(result) > 0: + if isinstance(result, list) and len(result) > 0: result = result[0] return Box(result) @@ -294,7 +295,7 @@ def _evaluate_functions(self, iterable): if not key.startswith('_') } - @lru_cache + @cachedmethod def _find_entity_class(self, entity_name): entity_name = entity_name.replace('_', '').lower() for name, class_obj in self._satellite.cli.__dict__.items(): @@ -394,8 +395,8 @@ def make_product_wait(self, options=None, wait_for=5): product = self._satellite.cli.Product.info( {'name': options.get('name'), 'organization-id': options.get('organization-id')} ) - except CLIReturnCodeError: - raise err + except CLIReturnCodeError as nested_err: + raise nested_err from err if not product: raise err return product @@ -503,7 +504,7 @@ def make_proxy(self, options=None): args['url'] = url return create_object(self._satellite.cli.Proxy, args, options) except CapsuleTunnelError as err: - raise CLIFactoryError(f'Failed to create ssh tunnel: {err}') + raise CLIFactoryError(f'Failed to create ssh tunnel: {err}') from None args['url'] = options['url'] return create_object(self._satellite.cli.Proxy, args, options) @@ -569,7 +570,7 @@ def activationkey_add_subscription_to_repo(self, options=None): except CLIReturnCodeError as err: raise CLIFactoryError( f'Failed to add subscription to activation key\n{err.msg}' - ) + ) from err def setup_org_for_a_custom_repo(self, options=None): """Sets up Org for the given custom repo by: @@ -608,7 +609,7 @@ def setup_org_for_a_custom_repo(self, options=None): try: self._satellite.cli.Repository.synchronize({'id': custom_repo['id']}) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') + raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') from err # Create CV if needed and associate repo with it if options.get('content-view-id') is None: cv_id = self.make_content_view({'organization-id': org_id})['id'] @@ -619,12 +620,14 @@ def setup_org_for_a_custom_repo(self, options=None): {'id': cv_id, 'organization-id': org_id, 'repository-id': custom_repo['id']} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') + raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') from err # Publish a new version of CV try: self._satellite.cli.ContentView.publish({'id': cv_id}) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to publish new version of content view\n{err.msg}') + raise CLIFactoryError( + f'Failed to publish new version of content view\n{err.msg}' + ) from err # Get the version id cv_info = self._satellite.cli.ContentView.info({'id': cv_id}) assert len(cv_info['versions']) > 0 @@ -642,7 +645,9 @@ def setup_org_for_a_custom_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to promote version to next environment\n{err.msg}') + raise CLIFactoryError( + f'Failed to promote version to next environment\n{err.msg}' + ) from err # Create activation key if needed and associate content view with it if options.get('activationkey-id') is None: activationkey_id = self.make_activation_key( @@ -661,7 +666,9 @@ def setup_org_for_a_custom_repo(self, options=None): {'content-view-id': cv_id, 'id': activationkey_id, 'organization-id': org_id} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to associate activation-key with CV\n{err.msg}') + raise CLIFactoryError( + f'Failed to associate activation-key with CV\n{err.msg}' + ) from err # Add custom_product subscription to activation-key, if SCA mode is disabled if self._satellite.is_sca_mode_enabled(org_id) is False: @@ -720,10 +727,13 @@ def _setup_org_for_a_rh_repo(self, options=None): # If manifest does not exist, clone and upload it if len(self._satellite.cli.Subscription.exists({'organization-id': org_id})) == 0: with clone() as manifest: - try: - self._satellite.upload_manifest(org_id, manifest.content) - except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') + self._satellite.put(manifest.path, manifest.name) + try: + self._satellite.cli.Subscription.upload( + {'file': manifest.name, 'organization-id': org_id} + ) + except CLIReturnCodeError as err: + raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') from err # Enable repo from Repository Set try: self._satellite.cli.RepositorySet.enable( @@ -736,7 +746,7 @@ def _setup_org_for_a_rh_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to enable repository set\n{err.msg}') + raise CLIFactoryError(f'Failed to enable repository set\n{err.msg}') from err # Fetch repository info try: rhel_repo = self._satellite.cli.Repository.info( @@ -747,7 +757,7 @@ def _setup_org_for_a_rh_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to fetch repository info\n{err.msg}') + raise CLIFactoryError(f'Failed to fetch repository info\n{err.msg}') from err # Synchronize the RH repository try: self._satellite.cli.Repository.synchronize( @@ -758,7 +768,7 @@ def _setup_org_for_a_rh_repo(self, options=None): } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') + raise CLIFactoryError(f'Failed to synchronize repository\n{err.msg}') from err # Create CV if needed and associate repo with it if options.get('content-view-id') is None: cv_id = self.make_content_view({'organization-id': org_id})['id'] @@ -769,24 +779,28 @@ def _setup_org_for_a_rh_repo(self, options=None): {'id': cv_id, 'organization-id': org_id, 'repository-id': rhel_repo['id']} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') + raise CLIFactoryError(f'Failed to add repository to content view\n{err.msg}') from err # Publish a new version of CV try: self._satellite.cli.ContentView.publish({'id': cv_id}) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to publish new version of content view\n{err.msg}') + raise CLIFactoryError( + f'Failed to publish new version of content view\n{err.msg}' + ) from err # Get the version id try: cvv = self._satellite.cli.ContentView.info({'id': cv_id})['versions'][-1] except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to fetch content view info\n{err.msg}') + raise CLIFactoryError(f'Failed to fetch content view info\n{err.msg}') from err # Promote version1 to next env try: self._satellite.cli.ContentView.version_promote( {'id': cvv['id'], 'organization-id': org_id, 'to-lifecycle-environment-id': env_id} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to promote version to next environment\n{err.msg}') + raise CLIFactoryError( + f'Failed to promote version to next environment\n{err.msg}' + ) from err # Create activation key if needed and associate content view with it if options.get('activationkey-id') is None: activationkey_id = self.make_activation_key( @@ -805,7 +819,9 @@ def _setup_org_for_a_rh_repo(self, options=None): {'id': activationkey_id, 'organization-id': org_id, 'content-view-id': cv_id} ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to associate activation-key with CV\n{err.msg}') + raise CLIFactoryError( + f'Failed to associate activation-key with CV\n{err.msg}' + ) from err # Add default subscription to activation-key, if SCA mode is disabled if self._satellite.is_sca_mode_enabled(org_id) is False: @@ -876,7 +892,7 @@ def setup_org_for_a_rh_repo( } ) except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') + raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') from err # Add default subscription to activation-key, if SCA mode is disabled if self._satellite.is_sca_mode_enabled(result['organization-id']) is False: @@ -1086,7 +1102,7 @@ def setup_cdn_and_custom_repos_content( try: self._satellite.upload_manifest(org_id, interface='CLI') except CLIReturnCodeError as err: - raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') + raise CLIFactoryError(f'Failed to upload manifest\n{err.msg}') from err custom_product, repos_info = self.setup_cdn_and_custom_repositories( org_id=org_id, repos=repos, download_policy=download_policy diff --git a/robottelo/host_helpers/repository_mixins.py b/robottelo/host_helpers/repository_mixins.py index 968b0ffea4d..aeb06a0c97e 100644 --- a/robottelo/host_helpers/repository_mixins.py +++ b/robottelo/host_helpers/repository_mixins.py @@ -826,7 +826,7 @@ def setup_virtual_machine( patch_os_release_distro = self.os_repo.distro rh_repo_ids = [] if enable_rh_repos: - rh_repo_ids = [getattr(repo, 'rh_repository_id') for repo in self.rh_repos] + rh_repo_ids = [repo.rh_repository_id for repo in self.rh_repos] repo_labels = [] if enable_custom_repos: repo_labels = [ diff --git a/robottelo/host_helpers/satellite_mixins.py b/robottelo/host_helpers/satellite_mixins.py index 751e9607ffc..312ec7e49f4 100644 --- a/robottelo/host_helpers/satellite_mixins.py +++ b/robottelo/host_helpers/satellite_mixins.py @@ -1,10 +1,10 @@ import contextlib -from functools import cache import io import os import random import re +from cachetools import cachedmethod import requests from robottelo.cli.proxy import CapsuleTunnelError @@ -188,7 +188,7 @@ def publish_content_view(self, org, repo_list): :returns: A dictionary containing the details of the published content view. """ - repo = repo_list if type(repo_list) is list else [repo_list] + repo = repo_list if isinstance(repo_list, list) else [repo_list] content_view = self.api.ContentView(organization=org, repository=repo).create() content_view.publish() content_view = content_view.read() @@ -235,9 +235,9 @@ def available_capsule_port(self): :rtype: int """ port_pool_range = settings.fake_capsules.port_range - if type(port_pool_range) is str: + if isinstance(port_pool_range, list): port_pool_range = tuple(port_pool_range.split('-')) - if type(port_pool_range) is tuple and len(port_pool_range) == 2: + if isinstance(port_pool_range, tuple) and len(port_pool_range) == 2: port_pool = range(int(port_pool_range[0]), int(port_pool_range[1])) else: raise TypeError( @@ -263,14 +263,14 @@ def available_capsule_port(self): except ValueError: raise CapsuleTunnelError( f'Failed parsing the port numbers from stdout: {ss_cmd.stdout.splitlines()[:-1]}' - ) + ) from None try: # take the list of available ports and return randomly selected one return random.choice([port for port in port_pool if port not in used_ports]) except IndexError: raise CapsuleTunnelError( 'Failed to create ssh tunnel: No more ports available for mapping' - ) + ) from None @contextlib.contextmanager def default_url_on_new_port(self, oldport, newport): @@ -308,7 +308,7 @@ def validate_pulp_filepath( self, org, dir_path, - file_names=['*.json', '*.tar.gz'], + file_names=('*.json', '*.tar.gz'), ): """Checks the existence of certain files in a pulp dir""" extension_query = ' -o '.join([f'-name "{file}"' for file in file_names]) @@ -358,6 +358,6 @@ def api_factory(self): self._api_factory = APIFactory(self) return self._api_factory - @cache + @cachedmethod def ui_factory(self, session): return UIFactory(self, session=session) diff --git a/robottelo/host_helpers/ui_factory.py b/robottelo/host_helpers/ui_factory.py index cb85436051e..df156ad6d6f 100644 --- a/robottelo/host_helpers/ui_factory.py +++ b/robottelo/host_helpers/ui_factory.py @@ -18,12 +18,14 @@ def __init__(self, satellite, session=None): def create_fake_host( self, host, - interface_id=gen_string('alpha'), + interface_id=None, global_parameters=None, host_parameters=None, extra_values=None, new_host_details=False, ): + if interface_id is None: + interface_id = gen_string('alpha') if extra_values is None: extra_values = {} os_name = f'{host.operatingsystem.name} {host.operatingsystem.major}' diff --git a/robottelo/hosts.py b/robottelo/hosts.py index 439b5012a97..ed313eaeede 100644 --- a/robottelo/hosts.py +++ b/robottelo/hosts.py @@ -17,6 +17,7 @@ from box import Box from broker import Broker from broker.hosts import Host +from cachetools import cachedmethod from dynaconf.vendor.box.exceptions import BoxKeyError from fauxfactory import gen_alpha, gen_string from manifester import Manifester @@ -414,11 +415,11 @@ def power_control(self, state=VmState.RUNNING, ensure=True): try: vm_operation = POWER_OPERATIONS.get(state) workflow_name = settings.broker.host_workflows.power_control - except (AttributeError, KeyError): + except (AttributeError, KeyError) as err: raise NotImplementedError( 'No workflow in broker.host_workflows for power control, ' 'or VM operation not supported' - ) + ) from err assert ( # TODO read the kwarg name from settings too? Broker() @@ -525,10 +526,12 @@ def subscription_manager_status(self): def subscription_manager_list(self): return self.execute('subscription-manager list') - def subscription_manager_get_pool(self, sub_list=[]): + def subscription_manager_get_pool(self, sub_list=None): """ Return pool ids for the corresponding subscriptions in the list """ + if sub_list is None: + sub_list = [] pool_ids = [] for sub in sub_list: result = self.execute( @@ -540,10 +543,12 @@ def subscription_manager_get_pool(self, sub_list=[]): pool_ids.append(result) return pool_ids - def subscription_manager_attach_pool(self, pool_list=[]): + def subscription_manager_attach_pool(self, pool_list=None): """ Attach pool ids to the host and return the result """ + if pool_list is None: + pool_list = [] result = [] for pool in pool_list: result.append(self.execute(f'subscription-manager attach --pool={pool}')) @@ -621,6 +626,7 @@ def install_katello_ca(self, satellite): warnings.warn( message='The install_katello_ca method is deprecated, use the register method instead.', category=DeprecationWarning, + stacklevel=2, ) self._satellite = satellite self.execute( @@ -673,6 +679,7 @@ def install_capsule_katello_ca(self, capsule=None): 'use the register method instead.' ), category=DeprecationWarning, + stacklevel=2, ) url = urlunsplit(('http', capsule, 'pub/', '', '')) ca_url = urljoin(url, 'katello-ca-consumer-latest.noarch.rpm') @@ -1444,8 +1451,10 @@ def install_tracer(self): raise ContentHostError('There was an error installing katello-host-tools-tracer') self.execute('katello-tracer-upload') - def register_to_cdn(self, pool_ids=[settings.subscription.rhn_poolid]): + def register_to_cdn(self, pool_ids=None): """Subscribe satellite to CDN""" + if pool_ids is None: + pool_ids = [settings.subscription.rhn_poolid] self.remove_katello_ca() cmd_result = self.register_contenthost( org=None, @@ -2274,7 +2283,7 @@ def get_rhsso_client_id(self): break return client_id - @lru_cache + @cachedmethod def get_rhsso_user_details(self, username): """Getter method to receive the user id""" result = self.execute( @@ -2283,7 +2292,7 @@ def get_rhsso_user_details(self, username): result_json = json.loads(result.stdout) return result_json[0] - @lru_cache + @cachedmethod def get_rhsso_groups_details(self, group_name): """Getter method to receive the group id""" result = self.execute(f"{KEY_CLOAK_CLI} get groups -r {settings.rhsso.realm}") diff --git a/robottelo/utils/decorators/func_locker.py b/robottelo/utils/decorators/func_locker.py index 2d575d8f560..4d82c3add28 100644 --- a/robottelo/utils/decorators/func_locker.py +++ b/robottelo/utils/decorators/func_locker.py @@ -227,8 +227,8 @@ def lock_function( def main_wrapper(func): - setattr(func, '__class_name__', class_name) - setattr(func, '__function_locked__', True) + func.__class_name__ = class_name + func.__function_locked__ = True @functools.wraps(func) def function_wrapper(*args, **kwargs): diff --git a/robottelo/utils/decorators/func_shared/shared.py b/robottelo/utils/decorators/func_shared/shared.py index 62e529b8d8a..072ebc3fb47 100644 --- a/robottelo/utils/decorators/func_shared/shared.py +++ b/robottelo/utils/decorators/func_shared/shared.py @@ -392,8 +392,7 @@ def __call__(self): # if was not able to restore the original exception raise this one raise SharedFunctionException( - 'Error generated by process: {} Exception: {}' - ' error: {}'.format(pid, error_class_name, error) + f'Error generated by process: {pid} Exception: {error_class_name} error: {error}' ) if not call_function and self._inject: diff --git a/robottelo/utils/ohsnap.py b/robottelo/utils/ohsnap.py index 74494766114..89eb6a97e22 100644 --- a/robottelo/utils/ohsnap.py +++ b/robottelo/utils/ohsnap.py @@ -90,7 +90,9 @@ def dogfood_repository( try: repository = next(r for r in res.json() if r['label'] == repo) except StopIteration: - raise RepositoryDataNotFound(f'Repository "{repo}" is not provided by the given product') + raise RepositoryDataNotFound( + f'Repository "{repo}" is not provided by the given product' + ) from None repository['baseurl'] = repository['baseurl'].replace('$basearch', arch) # If repo check is enabled, check that the repository actually exists on the remote server dogfood_req = requests.get(repository['baseurl']) diff --git a/robottelo/utils/virtwho.py b/robottelo/utils/virtwho.py index 8c7a6c48658..aaae57d7c97 100644 --- a/robottelo/utils/virtwho.py +++ b/robottelo/utils/virtwho.py @@ -301,9 +301,9 @@ def get_hypervisor_ahv_mapping(hypervisor_type): # Always check the last json section to get the host_uuid for item in mapping: if 'entities' in item: - for item in item['entities']: - if 'host_uuid' in item: - system_uuid = item['host_uuid'] + for _item in item['entities']: + if 'host_uuid' in _item: + system_uuid = _item['host_uuid'] break message = f"Host UUID {system_uuid} found for VM: {guest_uuid}" for line in logs.split('\n'): @@ -384,8 +384,8 @@ def deploy_configure_by_command_check(command): virtwho_cleanup() try: ret, stdout = runcmd(command) - except Exception: - raise VirtWhoError(f"Failed to deploy configure by {command}") + except Exception as err: + raise VirtWhoError(f"Failed to deploy configure by {command}") from err else: if ret != 0 or 'Finished successfully' not in stdout: raise VirtWhoError(f"Failed to deploy configure by {command}") @@ -410,7 +410,7 @@ def update_configure_option(option, value, config_file): :param value: set the option to the value :param config_file: path of virt-who config file """ - cmd = 'sed -i "s|^{0}.*|{0}={1}|g" {2}'.format(option, value, config_file) + cmd = f'sed -i "s|^{option}.*|{option}={value}|g" {config_file}' ret, output = runcmd(cmd) if ret != 0: raise VirtWhoError(f"Failed to set option {option} value to {value}") @@ -422,7 +422,7 @@ def delete_configure_option(option, config_file): :param option: the option you want to delete :param config_file: path of virt-who config file """ - cmd = 'sed -i "/^{0}/d" {1}; sed -i "/^#{0}/d" {1}'.format(option, config_file) + cmd = f'sed -i "/^{option}/d" {config_file}; sed -i "/^#{option}/d" {config_file}' ret, output = runcmd(cmd) if ret != 0: raise VirtWhoError(f"Failed to delete option {option}") @@ -437,11 +437,11 @@ def add_configure_option(option, value, config_file): """ try: get_configure_option(option, config_file) - except Exception: + except Exception as err: cmd = f'echo -e "\n{option}={value}" >> {config_file}' - ret, output = runcmd(cmd) + ret, _ = runcmd(cmd) if ret != 0: - raise VirtWhoError(f"Failed to add option {option}={value}") + raise VirtWhoError(f"Failed to add option {option}={value}") from err else: raise VirtWhoError(f"option {option} is already exist in {config_file}") @@ -456,9 +456,9 @@ def hypervisor_json_create(hypervisors, guests): :param guests: how many guests will be created """ hypervisors_list = [] - for i in range(hypervisors): + for _ in range(hypervisors): guest_list = [] - for c in range(guests): + for _ in range(guests): guest_list.append( { "guestId": str(uuid.uuid4()), diff --git a/tests/foreman/api/test_acs.py b/tests/foreman/api/test_acs.py index 8294f363668..7821db07fc5 100644 --- a/tests/foreman/api/test_acs.py +++ b/tests/foreman/api/test_acs.py @@ -123,7 +123,7 @@ def test_positive_run_bulk_actions(module_target_sat, module_yum_repo): 2. Only the proper ACSes are deleted on bulk destroy. """ acs_ids = [] - for i in range(3): + for _ in range(3): acs = module_target_sat.api.AlternateContentSource( name=gen_string('alpha'), alternate_content_source_type='simplified', diff --git a/tests/foreman/api/test_ansible.py b/tests/foreman/api/test_ansible.py index eca139145d2..730e469b0e0 100644 --- a/tests/foreman/api/test_ansible.py +++ b/tests/foreman/api/test_ansible.py @@ -240,10 +240,14 @@ def test_add_and_remove_ansible_role_hostgroup(target_sat): for role in ROLE_NAMES ] target_sat.api.HostGroup(id=hg.id).assign_ansible_roles(data={'ansible_role_ids': ROLES[:2]}) - for r1, r2 in zip(target_sat.api.HostGroup(id=hg.id).list_ansible_roles(), ROLE_NAMES[:2]): + for r1, r2 in zip( + target_sat.api.HostGroup(id=hg.id).list_ansible_roles(), ROLE_NAMES[:2], strict=True + ): assert r1['name'] == r2 target_sat.api.HostGroup(id=hg.id).add_ansible_role(data={'ansible_role_id': ROLES[2]}) - for r1, r2 in zip(target_sat.api.HostGroup(id=hg.id).list_ansible_roles(), ROLE_NAMES): + for r1, r2 in zip( + target_sat.api.HostGroup(id=hg.id).list_ansible_roles(), ROLE_NAMES, strict=True + ): assert r1['name'] == r2 for role in ROLES: diff --git a/tests/foreman/api/test_discoveredhost.py b/tests/foreman/api/test_discoveredhost.py index 5ecc02e1e54..df8444c951b 100644 --- a/tests/foreman/api/test_discoveredhost.py +++ b/tests/foreman/api/test_discoveredhost.py @@ -67,9 +67,9 @@ def _assert_discovered_host(host, channel=None, user_config=None): ]: try: dhcp_pxe = _wait_for_log(channel, pattern[0], timeout=10) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_pxe.out) assert len(groups.groups()) == 1, 'Unable to parse bootloader ip address' pxe_ip = groups.groups()[0] @@ -82,9 +82,9 @@ def _assert_discovered_host(host, channel=None, user_config=None): ]: try: _wait_for_log(channel, pattern[0], timeout=20) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') + raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') from err # assert that server receives DHCP discover from FDI for pattern in [ ( @@ -95,9 +95,9 @@ def _assert_discovered_host(host, channel=None, user_config=None): ]: try: dhcp_fdi = _wait_for_log(channel, pattern[0], timeout=30) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse FDI ip address' fdi_ip = groups.groups()[0] @@ -109,17 +109,17 @@ def _assert_discovered_host(host, channel=None, user_config=None): f'"/api/v2/discovered_hosts/facts" for {fdi_ip}', timeout=60, ) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for /facts POST request') + raise AssertionError('Timed out waiting for /facts POST request') from err groups = re.search('\\[I\\|app\\|([a-z0-9]+)\\]', facts_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse POST request UUID' req_id = groups.groups()[0] try: _wait_for_log(channel, f'\\[I\\|app\\|{req_id}\\] Completed 201 Created') - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for "/facts" 201 response') + raise AssertionError('Timed out waiting for "/facts" 201 response') from err default_config = entity_mixins.DEFAULT_SERVER_CONFIG try: wait_for( @@ -133,8 +133,10 @@ def _assert_discovered_host(host, channel=None, user_config=None): delay=2, logger=logger, ) - except TimedOutError: - raise AssertionError('Timed out waiting for discovered_host to appear on satellite') + except TimedOutError as err: + raise AssertionError( + 'Timed out waiting for discovered_host to appear on satellite' + ) from err discovered_host = host.api.DiscoveredHost(user_config or default_config).search( query={'search': f'name={host.guest_name}'} ) @@ -148,8 +150,8 @@ def assert_discovered_host_provisioned(channel, ksrepo): try: log = _wait_for_log(channel, pattern, timeout=300, delay=10) assert pattern in log - except TimedOutError: - raise AssertionError(f'Timed out waiting for {pattern} from VM') + except TimedOutError as err: + raise AssertionError(f'Timed out waiting for {pattern} from VM') from err @pytest.fixture @@ -429,7 +431,7 @@ def test_positive_reboot_all_pxe_hosts( host.power_control(ensure=False) mac = host._broker_args['provisioning_nic_mac_addr'] wait_for( - lambda: sat.api.DiscoveredHost().search(query={'mac': mac}) != [], + lambda: sat.api.DiscoveredHost().search(query={'mac': mac}) != [], # noqa: B023 timeout=240, delay=20, ) diff --git a/tests/foreman/api/test_host.py b/tests/foreman/api/test_host.py index 28dcda95179..94730d56925 100644 --- a/tests/foreman/api/test_host.py +++ b/tests/foreman/api/test_host.py @@ -955,7 +955,7 @@ def test_positive_read_content_source_id( 'lifecycle_environment_id': module_lce.id, }, ).create() - content_facet_attributes = getattr(host, 'content_facet_attributes') + content_facet_attributes = host.content_facet_attributes assert content_facet_attributes is not None content_source_id = content_facet_attributes.get('content_source_id') assert content_source_id is not None diff --git a/tests/foreman/api/test_media.py b/tests/foreman/api/test_media.py index b29339448c8..79c5cb7f20e 100644 --- a/tests/foreman/api/test_media.py +++ b/tests/foreman/api/test_media.py @@ -36,7 +36,9 @@ def class_media(self, module_org, class_target_sat): @pytest.mark.upgrade @pytest.mark.parametrize( ('name', 'new_name'), - **parametrized(list(zip(valid_data_list().values(), valid_data_list().values()))) + **parametrized( + list(zip(valid_data_list().values(), valid_data_list().values(), strict=True)) + ) ) def test_positive_crud_with_name(self, module_org, name, new_name, module_target_sat): """Create, update, delete media with valid name only diff --git a/tests/foreman/api/test_notifications.py b/tests/foreman/api/test_notifications.py index c2f1582f7d7..6ab89e9f4f4 100644 --- a/tests/foreman/api/test_notifications.py +++ b/tests/foreman/api/test_notifications.py @@ -134,11 +134,11 @@ def wait_for_mail(sat_obj, mailbox_file, contains_string, timeout=300, delay=5): timeout=timeout, delay=delay, ) - except TimedOutError: + except TimedOutError as err: raise AssertionError( f'No e-mail with text "{contains_string}" has arrived to mailbox {mailbox_file} ' f'after {timeout} seconds.' - ) + ) from err return True diff --git a/tests/foreman/api/test_organization.py b/tests/foreman/api/test_organization.py index 4ec6a3e8502..89988935f98 100644 --- a/tests/foreman/api/test_organization.py +++ b/tests/foreman/api/test_organization.py @@ -225,7 +225,7 @@ def test_positive_update_name(self, module_org, name): :parametrized: yes """ - setattr(module_org, 'name', name) + module_org.name = name module_org = module_org.update(['name']) assert module_org.name == name @@ -242,7 +242,7 @@ def test_positive_update_description(self, module_org, desc): :parametrized: yes """ - setattr(module_org, 'description', desc) + module_org.description = desc module_org = module_org.update(['description']) assert module_org.description == desc diff --git a/tests/foreman/api/test_partitiontable.py b/tests/foreman/api/test_partitiontable.py index 2a9012e1a6b..9b8e2a29d2c 100644 --- a/tests/foreman/api/test_partitiontable.py +++ b/tests/foreman/api/test_partitiontable.py @@ -61,6 +61,7 @@ def test_positive_create_with_one_character_name(self, target_sat, name): zip( generate_strings_list(length=gen_integer(4, 30)), generate_strings_list(length=gen_integer(4, 30)), + strict=True, ) ) ), @@ -90,7 +91,8 @@ def test_positive_crud_with_name(self, target_sat, name, new_name): @pytest.mark.tier1 @pytest.mark.parametrize( - ('layout', 'new_layout'), **parametrized(list(zip(valid_data_list(), valid_data_list()))) + ('layout', 'new_layout'), + **parametrized(list(zip(valid_data_list(), valid_data_list(), strict=True))), ) def test_positive_create_update_with_layout(self, target_sat, layout, new_layout): """Create new and update partition tables using different inputs as a diff --git a/tests/foreman/api/test_provisioning.py b/tests/foreman/api/test_provisioning.py index 481dd84f1ff..47dcc8d54a3 100644 --- a/tests/foreman/api/test_provisioning.py +++ b/tests/foreman/api/test_provisioning.py @@ -55,8 +55,8 @@ def assert_host_logs(channel, pattern): try: log = _wait_for_log(channel, pattern, timeout=300, delay=10) assert pattern in log - except TimedOutError: - raise AssertionError(f'Timed out waiting for {pattern} from VM') + except TimedOutError as err: + raise AssertionError(f'Timed out waiting for {pattern} from VM') from err @pytest.mark.e2e diff --git a/tests/foreman/api/test_repository.py b/tests/foreman/api/test_repository.py index dd7ce7bb3a8..9b908dbada9 100644 --- a/tests/foreman/api/test_repository.py +++ b/tests/foreman/api/test_repository.py @@ -2540,6 +2540,17 @@ def test_positive_create_with_long_token( container_repo = container_repos[0] except IndexError: pytest.skip('No registries with "long_pass" set to true') + + to_clean = [] + + @request.addfinalizer + def clean_repos(): + for repo in to_clean: + try: + repo.delete(synchronous=False) + except Exception: + logger.exception(f'Exception cleaning up docker repo:\n{repo}') + for docker_repo_name in container_repo.repos_to_sync: repo_options = dict( content_type='docker', @@ -2557,13 +2568,7 @@ def test_positive_create_with_long_token( pytest.skip('The "long_pass" registry does not meet length requirement') repo = module_target_sat.api.Repository(**repo_options).create() - - @request.addfinalizer - def clean_repo(): - try: - repo.delete(synchronous=False) - except Exception: - logger.exception('Exception cleaning up docker repo:') + to_clean.append(repo) repo = repo.read() for field in 'name', 'docker_upstream_name', 'content_type', 'upstream_username': @@ -2590,6 +2595,17 @@ def test_positive_tag_whitelist( :expectedresults: multiple products and repos are created """ container_repo = getattr(settings.container_repo.registries, repo_key) + + to_clean = [] + + @request.addfinalizer + def clean_repos(): + for repo in to_clean: + try: + repo.delete(synchronous=False) + except Exception: + logger.exception(f'Exception cleaning up docker repo:\n{repo}') + for docker_repo_name in container_repo.repos_to_sync: repo_options = dict( content_type='docker', @@ -2604,13 +2620,7 @@ def test_positive_tag_whitelist( repo_options['product'] = module_product repo = module_target_sat.api.Repository(**repo_options).create() - - @request.addfinalizer - def clean_repo(): - try: - repo.delete(synchronous=False) - except Exception: - logger.exception('Exception cleaning up docker repo:') + to_clean.append(repo) for field in 'name', 'docker_upstream_name', 'content_type', 'upstream_username': assert getattr(repo, field) == repo_options[field] diff --git a/tests/foreman/api/test_role.py b/tests/foreman/api/test_role.py index 2499d485596..fe6807734e0 100644 --- a/tests/foreman/api/test_role.py +++ b/tests/foreman/api/test_role.py @@ -32,7 +32,7 @@ class TestRole: @pytest.mark.upgrade @pytest.mark.parametrize( ('name', 'new_name'), - **parametrized(list(zip(generate_strings_list(), generate_strings_list()))), + **parametrized(list(zip(generate_strings_list(), generate_strings_list(), strict=True))), ) def test_positive_crud(self, name, new_name, target_sat): """Create, update and delete role with name ``name_generator()``. diff --git a/tests/foreman/api/test_webhook.py b/tests/foreman/api/test_webhook.py index dbe0e57523d..b6cb3036b57 100644 --- a/tests/foreman/api/test_webhook.py +++ b/tests/foreman/api/test_webhook.py @@ -56,8 +56,8 @@ def assert_event_triggered(channel, event): try: log = _wait_for_log(channel, pattern) assert pattern in log - except TimedOutError: - raise AssertionError(f'Timed out waiting for {pattern} from VM') + except TimedOutError as err: + raise AssertionError(f'Timed out waiting for {pattern} from VM') from err class TestWebhook: diff --git a/tests/foreman/cli/test_errata.py b/tests/foreman/cli/test_errata.py index d1975547de5..b08e2882ddb 100644 --- a/tests/foreman/cli/test_errata.py +++ b/tests/foreman/cli/test_errata.py @@ -108,7 +108,7 @@ def products_with_repos(orgs, module_target_sat): """Create and return a list of products. For each product, create and sync a single repo.""" products = [] # Create one product for each org, and a second product for the last org. - for org, params in zip(orgs + orgs[-1:], REPOS_WITH_ERRATA): + for org, params in zip(orgs + orgs[-1:], REPOS_WITH_ERRATA, strict=True): product = module_target_sat.api.Product(organization=org).create() # Replace the organization entity returned by create(), which contains only the id, # with the one we already have. @@ -334,7 +334,7 @@ def check_errata(errata_ids, by_org=False): :param errata_ids: a list containing a list of errata ids for each repo :type errata_ids: list[list] """ - for ids, repo_with_errata in zip(errata_ids, REPOS_WITH_ERRATA): + for ids, repo_with_errata in zip(errata_ids, REPOS_WITH_ERRATA, strict=True): assert len(ids) == repo_with_errata['org_errata_count' if by_org else 'errata_count'] assert repo_with_errata['errata_id'] in ids diff --git a/tests/foreman/cli/test_host.py b/tests/foreman/cli/test_host.py index a3a59102d6b..026168b8f68 100644 --- a/tests/foreman/cli/test_host.py +++ b/tests/foreman/cli/test_host.py @@ -1719,11 +1719,11 @@ def test_positive_erratum_applicability( timeout=300, delay=5, ) - except TimedOutError: + except TimedOutError as err: raise TimedOutError( f"Timed out waiting for erratum \"{setup_custom_repo['security_errata']}\"" " to disappear from the list" - ) + ) from err @pytest.mark.cli_katello_host_tools diff --git a/tests/foreman/cli/test_model.py b/tests/foreman/cli/test_model.py index d06e392b6b7..8fc51be9f62 100644 --- a/tests/foreman/cli/test_model.py +++ b/tests/foreman/cli/test_model.py @@ -35,7 +35,9 @@ def class_model(self, target_sat): @pytest.mark.upgrade @pytest.mark.parametrize( ('name', 'new_name'), - **parametrized(list(zip(valid_data_list().values(), valid_data_list().values()))) + **parametrized( + list(zip(valid_data_list().values(), valid_data_list().values(), strict=True)) + ) ) def test_positive_crud_with_name(self, name, new_name, module_target_sat): """Successfully creates, updates and deletes a Model. diff --git a/tests/foreman/cli/test_partitiontable.py b/tests/foreman/cli/test_partitiontable.py index 352602eef86..35e52cbbe65 100644 --- a/tests/foreman/cli/test_partitiontable.py +++ b/tests/foreman/cli/test_partitiontable.py @@ -50,6 +50,7 @@ def test_positive_create_with_one_character_name(self, name, target_sat): zip( generate_strings_list(length=randint(4, 30)), generate_strings_list(length=randint(4, 30)), + strict=True, ) ) ) diff --git a/tests/foreman/cli/test_ping.py b/tests/foreman/cli/test_ping.py index 69727dd3d0e..0a288143233 100644 --- a/tests/foreman/cli/test_ping.py +++ b/tests/foreman/cli/test_ping.py @@ -47,7 +47,7 @@ def test_positive_ping(target_sat, switch_user): # iterate over the lines grouping every 3 lines # example [1, 2, 3, 4, 5, 6] will return [(1, 2, 3), (4, 5, 6)] # only the status line is relevant for this test - for _, status, _ in zip(*[iter(result.stdout)] * 3): + for _, status, _ in zip(*[iter(result.stdout)] * 3, strict=False): # should this be strict? status_count += 1 if status.split(':')[1].strip().lower() == 'ok': diff --git a/tests/foreman/cli/test_remoteexecution.py b/tests/foreman/cli/test_remoteexecution.py index def7f8688e4..45c2183e9dc 100644 --- a/tests/foreman/cli/test_remoteexecution.py +++ b/tests/foreman/cli/test_remoteexecution.py @@ -75,7 +75,7 @@ def assert_job_invocation_result( result = sat.cli.JobInvocation.info({'id': invocation_command_id}) try: assert result[expected_result] == '1' - except AssertionError: + except AssertionError as err: raise AssertionError( 'host output: {}'.format( ' '.join( @@ -84,7 +84,7 @@ def assert_job_invocation_result( ) ) ) - ) + ) from err def assert_job_invocation_status(sat, invocation_command_id, client_hostname, status): @@ -93,7 +93,7 @@ def assert_job_invocation_status(sat, invocation_command_id, client_hostname, st result = sat.cli.JobInvocation.info({'id': invocation_command_id}) try: assert result['status'] == status - except AssertionError: + except AssertionError as err: raise AssertionError( 'host output: {}'.format( ' '.join( @@ -102,7 +102,7 @@ def assert_job_invocation_status(sat, invocation_command_id, client_hostname, st ) ) ) - ) + ) from err class TestRemoteExecution: diff --git a/tests/foreman/cli/test_report.py b/tests/foreman/cli/test_report.py index 7ef9e7fc9e7..b46c3881a15 100644 --- a/tests/foreman/cli/test_report.py +++ b/tests/foreman/cli/test_report.py @@ -79,7 +79,7 @@ def test_positive_install_configure_host( :BZ: 2126891, 2026239 """ puppet_infra_host = [session_puppet_enabled_sat, session_puppet_enabled_capsule] - for client, puppet_proxy in zip(content_hosts, puppet_infra_host): + for client, puppet_proxy in zip(content_hosts, puppet_infra_host, strict=True): client.configure_puppet(proxy_hostname=puppet_proxy.hostname) report = session_puppet_enabled_sat.cli.ConfigReport.list( {'search': f'host~{client.hostname},origin=Puppet'} diff --git a/tests/foreman/cli/test_repository.py b/tests/foreman/cli/test_repository.py index 184536c02c5..b43da6e4e81 100644 --- a/tests/foreman/cli/test_repository.py +++ b/tests/foreman/cli/test_repository.py @@ -265,7 +265,7 @@ def test_positive_create_with_auth_yum_repo(self, repo_options, repo): for key in 'url', 'content-type': assert repo.get(key) == repo_options[key] repo = entities.Repository(id=repo['id']).read() - assert getattr(repo, 'upstream_username') == repo_options['upstream-username'] + assert repo.upstream_username == repo_options['upstream-username'] @pytest.mark.tier1 @pytest.mark.upgrade diff --git a/tests/foreman/cli/test_role.py b/tests/foreman/cli/test_role.py index 90f12c6ce4f..5eb4e8569f8 100644 --- a/tests/foreman/cli/test_role.py +++ b/tests/foreman/cli/test_role.py @@ -30,7 +30,9 @@ class TestRole: @pytest.mark.parametrize( ('name', 'new_name'), **parametrized( - list(zip(generate_strings_list(length=10), generate_strings_list(length=10))) + list( + zip(generate_strings_list(length=10), generate_strings_list(length=10), strict=True) + ) ), ) def test_positive_crud_with_name(self, name, new_name, module_target_sat): diff --git a/tests/foreman/destructive/test_capsulecontent.py b/tests/foreman/destructive/test_capsulecontent.py index e746b2cc8d5..12f3455c08d 100644 --- a/tests/foreman/destructive/test_capsulecontent.py +++ b/tests/foreman/destructive/test_capsulecontent.py @@ -65,7 +65,7 @@ def test_positive_sync_without_deadlock( cv = target_sat.publish_content_view(function_entitlement_manifest_org, repo) - for i in range(4): + for _ in range(4): copy_id = target_sat.api.ContentView(id=cv.id).copy(data={'name': gen_alpha()})['id'] copy_cv = target_sat.api.ContentView(id=copy_id).read() copy_cv.publish() diff --git a/tests/foreman/destructive/test_discoveredhost.py b/tests/foreman/destructive/test_discoveredhost.py index 79645a1eb79..bcf73a14ed7 100644 --- a/tests/foreman/destructive/test_discoveredhost.py +++ b/tests/foreman/destructive/test_discoveredhost.py @@ -65,9 +65,9 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): ]: try: dhcp_pxe = _wait_for_log(channel, pattern[0], timeout=10) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_pxe.out) assert len(groups.groups()) == 1, 'Unable to parse bootloader ip address' @@ -82,9 +82,9 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): ]: try: _wait_for_log(channel, pattern[0], timeout=20) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') + raise AssertionError(f'Timed out waiting for VM (tftp) to fetch {pattern[1]}') from err # assert that server receives DHCP discover from FDI for pattern in [ @@ -96,9 +96,9 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): ]: try: dhcp_fdi = _wait_for_log(channel, pattern[0], timeout=30) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') + raise AssertionError(f'Timed out waiting for {pattern[1]} from VM') from err groups = re.search('DHCPACK on (\\d.+) to', dhcp_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse FDI ip address' fdi_ip = groups.groups()[0] @@ -111,18 +111,18 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): f'"/api/v2/discovered_hosts/facts" for {fdi_ip}', timeout=60, ) - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for /facts POST request') + raise AssertionError('Timed out waiting for /facts POST request') from err groups = re.search('\\[I\\|app\\|([a-z0-9]+)\\]', facts_fdi.out) assert len(groups.groups()) == 1, 'Unable to parse POST request UUID' req_id = groups.groups()[0] try: _wait_for_log(channel, f'\\[I\\|app\\|{req_id}\\] Completed 201 Created') - except TimedOutError: + except TimedOutError as err: # raise assertion error - raise AssertionError('Timed out waiting for "/facts" 201 response') + raise AssertionError('Timed out waiting for "/facts" 201 response') from err default_config = entity_mixins.DEFAULT_SERVER_CONFIG @@ -138,8 +138,10 @@ def _assert_discovered_host(host, channel=None, user_config=None, sat=None): delay=2, logger=logger, ) - except TimedOutError: - raise AssertionError('Timed out waiting for discovered_host to appear on satellite') + except TimedOutError as err: + raise AssertionError( + 'Timed out waiting for discovered_host to appear on satellite' + ) from err discovered_host = sat.api.DiscoveredHost(user_config or default_config).search( query={'search': f'name={host.guest_name}'} ) diff --git a/tests/foreman/destructive/test_ldap_authentication.py b/tests/foreman/destructive/test_ldap_authentication.py index 89f312fb99c..ecedf803e00 100644 --- a/tests/foreman/destructive/test_ldap_authentication.py +++ b/tests/foreman/destructive/test_ldap_authentication.py @@ -540,7 +540,7 @@ def test_user_permissions_rhsso_user_multiple_group( group_names = ['sat_users', 'sat_admins'] arguments = [{'roles': katello_role.name}, {'admin': 1}] external_auth_source = module_target_sat.cli.ExternalAuthSource.info({'name': "External"}) - for group_name, argument in zip(group_names, arguments): + for group_name, argument in zip(group_names, arguments, strict=True): # adding/creating rhsso groups default_sso_host.create_group(group_name=group_name) default_sso_host.update_rhsso_user(username, group_name=group_name) diff --git a/tests/foreman/destructive/test_remoteexecution.py b/tests/foreman/destructive/test_remoteexecution.py index ce19cc6bb69..c9c6bdb8b22 100644 --- a/tests/foreman/destructive/test_remoteexecution.py +++ b/tests/foreman/destructive/test_remoteexecution.py @@ -126,14 +126,14 @@ def test_positive_use_alternate_directory( result = target_sat.cli.JobInvocation.info({'id': invocation_command['id']}) try: assert result['success'] == '1' - except AssertionError: + except AssertionError as err: output = ' '.join( target_sat.cli.JobInvocation.get_output( {'id': invocation_command['id'], 'host': client.hostname} ) ) result = f'host output: {output}' - raise AssertionError(result) + raise AssertionError(result) from err task = target_sat.cli.Task.list_tasks({'search': command})[0] search = target_sat.cli.Task.list_tasks({'search': f'id={task["id"]}'}) diff --git a/tests/foreman/installer/test_installer.py b/tests/foreman/installer/test_installer.py index f1e00331934..4dc4680905d 100644 --- a/tests/foreman/installer/test_installer.py +++ b/tests/foreman/installer/test_installer.py @@ -1354,9 +1354,10 @@ def install_satellite(satellite, installer_args, enable_fapolicyd=False): snap=settings.server.version.snap, ) if enable_fapolicyd: - satellite.execute( - 'dnf -y install fapolicyd && systemctl enable --now fapolicyd' - ).status == 0 + assert ( + satellite.execute('dnf -y install fapolicyd && systemctl enable --now fapolicyd').status + == 0 + ) satellite.execute('dnf -y module enable satellite:el8 && dnf -y install satellite') if enable_fapolicyd: assert satellite.execute('rpm -q foreman-fapolicyd').status == 0 diff --git a/tests/foreman/longrun/test_oscap.py b/tests/foreman/longrun/test_oscap.py index be5d38f389e..fa3654f8a48 100644 --- a/tests/foreman/longrun/test_oscap.py +++ b/tests/foreman/longrun/test_oscap.py @@ -210,12 +210,12 @@ def test_positive_oscap_run_via_ansible( try: result = target_sat.cli.JobInvocation.info({'id': job_id})['success'] assert result == '1' - except AssertionError: + except AssertionError as err: output = ' '.join( target_sat.cli.JobInvocation.get_output({'id': job_id, 'host': vm.hostname}) ) result = f'host output: {output}' - raise AssertionError(result) + raise AssertionError(result) from err result = vm.run('cat /etc/foreman_scap_client/config.yaml | grep profile') assert result.status == 0 # Runs the actual oscap scan on the vm/clients and @@ -315,12 +315,12 @@ def test_positive_oscap_run_via_ansible_bz_1814988( try: result = target_sat.cli.JobInvocation.info({'id': job_id})['success'] assert result == '1' - except AssertionError: + except AssertionError as err: output = ' '.join( target_sat.cli.JobInvocation.get_output({'id': job_id, 'host': vm.hostname}) ) result = f'host output: {output}' - raise AssertionError(result) + raise AssertionError(result) from err result = vm.run('cat /etc/foreman_scap_client/config.yaml | grep profile') assert result.status == 0 # Runs the actual oscap scan on the vm/clients and diff --git a/tests/foreman/maintain/test_advanced.py b/tests/foreman/maintain/test_advanced.py index b264cb339c1..2771994d568 100644 --- a/tests/foreman/maintain/test_advanced.py +++ b/tests/foreman/maintain/test_advanced.py @@ -22,8 +22,10 @@ def get_satellite_capsule_repos( - x_y_release=sat_x_y_release, product='satellite', os_major_ver=get_sat_rhel_version().major + x_y_release=sat_x_y_release, product='satellite', os_major_ver=None ): + if os_major_ver is None: + os_major_ver = get_sat_rhel_version().major if product == 'capsule': product = 'satellite-capsule' repos = [ diff --git a/tests/foreman/maintain/test_health.py b/tests/foreman/maintain/test_health.py index 4313c372052..e927d85b5a5 100644 --- a/tests/foreman/maintain/test_health.py +++ b/tests/foreman/maintain/test_health.py @@ -209,7 +209,7 @@ def test_negative_health_check_upstream_repository(sat_maintain, request): assert result.status == 0 assert 'System has upstream foreman_repo,puppet_repo repositories enabled' in result.stdout assert 'FAIL' in result.stdout - for name in upstream_url.keys(): + for name in upstream_url: result = sat_maintain.execute(f'cat /etc/yum.repos.d/{name}.repo') if name == 'fedorapeople_repo': assert 'enabled=1' in result.stdout @@ -218,7 +218,7 @@ def test_negative_health_check_upstream_repository(sat_maintain, request): @request.addfinalizer def _finalize(): - for name, url in upstream_url.items(): + for name in upstream_url: sat_maintain.execute(f'rm -fr /etc/yum.repos.d/{name}.repo') sat_maintain.execute('dnf clean all') diff --git a/tests/foreman/ui/test_computeresource_vmware.py b/tests/foreman/ui/test_computeresource_vmware.py index f8a7ac97763..e85a406e141 100644 --- a/tests/foreman/ui/test_computeresource_vmware.py +++ b/tests/foreman/ui/test_computeresource_vmware.py @@ -282,8 +282,8 @@ def test_positive_resource_vm_power_management(session): timeout=30, delay=2, ) - except TimedOutError: - raise AssertionError('Timed out waiting for VM to toggle power state') + except TimedOutError as err: + raise AssertionError('Timed out waiting for VM to toggle power state') from err @pytest.mark.tier2 @@ -554,8 +554,8 @@ def test_positive_virt_card(session, target_sat, module_location, module_org): timeout=30, delay=2, ) - except TimedOutError: - raise AssertionError('Timed out waiting for VM to toggle power state') + except TimedOutError as err: + raise AssertionError('Timed out waiting for VM to toggle power state') from err virt_card = session.host_new.get_virtualization(host_name)['details'] assert virt_card['datacenter'] == settings.vmware.datacenter diff --git a/tests/foreman/ui/test_contenthost.py b/tests/foreman/ui/test_contenthost.py index ec2c6de469c..ffbe571c5b2 100644 --- a/tests/foreman/ui/test_contenthost.py +++ b/tests/foreman/ui/test_contenthost.py @@ -1505,7 +1505,7 @@ def test_syspurpose_attributes_empty(session, default_location, vm_module_stream ] syspurpose_status = details['system_purpose_status'] assert syspurpose_status.lower() == 'not specified' - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spname in DEFAULT_SYSPURPOSE_ATTRIBUTES: assert details[spname] == '' @@ -1541,7 +1541,7 @@ def test_set_syspurpose_attributes_cli(session, default_location, vm_module_stre with session: session.location.select(default_location.name) # Set sypurpose attributes - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.values(): run_remote_command_on_content_host( f'syspurpose set-{spdata[0]} "{spdata[1]}"', vm_module_streams ) @@ -1584,11 +1584,11 @@ def test_unset_syspurpose_attributes_cli(session, default_location, vm_module_st :CaseImportance: High """ # Set sypurpose attributes... - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.values(): run_remote_command_on_content_host( f'syspurpose set-{spdata[0]} "{spdata[1]}"', vm_module_streams ) - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.values(): # ...and unset them. run_remote_command_on_content_host(f'syspurpose unset-{spdata[0]}', vm_module_streams) @@ -1597,7 +1597,7 @@ def test_unset_syspurpose_attributes_cli(session, default_location, vm_module_st details = session.contenthost.read(vm_module_streams.hostname, widget_names='details')[ 'details' ] - for spname, spdata in DEFAULT_SYSPURPOSE_ATTRIBUTES.items(): + for spname in DEFAULT_SYSPURPOSE_ATTRIBUTES: assert details[spname] == '' diff --git a/tests/foreman/ui/test_dashboard.py b/tests/foreman/ui/test_dashboard.py index 0dc1bb0fcc9..b55891475ef 100644 --- a/tests/foreman/ui/test_dashboard.py +++ b/tests/foreman/ui/test_dashboard.py @@ -77,7 +77,7 @@ def test_positive_host_configuration_status(session, target_sat): else: assert dashboard_values['status_list'][criteria] == 0 - for criteria, search in zip(criteria_list, search_strings_list): + for criteria, search in zip(criteria_list, search_strings_list, strict=True): if criteria == 'Hosts with no reports': session.dashboard.action({'HostConfigurationStatus': {'status_list': criteria}}) values = session.host.read_all() diff --git a/tests/foreman/ui/test_errata.py b/tests/foreman/ui/test_errata.py index 7e0a300b32b..d7773533a89 100644 --- a/tests/foreman/ui/test_errata.py +++ b/tests/foreman/ui/test_errata.py @@ -147,7 +147,7 @@ def registered_contenthost( module_cv, module_target_sat, request, - repos=[CUSTOM_REPO_URL], + repos=None, ): """RHEL ContentHost registered in satellite, Using SCA and global registration. @@ -155,6 +155,8 @@ def registered_contenthost( :param repos: list of upstream URLs for custom repositories, default to CUSTOM_REPO_URL """ + if not repos: + repos = [CUSTOM_REPO_URL] activation_key = module_target_sat.api.ActivationKey( organization=module_org, environment=module_lce, diff --git a/tests/foreman/ui/test_host.py b/tests/foreman/ui/test_host.py index 6cbb3a47ffc..65dfb73d688 100644 --- a/tests/foreman/ui/test_host.py +++ b/tests/foreman/ui/test_host.py @@ -785,7 +785,7 @@ def test_positive_search_by_parameter_with_different_values( for host in hosts: assert session.host.search(host.name)[0]['Name'] == host.name # Check that search by parameter returns only one host in the list - for param_value, host in zip(param_values, hosts): + for param_value, host in zip(param_values, hosts, strict=True): values = session.host.search(f'params.{param_name} = {param_value}') assert len(values) == 1 assert values[0]['Name'] == host.name diff --git a/tests/foreman/ui/test_hostcollection.py b/tests/foreman/ui/test_hostcollection.py index a480db53850..546e70faae0 100644 --- a/tests/foreman/ui/test_hostcollection.py +++ b/tests/foreman/ui/test_hostcollection.py @@ -673,9 +673,10 @@ def test_negative_hosts_limit( session.hostcollection.associate_host(hc_name, hosts[0].name) with pytest.raises(AssertionError) as context: session.hostcollection.associate_host(hc_name, hosts[1].name) - assert "cannot have more than 1 host(s) associated with host collection '{}'".format( - hc_name - ) in str(context.value) + assert ( + f"cannot have more than 1 host(s) associated with host collection '{hc_name}'" + in str(context.value) + ) @pytest.mark.tier3 diff --git a/tests/foreman/ui/test_sync.py b/tests/foreman/ui/test_sync.py index b7fdc4eb576..c5ea76e0fa4 100644 --- a/tests/foreman/ui/test_sync.py +++ b/tests/foreman/ui/test_sync.py @@ -73,7 +73,7 @@ def test_positive_sync_rh_repos(session, target_sat, module_entitlement_manifest distros = ['rhel6', 'rhel7'] repo_collections = [ target_sat.cli_factory.RepositoryCollection(distro=distro, repositories=[repo]) - for distro, repo in zip(distros, repos) + for distro, repo in zip(distros, repos, strict=True) ] for repo_collection in repo_collections: repo_collection.setup(module_entitlement_manifest_org.id, synchronize=False) diff --git a/tests/robottelo/test_func_shared.py b/tests/robottelo/test_func_shared.py index 7cc635e1aa0..4e70550a08d 100644 --- a/tests/robottelo/test_func_shared.py +++ b/tests/robottelo/test_func_shared.py @@ -538,7 +538,7 @@ def test_function_kw_scope(self): """ prefixes = [f'pre_{i}' for i in range(10)] suffixes = [f'suf_{i}' for i in range(10)] - for prefix, suffix in zip(prefixes, suffixes): + for prefix, suffix in zip(prefixes, suffixes, strict=True): counter_value = gen_integer(min_value=2, max_value=10000) inc_string = basic_shared_counter_string( prefix=prefix, suffix=suffix, counter=counter_value diff --git a/tests/robottelo/test_report.py b/tests/robottelo/test_report.py index e5f2090b21a..acdc41c6ad4 100644 --- a/tests/robottelo/test_report.py +++ b/tests/robottelo/test_report.py @@ -44,11 +44,13 @@ def test_junit_timestamps(exec_test, property_level): prop = [prop] try: assert 'start_time' in [p['@name'] for p in prop] - except KeyError as e: - raise AssertionError(f'Missing property node: "start_time": {e}') + except KeyError as err: + raise AssertionError(f'Missing property node: "start_time": {err}') from err try: for p in prop: if p['@name'] == 'start_time': datetime.datetime.strptime(p['@value'], XUNIT_TIME_FORMAT) - except ValueError as e: - raise AssertionError(f'Unable to parse datetime for "start_time" property node: {e}') + except ValueError as err: + raise AssertionError( + f'Unable to parse datetime for "start_time" property node: {err}' + ) from err