Skip to content

Commit

Permalink
Incremental update errata coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
damoore044 committed Feb 6, 2024
1 parent 2d62072 commit 46fca98
Showing 1 changed file with 304 additions and 16 deletions.
320 changes: 304 additions & 16 deletions tests/foreman/api/test_errata.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from robottelo.config import settings
from robottelo.constants import (
DEFAULT_ARCHITECTURE,
DEFAULT_CV,
FAKE_1_CUSTOM_PACKAGE,
FAKE_2_CUSTOM_PACKAGE,
FAKE_2_CUSTOM_PACKAGE_NAME,
Expand Down Expand Up @@ -132,6 +133,11 @@ def _validate_errata_counts(host, errata_type, expected_value, timeout=120):
)


def _fetch_library_environment_for_org(sat, org):
search = {'search': f'name="Library" and organization_id={org.id}'}
return sat.api.LifecycleEnvironment().search(query=search)[0].read()


def _fetch_available_errata(host, expected_amount=None, timeout=120):
"""Fetch available errata for host."""
errata = host.errata()
Expand Down Expand Up @@ -414,40 +420,72 @@ def package_applicability_changed_as_expected(
return True


def cv_publish_promote(sat, org, cv, lce=None, needs_publish=True):
def cv_publish_promote(sat, org, cv, lce=None, needs_publish=True, force=False):
"""Publish & promote Content View Version with all content visible in org.
:param lce: if None, default to 'Library',
pass a single instance of lce, or list of instances.
pass a single environment :id or instance.
Or pass a list of environment :ids or isntances.
:param bool needs_publish: if False, skip publish of a new version
:return dictionary:
'content-view': instance of updated cv
'content-view-version': instance of newest cv version
"""
# Default to 'Library' lce, if None passed
# Take a single instance of lce, or list of instances
lce_ids = 'Library'
# Take a single lce id, of list of ids
# or Take a single instance of lce, or list of instances
lce_library = _fetch_library_environment_for_org(sat, org)
lce_ids = [lce_library.id]
if lce is not None:
if not isinstance(lce, list):
lce_ids = [lce.id]
# a list was not passed, just single lce or id
if isinstance(lce, int):
lce_ids = [lce]
else:
# will fail here if the single lce is not int or has no id
lce_ids = [lce.id]
else:
lce_ids = [_lce.id for _lce in lce]

# a list of ids or instances was passed
if isinstance(lce[0], int):
lce_ids = lce
else:
# will fail here if list of lces is not int or has no id
lce_ids = [env.id for env in lce]

for _id in lce_ids:
# entries in list of ids now just be int
assert isinstance(_id, int)

# multiple ids in list, sort:
if len(lce_ids) > 1:
lce_ids = sorted(lce_ids)

# Publish by default, or skip and use latest existing version
if needs_publish is True:
_publish_and_wait(sat, org, cv)
# Content-view must have at least one published version
cv = sat.api.ContentView(id=cv.id).read()
assert cv.version, f'No version(s) are published to the Content-View: {cv.id}'
# Find highest version id, will be the latest
cvv_id = max(cvv.id for cvv in cv.version)
# Promote to lifecycle-environment(s)
if lce_ids == 'Library':
library_lce = cv.environment[0].read()
# Promote to lifecycle-environment(s):
# when we promote to other environments, 'Library'/:id should not be passed,
# as it will be promoted to automatically.
if len(lce_ids) > 1 and lce_library.id in lce_ids:
# remove library.id from list if list contains more than just library
lce_ids.remove(lce_library.id)
if lce is None or (len(lce_ids) == 1 and lce_library.id in lce_ids):
# only Library in list, or lce passed was None, promote only to Library,
# promoting out of order may require force to bypass
lce_library = lce_library.read()
sat.api.ContentViewVersion(id=cvv_id).promote(
data={'environment_ids': library_lce.id, 'force': 'True'}
data={'environment_ids': [lce_library.id], 'force': force}
)
else:
sat.api.ContentViewVersion(id=cvv_id).promote(data={'environment_ids': lce_ids})
# promote to any environment ids remaining in list
sat.api.ContentViewVersion(id=cvv_id).promote(
data={'environment_ids': lce_ids, 'force': force}
)
_result = {
'content-view': sat.api.ContentView(id=cv.id).read(),
'content-view-version': sat.api.ContentViewVersion(id=cvv_id).read(),
Expand All @@ -458,18 +496,18 @@ def cv_publish_promote(sat, org, cv, lce=None, needs_publish=True):
return _result


def _publish_and_wait(sat, org, cv):
def _publish_and_wait(sat, org, cv, search_rate=5, max_tries=12):
"""Publish a new version of content-view to organization, wait for task(s) completion."""
task_id = sat.api.ContentView(id=cv.id).publish({'id': cv.id, 'organization': org})['id']
assert task_id, f'No task was invoked to publish the Content-View: {cv.id}.'
# Should take < 1 minute, check in 5s intervals
sat.wait_for_tasks(
search_query=(f'label = Actions::Katello::ContentView::Publish and id = {task_id}'),
search_rate=5,
max_tries=12,
search_rate=search_rate,
max_tries=max_tries,
), (
f'Failed to publish the Content-View: {cv.id}, in time.'
f'Task: {task_id} failed, or timed out (60s).'
f'Task: {task_id} failed, or timed out ({search_rate*max_tries}s).'
)


Expand Down Expand Up @@ -1430,3 +1468,253 @@ def rh_repo_module_manifest(module_sca_manifest_org, module_target_sat):
rh_repo = module_target_sat.api.Repository(id=rh_repo_id).read()
rh_repo.sync()
return rh_repo


@pytest.mark.tier3
def test_positive_incremental_update_apply_to_envs_cvs(
target_sat,
module_sca_manifest_org,
rhel8_contenthost,
module_product,
):
"""With multiple environments and content views, register a host to one,
apply a CV filter to the content-view, and query available incremental update(s).
Then, execute the available update with security errata, inspect the environment(s) and
content-view with the new incremental version. Check the errata and packages available on host.
:id: ce8bd9ed-8fbc-40f2-8e58-e9b520fe94a3
:Setup:
1. Security Errata synced on satellite server from custom repo.
2. Multiple content-views promoted to multiple environments.
3. Register a RHEL host to the content-view with activation key.
4. Install outdated packages, some applicable to the erratum and some not.
:Steps:
1. Add an inclusive Erratum filter to the host content-view
2. POST /api/hosts/bulk/available_incremental_updates
3. POST /katello/api/content_view_versions/incremental_update
:expectedresults:
1. Incremental update is available to expected content-view in
expected environment(s), applicable to expected host.
2. A new content-view incremental version is created and promoted,
the applicable errata are then available to the host.
3. We can install packages and see updated applicable errata within the
incremental version of the content-view.
"""
# any existing custom CVs in org, except Default CV
prior_cv_count = (
len(target_sat.api.ContentView(organization=module_sca_manifest_org).search()) - 1
)
# Number to be Created: new LCE's for org, new CV's per LCE.
number_of_lces = 3 # Does not include 'Library'
number_of_cvs = 3 # Does not include 'Default Content View'
lce_library = _fetch_library_environment_for_org(target_sat, module_sca_manifest_org)
lce_list = [lce_library]
# custom repo with errata
custom_repo = target_sat.api.Repository(
product=module_product, content_type='yum', url=CUSTOM_REPO_URL
).create()
custom_repo.sync()

# create multiple linked environments
for n in range(number_of_lces):
new_lce = target_sat.api.LifecycleEnvironment(
organization=module_sca_manifest_org,
prior=lce_list[n],
).create()
lce_list.append(new_lce)
assert len(lce_list) == number_of_lces + 1
# collect default CV for org
default_cv = (
target_sat.api.ContentView(
organization=module_sca_manifest_org,
name=DEFAULT_CV,
)
.search()[0]
.read()
)
cv_list = list([default_cv])
# for each environment including 'Library'
for _lce in lce_list:
# create some new CVs with some content
for _i in range(number_of_cvs):
new_cv = target_sat.api.ContentView(
organization=module_sca_manifest_org,
repository=[custom_repo],
).create()
# lces to be promoted to, omit newer than _lce in loop
env_ids = sorted([lce.id for lce in lce_list if lce.id <= _lce.id])
# when the only lce to publish to is Library, pass None to default
if len(env_ids) == 1 and env_ids[0] == lce_library.id:
env_ids = None
# we may initially promote out of order, use force to bypass
new_cv = cv_publish_promote(
target_sat,
module_sca_manifest_org,
new_cv,
lce=env_ids,
force=True,
)['content-view']
cv_list.append(new_cv)

# total amount of CVs created matches expected and search results
assert len(cv_list) == 1 + (number_of_cvs * (number_of_lces + 1))
assert prior_cv_count + len(cv_list) == len(
target_sat.api.ContentView(organization=module_sca_manifest_org).search()
)
# one ak with newest CV and lce
host_lce = lce_list[-1].read()
host_cv = cv_list[-1].read()
ak = target_sat.api.ActivationKey(
organization=module_sca_manifest_org,
environment=host_lce,
content_view=host_cv,
).create()
# content host, global registration
result = rhel8_contenthost.register(
org=module_sca_manifest_org,
activation_keys=ak.name,
target=target_sat,
loc=None,
)
assert result.status == 0, f'Failed to register the host: {rhel8_contenthost.hostname}'
assert rhel8_contenthost.subscribed
rhel8_contenthost.execute(r'subscription-manager repos --enable \*')
# Installing all outdated packages
pkgs = ' '.join(FAKE_9_YUM_OUTDATED_PACKAGES)
assert rhel8_contenthost.execute(f'yum install -y {pkgs}').status == 0
rhel8_contenthost.execute('subscription-manager repos')
# After installing packages, check available incremental updates
host = rhel8_contenthost.nailgun_host.read()
response = target_sat.api.Host().bulk_available_incremental_updates(
data={
'organization_id': module_sca_manifest_org.id,
'included': {'ids': [host.id]},
'errata_ids': FAKE_9_YUM_SECURITY_ERRATUM,
},
)
# expecting no available updates before CV change
assert (
response == []
), f'No incremental updates should currently be available to host: {rhel8_contenthost.hostname}.'

# New Erratum CV filter created for host view
target_sat.api.ErratumContentViewFilter(content_view=host_cv, inclusion=True).create()
host_cv = target_sat.api.ContentView(id=host_cv.id).read()
lce_ids = sorted([lce.id for lce in lce_list])
# publish version with filter and promote
host_cvv = cv_publish_promote(
target_sat,
module_sca_manifest_org,
host_cv,
lce_ids,
)['content-view-version']

# cv is not updated to host yet, applicable errata should be zero
rhel8_contenthost.execute('subscription-manager repos')
host_app_errata = rhel8_contenthost.applicable_errata_count
assert host_app_errata == 0
# After adding filter to cv, check available incremental updates
host_app_packages = rhel8_contenthost.applicable_package_count
response = target_sat.api.Host().bulk_available_incremental_updates(
data={
'organization_id': module_sca_manifest_org.id,
'included': {'ids': [host.id]},
'errata_ids': FAKE_9_YUM_SECURITY_ERRATUM,
},
)
assert (
response
), f'Expected one incremental update, but found none, for host: {rhel8_contenthost.hostname}.'
# find that only expected CV version has incremental update available
assert (
len(response) == 1
), f'Incremental update should currently be available to only one host: {rhel8_contenthost.hostname}.'
next_version = float(response[0]['next_version'])
assert float(host_cvv.version) + 0.1 == next_version # example: 2.0 > 2.1
assert response[0]['content_view_version']['id'] == host_cvv.id
assert response[0]['content_view_version']['content_view']['id'] == host_cv.id

# Perform Incremental Update with host cv version
host_cvv = target_sat.api.ContentViewVersion(id=host_cvv.id).read()
# Apply incremental update adding the applicable security erratum
response = target_sat.api.ContentViewVersion().incremental_update(
data={
'content_view_version_environments': [
{
'content_view_version_id': host_cvv.id,
'environment_ids': [host_lce.id],
}
],
'add_content': {'errata_ids': FAKE_9_YUM_SECURITY_ERRATUM},
}
)
assert response['result'] == 'success'
assert (
response['action']
== 'Incremental Update of 1 Content View Version(s) with 12 Package(s), and 3 Errata'
)

# only the hosts's CV was modified, new version made, check output details
assert len(response['output']['changed_content']) == 1
created_version_id = response['output']['changed_content'][0]['content_view_version']['id']
# host source CV version was changed to the new one
host_version_id = host.read().get_values()['content_facet_attributes'][
'content_view_version_id'
]
assert host_version_id == created_version_id
# get newest version by highest id
version_ids = sorted([ver.id for ver in host_cv.read().version])
assert created_version_id == version_ids[-1]
# latest sat and host version matches incremental one
host_version_number = float(
host.read().get_values()['content_facet_attributes']['content_view_version']
)
assert host_version_number == next_version
host_cvv = target_sat.api.ContentViewVersion(id=created_version_id).read()
assert float(host_cvv.version) == next_version
rhel8_contenthost.execute('subscription-manager repos')
# expected errata from FAKE_9 Security list added
added_errata = response['output']['changed_content'][0]['added_units']['erratum']
assert set(added_errata) == set(FAKE_9_YUM_SECURITY_ERRATUM)
# applicable errata count increased by length of security ids list
assert rhel8_contenthost.applicable_errata_count == host_app_errata + len(
FAKE_9_YUM_SECURITY_ERRATUM
)
# newly added errata from incremental version are now applicable to host
post_app_errata_ids = errata_id_set(
_fetch_available_errata_instances(target_sat, rhel8_contenthost)
)
assert set(FAKE_9_YUM_SECURITY_ERRATUM).issubset(post_app_errata_ids)
# expected packages from the security erratum were added to host
added_packages = response['output']['changed_content'][0]['added_units']['rpm']
assert 12 == len(added_packages)
# expected that not all of the added packages will be applicable
assert 8 == host_app_packages == rhel8_contenthost.applicable_package_count
# install all of the newly added packages, recalculate applicability
for pkg in added_packages:
assert rhel8_contenthost.run(f'yum install -y {pkg}').status == 0
rhel8_contenthost.execute('subscription-manager repos')
# security errata should not be applicable after installing updated packages
post_app_errata_ids = errata_id_set(
_fetch_available_errata_instances(target_sat, rhel8_contenthost)
)
assert set(FAKE_9_YUM_SECURITY_ERRATUM).isdisjoint(post_app_errata_ids)
assert rhel8_contenthost.applicable_errata_count == 0

# after applying the incremental update, check for any more available
response = target_sat.api.Host().bulk_available_incremental_updates(
data={
'organization_id': module_sca_manifest_org.id,
'included': {'ids': [host.id]},
'errata_ids': FAKE_9_YUM_SECURITY_ERRATUM,
},
)
# expect no remaining updates, after applying the only one
assert (
response == []
), f'No incremental updates should currently be available to host: {rhel8_contenthost.hostname}.'

0 comments on commit 46fca98

Please sign in to comment.