Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test case for capsule content counts granularity #16697

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 212 additions & 0 deletions tests/foreman/cli/test_capsulecontent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from robottelo.constants import (
CONTAINER_REGISTRY_HUB,
CONTAINER_UPSTREAM_NAME,
FAKE_0_CUSTOM_PACKAGE_NAME,
PULP_EXPORT_DIR,
)
from robottelo.constants.repos import ANSIBLE_GALAXY, CUSTOM_FILE_REPO
Expand Down Expand Up @@ -124,6 +125,32 @@ def module_capsule_artifact_cleanup(
)


def _dictionarize_counts(data):
"""From CLI content counts usually come as a dict with several nested dicts using numbers as
keys which is not practical for assertions. This helper refactors the dict with entity names
for keys and grabs just the content counts.
"""
refactored_data = {}

for lce_value in data['lifecycle-environments'].values():
lce_name = lce_value['name']
cvs_dict = {}

for cv_value in lce_value['content-views'].values():
cv_name = cv_value['name']['name']
repo_dict = {}

for repo_value in cv_value['repositories'].values():
repo_name = repo_value['repository-name']
repo_dict[repo_name] = repo_value['content-counts']

cvs_dict[cv_name] = repo_dict

refactored_data[lce_name] = cvs_dict

return refactored_data


