diff --git a/etc-conf/subscription-manager.completion.sh b/etc-conf/subscription-manager.completion.sh index 88c2b55419..d8bbd6609c 100644 --- a/etc-conf/subscription-manager.completion.sh +++ b/etc-conf/subscription-manager.completion.sh @@ -94,11 +94,8 @@ _subscription_manager_identity() _subscription_manager_list() { - local opts="--afterdate --all --available --consumed --installed - --ondate --servicelevel - --match-installed --no-overlap + local opts="--installed --matches - --pool-only ${_subscription_manager_common_opts}" COMPREPLY=($(compgen -W "${opts}" -- ${1})) } diff --git a/man/subscription-manager.8 b/man/subscription-manager.8 index 1ae4d209fa..1f8403408d 100644 --- a/man/subscription-manager.8 +++ b/man/subscription-manager.8 @@ -443,61 +443,19 @@ Removes any previously set usage preference. .SS LIST OPTIONS The .B list -command lists all of the subscriptions that are compatible with a system. The options allow the list to be filtered by subscriptions that are used by the system or unused subscriptions that are available to the system. - -.TP -.B --afterdate=YYYY-MM-DD -Shows pools that are active on or after the given date. This is only used with the -.B --available -option. - -.TP -.B --all -Lists all possible subscriptions that have been purchased, even if they don't match the architecture of the system. This is used with the -.B --available -option. - -.TP -.B --available -Lists available subscriptions which are not yet attached to the system. - -.TP -.B --consumed -Lists all of the subscriptions currently attached to the system. +command lists all of the installed products on the system. The options allow the list to be filtered. .TP .B --installed -Lists products which are currently installed on the system which may (or may not) have subscriptions associated with them, as well as products with attached subscriptions which may (or may not) be installed. (default) - -.TP -.B --ondate=YYYY-MM-DD -Sets the date to use to search for active and available subscriptions. The default (if not explicitly passed) is today's date; using a later date looks for subscriptions which will be active then. This is only used with the -.B --available -option. - -.TP -.B --no-overlap -Shows pools which provide products that are not already covered; only used with -.B --available -option. - -.TP -.B --match-installed -Shows only subscriptions matching products that are currently installed; only used with -.B --available -option. +Lists products which are currently installed on the system. .TP .B --matches=SEARCH -Limits the output of --installed, --available and --consumed to only subscriptions or products which contain SEARCH in the subscription or product information, varying with the list requested and the server version. +Limits the output of installed products which contain SEARCH in product information. .br SEARCH may contain the wildcards ? or * to match a single character or zero or more characters, respectively. The wildcard characters may be escaped with a backslash to represent a literal question mark or asterisk. Likewise, to represent a backslash, it must be escaped with another backslash. -.TP -.B --pool-only -Limits the output of --available and --consumed such that only the pool IDs are displayed. No labels or errors will be printed if this option is specified. - .SS REFRESH OPTIONS The .B refresh diff --git a/src/subscription_manager/cli_command/list.py b/src/subscription_manager/cli_command/list.py index 3c0915fa90..1e2b004ed1 100644 --- a/src/subscription_manager/cli_command/list.py +++ b/src/subscription_manager/cli_command/list.py @@ -14,14 +14,9 @@ # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. # -import datetime -import logging -import os -import sys - -from time import localtime, strftime, strptime -from rhsmlib.services import products, entitlement +import logging +from rhsmlib.services import products from subscription_manager.cert_sorter import ( FUTURE_SUBSCRIBED, @@ -31,14 +26,13 @@ PARTIALLY_SUBSCRIBED, UNKNOWN, ) -from subscription_manager.cli import system_exit + from subscription_manager.cli_command.cli import CliCommand from subscription_manager.i18n import ugettext as _ -from subscription_manager.jsonwrapper import PoolWrapper + from subscription_manager.printing_utils import ( columnize, none_wrap_columnize_callback, - highlight_by_filter_string_columnize_cb, echo_columnize_callback, ) from subscription_manager.utils import is_simple_content_access @@ -54,17 +48,6 @@ UNKNOWN: _("Unknown"), } -INSTALLED_PRODUCT_STATUS = [ - _("Product Name:"), - _("Product ID:"), - _("Version:"), - _("Arch:"), - _("Status:"), - _("Status Details:"), - _("Starts:"), - _("Ends:"), -] - INSTALLED_PRODUCT_STATUS_SCA = [ _("Product Name:"), _("Product ID:"), @@ -72,26 +55,6 @@ _("Arch:"), ] -AVAILABLE_SUBS_LIST = [ - _("Subscription Name:"), - _("Provides:"), - _("SKU:"), - _("Contract:"), - _("Pool ID:"), - _("Provides Management:"), - _("Available:"), - _("Suggested:"), - _("Service Type:"), - _("Roles:"), - _("Service Level:"), - _("Usage:"), - _("Add-ons:"), - _("Subscription Type:"), - _("Starts:"), - _("Ends:"), - _("Entitlement Type:"), -] - AVAILABLE_SUBS_MATCH_COLUMNS = [ _("Subscription Name:"), _("Provides:"), @@ -122,48 +85,6 @@ _("Key:"), ] -OLD_CONSUMED_LIST = [ - _("Subscription Name:"), - _("Provides:"), - _("SKU:"), - _("Contract:"), - _("Account:"), - _("Serial:"), - _("Pool ID:"), - _("Provides Management:"), - _("Active:"), - _("Quantity Used:"), - _("Service Type:"), - _("Service Level:"), - _("Status Details:"), - _("Subscription Type:"), - _("Starts:"), - _("Ends:"), - _("System Type:"), -] - -CONSUMED_LIST = [ - _("Subscription Name:"), - _("Provides:"), - _("SKU:"), - _("Contract:"), - _("Account:"), - _("Serial:"), - _("Pool ID:"), - _("Provides Management:"), - _("Active:"), - _("Quantity Used:"), - _("Service Type:"), - _("Roles:"), - _("Service Level:"), - _("Usage:"), - _("Add-ons:"), - _("Status Details:"), - _("Subscription Type:"), - _("Starts:"), - _("Ends:"), - _("Entitlement Type:"), -] log = logging.getLogger(__name__) @@ -207,59 +128,11 @@ class ListCommand(CliCommand): def __init__(self): shortdesc = _("List subscription and product information for this system") super(ListCommand, self).__init__("list", shortdesc, True) - self.available = None - self.consumed = None self.parser.add_argument( "--installed", action="store_true", help=_("list shows those products which are installed (default)"), ) - self.parser.add_argument( - "--available", - action="store_true", - help=_("show those subscriptions which are available"), - ) - self.parser.add_argument( - "--all", - action="store_true", - help=_("used with --available to ensure all subscriptions are returned"), - ) - self.parser.add_argument( - "--ondate", - dest="on_date", - help=_( - "date to search on, defaults to today's date, only used with --available (example: {example})" - ).format(example=strftime("%Y-%m-%d", localtime())), - ) - self.parser.add_argument( - "--consumed", - action="store_true", - help=_("show the subscriptions being consumed by this system"), - ) - self.parser.add_argument( - "--servicelevel", - dest="service_level", - help=_( - "shows only subscriptions matching the specified service level; " - "only used with --available and --consumed" - ), - ) - self.parser.add_argument( - "--no-overlap", - action="store_true", - help=_( - "shows pools which provide products that are not already covered; " - "only used with --available" - ), - ) - self.parser.add_argument( - "--match-installed", - action="store_true", - help=_( - "shows only subscriptions matching products that are currently installed; " - "only used with --available" - ), - ) self.parser.add_argument( "--matches", dest="filter_string", @@ -269,77 +142,6 @@ def __init__(self): "and the server version (case-insensitive)." ), ) - self.parser.add_argument( - "--pool-only", - dest="pid_only", - action="store_true", - help=_( - "lists only the pool IDs for applicable available or consumed subscriptions; " - "only used with --available and --consumed" - ), - ) - self.parser.add_argument( - "--afterdate", - dest="after_date", - help=_( - "show pools that are active on or after the given date; " - "only used with --available (example: {example})" - ).format(example=strftime("%Y-%m-%d", localtime())), - ) - - def _validate_options(self): - if self.options.all and not self.options.available: - system_exit(os.EX_USAGE, _("Error: --all is only applicable with --available")) - if self.options.on_date and not self.options.available: - system_exit(os.EX_USAGE, _("Error: --ondate is only applicable with --available")) - if self.options.service_level is not None and not (self.options.consumed or self.options.available): - system_exit( - os.EX_USAGE, _("Error: --servicelevel is only applicable with --available or --consumed") - ) - if not (self.options.available or self.options.consumed): - self.options.installed = True - if not self.options.available and self.options.match_installed: - system_exit(os.EX_USAGE, _("Error: --match-installed is only applicable with --available")) - if self.options.no_overlap and not self.options.available: - system_exit(os.EX_USAGE, _("Error: --no-overlap is only applicable with --available")) - if self.options.pid_only and self.options.installed: - system_exit( - os.EX_USAGE, _("Error: --pool-only is only applicable with --available and/or --consumed") - ) - if self.options.after_date and not self.options.available: - system_exit(os.EX_USAGE, _("Error: --afterdate is only applicable with --available")) - if self.options.after_date and self.options.on_date: - system_exit(os.EX_USAGE, _("Error: --afterdate cannot be used with --ondate")) - - def _parse_date(self, date): - """ - Turns a given date into a date object - :param date: Date string - :type date: str - :return: date - """ - try: - # doing it this ugly way for pre python 2.5 - return datetime.datetime(*(strptime(date, "%Y-%m-%d")[0:6])) - except Exception: - # Translators: dateexample is current date in format like 2014-11-31 - msg = _( - "Date entered is invalid. Date should be in YYYY-MM-DD format (example: {" "dateexample})" - ) - dateexample = strftime("%Y-%m-%d", localtime()) - system_exit(os.EX_DATAERR, msg.format(dateexample=dateexample)) - - def _split_mulit_value_field(self, values): - """ - REST API returns multi-value fields in string, where values are separated with comma, but - each value of multi-value field should be printed on new line. It is done automatically, when - values are in list - :param values: String containing multi-value string, where values are separated with comma - :return: list of values - """ - if values is None: - return "" - return [item.strip() for item in values.split(",")] def _do_command(self): """ @@ -347,206 +149,31 @@ def _do_command(self): """ self._validate_options() - if self.options.installed and not self.options.pid_only: - installed_products = products.InstalledProducts(self.cp).list(self.options.filter_string) - - if len(installed_products): - print("+-------------------------------------------+") - print(_(" Installed Product Status")) - print("+-------------------------------------------+") - - for product in installed_products: - if is_simple_content_access(self.cp, self.identity): - print( - columnize( - INSTALLED_PRODUCT_STATUS_SCA, - none_wrap_columnize_callback, - product[0], # Name - product[1], # ID - product[2], # Version - product[3], # Arch - ) - + "\n" - ) - else: - status = STATUS_MAP[product[4]] - print( - columnize( - INSTALLED_PRODUCT_STATUS, - none_wrap_columnize_callback, - product[0], # Name - product[1], # ID - product[2], # Version - product[3], # Arch - status, # Status - product[5], # Status details - product[6], # Start - product[7], # End - ) - + "\n" - ) - else: - if self.options.filter_string: - print( - _('No installed products were found matching the expression "{filter}".').format( - filter=self.options.filter_string - ) - ) - else: - print(_("No installed products to list")) - - if self.options.available: - self.assert_should_be_registered() - on_date = None - after_date = None - if self.options.on_date: - on_date = self._parse_date(self.options.on_date) - elif self.options.after_date: - after_date = self._parse_date(self.options.after_date) - - epools = entitlement.EntitlementService().get_available_pools( - show_all=self.options.all, - on_date=on_date, - no_overlap=self.options.no_overlap, - match_installed=self.options.match_installed, - matches=self.options.filter_string, - service_level=self.options.service_level, - after_date=after_date, - ) - - if len(epools): - if self.options.pid_only: - for data in epools: - print(data["id"]) - else: - print("+-------------------------------------------+") - print(" " + _("Available Subscriptions")) - print("+-------------------------------------------+") - - for data in epools: - if PoolWrapper(data).is_virt_only(): - entitlement_type = _("Virtual") - else: - entitlement_type = _("Physical") + installed_products = products.InstalledProducts(self.cp).list(self.options.filter_string) - if "management_enabled" in data and data["management_enabled"]: - data["management_enabled"] = _("Yes") - else: - data["management_enabled"] = _("No") + if len(installed_products): + print("+-------------------------------------------+") + print(_(" Installed Product Status")) + print("+-------------------------------------------+") - kwargs = { - "filter_string": self.options.filter_string, - "match_columns": AVAILABLE_SUBS_MATCH_COLUMNS, - "is_atty": sys.stdout.isatty(), - } - print( - columnize( - AVAILABLE_SUBS_LIST, - highlight_by_filter_string_columnize_cb, - data["productName"], - data["providedProducts"], - data["productId"], - data["contractNumber"] or "", - data["id"], - data["management_enabled"], - data["quantity"], - data["suggested"], - data["service_type"] or "", - self._split_mulit_value_field(data["roles"]), - data["service_level"] or "", - data["usage"] or "", - self._split_mulit_value_field(data["addons"]), - data["pool_type"], - data["startDate"], - data["endDate"], - entitlement_type, - **kwargs - ) - + "\n" - ) - elif not self.options.pid_only: - if self.options.filter_string and self.options.service_level: - print( - _( - "No available subscription pools were found matching the expression " - '"{filter}" and the service level "{level}".' - ).format(filter=self.options.filter_string, level=self.options.service_level) - ) - elif self.options.filter_string: - print( - _( - 'No available subscription pools were found matching the expression "{filter}".' - ).format(filter=self.options.filter_string) - ) - elif self.options.service_level: - print( - _( - 'No available subscription pools were found matching the service level "{level}".' - ).format(level=self.options.service_level) - ) - else: - print(_("No available subscription pools to list")) - - if self.options.consumed: - self.print_consumed( - service_level=self.options.service_level, - filter_string=self.options.filter_string, - pid_only=self.options.pid_only, - ) - - def print_consumed(self, service_level=None, filter_string=None, pid_only=False): - # list all certificates that have not yet expired, even those - # that are not yet active. - service = entitlement.EntitlementService() - certs = service.get_consumed_product_pools(service_level=service_level, matches=filter_string) - - # Process and display our (filtered) certs: - if len(certs): - if pid_only: - for cert in certs: - print(cert.pool_id) - else: - print("+-------------------------------------------+") - print(" " + _("Consumed Subscriptions")) - print("+-------------------------------------------+") - - for cert in certs: - kwargs = { - "filter_string": filter_string, - "match_columns": AVAILABLE_SUBS_MATCH_COLUMNS, - "is_atty": sys.stdout.isatty(), - } - if hasattr(cert, "roles") and hasattr(cert, "usage") and hasattr(cert, "addons"): - print( - columnize(CONSUMED_LIST, highlight_by_filter_string_columnize_cb, *cert, **kwargs) - + "\n" - ) - else: - print( - columnize( - OLD_CONSUMED_LIST, highlight_by_filter_string_columnize_cb, *cert, **kwargs - ) - + "\n" - ) - elif not pid_only: - if filter_string and service_level: + for product in installed_products: print( - _( - 'No consumed subscription pools were found matching the expression "{filter}" ' - 'and the service level "{level}".' - ).format(filter=filter_string, level=service_level) - ) - elif filter_string: - print( - _('No consumed subscription pools were found matching the expression "{filter}".').format( - filter=filter_string + columnize( + INSTALLED_PRODUCT_STATUS_SCA, + none_wrap_columnize_callback, + product[0], # Name + product[1], # ID + product[2], # Version + product[3], # Arch ) + + "\n" ) - elif service_level: + else: + if self.options.filter_string: print( - _( - 'No consumed subscription pools were found matching the service level "{level}".' - ).format(level=service_level) + _('No installed products were found matching the expression "{filter}".').format( + filter=self.options.filter_string + ) ) else: - print(_("No consumed subscription pools were found.")) + print(_("No installed products to list")) diff --git a/test/cli_command/test_list.py b/test/cli_command/test_list.py index 95576ef86d..7f572a0280 100644 --- a/test/cli_command/test_list.py +++ b/test/cli_command/test_list.py @@ -1,4 +1,3 @@ -import os import sys from ..test_managercli import TestCliProxyCommand @@ -6,10 +5,10 @@ from subscription_manager.entcertlib import CONTENT_ACCESS_CERT_TYPE from subscription_manager.injection import provide, CERT_SORTER -from ..stubs import StubProductCertificate, StubEntitlementCertificate, StubProduct, StubCertSorter, StubPool +from ..stubs import StubProductCertificate, StubEntitlementCertificate, StubProduct, StubCertSorter from ..fixture import Capture -from unittest.mock import patch, Mock, MagicMock +from unittest.mock import patch class TestListCommand(TestCliProxyCommand): @@ -30,160 +29,6 @@ def setUp(self): argv_patcher.start() self.addCleanup(argv_patcher.stop) - def _test_afterdate_option(self, argv, method, should_exit=True, expected_exit_code=0): - msg = "" - with patch.object(sys, "argv", argv): - try: - method() - except SystemExit as e: - self.assertEqual( - e.code, - expected_exit_code, - """Cli should have exited with code '{}', got '{}'""".format(expected_exit_code, e.code), - ) - fail = False - except Exception as e: - fail = True - msg = "Expected SystemExit, got '''{}'''".format(e) - else: - fail = should_exit - if fail: - msg = "Expected SystemExit, No Exception was raised" - - if fail: - self.fail(msg) - - def test_afterdate_option_bad_date(self): - argv = ["subscription-manager", "list", "--all", "--available", "--afterdate", "not_a_real_date"] - self._test_afterdate_option(argv, self.cc.main, expected_exit_code=os.EX_DATAERR) - - def test_afterdate_option_no_date(self): - argv = ["subscription-manager", "list", "--all", "--available", "--afterdate"] - # Error code of 2 is expected from optparse in this case. - self._test_afterdate_option(argv, self.cc.main, expected_exit_code=2) - - def test_afterdate_option_missing_options(self): - # Just missing "available" - argv = ["subscription-manager", "list", "--afterdate", self.valid_date, "--all"] - self._test_afterdate_option(argv, self.cc.main, expected_exit_code=os.EX_USAGE) - - # Missing both - argv = ["subscription-manager", "list", "--afterdate", self.valid_date] - self._test_afterdate_option(argv, self.cc.main, expected_exit_code=os.EX_USAGE) - - def test_afterdate_option_with_ondate(self): - argv = ["subscription-manager", "list", "--afterdate", self.valid_date, "--ondate", self.valid_date] - self._test_afterdate_option(argv, self.cc.main, expected_exit_code=os.EX_USAGE) - - @patch("subscription_manager.managerlib.get_available_entitlements") - def test_afterdate_option_valid(self, es): - def create_pool_list(*args, **kwargs): - return [ - { - "productName": "dummy-name", - "productId": "dummy-id", - "providedProducts": [], - "id": "888888888888", - "management_enabled": True, - "attributes": [{"name": "is_virt_only", "value": "false"}], - "pool_type": "Some Type", - "quantity": "4", - "service_type": "", - "roles": "awsome server", - "service_level": "", - "usage": "Testing", - "addons": "ADDON1", - "contractNumber": "5", - "multi-entitlement": "false", - "startDate": "", - "endDate": "", - "suggested": "2", - } - ] - - es.return_value = create_pool_list() - - argv = ["subscription-manager", "list", "--all", "--available", "--afterdate", self.valid_date] - self._test_afterdate_option(argv, self.cc.main, should_exit=False) - - @patch("subscription_manager.managerlib.get_available_entitlements") - def test_none_wrap_available_pool_id(self, mget_ents): - list_command = managercli.ListCommand() - - def create_pool_list(*args, **kwargs): - return [ - { - "productName": "dummy-name", - "productId": "dummy-id", - "providedProducts": [], - "id": "888888888888", - "management_enabled": True, - "attributes": [{"name": "is_virt_only", "value": "false"}], - "pool_type": "Some Type", - "quantity": "4", - "service_type": "", - "roles": "awesome server", - "service_level": "", - "usage": "Production", - "addons": "", - "contractNumber": "5", - "multi-entitlement": "false", - "startDate": "", - "endDate": "", - "suggested": "2", - } - ] - - mget_ents.return_value = create_pool_list() - - with Capture() as cap: - list_command.main(["--available"]) - self.assertTrue("888888888888" in cap.out) - - @patch("subscription_manager.managerlib.get_available_entitlements") - def test_available_syspurpose_attr(self, mget_ents): - list_command = managercli.ListCommand() - - def create_pool_list(*args, **kwargs): - return [ - { - "productName": "dummy-name", - "productId": "dummy-id", - "providedProducts": [], - "id": "888888888888", - "management_enabled": True, - "attributes": [{"name": "is_virt_only", "value": "false"}], - "pool_type": "Some Type", - "quantity": "4", - "service_type": "", - "roles": "Awesome Server, Cool Server", - "service_level": "Premium", - "usage": "Production", - "addons": "ADDON1,ADDON2", - "contractNumber": "5", - "multi-entitlement": "false", - "startDate": "", - "endDate": "", - "suggested": "2", - } - ] - - mget_ents.return_value = create_pool_list() - - with Capture() as cap: - list_command.main(["--available"]) - self.assertTrue("ADDON1\n" in cap.out) - self.assertTrue("Awesome Server\n" in cap.out) - self.assertTrue("Production" in cap.out) - self.assertTrue("Premium" in cap.out) - - def test_print_consumed_no_ents(self): - with Capture() as captured: - self.cc.print_consumed() - - lines = captured.out.split("\n") - self.assertEqual(len(lines) - 1, 1, "Error output consists of more than one line.") - def test_list_installed(self): """ Test output of 'subscription-manager list --installed' @@ -208,44 +53,6 @@ def test_list_installed(self): assert "Product ID:" in captured.out assert "Version:" in captured.out assert "Arch:" in captured.out - assert "Status:" in captured.out - assert "Status Details:" in captured.out - assert "Starts:" in captured.out - assert "Ends:" in captured.out - - @patch("subscription_manager.cli_command.list.is_simple_content_access") - def test_list_installed_sca_mode(self, is_simple_content_access_mock): - """ - Test output of 'subscription-manager list --installed', when SCA mode is used - """ - is_simple_content_access_mock.return_value = True - - installed_product_certs = [ - StubProductCertificate(product=StubProduct(name="test product", product_id="8675309")), - StubProductCertificate(product=StubProduct(name="another test product", product_id="123456")), - ] - - stub_sorter = StubCertSorter() - - for product_cert in installed_product_certs: - product = product_cert.products[0] - stub_sorter.installed_products[product.id] = product_cert - - provide(CERT_SORTER, stub_sorter) - - with Capture() as captured: - list_command = managercli.ListCommand() - list_command.main(["--installed"]) - assert "Product Name:" in captured.out - assert "Product ID:" in captured.out - assert "Version:" in captured.out - assert "Arch:" in captured.out - # Following attributes should not be printed in SCA mode, because it does not make - # any sense to print them in SCA mode - assert "Status:" not in captured.out - assert "Status Details:" not in captured.out - assert "Starts:" not in captured.out - assert "Ends:" not in captured.out def test_list_installed_with_ctfilter(self): installed_product_certs = [ @@ -294,191 +101,3 @@ def test_list_installed_with_ctfilter(self): installed_product_certs[index].name in captured.out, "Unexpected product was found in output for test data %i" % test_num, ) - - def test_list_consumed_with_ctfilter(self): - consumed = [ - StubEntitlementCertificate( - product=StubProduct(name="Test Entitlement 1", product_id="123"), - provided_products=["test product a", "beta product 1", "shared product", "troll* product?"], - ), - StubEntitlementCertificate( - product=StubProduct(name="Test Entitlement 2", product_id="456"), - provided_products=["test product b", "beta product 1", "shared product", "back\\slash"], - ), - ] - - test_data = [ - ("", (False, False)), - ("test entitlement ?", (True, True)), - ("*entitlement 1", (True, False)), - ("*entitlement 2", (False, True)), - ("input string", (False, False)), - ("*product", (True, True)), - ("*product*", (True, True)), - ("shared pro*nopenopenope", (False, False)), - ("*another*", (False, False)), - ("*product\\?", (True, False)), - ("*product ?", (True, True)), - ("*product?*", (True, True)), - ("*\\?*", (True, False)), - ("*\\\\*", (False, True)), - ("*k\\s*", (False, True)), - ("*23", (True, False)), - ("45?", (False, True)), - ] - - for stubby in consumed: - self.ent_dir.certs.append(stubby) - - for test_num, data in enumerate(test_data): - with Capture() as captured: - list_command = managercli.ListCommand() - list_command.main(["--consumed", "--matches", data[0]]) - - for index, expected in enumerate(data[1]): - if expected: - self.assertTrue( - consumed[index].order.name in captured.out, - "Expected product was not found in output for test data %i" % test_num, - ) - else: - self.assertFalse( - consumed[index].order.name in captured.out, - "Unexpected product was found in output for test data %i" % test_num, - ) - - def test_print_consumed_one_ent_one_product(self): - product = StubProduct("product1") - self.ent_dir.certs.append(StubEntitlementCertificate(product)) - self.cc.sorter = Mock() - self.cc.sorter.get_subscription_reasons_map = Mock() - self.cc.sorter.get_subscription_reasons_map.return_value = {} - self.cc.print_consumed() - - def test_print_consumed_one_ent_no_product(self): - self.ent_dir.certs.append(StubEntitlementCertificate(product=None)) - self.cc.sorter = Mock() - self.cc.sorter.get_subscription_reasons_map = Mock() - self.cc.sorter.get_subscription_reasons_map.return_value = {} - self.cc.print_consumed() - - def test_print_consumed_prints_nothing_with_no_service_level_match(self): - self.ent_dir.certs.append(self.cert_with_service_level) - - with Capture() as captured: - self.cc.print_consumed(service_level="NotFound") - - lines = captured.out.split("\n") - self.assertEqual(len(lines) - 1, 1, "Error output consists of more than one line.") - - def test_print_consumed_prints_enitlement_with_service_level_match(self): - self.ent_dir.certs.append(self.cert_with_service_level) - self.cc.sorter = Mock() - self.cc.sorter.get_subscription_reasons_map = Mock() - self.cc.sorter.get_subscription_reasons_map.return_value = {} - self.cc.print_consumed(service_level="Premium") - - def test_print_consumed_ignores_content_access_cert(self): - self.ent_dir.certs.append(self.cert_with_content_access) - with Capture() as captured: - self.cc.print_consumed(service_level="NotFound") - - lines = captured.out.split("\n") - self.assertEqual(len(lines) - 1, 1, "Error output consists of more than one line.") - - def test_list_installed_with_pidonly(self): - installed_product_certs = [ - StubProductCertificate(product=StubProduct(name="test product*", product_id="8675309")), - StubProductCertificate(product=StubProduct(name="another(?) test\\product", product_id="123456")), - ] - - stub_sorter = StubCertSorter() - - for product_cert in installed_product_certs: - product = product_cert.products[0] - stub_sorter.installed_products[product.id] = product_cert - - provide(CERT_SORTER, stub_sorter) - - try: - with Capture() as captured: - list_command = managercli.ListCommand() - list_command.main(["--installed", "--pool-only"]) - - self.fail("Expected error did not occur") - except SystemExit: - for cert in installed_product_certs: - self.assertFalse(cert.products[0].id in captured.out) - - def test_list_consumed_with_pidonly(self): - consumed = [ - StubEntitlementCertificate( - product=StubProduct(name="Test Entitlement 1", product_id="123"), - pool=StubPool("abc"), - provided_products=["test product a", "beta product 1", "shared product", "troll* product?"], - ), - StubEntitlementCertificate( - product=StubProduct(name="Test Entitlement 2", product_id="456"), - pool=StubPool("def"), - provided_products=["test product b", "beta product 1", "shared product", "back\\slash"], - ), - ] - - for stubby in consumed: - self.ent_dir.certs.append(stubby) - - with Capture() as captured: - list_command = managercli.ListCommand() - list_command.main(["--consumed", "--pool-only"]) - - for cert in consumed: - self.assertFalse(cert.order.name in captured.out) - self.assertTrue(cert.pool.id in captured.out) - - def test_list_consumed_syspurpose_attr_version34(self): - """ - When version of entitlement certificate is 3.4, then subscription-manager should print syspurpose - attributes from the certificate. - """ - product = StubProduct("product1") - ent_cert = StubEntitlementCertificate(product) - ent_cert.order.usage = "Development" - ent_cert.order.roles = ["SP Server", "SP Starter"] - ent_cert.order.addons = ["ADDON1", "ADDON2"] - ent_cert.version = MagicMock() - ent_cert.version.major = 3 - ent_cert.version.minor = 4 - self.ent_dir.certs.append(ent_cert) - self.cc.sorter = Mock() - self.cc.sorter.get_subscription_reasons_map = Mock() - self.cc.sorter.get_subscription_reasons_map.return_value = {} - with Capture() as captured: - self.cc.print_consumed() - self.assertTrue("Add-ons:" in captured.out) - self.assertTrue("ADDON1" in captured.out) - self.assertTrue("ADDON2" in captured.out) - self.assertTrue("Usage:" in captured.out) - self.assertTrue("Development" in captured.out) - self.assertTrue("Roles:" in captured.out) - self.assertTrue("SP Server" in captured.out) - self.assertTrue("SP Starter" in captured.out) - - def test_list_consumed_no_syspurpose_attr_version33(self): - """ - When the version of certificate is older then 3.4, then do not print syspurpose attributes, because - there cannot be any. - """ - product = StubProduct("product1") - ent_cert = StubEntitlementCertificate(product) - ent_cert.version = MagicMock() - ent_cert.version.major = 3 - ent_cert.version.minor = 3 - self.ent_dir.certs.append(ent_cert) - self.cc.sorter = Mock() - self.cc.sorter.get_subscription_reasons_map = Mock() - self.cc.sorter.get_subscription_reasons_map.return_value = {} - with Capture() as captured: - self.cc.print_consumed() - self.assertFalse("Add-ons:" in captured.out) - self.assertFalse("Usage:" in captured.out) - self.assertFalse("Roles:" in captured.out)