diff --git a/pulp_rpm/tests/functional/api/test_rbac_crud.py b/pulp_rpm/tests/functional/api/test_rbac_crud.py index 7771e3542..0c608307c 100644 --- a/pulp_rpm/tests/functional/api/test_rbac_crud.py +++ b/pulp_rpm/tests/functional/api/test_rbac_crud.py @@ -1,4 +1,5 @@ import pytest +import uuid from pulpcore.client.pulp_rpm import RpmRepositorySyncURL, RpmRpmPublication, RpmRpmDistribution from pulpcore.client.pulp_rpm.exceptions import ApiException @@ -10,12 +11,9 @@ RPM_FIXTURE_SUMMARY, ) -from pulp_smash.pulp3.bindings import monitor_task -from pulp_smash.pulp3.utils import gen_repo -from pulp_smash.utils import uuid4 - -def test_rbac_repositories(gen_user, rpm_repository_api): +@pytest.mark.parallel +def test_rbac_repositories(gen_user, rpm_repository_factory, rpm_repository_api, monitor_task): """ Test creation of repository with user with permissions and without. """ @@ -27,26 +25,26 @@ def test_rbac_repositories(gen_user, rpm_repository_api): # Create repository with user_creator: - repo = rpm_repository_api.create(gen_repo()) + repo = rpm_repository_factory() assert rpm_repository_api.read(repo.pulp_href) with user_viewer, pytest.raises(ApiException): - rpm_repository_api.create(gen_repo()) + rpm_repository_factory() with user_no, pytest.raises(ApiException): - rpm_repository_api.create(gen_repo()) + rpm_repository_factory() # List & retrieve repository with user_creator: - assert rpm_repository_api.list().count == 1 + assert rpm_repository_api.list(name=repo.name).count == 1 assert repo.pulp_href == rpm_repository_api.read(repo.pulp_href).pulp_href with user_viewer: - assert rpm_repository_api.list().count == 1 + assert rpm_repository_api.list(name=repo.name).count == 1 assert repo.pulp_href == rpm_repository_api.read(repo.pulp_href).pulp_href with user_no, pytest.raises(ApiException) as exc: - assert rpm_repository_api.list().count == 0 + assert rpm_repository_api.list(name=repo.name).count == 0 rpm_repository_api.read(repo.pulp_href) assert exc.value.status == 404 @@ -86,10 +84,13 @@ def test_rbac_repositories(gen_user, rpm_repository_api): with user_creator: response = rpm_repository_api.delete(repo.pulp_href) monitor_task(response.task) - assert rpm_repository_api.list().count == 0 + assert rpm_repository_api.list(name=repo.name).count == 0 -def test_rbac_remotes_and_sync(gen_user, rpm_rpmremote_api, rpm_repository_api): +@pytest.mark.parallel +def test_rbac_remotes_and_sync( + gen_user, rpm_rpmremote_api, rpm_repository_api, rpm_repository_factory, monitor_task +): """ Test creation of remotes with user with permissions and without. @@ -118,7 +119,7 @@ def test_rbac_remotes_and_sync(gen_user, rpm_rpmremote_api, rpm_repository_api): with user_creator: remote = rpm_rpmremote_api.create(remote_data) - assert rpm_rpmremote_api.list().count == 1 + assert rpm_rpmremote_api.list(name=remote.name).count == 1 # Update remote_data_update = gen_rpm_remote(RPM_UNSIGNED_FIXTURE_URL) @@ -133,26 +134,28 @@ def test_rbac_remotes_and_sync(gen_user, rpm_rpmremote_api, rpm_repository_api): assert exc.value.status == 403 with user_creator: - rpm_rpmremote_api.update(remote.pulp_href, remote_data_update) + res = rpm_rpmremote_api.update(remote.pulp_href, remote_data_update) + monitor_task(res.task) remote = rpm_rpmremote_api.read(remote.pulp_href) - assert rpm_rpmremote_api.list().count == 1 - assert rpm_rpmremote_api.list().results[0].url == RPM_UNSIGNED_FIXTURE_URL + remotes = rpm_rpmremote_api.list(name=remote_data_update["name"]).results + assert len(remotes) == 1 + assert remotes[0].url == RPM_UNSIGNED_FIXTURE_URL # Sync the remote with user_no, pytest.raises(ApiException) as exc: - repo = rpm_repository_api.create(gen_repo(remote=remote.pulp_href)) + repo = rpm_repository_factory(remote=remote.pulp_href) sync_url = RpmRepositorySyncURL(remote=remote.pulp_href) rpm_repository_api.sync(repo.pulp_href, sync_url) assert exc.value.status == 403 with user_viewer, pytest.raises(ApiException) as exc: - repo = rpm_repository_api.create(gen_repo(remote=remote.pulp_href)) + repo = rpm_repository_factory(remote=remote.pulp_href) sync_url = RpmRepositorySyncURL(remote=remote.pulp_href) rpm_repository_api.sync(repo.pulp_href, sync_url) assert exc.value.status == 403 with user_creator: - repo = rpm_repository_api.create(gen_repo(remote=remote.pulp_href)) + repo = rpm_repository_factory(remote=remote.pulp_href) sync_url = RpmRepositorySyncURL(remote=remote.pulp_href) sync_res = rpm_repository_api.sync(repo.pulp_href, sync_url) monitor_task(sync_res.task) @@ -173,11 +176,12 @@ def test_rbac_remotes_and_sync(gen_user, rpm_rpmremote_api, rpm_repository_api): monitor_task(response.task) response = rpm_repository_api.delete(repo.pulp_href) monitor_task(response.task) - assert rpm_rpmremote_api.list().count == 0 - assert rpm_repository_api.list().count == 0 + assert rpm_rpmremote_api.list(name=remote.name).count == 0 + assert rpm_repository_api.list(name=repo.name).count == 0 -def test_rbac_acs(gen_user, rpm_acs_api, rpm_rpmremote_api): +@pytest.mark.parallel +def test_rbac_acs(gen_user, rpm_acs_api, rpm_rpmremote_api, monitor_task): """Test RPM ACS CRUD.""" user_creator = gen_user( model_roles=[ @@ -202,7 +206,7 @@ def test_rbac_acs(gen_user, rpm_acs_api, rpm_rpmremote_api): remote = rpm_rpmremote_api.create(remote_data) acs_data = { - "name": uuid4(), + "name": str(uuid.uuid4()), "remote": remote.pulp_href, } @@ -217,7 +221,7 @@ def test_rbac_acs(gen_user, rpm_acs_api, rpm_rpmremote_api): with user_creator: acs = rpm_acs_api.create(acs_data) - assert rpm_acs_api.list().count == 1 + assert rpm_acs_api.list(name=acs.name).count == 1 # Update & Read with user_no, pytest.raises(ApiException) as exc: @@ -235,7 +239,7 @@ def test_rbac_acs(gen_user, rpm_acs_api, rpm_rpmremote_api): acs_to_update.paths[0] = "files/" response = rpm_acs_api.update(acs_to_update.pulp_href, acs_to_update) monitor_task(response.task) - assert rpm_acs_api.list().count == 1 + assert rpm_acs_api.list(name=acs.name).count == 1 assert "files/" in rpm_acs_api.read(acs.pulp_href).paths # Remove @@ -250,10 +254,18 @@ def test_rbac_acs(gen_user, rpm_acs_api, rpm_rpmremote_api): with user_creator: rpm_acs_api.delete(acs.pulp_href) rpm_rpmremote_api.delete(remote.pulp_href) - assert rpm_acs_api.list().count == 0 + assert rpm_acs_api.list(name=acs.name).count == 0 -def test_rbac_publication(gen_user, rpm_rpmremote_api, rpm_repository_api, rpm_publication_api): +@pytest.mark.parallel +def test_rbac_publication( + gen_user, + rpm_rpmremote_api, + rpm_repository_api, + rpm_repository_factory, + rpm_publication_api, + monitor_task, +): """Test RPM publication CRUD.""" user_creator = gen_user( model_roles=[ @@ -279,7 +291,7 @@ def test_rbac_publication(gen_user, rpm_rpmremote_api, rpm_repository_api, rpm_p publication = None remote_data = gen_rpm_remote(RPM_UNSIGNED_FIXTURE_URL) remote = rpm_rpmremote_api.create(remote_data) - repo = rpm_repository_api.create(gen_repo()) + repo = rpm_repository_factory() sync_url = RpmRepositorySyncURL(remote=remote.pulp_href) sync_res = rpm_repository_api.sync(repo.pulp_href, sync_url) monitor_task(sync_res.task) @@ -291,7 +303,7 @@ def test_rbac_publication(gen_user, rpm_rpmremote_api, rpm_repository_api, rpm_p publish_response = rpm_publication_api.create(publish_data) created_resources = monitor_task(publish_response.task).created_resources publication = rpm_publication_api.read(created_resources[0]) - assert rpm_publication_api.list().count == 1 + assert rpm_publication_api.list(repository=repository.pulp_href).count == 1 with user_viewer, pytest.raises(ApiException) as exc: publish_data = RpmRpmPublication(repository=repo.pulp_href) @@ -318,11 +330,19 @@ def test_rbac_publication(gen_user, rpm_rpmremote_api, rpm_repository_api, rpm_p monitor_task(res.task) res = rpm_rpmremote_api.delete(remote.pulp_href) monitor_task(res.task) - assert rpm_publication_api.list().count == 0 + publications = rpm_publication_api.list().results + assert not any(p.repository != repository.pulp_href for p in publications) +@pytest.mark.parallel def test_rbac_distribution( - gen_user, rpm_repository_api, rpm_rpmremote_api, rpm_publication_api, rpm_distribution_api + gen_user, + rpm_repository_api, + rpm_repository_factory, + rpm_rpmremote_api, + rpm_publication_api, + rpm_distribution_api, + monitor_task, ): """Test RPM distribution CRUD.""" user_creator = gen_user( @@ -352,7 +372,7 @@ def test_rbac_distribution( distribution = None remote_data = gen_rpm_remote(RPM_UNSIGNED_FIXTURE_URL) remote = rpm_rpmremote_api.create(remote_data) - repo = rpm_repository_api.create(gen_repo()) + repo = rpm_repository_factory() sync_url = RpmRepositorySyncURL(remote=remote.pulp_href) sync_res = rpm_repository_api.sync(repo.pulp_href, sync_url) monitor_task(sync_res.task) @@ -363,7 +383,7 @@ def test_rbac_distribution( # Create dist_data = RpmRpmDistribution( - name=uuid4(), publication=publication.pulp_href, base_path=uuid4() + name=str(uuid.uuid4()), publication=publication.pulp_href, base_path=str(uuid.uuid4()) ) with user_no, pytest.raises(ApiException) as exc: rpm_distribution_api.create(dist_data) @@ -376,11 +396,11 @@ def test_rbac_distribution( with user_creator: res = rpm_distribution_api.create(dist_data) distribution = rpm_distribution_api.read(monitor_task(res.task).created_resources[0]) - assert rpm_distribution_api.list().count == 1 + assert rpm_distribution_api.list(name=distribution.name).count == 1 # Update dist_data_to_update = rpm_distribution_api.read(distribution.pulp_href) - new_name = uuid4() + new_name = str(uuid.uuid4()) dist_data_to_update.name = new_name with user_no, pytest.raises(ApiException) as exc: @@ -394,8 +414,7 @@ def test_rbac_distribution( with user_creator: res = rpm_distribution_api.update(distribution.pulp_href, dist_data_to_update) monitor_task(res.task) - assert rpm_distribution_api.list().count == 1 - assert new_name in rpm_distribution_api.list().results[0].name + assert rpm_distribution_api.list(name=new_name).count == 1 # Remove with user_no, pytest.raises(ApiException) as exc: @@ -416,11 +435,12 @@ def test_rbac_distribution( res = rpm_rpmremote_api.delete(remote.pulp_href) monitor_task(res.task) - assert rpm_distribution_api.list().count == 0 - assert rpm_repository_api.list().count == 0 - assert rpm_rpmremote_api.list().count == 0 + assert rpm_distribution_api.list(name=distribution.name).count == 0 + assert rpm_repository_api.list(name=repo.name).count == 0 + assert rpm_rpmremote_api.list(name=remote.name).count == 0 +@pytest.mark.parallel def test_rbac_content_scoping( gen_user, rpm_advisory_api, @@ -429,7 +449,9 @@ def test_rbac_content_scoping( rpm_package_groups_api, rpm_package_lang_packs_api, rpm_repository_api, + rpm_repository_factory, rpm_rpmremote_api, + monitor_task, ): """ Test creation of remotes with user with permissions and without. @@ -455,39 +477,51 @@ def test_rbac_content_scoping( # Sync the remote with user_creator: - repo = rpm_repository_api.create(gen_repo(remote=remote.pulp_href)) + repo = rpm_repository_factory(remote=remote.pulp_href) sync_url = RpmRepositorySyncURL(remote=remote.pulp_href) sync_res = rpm_repository_api.sync(repo.pulp_href, sync_url) monitor_task(sync_res.task) repo = rpm_repository_api.read(repo.pulp_href) assert repo.latest_version_href.endswith("/1/") + def _assert_listed_content(): + packages_count = rpm_package_api.list(repository_version=repo.latest_version_href).count + assert RPM_FIXTURE_SUMMARY["rpm.package"] == packages_count + + advisories_count = rpm_advisory_api.list(repository_version=repo.latest_version_href).count + assert RPM_FIXTURE_SUMMARY["rpm.advisory"] == advisories_count + + package_categories_count = rpm_package_category_api.list( + repository_version=repo.latest_version_href + ).count + assert RPM_FIXTURE_SUMMARY["rpm.packagecategory"] == package_categories_count + + package_groups_count = rpm_package_groups_api.list( + repository_version=repo.latest_version_href + ).count + assert RPM_FIXTURE_SUMMARY["rpm.packagegroup"] == package_groups_count + + package_lang_packs_count = rpm_package_lang_packs_api.list( + repository_version=repo.latest_version_href + ).count + assert RPM_FIXTURE_SUMMARY["rpm.packagelangpacks"] == package_lang_packs_count + # Test content visibility # TODO: modules with user_creator: - assert RPM_FIXTURE_SUMMARY["rpm.package"] == rpm_package_api.list().count - assert RPM_FIXTURE_SUMMARY["rpm.advisory"] == rpm_advisory_api.list().count - assert RPM_FIXTURE_SUMMARY["rpm.packagecategory"] == rpm_package_category_api.list().count - assert RPM_FIXTURE_SUMMARY["rpm.packagegroup"] == rpm_package_groups_api.list().count - assert ( - RPM_FIXTURE_SUMMARY["rpm.packagelangpacks"] == rpm_package_lang_packs_api.list().count - ) + _assert_listed_content() with user_viewer: - assert RPM_FIXTURE_SUMMARY["rpm.package"] == rpm_package_api.list().count - assert RPM_FIXTURE_SUMMARY["rpm.advisory"] == rpm_advisory_api.list().count - assert RPM_FIXTURE_SUMMARY["rpm.packagecategory"] == rpm_package_category_api.list().count - assert RPM_FIXTURE_SUMMARY["rpm.packagegroup"] == rpm_package_groups_api.list().count - assert ( - RPM_FIXTURE_SUMMARY["rpm.packagelangpacks"] == rpm_package_lang_packs_api.list().count - ) + _assert_listed_content() with user_no: - assert 0 == rpm_package_api.list().count - assert 0 == rpm_advisory_api.list().count - assert 0 == rpm_package_category_api.list().count - assert 0 == rpm_package_groups_api.list().count - assert 0 == rpm_package_lang_packs_api.list().count + assert 0 == rpm_package_api.list(repository_version=repo.latest_version_href).count + assert 0 == rpm_advisory_api.list(repository_version=repo.latest_version_href).count + assert 0 == rpm_package_category_api.list(repository_version=repo.latest_version_href).count + assert 0 == rpm_package_groups_api.list(repository_version=repo.latest_version_href).count + assert ( + 0 == rpm_package_lang_packs_api.list(repository_version=repo.latest_version_href).count + ) # Remove with user_creator: @@ -495,5 +529,5 @@ def test_rbac_content_scoping( monitor_task(response.task) response = rpm_repository_api.delete(repo.pulp_href) monitor_task(response.task) - assert rpm_rpmremote_api.list().count == 0 - assert rpm_repository_api.list().count == 0 + assert rpm_rpmremote_api.list(name=remote.name).count == 0 + assert rpm_repository_api.list(name=repo.name).count == 0