Skip to content

Commit

Permalink
Check bulk generate applicability task
Browse files Browse the repository at this point in the history
  • Loading branch information
damoore044 committed Nov 8, 2023
1 parent 365db43 commit effe759
Showing 1 changed file with 67 additions and 40 deletions.
107 changes: 67 additions & 40 deletions tests/foreman/ui/test_errata.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,27 @@ def _generate_errata_applicability(hostname):
host.errata_applicability(synchronous=False)


def _install_client_package(client, package, errata_applicability=False):
def _install_client_package(client, package):
"""Install a package in virtual machine client.
Errata applicability regenerated after successful yum execution.
:param client: The Virtual machine client.
:param package: the package to install in virtual machine client.
:param errata_applicability: If True, force host to generate errata applicability.
:returns: True if package installed successfully, False otherwise.
"""
result = client.execute(f'yum install -y {package}')
if errata_applicability:
_generate_errata_applicability(client.hostname)
return result.status == 0


def _remove_client_package(client, package, errata_applicability=False):
def _remove_client_package(client, package):
"""Remove a package in virtual machine client.
Errata applicability regenerated after successful yum execution.
:param client: The Virtual machine client.
:param package: the package (full version, or name) to remove from virtual machine client.
:param errata_applicability: If True, force host to generate errata applicability.
:returns: True if a package was removed successfully, False otherwise.
"""
result = client.execute(f'yum remove -y {package}')
if errata_applicability:
_generate_errata_applicability(client.hostname)
result = client.execute(f'yum erase -y {package}')
return result.status == 0


Expand Down Expand Up @@ -178,7 +174,9 @@ def registered_contenthost(
rhel_contenthost,
module_org,
module_lce,
module_cv,
module_target_sat,
request,
repos=[CUSTOM_REPO_URL],
):
"""RHEL ContentHost registered in satellite,
Expand All @@ -187,35 +185,33 @@ def registered_contenthost(
:param repos: list of upstream URLs for custom repositories,
default to CUSTOM_REPO_URL
"""
content_view = module_target_sat.api.ContentView(
organization=module_org,
environment=[module_lce],
).create()

module_cv.environment = [module_lce]
module_cv.update(['environment'])
activation_key = module_target_sat.api.ActivationKey(
organization=module_org,
environment=module_lce,
).create()

custom_products = []
custom_repos = []
for repo_url in repos:
rhel_contenthost.create_custom_repos(custom_repo=repo_url)
# Publishes and promotes a new cvv to lce
# Associate org, ak, cv, with custom repo:
custom_repo_id = module_target_sat.cli_factory.setup_org_for_a_custom_repo(
custom_repo_info = module_target_sat.cli_factory.setup_org_for_a_custom_repo(
{
'url': repo_url,
'organization-id': module_org.id,
'lifecycle-environment-id': module_lce.id,
'activationkey-id': activation_key.id,
'content-view-id': content_view.id,
'content-view-id': module_cv.id,
}
)['repository-id']
custom_repos.append(custom_repo_id)
)
custom_products.append(custom_repo_info['product-id'])
custom_repos.append(custom_repo_info['repository-id'])

result = rhel_contenthost.register(
activation_keys=activation_key.name,
lifecycle_environment=module_lce,
target=module_target_sat,
org=module_org,
loc=None,
Expand All @@ -231,15 +227,18 @@ def registered_contenthost(
len(result['errors']) == 0
), f'Failed to sync custom repository [id: {custom_repo_id}]:\n{str(result["errors"])}'

# TODO: @request.addfinalizer: Cleanup to remove created ak,
# contentview versions, products & repositories.
return rhel_contenthost


@pytest.mark.e2e
@pytest.mark.tier3
@pytest.mark.rhel_ver_match('8')
@pytest.mark.rhel_ver_match('[^6]')
@pytest.mark.no_containers
def test_end_to_end(
session,
request,
module_org,
module_lce,
module_cv,
Expand Down Expand Up @@ -286,19 +285,22 @@ def test_end_to_end(
],
'module_stream_packages': [],
}
# Capture product and repository with the desired content
assert len(product_list := module_target_sat.api.Product(organization=module_org).search()) > 0
_UTC_format = '%Y-%m-%d %H:%M:%S UTC'
# Capture newest product and repository with the desired content
product_list = module_target_sat.api.Product(organization=module_org).search()
assert len(product_list) > 0
product_list.sort(key=lambda product: product.id)
_product = product_list[-1].read()
assert len(_product.repository) > 0
_repository = _product.repository[-1].read()
# Remove custom package if present, then install outdated version
# _product = module_target_sat.api.Product(id=product_id).read()
assert len(_product.repository) == 1
_repository = _product.repository[0].read()
# Remove custom package if present, install outdated version
_remove_client_package(registered_contenthost, FAKE_1_CUSTOM_PACKAGE_NAME)
assert _install_client_package(
registered_contenthost, FAKE_1_CUSTOM_PACKAGE, errata_applicability=True
)
assert _install_client_package(registered_contenthost, FAKE_1_CUSTOM_PACKAGE)
applicable_errata = registered_contenthost.applicable_errata_count
assert (
registered_contenthost.applicable_errata_count == 1
), 'Expected only one applicable errata after setup.'
applicable_errata == 1
), f'Expected 1 applicable errata: {CUSTOM_REPO_ERRATA_ID}, after setup. Got {applicable_errata}'