@pytest.mark.parametrize(
'repos_collection',
[
Expand Down Expand Up @@ -309,6 +336,191 @@ def test_positive_update_counts(target_sat, module_capsule_configured):
)


@pytest.mark.parametrize('setting_update', ['automatic_content_count_updates=True'], indirect=True)
def test_positive_content_counts_granularity(
request,
module_target_sat,
module_capsule_configured,
setting_update,
function_org,
function_product,
):
"""Verify the Capsule content counts are updated separately for each content view when it is
promoted to the Capsule's LCEs.

:id: 14f9dbba-6d12-4faa-b33f-7ba1f328f8b6

:parametrized: yes

:verifies: SAT-28337

:setup:
1. Satellite with registered external Capsule.

:steps:
1. Create two LCEs, assign them to the Capsule.
2. Create and sync a repo, publish it in two CVs, promote both CVs to both LCEs.
3. Ensure full counts were calculated for both CVs, both LCEs.
4. Disable automatic content counts update.
5. Create a filter for both CVs to change the counts of next version.
6. Publish new version of both CVs and promote to both LCEs.
After capsule sync completes we should have changes in all publictions.
7. Ensure the counts were invalidated in both CVs, both LCEs.
8. Enable automatic content counts update.
9. Publish new version of the first CV, promote it to the first LCE.
10. Ensure counts of the first CV were updated in the first LCE only, nothing else.
11. Promote the first CV to the second LCE.
12. Ensure counts for the first CV were updated in the second LCE, no changes for second CV.

:expectedresults:
1. Content counts are updated only for particular CVs promoted to particular LCEs.
"""
wait_query = (
'label = Actions::Katello::ContentView::CapsuleSync OR '
'Actions::Katello::CapsuleContent::UpdateContentCounts '
'AND started_at >= "{}"'
)

# Create two LCEs, assign them to the Capsule.
lce1 = module_target_sat.api.LifecycleEnvironment(organization=function_org).create()
lce2 = module_target_sat.api.LifecycleEnvironment(
organization=function_org, prior=lce1
).create()
module_capsule_configured.nailgun_capsule.content_add_lifecycle_environment(
data={'environment_id': [lce1.id, lce2.id]}
)

# Create and sync a repo, publish it in two CVs, promote both CVs to both LCEs.
repo = module_target_sat.api.Repository(
product=function_product,
).create()
repo.sync()
repo = repo.read()
cvs = []
for _ in range(2):
cv = module_target_sat.api.ContentView(
organization=function_org, repository=[repo]
).create()
cv.publish()
cv = cv.read()
cv.version[0].promote(data={'environment_ids': [lce1.id, lce2.id]})
cvs.append(cv.read())
cv1, cv2 = cvs
module_target_sat.wait_for_tasks(
search_query=wait_query.format(datetime.utcnow().replace(microsecond=0)),
search_rate=5,
max_tries=5,
)

# Ensure full counts were calculated for both CVs, both LCEs.
info = module_target_sat.cli.Capsule.content_info(
{'id': module_capsule_configured.nailgun_capsule.id, 'organization-id': function_org.id}
)
info = _dictionarize_counts(info)

for lce in [lce1, lce2]:
for cv in cvs:
assert int(info[lce.name][cv.name][repo.name]['packages']) == repo.content_counts['rpm']
assert (
int(info[lce.name][cv.name][repo.name]['package-groups'])
== repo.content_counts['package_group']
)
assert (
int(info[lce.name][cv.name][repo.name]['errata']) == repo.content_counts['erratum']
)

# Disable automatic content counts update.
setting_update.value = False
setting_update = setting_update.update({'value'})
ogajduse marked this conversation as resolved.
Show resolved Hide resolved

# Create a filter for both CVs to change the counts of next version.
for cv in cvs:
cvf = module_target_sat.cli_factory.make_content_view_filter(
{
'content-view-id': cv.id,
'inclusion': 'true',
'type': 'rpm',
},
)
module_target_sat.cli_factory.content_view_filter_rule(
{
'content-view-filter-id': cvf['filter-id'],
'name': FAKE_0_CUSTOM_PACKAGE_NAME,
}
)

# Publish new version of both CVs and promote to both LCEs.
# After capsule sync completes we should have changes in all publictions.
for cv in cvs:
cv.publish()
cv = cv.read()
cv.version[0].promote(data={'environment_ids': [lce1.id, lce2.id]})
cv = cv.read()
module_target_sat.wait_for_tasks(
search_query=wait_query.format(datetime.utcnow().replace(microsecond=0)),
search_rate=5,
max_tries=5,
)

# Ensure the counts were invalidated in both CVs, both LCEs.
info = module_target_sat.cli.Capsule.content_info(
{'id': module_capsule_configured.nailgun_capsule.id, 'organization-id': function_org.id}
)
info = _dictionarize_counts(info)
assert all(info[lce.name][cv.name][repo.name] == {} for cv in cvs for lce in [lce1, lce2])

# Enable automatic content counts update.
setting_update.value = True
setting_update = setting_update.update({'value'})

# Publish new version of the first CV, promote it to the first LCE.
cv1.publish()
cv1 = cv1.read()
cv1.version[0].promote(data={'environment_ids': lce1.id})
cv1 = cv1.read()
module_target_sat.wait_for_tasks(
search_query=wait_query.format(datetime.utcnow().replace(microsecond=0)),
search_rate=5,
max_tries=5,
)

# Ensure counts of the first CV were updated in the first LCE only, nothing else.
info = module_target_sat.cli.Capsule.content_info(
{'id': module_capsule_configured.nailgun_capsule.id, 'organization-id': function_org.id}
)
info = _dictionarize_counts(info)

assert int(info[lce1.name][cv1.name][repo.name]['packages']) == 1
assert int(info[lce1.name][cv1.name][repo.name]['package-groups']) == 1
assert int(info[lce1.name][cv1.name][repo.name]['errata']) == 1
assert all(
info[lce.name][cv.name][repo.name] == {}
for cv in cvs
for lce in [lce1, lce2]
if (cv.name != cv1.name or lce.name != lce1.name)
)

# Promote the first CV to the second LCE.
cv1.version[0].promote(data={'environment_ids': lce2.id})
cv1 = cv1.read()
module_target_sat.wait_for_tasks(
search_query=wait_query.format(datetime.utcnow().replace(microsecond=0)),
search_rate=5,
max_tries=5,
)

# Ensure counts for the first CV were updated in the second LCE, no changes for second CV.
info = module_target_sat.cli.Capsule.content_info(
{'id': module_capsule_configured.nailgun_capsule.id, 'organization-id': function_org.id}
)
info = _dictionarize_counts(info)

assert int(info[lce2.name][cv1.name][repo.name]['packages']) == 1
assert int(info[lce2.name][cv1.name][repo.name]['package-groups']) == 1
assert int(info[lce2.name][cv1.name][repo.name]['errata']) == 1
assert all(info[lce.name][cv2.name][repo.name] == {} for lce in [lce1, lce2])


@pytest.mark.tier4
@pytest.mark.skip_if_not_set('capsule')
def test_positive_exported_imported_content_sync(
Expand Down
Loading