From 6037cf8ead06cd71df8a7892d1302d50e11e44fe Mon Sep 17 00:00:00 2001 From: Jiri Hnidek Date: Mon, 3 Jun 2024 12:49:04 +0200 Subject: [PATCH] feat: Removed attach command and CLI option related to attach * Card ID: CCT-425 * Removed attach command * Removed --auto-attach and --autosubscribe CLI options of register command * Removed --servicelevel CLI option of register command too, because this CLI option was possible to use only with --auto-attach CLI option. Thus, it does not make sense to keep this option * Removed unit tests for attach command * Modified unit tests for register command. Only SCA mode is considered. --- .../cli_command/attach.py | 306 ------------------ .../cli_command/register.py | 66 +--- src/subscription_manager/managercli.py | 2 - test/cli_command/test_attach.py | 204 ------------ 4 files changed, 2 insertions(+), 576 deletions(-) delete mode 100644 src/subscription_manager/cli_command/attach.py delete mode 100644 test/cli_command/test_attach.py diff --git a/src/subscription_manager/cli_command/attach.py b/src/subscription_manager/cli_command/attach.py deleted file mode 100644 index 1796b164aa..0000000000 --- a/src/subscription_manager/cli_command/attach.py +++ /dev/null @@ -1,306 +0,0 @@ -# -# Subscription manager command line utility. -# -# Copyright (c) 2021 Red Hat, Inc. -# -# This software is licensed to you under the GNU General Public License, -# version 2 (GPLv2). There is NO WARRANTY for this software, express or -# implied, including the implied warranties of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 -# along with this software; if not, see -# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. -# -# Red Hat trademarks are not licensed under GPLv2. No permission is -# granted to use or replicate Red Hat trademarks that are incorporated -# in this software or its documentation. -# -import fileinput -import logging -import os -import re - -import rhsm.connection as connection -import subscription_manager.injection as inj - -from rhsmlib.services import attach, products - -from subscription_manager.action_client import ProfileActionClient, ActionClient -from subscription_manager.cli import system_exit -from subscription_manager.cli_command.cli import CliCommand, handle_exception -from subscription_manager.cli_command.list import show_autosubscribe_output -from subscription_manager.exceptions import ExceptionMapper -from subscription_manager.i18n import ugettext as _ -from subscription_manager.packageprofilelib import PackageProfileActionInvoker -from subscription_manager.syspurposelib import save_sla_to_syspurpose_metadata -from subscription_manager.utils import is_simple_content_access, get_current_owner - -log = logging.getLogger(__name__) - - -class AttachCommand(CliCommand): - def __init__(self): - super(AttachCommand, self).__init__(self._command_name(), self._short_description(), self._primary()) - - self.product = None - self.substoken = None - self.auto_attach = True - self.parser.add_argument( - "--pool", - dest="pool", - action="append", - help=_("The ID of the pool to attach (can be specified more than once)"), - ) - self.parser.add_argument( - "--quantity", - dest="quantity", - type=int, - help=_("Number of subscriptions to attach. May not be used with an auto-attach."), - ) - self.parser.add_argument( - "--auto", - action="store_true", - help=_( - "Automatically attach the best-matched compatible subscriptions to this system. " - "This is the default action." - ), - ) - self.parser.add_argument( - "--servicelevel", - dest="service_level", - help=_( - "Automatically attach only subscriptions matching the specified service level; " - "only used with --auto" - ), - ) - self.parser.add_argument( - "--file", - dest="file", - help=_( - "A file from which to read pool IDs. If a hyphen is provided, pool IDs will be " - "read from stdin." - ), - ) - - # re bz #864207 - _("All installed products are covered by valid entitlements.") - _("No need to update subscriptions at this time.") - - def _read_pool_ids(self, f): - if not self.options.pool: - self.options.pool = [] - - for line in fileinput.input(f): - for pool in filter(bool, re.split(r"\s+", line.strip())): - self.options.pool.append(pool) - - def _short_description(self): - return _( - "Attach a specified subscription to the registered system, when system does not use " - "Simple Content Access mode" - ) - - def _command_name(self): - return "attach" - - def _primary(self): - return True - - def _validate_options(self): - if self.options.pool or self.options.file: - if self.options.auto: - system_exit(os.EX_USAGE, _("Error: --auto may not be used when specifying pools.")) - if self.options.service_level: - system_exit( - os.EX_USAGE, _("Error: The --servicelevel option cannot be used when specifying pools.") - ) - - # Quantity must be positive - if self.options.quantity is not None: - if self.options.quantity <= 0: - system_exit(os.EX_USAGE, _("Error: Quantity must be a positive integer.")) - elif self.options.auto or not (self.options.pool or self.options.file): - system_exit(os.EX_USAGE, _("Error: --quantity may not be used with an auto-attach")) - - # If a pools file was specified, process its contents and append it to options.pool - if self.options.file: - self.options.file = os.path.expanduser(self.options.file) - if self.options.file == "-" or os.path.isfile(self.options.file): - self._read_pool_ids(self.options.file) - - if len(self.options.pool) < 1: - if self.options.file == "-": - system_exit(os.EX_DATAERR, _("Error: Received data does not contain any pool IDs.")) - else: - system_exit( - os.EX_DATAERR, - _('Error: The file "{file}" does not contain any pool IDs.').format( - file=self.options.file - ), - ) - else: - system_exit( - os.EX_DATAERR, - _('Error: The file "{file}" does not exist or cannot be read.').format( - file=self.options.file - ), - ) - - def _print_ignore_attach_message(self): - """ - Print message about ignoring attach request - :return: None - """ - owner = get_current_owner(self.cp, self.identity) - owner_id = owner["key"] - print( - _( - "Ignoring the request to attach. " - 'Attaching subscriptions is disabled for organization "{owner_id}" ' - "because Simple Content Access (SCA) is enabled." - ).format(owner_id=owner_id) - ) - - def _do_command(self): - """ - Executes the command. - """ - self.assert_should_be_registered() - self._validate_options() - - # --pool or --file turns off default auto attach - if self.options.pool or self.options.file: - self.auto_attach = False - - # Do not try to do auto-attach, when simple content access mode is used - # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1826300 - # - if is_simple_content_access(uep=self.cp, identity=self.identity): - if self.auto_attach is True: - self._print_ignore_auto_attach_message() - else: - self._print_ignore_attach_message() - return 0 - - installed_products_num = 0 - return_code = 0 - report = None - - # TODO: change to if self.auto_attach: else: pool/file stuff - try: - cert_action_client = ActionClient(skips=[PackageProfileActionInvoker]) - cert_action_client.update() - cert_update = True - - attach_service = attach.AttachService(self.cp) - if self.options.pool: - subscribed = False - - for pool in self.options.pool: - # odd html strings will cause issues, reject them here. - if pool.find("#") >= 0: - system_exit(os.EX_USAGE, _("Please enter a valid numeric pool ID.")) - - try: - ents = attach_service.attach_pool(pool, self.options.quantity) - # Usually just one, but may as well be safe: - for ent in ents: - pool_json = ent["pool"] - print( - _("Successfully attached a subscription for: {name}").format( - name=pool_json["productName"] - ) - ) - log.debug( - "Attached a subscription for {name}".format(name=pool_json["productName"]) - ) - subscribed = True - except connection.RestlibException as re: - log.exception(re) - - exception_mapper = ExceptionMapper() - mapped_message = exception_mapper.get_message(re) - - if re.code == 403: - print(mapped_message) # already subscribed. - elif re.code == 400 or re.code == 404: - print(mapped_message) # no such pool. - else: - system_exit(os.EX_SOFTWARE, mapped_message) # some other error.. don't try again - if not subscribed: - return_code = 1 - # must be auto - else: - installed_products_num = len(products.InstalledProducts(self.cp).list()) - # if we are green, we don't need to go to the server - self.sorter = inj.require(inj.CERT_SORTER) - - if self.sorter.is_valid(): - if not installed_products_num: - print(_("No Installed products on system. " "No need to attach subscriptions.")) - else: - print( - _( - "All installed products are covered by valid entitlements. " - "No need to update subscriptions at this time." - ) - ) - cert_update = False - else: - # If service level specified, make an additional request to - # verify service levels are supported on the server: - if self.options.service_level: - consumer = self.cp.getConsumer(self.identity.uuid) - if "serviceLevel" not in consumer: - system_exit( - os.EX_UNAVAILABLE, - _( - "Error: The --servicelevel option is not " - "supported by the server. Did not " - "complete your request." - ), - ) - - attach_service.attach_auto(self.options.service_level) - if self.options.service_level is not None: - # RHBZ 1632797 we should only save the sla if the sla was actually - # specified. The uep and consumer_uuid are None, because service_level was sent - # to candlepin server using attach_service.attach_auto() - save_sla_to_syspurpose_metadata( - uep=None, consumer_uuid=None, service_level=self.options.service_level - ) - print(_("Service level set to: {}").format(self.options.service_level)) - - if cert_update: - report = self.entcertlib.update() - - profile_action_client = ProfileActionClient() - profile_action_client.update() - - if report and report.exceptions(): - print(_("Entitlement Certificate(s) update failed due to the following reasons:")) - for e in report.exceptions(): - print("\t-", str(e)) - elif self.auto_attach: - if not installed_products_num: - return_code = 1 - else: - self.sorter.force_cert_check() - # Make sure that we get fresh status of installed products - status_cache = inj.require(inj.ENTITLEMENT_STATUS_CACHE) - status_cache.load_status( - self.sorter.cp_provider.get_consumer_auth_cp(), - self.sorter.identity.uuid, - self.sorter.on_date, - ) - self.sorter.load() - # run this after entcertlib update, so we have the new entitlements - return_code = show_autosubscribe_output(self.cp, self.identity) - - except Exception as e: - handle_exception("Unable to attach: {e}".format(e=e), e) - - # it is okay to call this no matter what happens above, - # it's just a notification to perform a check - self._request_validity_check() - - return return_code diff --git a/src/subscription_manager/cli_command/register.py b/src/subscription_manager/cli_command/register.py index 0c7ae9bd46..8ba27731be 100644 --- a/src/subscription_manager/cli_command/register.py +++ b/src/subscription_manager/cli_command/register.py @@ -29,7 +29,7 @@ from rhsm.utils import LiveStatusMessage from rhsmlib.facts.hwprobe import ClassicCheck -from rhsmlib.services import attach, unregister, register, exceptions +from rhsmlib.services import unregister, register, exceptions from subscription_manager import identity from subscription_manager.branding import get_branding @@ -42,14 +42,11 @@ from subscription_manager.i18n import ugettext as _ from subscription_manager.utils import ( restart_virt_who, - print_error, get_supported_resources, - is_simple_content_access, is_interactive, is_process_running, ) from subscription_manager.cli_command.environments import check_set_environment_names -from subscription_manager.exceptions import ExceptionMapper log = logging.getLogger(__name__) @@ -106,17 +103,6 @@ def __init__(self): dest="release", help=_("set a release version"), ) - self.parser.add_argument( - "--autosubscribe", - action="store_true", - help=_("Deprecated, see --auto-attach"), - ) - self.parser.add_argument( - "--auto-attach", - action="store_true", - dest="autoattach", - help=_("automatically attach compatible subscriptions to this system"), - ) self.parser.add_argument( "--force", action="store_true", @@ -128,14 +114,8 @@ def __init__(self): dest="activation_keys", help=_("activation key to use for registration (can be specified more than once)"), ) - self.parser.add_argument( - "--servicelevel", - dest="service_level", - help=_("system preference used when subscribing automatically, requires --auto-attach"), - ) def _validate_options(self): - self.autoattach = self.options.autosubscribe or self.options.autoattach if self.is_registered() and not self.options.force: system_exit(os.EX_USAGE, _("This system is already registered. Use --force to override")) elif self.options.consumername == "": @@ -148,13 +128,9 @@ def _validate_options(self): ) elif self.options.environments and self.options.activation_keys: system_exit(os.EX_USAGE, _("Error: Activation keys do not allow environments to be specified.")) - elif self.autoattach and self.options.activation_keys: - system_exit(os.EX_USAGE, _("Error: Activation keys cannot be used with --auto-attach.")) # 746259: Don't allow the user to pass in an empty string as an activation key elif self.options.activation_keys and "" in self.options.activation_keys: system_exit(os.EX_USAGE, _("Error: Must specify an activation key")) - elif self.options.service_level and not self.autoattach: - system_exit(os.EX_USAGE, _("Error: Must use --auto-attach with --servicelevel.")) elif self.options.activation_keys and not self.options.org: system_exit(os.EX_USAGE, _("Error: Must provide --org with activation keys.")) elif self.options.force and self.options.consumerid: @@ -182,37 +158,6 @@ def persist_server_options(self): """ return True - def _do_auto_attach(self, consumer): - """ - Try to do auto-attach, when it was requested using --auto-attach CLI option - :return: None - """ - - # Do not try to do auto-attach, when simple content access mode is used - # Only print info message to stdout - if is_simple_content_access(uep=self.cp, identity=self.identity): - self._print_ignore_auto_attach_message() - return - - if "serviceLevel" not in consumer and self.options.service_level: - system_exit( - os.EX_UNAVAILABLE, - _( - "Error: The --servicelevel option is not supported " - "by the server. Did not complete your request." - ), - ) - try: - # We don't call auto_attach with self.option.service_level, because it has been already - # set during service.register() call - attach.AttachService(self.cp).attach_auto(service_level=None) - except connection.RestlibException as rest_lib_err: - mapped_message: str = ExceptionMapper().get_message(rest_lib_err) - print_error(mapped_message) - except Exception: - log.exception("Auto-attach failed") - raise - def _upload_profile_blocking(self, consumer: dict) -> None: """ Try to upload DNF profile to server @@ -355,7 +300,6 @@ def _do_command(self): force=self.options.force, name=self.options.consumername, consumer_type=self.options.consumertype, - service_level=self.options.service_level, ) except (connection.RestlibException, exceptions.ServiceError) as re: log.exception(re) @@ -367,8 +311,6 @@ def _do_command(self): consumer_info = identity.ConsumerIdentity(consumer["idCert"]["key"], consumer["idCert"]["cert"]) print(_("The system has been registered with ID: {id}").format(id=consumer_info.getConsumerId())) print(_("The registered system name is: {name}").format(name=consumer_info.getConsumerName())) - if self.options.service_level: - print(_("Service level set to: {level}").format(level=self.options.service_level)) # We have new credentials, restart virt-who restart_virt_who() @@ -402,13 +344,9 @@ def _do_command(self): # TODO: grab the list of valid options, and check self.cp.updateConsumer(consumer["uuid"], release=self.options.release) - if self.autoattach: - self._do_auto_attach(consumer) - if ( self.options.consumerid or self.options.activation_keys - or self.autoattach or self.cp.has_capability(CONTENT_ACCESS_CERT_CAPABILITY) ): log.debug("System registered, updating entitlements if needed") @@ -419,7 +357,7 @@ def _do_command(self): self._upload_profile(consumer) subscribed = 0 - if self.options.activation_keys or self.autoattach: + if self.options.activation_keys: # update with the latest cert info self.sorter = inj.require(inj.CERT_SORTER) self.sorter.force_cert_check() diff --git a/src/subscription_manager/managercli.py b/src/subscription_manager/managercli.py index 64957ebc31..808d7ba767 100644 --- a/src/subscription_manager/managercli.py +++ b/src/subscription_manager/managercli.py @@ -24,7 +24,6 @@ from subscription_manager import managerlib from subscription_manager.cli import CLI from subscription_manager.cli_command.addons import AddonsCommand -from subscription_manager.cli_command.attach import AttachCommand from subscription_manager.cli_command.autoheal import AutohealCommand from subscription_manager.cli_command.clean import CleanCommand from subscription_manager.cli_command.config import ConfigCommand @@ -76,7 +75,6 @@ def __init__(self): ServiceLevelCommand, VersionCommand, RemoveCommand, - AttachCommand, PluginsCommand, AutohealCommand, OverrideCommand, diff --git a/test/cli_command/test_attach.py b/test/cli_command/test_attach.py deleted file mode 100644 index 693e928683..0000000000 --- a/test/cli_command/test_attach.py +++ /dev/null @@ -1,204 +0,0 @@ -import contextlib -import os -import sys -import tempfile - -from ..test_managercli import TestCliProxyCommand -from subscription_manager import managercli - -from unittest.mock import patch - - -# Test Attach and Subscribe are the same -class TestAttachCommand(TestCliProxyCommand): - command_class = managercli.AttachCommand - tempdir = None - tempfiles = [] - - @classmethod - def setUpClass(cls): - # Create temp file(s) for processing pool IDs - cls.tempdir = tempfile.TemporaryDirectory() - cls.tempfiles = [ - tempfile.mkstemp(dir=cls.tempdir.name), - tempfile.mkstemp(dir=cls.tempdir.name), - tempfile.mkstemp(dir=cls.tempdir.name), - ] - - os.write(cls.tempfiles[0][0], b"pool1 pool2 pool3 \npool4\npool5\r\npool6\t\tpool7\n pool8\n\n\n") - os.close(cls.tempfiles[0][0]) - os.write(cls.tempfiles[1][0], b"pool1 pool2 pool3 \npool4\npool5\r\npool6\t\tpool7\n pool8\n\n\n") - os.close(cls.tempfiles[1][0]) - # The third temp file intentionally left empty for testing empty sets of data - os.close(cls.tempfiles[2][0]) - - @classmethod - def tearDownClass(cls): - cls.tempdir = None - cls.tempfiles = [] - - def setUp(self): - super(TestAttachCommand, self).setUp() - argv_patcher = patch.object(sys, "argv", ["subscription-manager", "attach"]) - argv_patcher.start() - self.addCleanup(argv_patcher.stop) - - def _test_quantity_exception(self, arg): - try: - self.cc.main(["--pool", "test-pool-id", "--quantity", arg]) - self.cc._validate_options() - except SystemExit as e: - self.assertEqual(e.code, os.EX_USAGE) - else: - self.fail("No Exception Raised") - - def _test_auto_and_quantity_exception(self): - try: - self.cc.main(["--auto", "--quantity", "6"]) - self.cc._validate_options() - except SystemExit as e: - self.assertEqual(e.code, os.EX_USAGE) - else: - self.fail("No Exception Raised") - - def _test_auto_default_and_quantity_exception(self): - try: - self.cc.main(["--quantity", "3"]) - self.cc._validate_options() - except SystemExit as e: - self.assertEqual(e.code, os.EX_USAGE) - else: - self.fail("No Exception Raised") - - def test_zero_quantity(self): - self._test_quantity_exception("0") - - def test_negative_quantity(self): - self._test_quantity_exception("-1") - - def test_text_quantity(self): - try: - self.cc.main(["--quantity", "JarJarBinks"]) - self.cc._validate_options() - except SystemExit as e: - self.assertEqual(e.code, 2) - else: - self.fail("No Exception Raised") - - def test_positive_quantity(self): - self.cc.main(["--pool", "test-pool-id", "--quantity", "1"]) - self.cc._validate_options() - - def test_positive_quantity_with_plus(self): - self.cc.main(["--pool", "test-pool-id", "--quantity", "+1"]) - self.cc._validate_options() - - def test_positive_quantity_as_float(self): - try: - self.cc.main(["--quantity", "2.0"]) - self.cc._validate_options() - except SystemExit as e: - self.assertEqual(e.code, 2) - else: - self.fail("No Exception Raised") - - def _test_pool_file_processing(self, f, expected): - self.cc.main(["--file", f]) - self.cc._validate_options() - - self.assertEqual(expected, self.cc.options.pool) - - def test_pool_option_or_auto_option(self): - self.cc.main(["--auto", "--pool", "1234"]) - self.assertRaises(SystemExit, self.cc._validate_options) - - def test_servicelevel_option_but_no_auto_option(self): - with self.mock_stdin(open(self.tempfiles[1][1])): - self.cc.main(["--servicelevel", "Super", "--file", "-"]) - self.assertRaises(SystemExit, self.cc._validate_options) - - def test_servicelevel_option_with_pool_option(self): - self.cc.main(["--servicelevel", "Super", "--pool", "1232342342313"]) - # need a assertRaises that checks a SystemsExit code and message - self.assertRaises(SystemExit, self.cc._validate_options) - - def test_just_pools_option(self): - self.cc.main(["--pool", "1234"]) - self.cc._validate_options() - - def test_just_auto_option(self): - self.cc.main(["--auto"]) - self.cc._validate_options() - - def test_no_options_defaults_to_auto(self): - self.cc.main([]) - self.cc._validate_options() - - @contextlib.contextmanager - def mock_stdin(self, fileobj): - org_stdin = sys.stdin - sys.stdin = fileobj - - try: - yield - finally: - sys.stdin = org_stdin - - def test_pool_stdin_processing(self): - with self.mock_stdin(open(self.tempfiles[1][1])): - self._test_pool_file_processing( - "-", ["pool1", "pool2", "pool3", "pool4", "pool5", "pool6", "pool7", "pool8"] - ) - - def test_pool_stdin_empty(self): - try: - with self.mock_stdin(open(self.tempfiles[2][1])): - self.cc.main(["--file", "-"]) - self.cc._validate_options() - - except SystemExit as e: - self.assertEqual(e.code, os.EX_DATAERR) - else: - self.fail("No Exception Raised") - - def test_pool_file_processing(self): - self._test_pool_file_processing( - self.tempfiles[0][1], ["pool1", "pool2", "pool3", "pool4", "pool5", "pool6", "pool7", "pool8"] - ) - - def test_pool_file_empty(self): - try: - self.cc.main(["--file", self.tempfiles[2][1]]) - self.cc._validate_options() - - except SystemExit as e: - self.assertEqual(e.code, os.EX_DATAERR) - else: - self.fail("No Exception Raised") - - def test_pool_file_invalid(self): - try: - self.cc.main(["--file", "nonexistant_file.nope"]) - self.cc._validate_options() - except SystemExit as e: - self.assertEqual(e.code, os.EX_DATAERR) - else: - self.fail("No Exception Raised") - - @patch("subscription_manager.cli_command.attach.is_simple_content_access") - def test_auto_attach_sca_mode(self, mock_is_simple_content_access): - """ - Test the case, when SCA mode is used. Auto-attach is not possible in this case - """ - mock_is_simple_content_access.return_value = True - self.cc.main(["--auto"]) - self.cc._validate_options() - - @patch("subscription_manager.cli_command.attach.is_simple_content_access") - def test_attach_sca_mode(self, mock_is_simple_content_access): - """ - Test the case, when SCA mode is used. Attaching of pool is not possible in this case - """ - mock_is_simple_content_access.return_value = True - self.cc.main(["--pool", "123456789"]) - self.cc._validate_options()