diff --git a/etc-conf/dbus/system.d/com.redhat.RHSM1.conf b/etc-conf/dbus/system.d/com.redhat.RHSM1.conf
index 4c04437aa9..33f60e13a0 100644
--- a/etc-conf/dbus/system.d/com.redhat.RHSM1.conf
+++ b/etc-conf/dbus/system.d/com.redhat.RHSM1.conf
@@ -14,9 +14,6 @@
-
-
diff --git a/example-plugins/dbus_event.py b/example-plugins/dbus_event.py
index 8d2d54530c..175b1d915a 100644
--- a/example-plugins/dbus_event.py
+++ b/example-plugins/dbus_event.py
@@ -89,9 +89,3 @@ def pre_subscribe_hook(self, conduit):
def post_subscribe_hook(self, conduit):
self._dbus_event("post_subscribe", conduit)
-
- def pre_auto_attach_hook(self, conduit):
- self._dbus_event("pre_auto_attach", conduit)
-
- def post_auto_attach_hook(self, conduit):
- self._dbus_event("post_auto_attach", conduit)
diff --git a/example-plugins/subscribe.py b/example-plugins/subscribe.py
index 7fe604993c..08ac839af8 100644
--- a/example-plugins/subscribe.py
+++ b/example-plugins/subscribe.py
@@ -37,9 +37,3 @@ def post_subscribe_hook(self, conduit):
conduit: A PostSubscriptionConduit()
"""
conduit.log.debug("post subscribe called")
-
- def pre_auto_attach_hook(self, conduit):
- conduit.log.debug("pre auto attach called")
-
- def post_auto_attach_hook(self, conduit):
- conduit.log.debug("post auto attach called")
diff --git a/man/rhsm.conf.5 b/man/rhsm.conf.5
index 925f4b5c73..3a8a65291f 100644
--- a/man/rhsm.conf.5
+++ b/man/rhsm.conf.5
@@ -242,7 +242,7 @@ daemon
.PP
splay
.RS 4
-1 to enable splay. 0 to disable splay. If enabled, this feature delays the initial auto attach and cert check by an amount between 0 seconds and the interval given for the action being delayed. For example if the
+1 to enable splay. 0 to disable splay. If enabled, this feature delays the initial cert check by an amount between 0 seconds and the interval given for the action being delayed. For example if the
.B certCheckInterval
were set to 3 minutes, the initial cert check would begin somewhere between 2 minutes after start up (minimum delay) and 5 minutes after start up. This is useful to reduce peak load on the Satellite or entitlement service used by a large number of machines.
.RE
diff --git a/src/plugins/libdnf/README.md b/src/plugins/libdnf/README.md
index a9ed69817e..a3ae69e11d 100644
--- a/src/plugins/libdnf/README.md
+++ b/src/plugins/libdnf/README.md
@@ -115,12 +115,6 @@ To use testing repository you have to do several steps:
subscription-manager register --username admin --password admin --org admin
```
-* Attach some subscription:
-
- ```
- subscription-manager attach --pool
- ```
-
* Choose some repository from output of `subscription-manager repos --list` and
enable the repository:
diff --git a/src/rhsm/connection.py b/src/rhsm/connection.py
index 163b8ef055..93fb6ed021 100644
--- a/src/rhsm/connection.py
+++ b/src/rhsm/connection.py
@@ -1903,63 +1903,6 @@ def getAccessibleContent(self, consumerId: str, if_modified_since: datetime.date
method, headers=headers, description=_("Fetching content for a certificate")
)
- def bindByEntitlementPool(self, consumerId: str, poolId: str, quantity: int = None) -> List[dict]:
- """
- Subscribe consumer to a subscription by pool ID
- :param consumerId: consumer UUID
- :param poolId: pool ID
- :param quantity: the desired quantity of subscription to be consumed
- """
- method = "/consumers/%s/entitlements?pool=%s" % (self.sanitize(consumerId), self.sanitize(poolId))
- if quantity:
- method = "%s&quantity=%s" % (method, quantity)
- return self.conn.request_post(method, description=_("Updating subscriptions"))
-
- def bind(self, consumerId: str, entitle_date: datetime.datetime = None) -> List[dict]:
- """
- Same as bindByProduct, but assume the server has a list of the
- system's products. This is useful for autosubscribe. Note that this is
- done on a best-effort basis, and there are cases when the server will
- not be able to fulfill the client's product certs with entitlements
- :param consumerId: consumer UUID
- :param entitle_date: The date, when subscription will be valid
- """
- method = "/consumers/%s/entitlements" % (self.sanitize(consumerId))
-
- # add the optional date to the url
- if entitle_date:
- method = "%s?entitle_date=%s" % (method, self.sanitize(entitle_date.isoformat(), plus=True))
-
- return self.conn.request_post(method, description=_("Updating subscriptions"))
-
- def unbindBySerial(self, consumerId: str, serial: str) -> bool:
- """
- Try to remove consumed pool by serial number
- :param consumerId: consumer UUID
- :param serial: serial number of consumed pool
- """
- method = "/consumers/%s/certificates/%s" % (self.sanitize(consumerId), self.sanitize(str(serial)))
- return self.conn.request_delete(method, description=_("Unsubscribing")) is None
-
- def unbindByPoolId(self, consumer_uuid: str, pool_id: str) -> bool:
- """
- Try to remove consumed pool by pool ID
- :param consumer_uuid: consumer UUID
- :param pool_id: pool ID
- :return: None
- """
- method = "/consumers/%s/entitlements/pool/%s" % (self.sanitize(consumer_uuid), self.sanitize(pool_id))
- return self.conn.request_delete(method, description=_("Unsubscribing")) is None
-
- def unbindAll(self, consumerId: str) -> dict:
- """
- Try to remove all consumed pools
- :param consumerId: consumer UUID
- :return: Dictionary containing statistics about removed pools
- """
- method = "/consumers/%s/entitlements" % self.sanitize(consumerId)
- return self.conn.request_delete(method, description=_("Unsubscribing"))
-
def getPoolsList(
self,
consumer: str = None,
@@ -2152,22 +2095,6 @@ def deleteContentOverrides(self, consumerId: str, params: List[dict] = None) ->
params = []
return self.conn.request_delete(method, params, description=_("Removing content overrides"))
- def activateMachine(self, consumerId: str, email: str, lang: str = None) -> Union[dict, None]:
- """
- Activate a subscription by machine, information is located in the consumer facts
- :param consumerId: consumer UUID
- :param email: The email for sending notification. The notification will be sent by candlepin server
- :param lang: The locale specifies the language of notification email
- :return When activation was successful, then dictionary is returned. Otherwise, None is returned.
- """
- method = "/subscriptions?consumer_uuid=%s" % consumerId
- method += "&email=%s" % self.sanitize(email)
- if (not lang) and (locale.getdefaultlocale()[0] is not None):
- lang = locale.getdefaultlocale()[0].lower().replace("_", "-")
- if lang:
- method += "&email_locale=%s" % self.sanitize(lang)
- return self.conn.request_post(method, description=_("Activating"))
-
# used by virt-who
def getJob(self, job_id: str) -> str:
"""
diff --git a/src/rhsmlib/client_info.py b/src/rhsmlib/client_info.py
index aed83ffbd5..7dbe113153 100644
--- a/src/rhsmlib/client_info.py
+++ b/src/rhsmlib/client_info.py
@@ -14,7 +14,7 @@
"""
This module holds information about current state of client application
like sender of D-bus method, current subscription-manager command (register,
-attach, ...), dnf command, etc.
+status...), dnf command, etc.
"""
import logging
diff --git a/src/rhsmlib/dbus/constants.py b/src/rhsmlib/dbus/constants.py
index 6215467dbc..98ddc33a7b 100644
--- a/src/rhsmlib/dbus/constants.py
+++ b/src/rhsmlib/dbus/constants.py
@@ -24,8 +24,6 @@
"UNREGISTER_DBUS_PATH",
"CONFIG_INTERFACE",
"CONFIG_DBUS_PATH",
- "ATTACH_INTERFACE",
- "ATTACH_DBUS_PATH",
"PRODUCTS_INTERFACE",
"PRODUCTS_DBUS_PATH",
"ENTITLEMENT_INTERFACE",
@@ -69,9 +67,6 @@
CONFIG_INTERFACE = "%s.%s" % (INTERFACE_BASE, "Config")
CONFIG_DBUS_PATH = "%s/%s" % (ROOT_DBUS_PATH, "Config")
-ATTACH_INTERFACE = "%s.%s" % (INTERFACE_BASE, "Attach")
-ATTACH_DBUS_PATH = "%s/%s" % (ROOT_DBUS_PATH, "Attach")
-
PRODUCTS_INTERFACE = "%s.%s" % (INTERFACE_BASE, "Products")
PRODUCTS_DBUS_PATH = "%s/%s" % (ROOT_DBUS_PATH, "Products")
diff --git a/src/rhsmlib/dbus/objects/entitlement.py b/src/rhsmlib/dbus/objects/entitlement.py
index 07cfa62620..889862e048 100644
--- a/src/rhsmlib/dbus/objects/entitlement.py
+++ b/src/rhsmlib/dbus/objects/entitlement.py
@@ -11,7 +11,7 @@
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
import datetime
-from typing import List, Union
+from typing import Union
import dbus
import json
@@ -65,35 +65,6 @@ def get_pools(self, options: dict, proxy_options: dict) -> dict:
return pools
- def remove_all_entitlements(self, proxy_options: dict) -> dict:
- uep: UEPConnection = self.build_uep(proxy_options, proxy_only=True)
- service = EntitlementService(uep)
- result: dict = service.remove_all_entitlements()
-
- return result
-
- def remove_entitlements_by_pool_ids(self, pool_ids: List[str], proxy_options: dict) -> List[str]:
- """Remove entitlements by Pool IDs
-
- :return: List of removed serials.
- """
- uep: UEPConnection = self.build_uep(proxy_options, proxy_only=True)
- service = EntitlementService(uep)
- _, _, removed_serials = service.remove_entitlements_by_pool_ids(pool_ids)
-
- return removed_serials
-
- def remove_entitlements_by_serials(self, serials: List[str], proxy_options: dict) -> List[str]:
- """Remove entitlements by serials.
-
- :return: List of removed serials.
- """
- uep: UEPConnection = self.build_uep(proxy_options, proxy_only=True)
- service = EntitlementService(uep)
- removed_serials, _ = service.remove_entitlements_by_serials(serials)
-
- return removed_serials
-
def _parse_date(self, date_string: str) -> datetime.datetime:
"""
Return new datetime parsed from date.
@@ -187,79 +158,6 @@ def GetPools(self, options, proxy_options, locale, sender=None):
pools: dict = self.impl.get_pools(options, proxy_options)
return json.dumps(pools)
- @util.dbus_service_method(
- constants.ENTITLEMENT_INTERFACE,
- in_signature="a{sv}s",
- out_signature="s",
- )
- @util.dbus_handle_sender
- @util.dbus_handle_exceptions
- def RemoveAllEntitlements(self, proxy_options, locale, sender=None):
- """
- Try to remove all entitlements (subscriptions) from the system
- :param proxy_options: Settings of proxy
- :param locale: String with locale (e.g. de_DE.UTF-8)
- :param sender: Not used argument
- :return: Json string containing response
- """
- proxy_options = dbus_utils.dbus_to_python(proxy_options, expected_type=dict)
- locale = dbus_utils.dbus_to_python(locale, expected_type=str)
-
- Locale.set(locale)
-
- result: dict = self.impl.remove_all_entitlements(proxy_options)
- return json.dumps(result)
-
- @util.dbus_service_method(
- constants.ENTITLEMENT_INTERFACE,
- in_signature="asa{sv}s",
- out_signature="s",
- )
- @util.dbus_handle_sender
- @util.dbus_handle_exceptions
- def RemoveEntitlementsByPoolIds(self, pool_ids, proxy_options, locale, sender=None):
- """
- Try to remove entitlements (subscriptions) by pool_ids
- :param pool_ids: List of pool IDs
- :param proxy_options: Settings of proxy
- :param locale: String with locale (e.g. de_DE.UTF-8)
- :param sender: Not used argument
- :return: Json string representing list of serial numbers
- """
- pool_ids = dbus_utils.dbus_to_python(pool_ids, expected_type=list)
- proxy_options = dbus_utils.dbus_to_python(proxy_options, expected_type=dict)
- locale = dbus_utils.dbus_to_python(locale, expected_type=str)
-
- Locale.set(locale)
-
- removed_serials = self.impl.remove_entitlements_by_pool_ids(pool_ids, proxy_options)
- return json.dumps(removed_serials)
-
- @util.dbus_service_method(
- constants.ENTITLEMENT_INTERFACE,
- in_signature="asa{sv}s",
- out_signature="s",
- )
- @util.dbus_handle_sender
- @util.dbus_handle_exceptions
- def RemoveEntitlementsBySerials(self, serials, proxy_options, locale, sender=None):
- """
- Try to remove entitlements (subscriptions) by serials
- :param serials: List of serial numbers of subscriptions
- :param proxy_options: Settings of proxy
- :param locale: String with locale (e.g. de_DE.UTF-8)
- :param sender: Not used argument
- :return: Json string representing list of serial numbers
- """
- serials = dbus_utils.dbus_to_python(serials, expected_type=list)
- proxy_options = dbus_utils.dbus_to_python(proxy_options, expected_type=dict)
- locale = dbus_utils.dbus_to_python(locale, expected_type=str)
-
- Locale.set(locale)
-
- removed_serials = self.impl.remove_entitlements_by_serials(serials, proxy_options)
- return json.dumps(removed_serials)
-
@staticmethod
def reload():
entitlement_service = EntitlementService()
diff --git a/src/rhsmlib/services/entitlement.py b/src/rhsmlib/services/entitlement.py
index e6db2a12da..d3d1d89ec5 100644
--- a/src/rhsmlib/services/entitlement.py
+++ b/src/rhsmlib/services/entitlement.py
@@ -15,7 +15,7 @@
import datetime
import logging
import time
-from typing import Union, Callable
+from typing import Union
from subscription_manager import injection as inj
from subscription_manager.i18n import ugettext as _
@@ -518,77 +518,6 @@ def validate_options(self, options: dict) -> None:
elif not self.identity.is_valid() and "available" in options["pool_subsets"]:
raise exceptions.ValidationError(_("Error: this system is not registered"))
- def _unbind_ids(self, unbind_method: Callable, consumer_uuid: str, ids: list) -> tuple:
- """
- Method for unbinding entitlements
- :param unbind_method: unbindByPoolId or unbindBySerial
- :param consumer_uuid: UUID of consumer
- :param ids: List of serials or pool_ids
- :return: Tuple of two lists containing unbinded and not-unbinded subscriptions
- """
- success = []
- failure = []
- for id_ in ids:
- try:
- unbind_method(consumer_uuid, id_)
- success.append(id_)
- except connection.RestlibException as re:
- if re.code == 410:
- raise
- failure.append(id_)
- log.error(re)
- return success, failure
-
- def remove_all_entitlements(self) -> dict:
- """
- Try to remove all entitlements
- :return: Result of REST API call
- """
-
- response = self.cp.unbindAll(self.identity.uuid)
- self.entcertlib.update()
-
- return response
-
- def remove_entitlements_by_pool_ids(self, pool_ids: list) -> tuple:
- """
- Try to remove entitlements by pool IDs
- :param pool_ids: List of pool IDs
- :return: List of serial numbers of removed subscriptions
- """
-
- removed_serials = []
- _pool_ids = utils.unique_list_items(pool_ids) # Don't allow duplicates
- # FIXME: the cache of CertificateDirectory should be smart enough and refreshing
- # should not be necessary. I vote for i-notify to be used there somehow.
- self.entitlement_dir.refresh()
- pool_id_to_serials = self.entitlement_dir.list_serials_for_pool_ids(_pool_ids)
- removed_pools, unremoved_pools = self._unbind_ids(
- self.cp.unbindByPoolId, self.identity.uuid, _pool_ids
- )
- if removed_pools:
- for pool_id in removed_pools:
- removed_serials.extend(pool_id_to_serials[pool_id])
- self.entcertlib.update()
-
- return removed_pools, unremoved_pools, removed_serials
-
- def remove_entitlements_by_serials(self, serials: list) -> tuple:
- """
- Try to remove pools by Serial numbers
- :param serials: List of serial numbers
- :return: Tuple of two items: list of serial numbers of already removed subscriptions and list
- of not removed serials
- """
-
- _serials = utils.unique_list_items(serials) # Don't allow duplicates
- removed_serials, unremoved_serials = self._unbind_ids(
- self.cp.unbindBySerial, self.identity.uuid, _serials
- )
- self.entcertlib.update()
-
- return removed_serials, unremoved_serials
-
@staticmethod
def reload() -> None:
"""
diff --git a/src/subscription_manager/action_client.py b/src/subscription_manager/action_client.py
index 66098b257b..e8b431ddba 100644
--- a/src/subscription_manager/action_client.py
+++ b/src/subscription_manager/action_client.py
@@ -47,8 +47,7 @@ def _get_libset(self) -> List["BaseActionInvoker"]:
self.syspurposelib = SyspurposeSyncActionInvoker()
# WARNING: order is important here, we need to update a number
- # of things before attempting to autoheal, and we need to autoheal
- # before attempting to fetch our certificates:
+ # of things before attempting to fetch our certificates:
lib_set: List[BaseActionInvoker] = [
self.entcertlib,
self.idcertlib,
diff --git a/src/subscription_manager/certdirectory.py b/src/subscription_manager/certdirectory.py
index 891d4d758a..c3138d44f1 100644
--- a/src/subscription_manager/certdirectory.py
+++ b/src/subscription_manager/certdirectory.py
@@ -336,15 +336,6 @@ def list_for_pool_id(self, pool_id: str) -> List["EntitlementCertificate"]:
]
return entitlements
- def list_serials_for_pool_ids(self, pool_ids: List[str]) -> Dict[str, List[str]]:
- """
- Returns a dict of all entitlement certificate serials for each pool_id in the list provided
- """
- pool_id_to_serials = {}
- for pool_id in pool_ids:
- pool_id_to_serials[pool_id] = [str(cert.serial) for cert in self.list_for_pool_id(pool_id)]
- return pool_id_to_serials
-
class Path:
# Used during Anaconda install by the yum pidplugin to ensure we operate
diff --git a/src/subscription_manager/entcertlib.py b/src/subscription_manager/entcertlib.py
index 84d3f0fc9d..c2ac33b516 100644
--- a/src/subscription_manager/entcertlib.py
+++ b/src/subscription_manager/entcertlib.py
@@ -108,9 +108,7 @@ def perform(self) -> "EntCertUpdateReport":
log.info("certs updated:\n%s", self.report)
self.syslog_results()
- # We call EntCertlibActionInvoker.update() solo from
- # the 'attach' cli instead of an ActionClient. So
- # we need to refresh the ent_dir object before calling
+ # We need to refresh the ent_dir object before calling
# content updating actions.
self.ent_dir.refresh()
diff --git a/src/subscription_manager/managerlib.py b/src/subscription_manager/managerlib.py
index df2deb9975..e291ddc9e3 100644
--- a/src/subscription_manager/managerlib.py
+++ b/src/subscription_manager/managerlib.py
@@ -18,7 +18,6 @@
import logging
import os
import grp
-import re
import shutil
import stat
import syslog
@@ -26,7 +25,6 @@
from rhsm.config import get_config_parser
-from rhsm.certificate import Key, CertificateException, create_from_pem
import subscription_manager.cache as cache
from subscription_manager.cert_sorter import StackingGroupSorter, ComplianceManager
@@ -749,166 +747,6 @@ def lookup_provided_products(self, pool_id: str) -> Optional[List[Tuple[str, str
return provided_products
-class ImportFileExtractor:
- """
- Responsible for checking an import file and pulling cert and key from it.
- An import file may include only the certificate, but may also include its
- key.
-
- An import file is processed looking for:
-
- -----BEGIN -----
-
- ..
- -----END -----
-
- and will only process if it finds CERTIFICATE or KEY in the text.
-
- For example the following would locate a key and cert.
-
- -----BEGIN CERTIFICATE-----
-
- -----END CERTIFICATE-----
- -----BEGIN PUBLIC KEY-----
-
- -----END PUBLIC KEY-----
-
- """
-
- _REGEX_START_GROUP = "start"
- _REGEX_CONTENT_GROUP = "content"
- _REGEX_END_GROUP = "end"
- _REGEX = r"(?P<%s>[-]*BEGIN[\w\ ]*[-]*)(?P<%s>[^-]*)(?P<%s>[-]*END[\w\ ]*[-]*)" % (
- _REGEX_START_GROUP,
- _REGEX_CONTENT_GROUP,
- _REGEX_END_GROUP,
- )
- _PATTERN = re.compile(_REGEX)
-
- _CERT_DICT_TAG = "CERTIFICATE"
- _KEY_DICT_TAG = "KEY"
- _ENT_DICT_TAG = "ENTITLEMENT"
- _SIG_DICT_TAG = "RSA SIGNATURE"
-
- def __init__(self, cert_file_path: str):
- self.path = cert_file_path
- self.file_name = os.path.basename(cert_file_path)
-
- content = self._read(cert_file_path)
- self.parts = self._process_content(content)
-
- def _read(self, file_path: str) -> str:
- fd = open(file_path, "r")
- file_content = fd.read()
- fd.close()
- return file_content
-
- def _process_content(self, content: str) -> Dict[str, str]:
- part_dict = {}
- matches = self._PATTERN.finditer(content)
- for match in matches:
- start = match.group(self._REGEX_START_GROUP)
- meat = match.group(self._REGEX_CONTENT_GROUP)
- end = match.group(self._REGEX_END_GROUP)
-
- dict_key = None
- if not start.find(self._KEY_DICT_TAG) < 0:
- dict_key = self._KEY_DICT_TAG
- elif not start.find(self._CERT_DICT_TAG) < 0:
- dict_key = self._CERT_DICT_TAG
- elif not start.find(self._ENT_DICT_TAG) < 0:
- dict_key = self._ENT_DICT_TAG
- elif not start.find(self._SIG_DICT_TAG) < 0:
- dict_key = self._SIG_DICT_TAG
-
- if dict_key is None:
- continue
-
- part_dict[dict_key] = start + meat + end
- return part_dict
-
- def contains_key_content(self) -> bool:
- return self._KEY_DICT_TAG in self.parts
-
- def get_key_content(self) -> Optional[str]:
- key_content = None
- if self._KEY_DICT_TAG in self.parts:
- key_content = self.parts[self._KEY_DICT_TAG]
- return key_content
-
- def get_cert_content(self) -> str:
- cert_content = ""
- if self._CERT_DICT_TAG in self.parts:
- cert_content = self.parts[self._CERT_DICT_TAG]
- if self._ENT_DICT_TAG in self.parts:
- cert_content = cert_content + os.linesep + self.parts[self._ENT_DICT_TAG]
- if self._SIG_DICT_TAG in self.parts:
- cert_content = cert_content + os.linesep + self.parts[self._SIG_DICT_TAG]
- return cert_content
-
- def verify_valid_entitlement(self) -> bool:
- """
- Verify that a valid entitlement was processed.
-
- @return: True if valid, False otherwise.
- """
- try:
- cert = self.get_cert()
- # Don't want to check class explicitly, instead we'll look for
- # order info, which only an entitlement cert could have:
- if not hasattr(cert, "order"):
- return False
- except CertificateException:
- return False
- ent_key = Key(self.get_key_content())
- if ent_key.bogus():
- return False
- return True
-
- # TODO: rewrite to use certlib.EntitlementCertBundleInstall?
- def write_to_disk(self) -> None:
- """
- Write/copy cert to the entitlement cert dir.
- """
- self._ensure_entitlement_dir_exists()
- dest_file_path = os.path.join(ENT_CONFIG_DIR, self._create_filename_from_cert_serial_number())
-
- # Write the key/cert content to new files
- log.debug("Writing certificate file: %s" % (dest_file_path))
- cert_content = self.get_cert_content()
- self._write_file(dest_file_path, cert_content)
-
- if self.contains_key_content():
- dest_key_file_path = self._get_key_path_from_dest_cert_path(dest_file_path)
- log.debug("Writing key file: %s" % (dest_key_file_path))
- self._write_file(dest_key_file_path, self.get_key_content())
-
- def _write_file(self, target_path: str, content: str) -> None:
- new_file = open(target_path, "w")
- try:
- new_file.write(content)
- finally:
- new_file.close()
-
- def _ensure_entitlement_dir_exists(self) -> None:
- if not os.access(ENT_CONFIG_DIR, os.R_OK):
- os.mkdir(ENT_CONFIG_DIR)
-
- def _get_key_path_from_dest_cert_path(self, dest_cert_path: str) -> str:
- file_parts = os.path.splitext(dest_cert_path)
- return file_parts[0] + "-key" + file_parts[1]
-
- def _create_filename_from_cert_serial_number(self) -> str:
- "create from serial"
- ent_cert = self.get_cert()
- return "%s.pem" % (ent_cert.serial)
-
- def get_cert(self) -> "EntitlementCertificate":
- cert_content: str = self.get_cert_content()
- ent_cert: EntitlementCertificate = create_from_pem(cert_content)
- return ent_cert
-
-
def _sub_dict(datadict: dict, subkeys: Iterable[str], default: Optional[object] = None) -> dict:
"""Return a dict that is a subset of datadict matching only the keys in subkeys"""
return dict([(k, datadict.get(k, default)) for k in subkeys])
diff --git a/src/subscription_manager/plugins.py b/src/subscription_manager/plugins.py
index 650539100c..947464b23b 100644
--- a/src/subscription_manager/plugins.py
+++ b/src/subscription_manager/plugins.py
@@ -379,7 +379,6 @@ def __init__(self, clazz: Type[SubManPlugin], consumer_uuid: str, pool_id: str,
consumer_uuid: the UUID of the consumer being subscribed
pool_id: the id of the pool the subscription will come from (None if 'auto' is False)
quantity: the quantity to consume from the pool (None if 'auto' is False).
- auto: is this an auto-attach/healing event.
"""
super(SubscriptionConduit, self).__init__(clazz)
self.consumer_uuid: str = consumer_uuid
@@ -402,33 +401,6 @@ def __init__(self, clazz: Type[SubManPlugin], consumer_uuid: str, entitlement_da
self.entitlement_data: Dict = entitlement_data
-class AutoAttachConduit(BaseConduit):
- slots = ["pre_auto_attach"]
-
- def __init__(self, clazz: Type[SubManPlugin], consumer_uuid: str):
- """
- init for AutoAttachConduit
-
- Args:
- consumer_uuid: the UUID of the consumer being auto-subscribed
- """
- super(AutoAttachConduit, self).__init__(clazz)
- self.consumer_uuid: str = consumer_uuid
-
-
-class PostAutoAttachConduit(PostSubscriptionConduit):
- slots = ["post_auto_attach"]
-
- def __init__(self, clazz: Type[SubManPlugin], consumer_uuid: str, entitlement_data: Dict):
- """init for PostAutoAttachConduit
-
- Args:
- consumer_uuid: the UUID of the consumer subscribed
- entitlement_data: the data returned by the server
- """
- super(PostAutoAttachConduit, self).__init__(clazz, consumer_uuid, entitlement_data)
-
-
class PluginConfig:
"""Represents configuation for each rhsm plugin.
@@ -917,8 +889,6 @@ def _get_conduits(self) -> List[type(BaseConduit)]:
SubscriptionConduit,
UpdateContentConduit,
PostSubscriptionConduit,
- AutoAttachConduit,
- PostAutoAttachConduit,
]
def _get_modules(self):
diff --git a/src/subscription_manager/syspurposelib.py b/src/subscription_manager/syspurposelib.py
index d26482fc05..0278747c86 100644
--- a/src/subscription_manager/syspurposelib.py
+++ b/src/subscription_manager/syspurposelib.py
@@ -42,31 +42,6 @@
syspurpose = None
-def save_sla_to_syspurpose_metadata(uep: "UEPConnection", consumer_uuid: str, service_level: str):
- """
- Saves the provided service-level value to the local Syspurpose Metadata (syspurpose.json) file.
- If the service level provided is null or empty, the sla value to the local syspurpose file is set to null.
- when uep and consumer_uuid is not None, then service_level is also synced with candlepin server
-
- :param uep: The object with uep connection (connection to candlepin server)
- :param consumer_uuid: Consumer UUID
- :param service_level: The service-level value to be saved in the syspurpose file.
- """
-
- if "SyncedStore" in globals() and SyncedStore is not None:
- synced_store = SyncedStore(uep=uep, consumer_uuid=consumer_uuid)
-
- # if empty, set it to null
- if service_level is None or service_level == "":
- service_level = None
-
- synced_store.set("service_level_agreement", service_level)
- synced_store.finish()
- log.debug("Syspurpose SLA value successfully saved locally.")
- else:
- log.error("SyspurposeStore could not be imported. Syspurpose SLA value not saved locally.")
-
-
def get_sys_purpose_store() -> Optional[SyncedStore]:
"""
:return: Returns a singleton instance of the syspurpose store if it was imported.
diff --git a/src/subscription_manager/utils.py b/src/subscription_manager/utils.py
index 7f67c8a278..ce19fd97fe 100644
--- a/src/subscription_manager/utils.py
+++ b/src/subscription_manager/utils.py
@@ -24,7 +24,7 @@
import socket
import syslog
import uuid
-from typing import Callable, Dict, Iterable, Iterator, Optional, Tuple, TYPE_CHECKING
+from typing import Dict, Iterable, Iterator, Optional, Tuple, TYPE_CHECKING
import urllib
@@ -570,24 +570,6 @@ def print_error(message: str) -> None:
sys.stderr.write("\n")
-def unique_list_items(items: Iterable, hash_function: Callable = lambda x: x) -> list:
- """
- Accepts a list of items.
- Returns a list of the unique items in the input.
- Maintains order.
- """
- observed = set()
- unique_items = []
- for item in items:
- item_key = hash_function(item)
- if item_key in observed:
- continue
- else:
- unique_items.append(item)
- observed.add(item_key)
- return unique_items
-
-
def generate_correlation_id() -> str:
return str(uuid.uuid4()).replace("-", "") # FIXME cp should accept -
diff --git a/test/data/anaconda-ks.cfg b/test/data/anaconda-ks.cfg
index b98f7a434c..1f843b4547 100644
--- a/test/data/anaconda-ks.cfg
+++ b/test/data/anaconda-ks.cfg
@@ -59,9 +59,6 @@ clearpart --all --initlabel --drives=sda
# Yeah, strip()'ing passwords seems like a bad idea.
password = password
- auto-attach = True
- # If we should attempt to auto-attach
-
servicelevel = Premium
%end
diff --git a/test/functional_tests/README.md b/test/functional_tests/README.md
index 02c5e2cf7b..231e8b1562 100644
--- a/test/functional_tests/README.md
+++ b/test/functional_tests/README.md
@@ -24,7 +24,6 @@ How to run functional tests?
cd ./ansible_playbooks
ansible-playbook ./configure_package_manager.yml --extra-vars="candlepin_hostname="
ansible-playbook ./register_system.yml
- ansible-playbook ./attach_subscriptions.yml
ansible-playbook ./test_install_remove_packages.yml
ansible-playbook ./test_not_remove_prod_cert_for_disabled_repo.yml
```
diff --git a/test/functional_tests/ansible_playbooks/attach_subscriptions.yml b/test/functional_tests/ansible_playbooks/attach_subscriptions.yml
deleted file mode 100644
index e0bb6152d9..0000000000
--- a/test/functional_tests/ansible_playbooks/attach_subscriptions.yml
+++ /dev/null
@@ -1,23 +0,0 @@
----
-- hosts: clients
- vars:
- subscription_skus: ['awesomeos-x86_64', 'awesomeos-all-x86-cont']
- remote_user: root
- tasks:
-
- # Try to find pool ids
- - name: try to find pool ids
- shell: subscription-manager list --available --matches={{ item }} | gawk '/Pool ID:/{ print $3; exit }'
- register: cmd_output
- with_items: "{{ subscription_skus }}"
-
- - set_fact:
- pool_ids: "{{ cmd_output | json_query('results[*].stdout_lines[0]') }}"
-
- - debug:
- msg: "{{ pool_ids }}"
-
- - name: attach pools
- shell: subscription-manager attach --pool {{ item }}
- with_items: "{{ pool_ids }}"
-
diff --git a/test/rhsm/functional/test_connection.py b/test/rhsm/functional/test_connection.py
index 7660ed5ca5..4f7d38f445 100644
--- a/test/rhsm/functional/test_connection.py
+++ b/test/rhsm/functional/test_connection.py
@@ -22,7 +22,6 @@
from rhsm.connection import (
ContentConnection,
UEPConnection,
- BaseRestLib,
UnauthorizedException,
ForbiddenException,
RestlibException,
@@ -121,40 +120,6 @@ def tearDown(self):
self.cp.unregisterConsumer(self.consumer_uuid)
-@subman_marker_functional
-class BindRequestTests(unittest.TestCase):
- def setUp(self):
- self.cp = UEPConnection(username="admin", password="admin", insecure=True)
-
- consumerInfo = self.cp.registerConsumer("test-consumer", "system", owner="admin")
- self.consumer_uuid = consumerInfo["uuid"]
-
- @patch.object(BaseRestLib, "validateResult")
- @patch("rhsm.connection.drift_check", return_value=False)
- @patch("httplib.HTTPSConnection", autospec=True)
- def test_bind_no_args(self, mock_conn, mock_drift, mock_validate):
- self.cp.bind(self.consumer_uuid)
-
- # verify we called request() with kwargs that include 'body' as None
- # Specifically, we are checking that passing in "" to post_request, as
- # it does by default, results in None here. bin() passes no args there
- # so we use the default, "". See bz #907536
- for name, args, kwargs in mock_conn.mock_calls:
- if name == "().request":
- self.assertEqual(None, kwargs["body"])
-
- @patch.object(BaseRestLib, "validateResult")
- @patch("rhsm.connection.drift_check", return_value=False)
- @patch("httplib.HTTPSConnection", autospec=True)
- def test_bind_by_pool(self, mock_conn, mock_drift, mock_validate):
- # this test is just to verify we make the httplib connection with
- # right args, we don't validate the bind here
- self.cp.bindByEntitlementPool(self.consumer_uuid, "123121111", "1")
- for name, args, kwargs in mock_conn.mock_calls:
- if name == "().request":
- self.assertEqual(None, kwargs["body"])
-
-
@subman_marker_functional
class ContentConnectionTests(unittest.TestCase):
def testInsecure(self):
diff --git a/test/rhsm/unit/test_connection.py b/test/rhsm/unit/test_connection.py
index e57da87a91..be562b5fd8 100644
--- a/test/rhsm/unit/test_connection.py
+++ b/test/rhsm/unit/test_connection.py
@@ -43,7 +43,6 @@
import subscription_manager.injection as inj
from unittest.mock import Mock, patch, mock_open
-from datetime import date
from rhsm import ourjson as json
from collections import namedtuple
@@ -207,24 +206,6 @@ def test_has_proper_language_header_not_utf8(self, mock_locale):
self.cp.conn._set_accept_language_in_header()
self.assertEqual(self.cp.conn.headers["Accept-Language"], "ja-jp")
- def test_entitle_date(self):
- self.cp.conn = Mock()
- self.cp.conn.request_post = Mock(return_value=[])
- self.cp.bind("abcd", date(2011, 9, 2))
- self.cp.conn.request_post.assert_called_with(
- "/consumers/abcd/entitlements?entitle_date=2011-09-02",
- description="Updating subscriptions",
- )
-
- def test_no_entitle_date(self):
- self.cp.conn = Mock()
- self.cp.conn.request_post = Mock(return_value=[])
- self.cp.bind("abcd")
- self.cp.conn.request_post.assert_called_with(
- "/consumers/abcd/entitlements",
- description="Updating subscriptions",
- )
-
def test_clean_up_prefix(self):
self.assertTrue(self.cp.handler == "/Test/")
diff --git a/test/rhsmlib/dbus/test_entitlement.py b/test/rhsmlib/dbus/test_entitlement.py
index 21fb477c5d..fc0ddad0cb 100644
--- a/test/rhsmlib/dbus/test_entitlement.py
+++ b/test/rhsmlib/dbus/test_entitlement.py
@@ -34,77 +34,3 @@ def test_get_status(self):
result = self.impl.get_status("")
self.assertEqual(expected, result)
-
- def test_remove_entitlements_by_serials(self):
- remove_entitlements_by_serials_patch = mock.patch(
- "rhsmlib.services.entitlement.EntitlementService.remove_entitlements_by_serials",
- name="remove_entitlements_by_serials",
- )
- self.patches["remove_entitlements_by_serials"] = remove_entitlements_by_serials_patch.start()
- self.addCleanup(remove_entitlements_by_serials_patch.stop)
-
- removed_nonremoved = (["123"], [])
- self.patches["remove_entitlements_by_serials"].return_value = removed_nonremoved
-
- expected = removed_nonremoved[0]
- result = self.impl.remove_entitlements_by_serials(["123"], {})
- self.assertEqual(expected, result)
-
- def test_remove_entitlements_by_serials__multiple(self):
- remove_entitlements_by_serials_patch = mock.patch(
- "rhsmlib.services.entitlement.EntitlementService.remove_entitlements_by_serials",
- name="remove_entitlements_by_serials",
- )
- self.patches["remove_entitlements_by_serials"] = remove_entitlements_by_serials_patch.start()
- self.addCleanup(remove_entitlements_by_serials_patch.stop)
-
- removed_nonremoved = (["123", "456"], [])
- self.patches["remove_entitlements_by_serials"].return_value = removed_nonremoved
-
- expected = removed_nonremoved[0]
- result = self.impl.remove_entitlements_by_serials(["123", "456"], {})
- self.assertEqual(expected, result)
-
- def test_remove_entitlements_by_serials__good_and_bad(self):
- remove_entitlements_by_serials_patch = mock.patch(
- "rhsmlib.services.entitlement.EntitlementService.remove_entitlements_by_serials",
- name="remove_entitlements_by_serials",
- )
- self.patches["remove_entitlements_by_serials"] = remove_entitlements_by_serials_patch.start()
- self.addCleanup(remove_entitlements_by_serials_patch.stop)
-
- removed_nonremoved = (["123"], ["456"])
- self.patches["remove_entitlements_by_serials"].return_value = removed_nonremoved
-
- expected = removed_nonremoved[0]
- result = self.impl.remove_entitlements_by_serials(["123", "789"], {})
- self.assertEqual(expected, result)
-
- def test_remove_entitlements_by_pool_ids(self):
- remove_entitlements_by_pool_ids_patch = mock.patch(
- "rhsmlib.services.entitlement.EntitlementService.remove_entitlements_by_pool_ids",
- name="remove_entitlements_by_pool_ids",
- )
- self.patches["remove_entitlements_by_pool_ids"] = remove_entitlements_by_pool_ids_patch.start()
- self.addCleanup(remove_entitlements_by_pool_ids_patch.stop)
-
- removed_nonremoved_serials = (["123"], [], ["456"])
- self.patches["remove_entitlements_by_pool_ids"].return_value = removed_nonremoved_serials
-
- expected = removed_nonremoved_serials[2]
- result = self.impl.remove_entitlements_by_pool_ids(["123"], {})
- self.assertEqual(expected, result)
-
- def test_remove_all_entitlements(self):
- remove_all_entitlements_patch = mock.patch(
- "rhsmlib.services.entitlement.EntitlementService.remove_all_entitlements",
- name="remove_all_entitlements",
- )
- self.patches["remove_all_entitlements"] = remove_all_entitlements_patch.start()
- self.addCleanup(remove_all_entitlements_patch.stop)
-
- records = {"deletedRecords": 1}
- self.patches["remove_all_entitlements"].return_value = records
-
- result = self.impl.remove_all_entitlements({})
- self.assertEqual(records, result)
diff --git a/test/rhsmlib/services/test_entitlement.py b/test/rhsmlib/services/test_entitlement.py
index 21beb85fbe..0c5a754ef2 100644
--- a/test/rhsmlib/services/test_entitlement.py
+++ b/test/rhsmlib/services/test_entitlement.py
@@ -289,172 +289,6 @@ def test_no_pool_with_specified_filter(self, mock_managerlib):
filtered = service.get_available_pools(service_level="NotFound")
self.assertEqual(0, len(filtered))
- def test_remove_all_pools(self):
- """
- Test of removing all pools
- """
- ent_service = EntitlementService(self.mock_cp)
- ent_service.entcertlib = mock.Mock().return_value
- ent_service.entcertlib.update = mock.Mock()
- ent_service.cp.unbindAll = mock.Mock(return_value="[]")
-
- response = ent_service.remove_all_entitlements()
- self.assertEqual(response, "[]")
-
- def test_remove_all_pools_by_id(self):
- """
- Test of removing all pools by IDs of pool
- """
- ent_service = EntitlementService(self.mock_cp)
- ent_service.cp.unbindByPoolId = mock.Mock()
- ent_service.entitlement_dir.list_serials_for_pool_ids = mock.Mock(
- return_value={
- "4028fa7a5dea087d015dea0b025003f6": ["6219625278114868779"],
- "4028fa7a5dea087d015dea0adf560152": ["3573249574655121394"],
- }
- )
- ent_service.entcertlib = mock.Mock().return_value
- ent_service.entcertlib.update = mock.Mock()
-
- removed_pools, unremoved_pools, removed_serials = ent_service.remove_entitlements_by_pool_ids(
- ["4028fa7a5dea087d015dea0b025003f6", "4028fa7a5dea087d015dea0adf560152"]
- )
-
- expected_removed_serials = ["6219625278114868779", "3573249574655121394"]
- expected_removed_pools = ["4028fa7a5dea087d015dea0b025003f6", "4028fa7a5dea087d015dea0adf560152"]
-
- self.assertEqual(expected_removed_serials, removed_serials)
- self.assertEqual(expected_removed_pools, removed_pools)
- self.assertEqual([], unremoved_pools)
-
- def test_remove_dupli_pools_by_id(self):
- """
- Test of removing pools specified with duplicities
- (one pool id is set twice)
- """
- ent_service = EntitlementService(self.mock_cp)
- ent_service.cp.unbindByPoolId = mock.Mock()
- ent_service.entitlement_dir.list_serials_for_pool_ids = mock.Mock(
- return_value={
- "4028fa7a5dea087d015dea0b025003f6": ["6219625278114868779"],
- "4028fa7a5dea087d015dea0adf560152": ["3573249574655121394"],
- }
- )
- ent_service.entcertlib = mock.Mock().return_value
- ent_service.entcertlib.update = mock.Mock()
-
- removed_pools, unremoved_pools, removed_serials = ent_service.remove_entitlements_by_pool_ids(
- [
- "4028fa7a5dea087d015dea0b025003f6",
- "4028fa7a5dea087d015dea0b025003f6",
- "4028fa7a5dea087d015dea0adf560152",
- ]
- )
-
- expected_removed_serials = ["6219625278114868779", "3573249574655121394"]
- expected_removed_pools = ["4028fa7a5dea087d015dea0b025003f6", "4028fa7a5dea087d015dea0adf560152"]
-
- self.assertEqual(expected_removed_serials, removed_serials)
- self.assertEqual(expected_removed_pools, removed_pools)
- self.assertEqual([], unremoved_pools)
-
- def test_remove_some_pools_by_id(self):
- """
- Test of removing only some pools, because one pool ID is not valid
- """
- ent_service = EntitlementService(self.mock_cp)
-
- def stub_unbind(uuid, pool_id):
- if pool_id == "does_not_exist_d015dea0adf560152":
- raise connection.RestlibException(400, "Error")
-
- ent_service.cp.unbindByPoolId = mock.Mock(side_effect=stub_unbind)
- ent_service.entitlement_dir.list_serials_for_pool_ids = mock.Mock(
- return_value={
- "4028fa7a5dea087d015dea0b025003f6": ["6219625278114868779"],
- "4028fa7a5dea087d015dea0adf560152": ["3573249574655121394"],
- }
- )
- ent_service.entcertlib = mock.Mock().return_value
- ent_service.entcertlib.update = mock.Mock()
-
- removed_pools, unremoved_pools, removed_serials = ent_service.remove_entitlements_by_pool_ids(
- ["4028fa7a5dea087d015dea0b025003f6", "does_not_exist_d015dea0adf560152"]
- )
-
- expected_removed_serials = ["6219625278114868779"]
- expected_removed_pools = ["4028fa7a5dea087d015dea0b025003f6"]
- expected_unremoved_pools = ["does_not_exist_d015dea0adf560152"]
-
- self.assertEqual(expected_removed_serials, removed_serials)
- self.assertEqual(expected_removed_pools, removed_pools)
- self.assertEqual(expected_unremoved_pools, unremoved_pools)
-
- def test_remove_all_pools_by_serial(self):
- """
- Test of removing all pools by serial numbers
- """
- ent_service = EntitlementService(self.mock_cp)
- ent_service.cp.unbindBySerial = mock.Mock()
-
- ent_service.entcertlib = mock.Mock().return_value
- ent_service.entcertlib.update = mock.Mock()
-
- removed_serial, unremoved_serials = ent_service.remove_entitlements_by_serials(
- ["6219625278114868779", "3573249574655121394"]
- )
-
- expected_removed_serials = ["6219625278114868779", "3573249574655121394"]
-
- self.assertEqual(expected_removed_serials, removed_serial)
- self.assertEqual([], unremoved_serials)
-
- def test_remove_dupli_pools_by_serial(self):
- """
- Test of removing pools specified with duplicities
- (one serial number is set twice)
- """
- ent_service = EntitlementService(self.mock_cp)
- ent_service.cp.unbindBySerial = mock.Mock()
-
- ent_service.entcertlib = mock.Mock().return_value
- ent_service.entcertlib.update = mock.Mock()
-
- removed_serial, unremoved_serials = ent_service.remove_entitlements_by_serials(
- ["6219625278114868779", "6219625278114868779", "3573249574655121394"]
- )
-
- expected_removed_serials = ["6219625278114868779", "3573249574655121394"]
-
- self.assertEqual(expected_removed_serials, removed_serial)
- self.assertEqual([], unremoved_serials)
-
- def test_remove_some_pools_by_serial(self):
- """
- Test of removing some of pools by serial numbers, because one serial
- number is not valid.
- """
- ent_service = EntitlementService(self.mock_cp)
-
- def stub_unbind(uuid, serial):
- if serial == "does_not_exist_1394":
- raise connection.RestlibException(400, "Error")
-
- ent_service.cp.unbindBySerial = mock.Mock(side_effect=stub_unbind)
-
- ent_service.entcertlib = mock.Mock().return_value
- ent_service.entcertlib.update = mock.Mock()
-
- removed_serial, unremoved_serials = ent_service.remove_entitlements_by_serials(
- ["6219625278114868779", "does_not_exist_1394"]
- )
-
- expected_removed_serials = ["6219625278114868779"]
- expected_unremoved_serials = ["does_not_exist_1394"]
-
- self.assertEqual(expected_removed_serials, removed_serial)
- self.assertEqual(expected_unremoved_serials, unremoved_serials)
-
def test_parse_valid_date(self):
"""
Test parsing valid date
diff --git a/test/smoke.sh b/test/smoke.sh
index 9130825723..eb8e9a5b34 100755
--- a/test/smoke.sh
+++ b/test/smoke.sh
@@ -226,7 +226,6 @@ run_sm "0" list --available
run_sm "0" service-level
run_sm "0" service-level --list
run_sm "0" repos
-run_sm "0" attach
# Note: with current test data, the awesome-os repos will never be enabled
run_yum "0" repolist
@@ -282,7 +281,6 @@ run_rhsmcertd "0"
run_rhsmcertd "0" -n
run_rhsmcertd_worker "0"
-run_rhsmcertd_worker "0" --autoheal
# too slow
# run_rhsm_debug "0" system
@@ -319,8 +317,6 @@ run_sm "0" repos --list
# fully entitled, hence the '1'
run_sm "1" register --activationkey "${ACTIVATION_KEY}" --org "${ORG}" --force
run_sm "0" unregister
-run_sm "64" register --activationkey "${ACTIVATION_KEY}" --org "${ORG}" --force --auto-attach
-run_sm "1" unregister
run_sm "0" clean
diff --git a/test/stubs.py b/test/stubs.py
index dd93be76e9..b595c87c9c 100644
--- a/test/stubs.py
+++ b/test/stubs.py
@@ -482,8 +482,6 @@ def __init__(
self.environment_list = []
self.called_unregister_uuid = None
self.called_unbind_uuid = None
- self.called_unbind_serial = []
- self.called_unbind_pool_id = []
self.username = username
self.password = password
self._capabilities = []
@@ -494,8 +492,6 @@ def __init__(
def reset(self):
self.called_unregister_uuid = None
self.called_unbind_uuid = None
- self.called_unbind_serial = []
- self.called_unbind_pool_id = []
def has_capability(self, capability):
return capability in self._capabilities
@@ -562,12 +558,6 @@ def getConsumer(self, consumerId):
def unbindAll(self, consumer):
self.called_unbind_uuid = consumer
- def unbindBySerial(self, consumer, serial):
- self.called_unbind_serial.append(serial)
-
- def unbindByPoolId(self, consumer_uuid, pool_id):
- self.called_unbind_pool_id.append(pool_id)
-
def getCertificateSerials(self, consumer):
return []
diff --git a/test/test_certmgr.py b/test/test_certmgr.py
index 99586f07f6..7c5013706e 100644
--- a/test/test_certmgr.py
+++ b/test/test_certmgr.py
@@ -38,7 +38,7 @@
"releaseVer": {"id": 1, "releaseVer": "123123"},
"serviceLevel": "Pro Turbo HD Plus Ultra",
"owner": {"key": "admin"},
- "autoheal": 1,
+ "autoheal": True,
"idCert": {"serial": {"serial": 3787455826750723380}},
}
diff --git a/test/test_managercli.py b/test/test_managercli.py
index 9b921a459f..1412c266b0 100644
--- a/test/test_managercli.py
+++ b/test/test_managercli.py
@@ -212,7 +212,7 @@ def test_unknown_args_cause_exit(self):
sys,
"argv",
# test with some subcommand; sub-man prints help without it
- ["subscription-manager", "attach", "--foo", "bar", "baz"],
+ ["subscription-manager", "register", "--foo", "bar", "baz"],
):
try:
self.cc.main()
diff --git a/test/test_managerlib.py b/test/test_managerlib.py
index f7ec64a526..975e0d067b 100644
--- a/test/test_managerlib.py
+++ b/test/test_managerlib.py
@@ -715,201 +715,6 @@ def MockSystemLog(self, message, priority):
EXPECTED_CONTENT_V3 = EXPECTED_CERT_CONTENT_V3 + os.linesep + EXPECTED_KEY_CONTENT_V3
-class ExtractorStub(managerlib.ImportFileExtractor):
- def __init__(self, content, file_path="test/file/path"):
- self.content = content
- self.writes = []
- managerlib.ImportFileExtractor.__init__(self, file_path)
-
- # Stub out any file system access
- def _read(self, file_path):
- return self.content
-
- def _write_file(self, target, content):
- self.writes.append((target, content))
-
- def _ensure_entitlement_dir_exists(self):
- # Do nothing but stub out the dir check to avoid file system access.
- pass
-
-
-class TestImportFileExtractor(unittest.TestCase):
- def test_contains_key_content_when_key_and_cert_exists_in_import_file(self):
- extractor = ExtractorStub(EXPECTED_CONTENT)
- self.assertTrue(extractor.contains_key_content())
-
- def test_contains_key_content_when_key_and_cert_exists_in_import_file_v3(self):
- extractor = ExtractorStub(EXPECTED_CONTENT_V3)
- self.assertTrue(extractor.contains_key_content())
-
- def test_does_not_contain_key_when_key_does_not_exist_in_import_file(self):
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT)
- self.assertFalse(extractor.contains_key_content())
-
- def test_does_not_contain_key_when_key_does_not_exist_in_import_file_v3(self):
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3)
- self.assertFalse(extractor.contains_key_content())
-
- def test_get_key_content_when_key_exists(self):
- extractor = ExtractorStub(EXPECTED_CONTENT, file_path="12345.pem")
- self.assertTrue(extractor.contains_key_content())
- self.assertEqual(EXPECTED_KEY_CONTENT, extractor.get_key_content())
-
- def test_get_key_content_when_key_exists_v3(self):
- extractor = ExtractorStub(EXPECTED_CONTENT_V3, file_path="12345.pem")
- self.assertTrue(extractor.contains_key_content())
- self.assertEqual(EXPECTED_KEY_CONTENT_V3, extractor.get_key_content())
-
- def test_get_key_content_returns_None_when_key_does_not_exist(self):
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT, file_path="12345.pem")
- self.assertFalse(extractor.get_key_content())
-
- def test_get_key_content_returns_None_when_key_does_not_exist_v3(self):
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3, file_path="12345.pem")
- self.assertFalse(extractor.get_key_content())
-
- def test_get_cert_content(self):
- extractor = ExtractorStub(EXPECTED_CONTENT, file_path="12345.pem")
- self.assertTrue(extractor.contains_key_content())
- self.assertEqual(EXPECTED_CERT_CONTENT, extractor.get_cert_content())
-
- def test_get_cert_content_v3(self):
- extractor = ExtractorStub(EXPECTED_CONTENT_V3, file_path="12345.pem")
- self.assertTrue(extractor.contains_key_content())
- self.assertEqual(EXPECTED_CERT_CONTENT_V3, extractor.get_cert_content())
-
- def test_get_cert_content_returns_None_when_cert_does_not_exist(self):
- extractor = ExtractorStub(EXPECTED_KEY_CONTENT, file_path="12345.pem")
- self.assertFalse(extractor.get_cert_content())
-
- def test_get_cert_content_returns_None_when_cert_does_not_exist_v3(self):
- extractor = ExtractorStub(EXPECTED_KEY_CONTENT_V3, file_path="12345.pem")
- self.assertFalse(extractor.get_cert_content())
-
- def test_verify_valid_entitlement_for_invalid_cert(self):
- extractor = ExtractorStub(EXPECTED_KEY_CONTENT, file_path="12345.pem")
- self.assertFalse(extractor.verify_valid_entitlement())
-
- def test_verify_valid_entitlement_for_invalid_cert_v3(self):
- extractor = ExtractorStub(EXPECTED_KEY_CONTENT_V3, file_path="12345.pem")
- self.assertFalse(extractor.verify_valid_entitlement())
-
- def test_verify_valid_entitlement_for_invalid_cert_bundle(self):
- # Use a bundle of cert + key, but the cert is not an entitlement cert:
- extractor = ExtractorStub(IDENTITY_CERT_WITH_KEY, file_path="12345.pem")
- self.assertFalse(extractor.verify_valid_entitlement())
-
- def test_verify_valid_entitlement_for_no_key(self):
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT, file_path="12345.pem")
- self.assertFalse(extractor.verify_valid_entitlement())
-
- def test_verify_valid_entitlement_for_no_key_v3(self):
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3, file_path="12345.pem")
- self.assertFalse(extractor.verify_valid_entitlement())
-
- def test_verify_valid_entitlement_for_no_cert_content(self):
- extractor = ExtractorStub("", file_path="12345.pem")
- self.assertFalse(extractor.verify_valid_entitlement())
-
- def test_write_cert_only(self):
- expected_cert_file = "%d.pem" % (EXPECTED_CERT.serial)
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT, file_path=expected_cert_file)
- extractor.write_to_disk()
-
- self.assertEqual(1, len(extractor.writes))
-
- write_one = extractor.writes[0]
- self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
- self.assertEqual(EXPECTED_CERT_CONTENT, write_one[1])
-
- def test_write_cert_only_v3(self):
- expected_cert_file = "%d.pem" % (EXPECTED_CERT_V3.serial)
- extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3, file_path=expected_cert_file)
- extractor.write_to_disk()
-
- self.assertEqual(1, len(extractor.writes))
-
- write_one = extractor.writes[0]
- self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
- self.assertEqual(EXPECTED_CERT_CONTENT_V3, write_one[1])
-
- def test_write_key_and_cert(self):
- filename = "%d.pem" % (EXPECTED_CERT.serial)
- self._assert_correct_cert_and_key_files_generated_with_filename(filename)
-
- def test_write_key_and_cert_v3(self):
- filename = "%d.pem" % (EXPECTED_CERT_V3.serial)
- self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
-
- def test_file_renamed_when_imported_with_serial_no_and_custom_extension(self):
- filename = "%d.cert" % (EXPECTED_CERT.serial)
- self._assert_correct_cert_and_key_files_generated_with_filename(filename)
-
- def test_file_renamed_when_imported_with_serial_no_and_custom_extension_v3(self):
- filename = "%d.cert" % (EXPECTED_CERT_V3.serial)
- self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
-
- def test_file_renamed_when_imported_with_serial_no_and_no_extension(self):
- filename = str(EXPECTED_CERT.serial)
- self._assert_correct_cert_and_key_files_generated_with_filename(filename)
-
- def test_file_renamed_when_imported_with_serial_no_and_no_extension_v3(self):
- filename = str(EXPECTED_CERT_V3.serial)
- self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
-
- def test_file_renamed_when_imported_with_custom_name_and_pem_extension(self):
- filename = "entitlement.pem"
- self._assert_correct_cert_and_key_files_generated_with_filename(filename)
-
- def test_file_renamed_when_imported_with_custom_name_and_pem_extension_v3(self):
- filename = "entitlement.pem"
- self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
-
- def test_file_renamed_when_imported_with_custom_name_no_extension(self):
- filename = "entitlement"
- self._assert_correct_cert_and_key_files_generated_with_filename(filename)
-
- def test_file_renamed_when_imported_with_custom_name_no_extension_v3(self):
- filename = "entitlement"
- self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
-
- def _assert_correct_cert_and_key_files_generated_with_filename(self, filename):
- expected_file_prefix = "%d" % (EXPECTED_CERT.serial)
- expected_cert_file = expected_file_prefix + ".pem"
- expected_key_file = expected_file_prefix + "-key.pem"
-
- extractor = ExtractorStub(EXPECTED_CONTENT, file_path=filename)
- extractor.write_to_disk()
-
- self.assertEqual(2, len(extractor.writes))
-
- write_one = extractor.writes[0]
- self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
- self.assertEqual(EXPECTED_CERT_CONTENT, write_one[1])
-
- write_two = extractor.writes[1]
- self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_key_file), write_two[0])
- self.assertEqual(EXPECTED_KEY_CONTENT, write_two[1])
-
- def _assert_correct_cert_and_key_files_generated_with_filename_v3(self, filename):
- expected_file_prefix = "%d" % (EXPECTED_CERT_V3.serial)
- expected_cert_file = expected_file_prefix + ".pem"
- expected_key_file = expected_file_prefix + "-key.pem"
-
- extractor = ExtractorStub(EXPECTED_CONTENT_V3, file_path=filename)
- extractor.write_to_disk()
-
- self.assertEqual(2, len(extractor.writes))
-
- write_one = extractor.writes[0]
- self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
- self.assertEqual(EXPECTED_CERT_CONTENT_V3, write_one[1])
-
- write_two = extractor.writes[1]
- self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_key_file), write_two[0])
- self.assertEqual(EXPECTED_KEY_CONTENT_V3, write_two[1])
-
-
class TestMergedPoolsStackingGroupSorter(unittest.TestCase):
def test_sorter_adds_group_for_non_stackable_entitlement(self):
pool = self._create_pool("test-prod-1", "Test Prod 1")
diff --git a/test/test_plugins.py b/test/test_plugins.py
index e58c0827df..6a1fa9288f 100644
--- a/test/test_plugins.py
+++ b/test/test_plugins.py
@@ -1075,19 +1075,6 @@ def test_post_subscription_conduit(self):
self.assertEqual({}, conduit.entitlement_data)
-class TestAutoAttachConduit(unittest.TestCase):
- def test_auto_attach_conduit(self):
- conduit = plugins.AutoAttachConduit(StubPluginClass, "a-consumer-uuid")
- self.assertEqual("a-consumer-uuid", conduit.consumer_uuid)
-
-
-class TestPostAutoAttachConduit(unittest.TestCase):
- def test_post_auto_attach_conduit(self):
- conduit = plugins.PostAutoAttachConduit(StubPluginClass, "a-consumer-uuid", {})
- self.assertEqual("a-consumer-uuid", conduit.consumer_uuid)
- self.assertEqual({}, conduit.entitlement_data)
-
-
class BasePluginException(unittest.TestCase):
"""At least create and raise all the exceptions."""
diff --git a/test/test_syspurposestore_interface.py b/test/test_syspurposestore_interface.py
deleted file mode 100644
index f733e34fe1..0000000000
--- a/test/test_syspurposestore_interface.py
+++ /dev/null
@@ -1,134 +0,0 @@
-# Copyright (c) 2018 Red Hat, Inc.
-#
-# This software is licensed to you under the GNU General Public License,
-# version 2 (GPLv2). There is NO WARRANTY for this software, express or
-# implied, including the implied warranties of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
-# along with this software; if not, see
-# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
-#
-# Red Hat trademarks are not licensed under GPLv2. No permission is
-# granted to use or replicate Red Hat trademarks that are incorporated
-# in this software or its documentation.
-#
-
-import json
-import shutil
-import tempfile
-import unittest
-from .fixture import set_up_mock_sp_store
-
-import os
-from unittest import mock
-
-
-class SyspurposeStoreInterfaceTests(unittest.TestCase):
- def setUp(self):
- temp_dir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, temp_dir)
- mock_syspurpose_file = os.path.join(temp_dir, "mock_syspurpose.json")
- syspurpose_values = {}
- with open(mock_syspurpose_file, "w") as f:
- json.dump(syspurpose_values, f)
- f.flush()
-
- from subscription_manager import syspurposelib
-
- self.syspurposelib = syspurposelib
- syspurposelib.USER_SYSPURPOSE = mock_syspurpose_file
-
- syspurpose_patch = mock.patch("subscription_manager.syspurposelib.SyncedStore")
- self.mock_sp_store = syspurpose_patch.start()
- self.mock_sp_store, self.mock_sp_store_contents = set_up_mock_sp_store(self.mock_sp_store)
- self.addCleanup(syspurpose_patch.stop)
-
- def _set_up_mock_sp_store(self):
- """
- Sets up the mock syspurpose store with methods that are mock versions of the real deal.
- Allows us to test in the absence of the syspurpose module.
- :return:
- """
- contents = {}
- self.mock_sp_store_contents = contents
-
- def set(item, value):
- contents[item] = value
-
- def read(path, raise_on_error=False):
- return self.mock_sp_store
-
- def unset(item):
- contents[item] = None
-
- def add(item, value):
- current = contents.get(item, [])
- if value not in current:
- current.append(value)
- contents[item] = current
-
- def remove(item, value):
- current = contents.get(item)
- if current is not None and isinstance(current, list) and value in current:
- current.remove(value)
-
- self.mock_sp_store.set = mock.Mock(side_effect=set)
- self.mock_sp_store.read = mock.Mock(side_effect=read)
- self.mock_sp_store.unset = mock.Mock(side_effect=unset)
- self.mock_sp_store.add = mock.Mock(side_effect=add)
- self.mock_sp_store.remove = mock.Mock(side_effect=remove)
- self.mock_sp_store.contents = self.mock_sp_store_contents
-
- def tearDown(self):
- self.syspurposelib.USER_SYSPURPOSE = "/etc/rhsm/syspurpose/syspurpose.json"
-
- def test_save_sla_to_syspurpose_metadata_sla_is_set_when_syspurpose_module_exists(self):
- """
- Tests that the syspurpose sla is set through the syspurposestore interface
- when the syspurpose module is available for import.
- """
- self.syspurposelib.save_sla_to_syspurpose_metadata(None, None, "Freemium")
-
- contents = self.syspurposelib.read_syspurpose()
- self.assertEqual(contents.get("service_level_agreement"), "Freemium")
-
- def test_save_sla_to_syspurpose_metadata_sla_is_not_set_when_None_is_provided(self):
- """
- Tests that the syspurpose sla is not set through the syspurposestore interface
- when None is passed to save_sla_to_syspurpose_metadata method.
- """
- self.syspurposelib.save_sla_to_syspurpose_metadata(None, None, None)
-
- contents = self.syspurposelib.read_syspurpose()
- self.assertEqual(contents.get("service_level_agreement"), None)
-
- def test_save_sla_to_syspurpose_metadata_sla_is_not_set_when_empty_string_is_provided(self):
- """
- Tests that the syspurpose sla is not set through the syspurposestore interface
- when an empty string is passed to save_sla_to_syspurpose_metadata method.
- """
- self.syspurposelib.save_sla_to_syspurpose_metadata(None, None, "")
-
- contents = self.syspurposelib.read_syspurpose()
- self.assertEqual(contents.get("service_level_agreement"), None)
-
- def test_save_sla_to_syspurpose_metadata_sla_is_not_set_when_syspurpose_module_does_not_exist(self):
- """
- Tests that the syspurpose sla is NOT set through the syspurposestore interface
- when the syspurpose module is not available for import.
- """
- # Remove SyspurposeStore and USER_SYSPURPOSE from syspurposestore_interface's scope temporarily
- # to simulate that importing them failed.
- tmp_syspurpose_store = self.syspurposelib.SyncedStore
- tmp_user_syspurpose = self.syspurposelib.USER_SYSPURPOSE
- del self.syspurposelib.SyncedStore
- del self.syspurposelib.USER_SYSPURPOSE
-
- self.syspurposelib.save_sla_to_syspurpose_metadata(None, None, "Freemium")
-
- # Add SyspurposeStore and USER_SYSPURPOSE back to syspurposestore_interface's scope
- self.syspurposelib.SyncedStore = tmp_syspurpose_store
- self.syspurposelib.USER_SYSPURPOSE = tmp_user_syspurpose
-
- # Check that the contents of the syspurpose.json are empty (thus sla was not set)
- contents = self.syspurposelib.read_syspurpose()
- self.assertFalse(contents)
diff --git a/test/test_utils.py b/test/test_utils.py
index 264bfc7af1..57b7c69505 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -19,7 +19,6 @@
format_baseurl,
get_version,
get_client_versions,
- unique_list_items,
get_server_versions,
friendly_join,
is_true_value,
@@ -585,22 +584,6 @@ def test_false_value(self):
self.assertFalse(is_true_value("f"))
-class TestUniqueListItems(fixture.SubManFixture):
- def test_preserves_order(self):
- input_list = [1, 1, 2, 2, 3, 3]
- expected = [1, 2, 3]
- self.assertEqual(expected, unique_list_items(input_list))
-
- def test_hash_function(self):
- mock_item_1 = Mock()
- mock_item_1.value = 1
- mock_item_2 = Mock()
- mock_item_2.value = 2
- input_list = [mock_item_1, mock_item_1, mock_item_2, mock_item_2]
- expected = [mock_item_1, mock_item_2]
- self.assertEqual(expected, unique_list_items(input_list, lambda x: x.value))
-
-
class TestProductCertificateFilter(fixture.SubManFixture):
def test_set_filter_string(self):
test_data = [
diff --git a/test/zypper/test_serviceplugin.py b/test/zypper/test_serviceplugin.py
index 16472d525c..f4d858edcf 100644
--- a/test/zypper/test_serviceplugin.py
+++ b/test/zypper/test_serviceplugin.py
@@ -40,9 +40,6 @@ def test_provides_subman_repos_if_registered_and_subscribed(self):
"--serverurl={RHSM_URL}".format(sub_man=self.SUB_MAN, **os.environ),
shell=True,
)
- subprocess.call(
- "{sub_man} attach --pool={RHSM_POOL}".format(sub_man=self.SUB_MAN, **os.environ), shell=True
- )
self.assertTrue(self.has_subman_repos())
def test_can_download_rpm(self):
@@ -51,9 +48,6 @@ def test_can_download_rpm(self):
"--serverurl={RHSM_URL}".format(sub_man=self.SUB_MAN, **os.environ),
shell=True,
)
- subprocess.check_call(
- "{sub_man} attach --pool={RHSM_POOL}".format(sub_man=self.SUB_MAN, **os.environ), shell=True
- )
subprocess.check_call(
"{sub_man} repos --enable={RHSM_TEST_REPO}".format(sub_man=self.SUB_MAN, **os.environ), shell=True
)