with session:

Expand All @@ -309,10 +311,10 @@ def test_end_to_end(
assert session.errata.search_content_hosts(
CUSTOM_REPO_ERRATA_ID, registered_contenthost.hostname, environment=module_lce.name
), 'Errata ID not found on registered contenthost or the host lifecycle-environment.'
assert (errata := session.errata.read(CUSTOM_REPO_ERRATA_ID))
# Check all tabs of Errata Details page
errata = session.errata.read(CUSTOM_REPO_ERRATA_ID)
assert errata['repositories']['table'][-1]['Name'] == _repository.name
assert errata['repositories']['table'][-1]['Product'] == _product.name
# Check all tabs of Errata Details page
assert (
not ERRATA_DETAILS.items() - errata['details'].items()
), 'Errata details do not match expected values.'
Expand All @@ -326,37 +328,62 @@ def test_end_to_end(
== ERRATA_PACKAGES['module_stream_packages']
)

# Install Errata, find and assert REX task
# Apply Errata, find REX install task
session.host_new.apply_erratas(
entity_name=registered_contenthost.hostname,
search=f"errata_id == {CUSTOM_REPO_ERRATA_ID}",
)
result = module_target_sat.wait_for_tasks(
results = module_target_sat.wait_for_tasks(
search_query=(
f'"Install errata errata_id == {CUSTOM_REPO_ERRATA_ID}'
f' on {registered_contenthost.hostname}"'
),
search_rate=2,
max_tries=60,
)
task_status = module_target_sat.api.ForemanTask(id=result[0].id).poll()
results.sort(key=lambda res: res.id)
task_status = module_target_sat.api.ForemanTask(id=results[-1].id).poll()
assert (
task_status['result'] == 'success'
), f'Errata Installation task failed:\n{task_status}'
assert (
registered_contenthost.applicable_errata_count == 0
), 'Unexpected applicable errata found after install.'
# UTC timing for install task and session
install_start = datetime.strptime(task_status['started_at'], _UTC_format)
install_end = datetime.strptime(task_status['ended_at'], _UTC_format)
assert (install_end - install_start).total_seconds() <= 60
assert (install_end - datetime_utc_start).total_seconds() <= 600

# Find bulk generate applicability task
results = module_target_sat.wait_for_tasks(
search_query=(
f'"Bulk generate applicability for host {registered_contenthost.hostname}"'
),
search_rate=2,
max_tries=60,
)
results.sort(key=lambda res: res.id)
task_status = module_target_sat.api.ForemanTask(id=results[-1].id).poll()
assert (
task_status['result'] == 'success'
), f'Bulk Generate Errata Applicability task failed:\n{task_status}'
# UTC timing for generate applicability task
bulk_gen_start = datetime.strptime(task_status['started_at'], _UTC_format)
bulk_gen_end = datetime.strptime(task_status['ended_at'], _UTC_format)
assert (bulk_gen_start - install_end).total_seconds() <= 30
assert (bulk_gen_end - bulk_gen_start).total_seconds() <= 60

task_start = datetime.strptime(task_status['started_at'], '%Y-%m-%d %H:%M:%S UTC')
task_end = datetime.strptime(task_status['ended_at'], '%Y-%m-%d %H:%M:%S UTC')
# Checking UTC timing for install task and session
assert (task_end - task_start).total_seconds() <= 60
assert (task_end - datetime_utc_start).total_seconds() <= 600
# Errata should still be visible on satellite, but not on contenthost
assert session.errata.read(CUSTOM_REPO_ERRATA_ID)
assert not session.errata.search_content_hosts(
CUSTOM_REPO_ERRATA_ID, registered_contenthost.hostname, environment=module_lce.name
)
# Check package version was updated on contenthost
_package_version = registered_contenthost.execute(
f'rpm -q {FAKE_1_CUSTOM_PACKAGE_NAME}'
).stdout
assert FAKE_2_CUSTOM_PACKAGE in _package_version


@pytest.mark.tier2
Expand Down

0 comments on commit effe759

Please sign in to comment.