diff --git a/src/daemons/rhsmcertd.c b/src/daemons/rhsmcertd.c index 29790747ac..b7568af375 100644 --- a/src/daemons/rhsmcertd.c +++ b/src/daemons/rhsmcertd.c @@ -67,8 +67,6 @@ typedef enum { #define N_(x) x #define CONFIG_KEY_NOT_FOUND (0) -#define MAX_AUTO_REGISTER_ATTEMPTS 3 - #if defined(__linux) # if LINUX_VERSION_CODE >= KERNEL_VERSION(3,17,0) # ifdef HAVE_LINUX_GETRANDOM @@ -93,7 +91,6 @@ static gint arg_reg_interval_minutes = -1; static gboolean arg_no_splay = FALSE; static gboolean arg_auto_registration = FALSE; static int fd_lock = -1; -static int auto_register_attempt = 0; struct CertCheckData { int interval_seconds; @@ -419,14 +416,8 @@ auto_register(gpointer data) exit (EXIT_FAILURE); } if (pid == 0) { - if (auto_register_attempt < MAX_AUTO_REGISTER_ATTEMPTS) { - debug ("(Auto-registration) executing: %s --auto-register", WORKER); - execl (WORKER, WORKER_NAME, "--auto-register", NULL); - } else { - warn ("(Auto-registration) the number of attempts reached the max limit: %d", MAX_AUTO_REGISTER_ATTEMPTS); - // Return False to not repeat this again - return false; - } + debug ("(Auto-registration) executing: %s --auto-register", WORKER); + execl (WORKER, WORKER_NAME, "--auto-register", NULL); } waitpid (pid, &status, 0); @@ -434,19 +425,10 @@ auto_register(gpointer data) if (status == 0) { info ("(Auto-registration) performed successfully."); - // No need to repeat this action again return false; } else { - auto_register_attempt++; - if (auto_register_attempt < MAX_AUTO_REGISTER_ATTEMPTS) { - warn ("(Auto-registration) failed (%d), retry will occur on next run.", status); - } else { - warn ("(Auto-registration) failed (%d), the number of attempts reached the max limit: %d", - status, MAX_AUTO_REGISTER_ATTEMPTS); - // Return False to not repeat this again - return false; - } - return true; + warn ("(Auto-registration) failed (%d)", status); + return false; } } @@ -488,30 +470,6 @@ cert_check (gboolean heal) return TRUE; } - -static gboolean -initial_auto_register (gpointer data) -{ - struct CertCheckData *cert_data = data; - gboolean repeat_attempts; - - repeat_attempts = auto_register(cert_data); - - // When first attempt was not successful, then try to do other - // auto-registration attempts - if (repeat_attempts == true) { - // Add the timeout to begin waiting on interval but offset by the initial - // delay. - g_timeout_add(cert_data->interval_seconds * 1000, - (GSourceFunc) auto_register, cert_data); - // Update timestamp - log_update(cert_data->interval_seconds, cert_data->next_update_file); - } - // Return false so that the timer does - // not run this again. - return false; -} - static gboolean initial_cert_check (gpointer data) { @@ -915,13 +873,11 @@ main (int argc, char *argv[]) // NOTE: We put the initial checks on a timer so that in the case of systemd, // we can ensure that the network interfaces are all up before the initial // checks are done. - int auto_reg_initial_delay = 0; int auto_attach_initial_delay = 0; int cert_check_initial_delay = 0; if (run_now) { info ("Initial checks will be run now!"); } else { - int auto_reg_offset = 0; int auto_attach_offset = 0; int cert_check_offset = 0; if (splay_enabled == true) { @@ -960,16 +916,10 @@ main (int argc, char *argv[]) } #endif srand((unsigned int) seed); - auto_reg_offset = gen_random(auto_reg_interval_seconds); auto_attach_offset = gen_random(heal_interval_seconds); cert_check_offset = gen_random(cert_interval_seconds); } - if (auto_reg_enabled) { - auto_reg_initial_delay = INITIAL_DELAY_SECONDS + auto_reg_offset; - info ("Waiting %.1f minutes plus %d splay seconds [%d seconds total] before performing first auto-register", - INITIAL_DELAY_SECONDS / 60.0, auto_reg_offset, auto_reg_initial_delay); - } auto_attach_initial_delay = INITIAL_DELAY_SECONDS + auto_attach_offset; info ("Waiting %.1f minutes plus %d splay seconds [%d seconds total] before performing first auto-attach.", INITIAL_DELAY_SECONDS / 60.0, auto_attach_offset, auto_attach_initial_delay); @@ -994,8 +944,7 @@ main (int argc, char *argv[]) auto_attach_data.next_update_file = NEXT_AUTO_ATTACH_UPDATE_FILE; if (auto_reg_enabled) { - g_timeout_add(auto_reg_initial_delay * 1000, - (GSourceFunc) initial_auto_register, (gpointer) &auto_register_data); + auto_register((gpointer) &auto_register_data); } g_timeout_add (cert_check_initial_delay * 1000, (GSourceFunc) initial_cert_check, (gpointer) &cert_check_data); @@ -1006,7 +955,7 @@ main (int argc, char *argv[]) // time. This works for most users, since the cert_interval aligns with // runs of heal_interval (i.e., heal_interval % cert_interval = 0) if (auto_reg_enabled) { - log_update (auto_reg_initial_delay, NEXT_AUTO_REGISTER_UPDATE_FILE); + log_update (0, NEXT_AUTO_REGISTER_UPDATE_FILE); } log_update (cert_check_initial_delay, NEXT_CERT_UPDATE_FILE); log_update (auto_attach_initial_delay, NEXT_AUTO_ATTACH_UPDATE_FILE); diff --git a/src/plugins/dnf/subscription_manager.py b/src/plugins/dnf/subscription_manager.py index 38d213abf2..199bb1db05 100644 --- a/src/plugins/dnf/subscription_manager.py +++ b/src/plugins/dnf/subscription_manager.py @@ -32,6 +32,13 @@ from configparser import ConfigParser +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from subscription_manager.certdirectory import EntitlementDirectory + from subscription_manager.identity import Identity + + expired_warning = _( """ *** WARNING *** @@ -137,12 +144,15 @@ def _update(cache_only): Update entitlement certificates and redhat.repo :param cache_only: is True, when rhsm.full_refresh_on_yum is set to 0 in rhsm.conf """ - logger.info(_("Updating Subscription Management repositories.")) - - identity = inj.require(inj.IDENTITY) - - if not identity.is_valid(): + identity: Identity = inj.require(inj.IDENTITY) + ent_dir: EntitlementDirectory = inj.require(inj.ENT_DIR) + + # During first phase of anonymous cloud registration the system has + # valid entitlement certificates, but does not yet have any identity. + # We have access to the content, so we shouldn't be reporting missing + # identity certificate. + if not identity.is_valid() and len(ent_dir.list_valid()) == 0: logger.info(_("Unable to read consumer identity")) if config.in_container(): diff --git a/src/rhsm/connection.py b/src/rhsm/connection.py index fe9c03b4ab..629a47cf0e 100644 --- a/src/rhsm/connection.py +++ b/src/rhsm/connection.py @@ -1442,11 +1442,12 @@ def get_supported_resources(self) -> dict: return self.resources - def supports_resource(self, resource_name: str) -> bool: - """ - Check if the server we're connecting too supports a particular - resource. For our use cases this is generally the plural form - of the resource. + def supports_resource(self, resource_name: Optional[str]) -> bool: + """Check if the server supports a particular resource. + + :param resource_name: + Resource to be requested. + When `None`, API call 'GET /' is made to cache all supported resources. """ if self.resources is None: self._load_supported_resources() @@ -1488,31 +1489,27 @@ def has_capability(self, capability: str) -> bool: def ping(self, *args, **kwargs) -> Any: return self.conn.request_get("/status/", description=_("Checking connection status")) - def getJWToken(self, cloud_id: str, metadata: str, signature: str) -> Any: - """ - When automatic registration is enabled in rhsm.conf and it was possible - to gather cloud metadata, then it is possible to try to get JSON Web Token - for automatic registration. When candlepin does not provide automatic - registration, then raise exception. - :param cloud_id: ID of cloud provider, e.g. "aws", "azure", "gcp" - :param metadata: string with base64 encoded metadata - :param signature: string with base64 encoded signature - :return: string with JWT + def getCloudJWT(self, cloud_id: str, metadata: str, signature: str) -> Dict[str, Any]: + """Obtain cloud JWT. + + This method is part of the Cloud registration v2: standard or anonymous flow. + + :param cloud_id: Cloud provider, e.g. 'aws', 'azure' or 'gcp'. + :param metadata: Base64 encoded public cloud metadata. + :param signature: Base64 encoded public cloud signature. """ - params = { + data = { "type": cloud_id, "metadata": metadata, "signature": signature, } - # "Accept" http header has to be text/plain, because candlepin return - # token as simple text and it is not wrapped in json document headers = { - "Content-type": "application/json", - "Accept": "text/plain", + "Content-Type": "application/json", } + return self.conn.request_post( - method="/cloud/authorize", - params=params, + method="/cloud/authorize?version=2", + params=data, headers=headers, description=_("Fetching cloud token"), ) @@ -1844,18 +1841,29 @@ def unregisterConsumer(self, consumerId: str) -> bool: method = "/consumers/%s" % self.sanitize(consumerId) return self.conn.request_delete(method, description=_("Unregistering system")) is None - def getCertificates(self, consumer_uuid: str, serials: Optional[list] = None) -> List[dict]: + def getCertificates( + self, + consumer_uuid: str, + serials: Optional[list] = None, + jwt: Optional[str] = None, + ) -> List[dict]: """ Fetch all entitlement certificates for this consumer. Specify a list of serial numbers to filter if desired + :param consumer_uuid: consumer UUID :param serials: list of entitlement serial numbers + :param jwt: JWT identifying an anonymous system """ method = "/consumers/%s/certificates" % (self.sanitize(consumer_uuid)) if serials: serials_str = ",".join(serials) method = "%s?serials=%s" % (method, serials_str) - return self.conn.request_get(method, description=_("Fetching certificates")) + headers: Dict[str, str] = {} + if jwt: + headers["Authorization"] = f"Bearer {jwt}" + + return self.conn.request_get(method, headers=headers, description=_("Fetching certificates")) def getCertificateSerials(self, consumerId: str) -> List[dict]: """ diff --git a/src/rhsmlib/services/register.py b/src/rhsmlib/services/register.py index ebc1e1bfb8..aa3d451844 100644 --- a/src/rhsmlib/services/register.py +++ b/src/rhsmlib/services/register.py @@ -13,7 +13,7 @@ import logging import socket -from typing import Callable +from typing import Callable, Optional from rhsm.connection import UEPConnection @@ -37,7 +37,7 @@ def __init__(self, cp: UEPConnection) -> None: def register( self, - org: str, + org: Optional[str], activation_keys: list = None, environments: list = None, force: bool = False, diff --git a/src/subscription_manager/cache.py b/src/subscription_manager/cache.py index 68b0bb798a..6e24486eb2 100644 --- a/src/subscription_manager/cache.py +++ b/src/subscription_manager/cache.py @@ -17,6 +17,7 @@ this with the current state, and perform an update on the server if necessary. """ +import base64 import datetime import logging import os @@ -27,6 +28,7 @@ if TYPE_CHECKING: from rhsm.certificate2 import EntitlementCertificate, Product + from rhsm.connection import UEPConnection from subscription_manager.certdirectory import ProductDirectory, EntitlementDirectory from subscription_manager.cp_provider import CPProvider from subscription_manager.identity import Identity @@ -1173,3 +1175,97 @@ def _load_data(self, open_file: TextIO) -> Optional[Dict]: except ValueError: # Ignore json file parse error pass + + +class CloudTokenCache: + """A cache for Candlepin's JWT used during automatic registration. + + This is used by rhsmcertd worker. + """ + + CACHE_FILE = "/var/lib/rhsm/cache/cloud_token_cache.json" + + @classmethod + def get(cls, uep: "UEPConnection", cloud_id: str, metadata: str, signature: str) -> Dict[str, str]: + """Get a JWT from the Candlepin server. + + If cached token already exists and is still valid, it will be used + without contacting the server. + """ + try: + token: Dict[str, str] = cls._get_from_file() + log.debug("JWT cache contains valid token, no need to contact the server.") + return token + except LookupError as exc: + log.debug(f"JWT cache doesn't contain valid token, contacting the server ({exc}).") + except Exception as exc: + log.debug(f"JWT cache couldn't be read (got {type(exc).__name__}), contacting the server.") + + token: Dict[str, str] = cls._get_from_server(uep, cloud_id, metadata, signature) + cls._save_to_file(token) + return token + + @classmethod + def is_valid(cls) -> bool: + """Check if the cached JWT is valid. + + :returns: + `True` if the locally cached JWT is valid, + `False` if the locally cached JWT is expired or does not exist. + """ + # 'exp' key: https://www.rfc-editor.org/rfc/rfc7519#section-4.1.4 + expiration: int = cls._get_payload()["exp"] + + now = int(time.time()) + return expiration > now + + @classmethod + def _get_payload(cls) -> Dict: + """Get the body of the JWT. + + :returns: The body of the JWT as a dictionary. + :raises Exception: The file is missing or malformed. + """ + with open(cls.CACHE_FILE, "r") as f: + content: Dict[str, str] = json.load(f) + + payload: str = content["token"].split(".")[1] + # JWT does not use the padding, base64.b64decode requires it. + payload = f"{payload}===" + + return json.loads(base64.b64decode(payload).decode("utf-8")) + + @classmethod + def _get_from_file(cls) -> Dict[str, str]: + """Get a JWT from a cache file. + + :raises LookupError: The token is expired. + :raises Exception: The file is missing or malformed. + """ + if not cls.is_valid(): + cls.delete_cache() + raise LookupError("Candlepin JWT is expired.") + + with open(cls.CACHE_FILE, "r") as f: + return json.load(f) + + @classmethod + def _get_from_server( + cls, uep: "UEPConnection", cloud_id: str, metadata: str, signature: str + ) -> Dict[str, str]: + """Get a JWT from the Candlepin server.""" + log.debug("Obtaining Candlepin JWT.") + result: Dict[str, str] = uep.getCloudJWT(cloud_id, metadata, signature) + return result + + @classmethod + def delete_cache(cls): + if os.path.exists(cls.CACHE_FILE): + os.remove(cls.CACHE_FILE) + log.debug(f"Candlepin JWT cache file ({cls.CACHE_FILE}) was deleted.") + + @classmethod + def _save_to_file(cls, token: Dict[str, str]) -> None: + with open(cls.CACHE_FILE, "w") as f: + json.dump(token, f) + log.debug(f"Candlepin JWT was saved to a cache file ({cls.CACHE_FILE}).") diff --git a/src/subscription_manager/entcertlib.py b/src/subscription_manager/entcertlib.py index 2dff2cbc8c..84d3f0fc9d 100644 --- a/src/subscription_manager/entcertlib.py +++ b/src/subscription_manager/entcertlib.py @@ -26,6 +26,7 @@ from subscription_manager import rhelentbranding import subscription_manager.injection as inj +from subscription_manager import repolib from subscription_manager.i18n import ungettext, ugettext as _ if TYPE_CHECKING: @@ -455,3 +456,54 @@ def __str__(self) -> str: self.write(s, _("Added (new)"), self.added) self.write(s, _("Deleted (rogue):"), self.rogue) return "\n".join(s) + + +class AnonymousCertificateManager: + """Manage anonymous entitlement certificates. + + Anonymous certificate can be obtained from Candlepin via JWT/bearer token + when the system is deployed as a cloud VM. + + These certificates are short-lived and are meant to be replaced by a proper + certificate in a short time. + """ + + def __init__(self, uep: "UEPConnection"): + self.uep = uep + + def install_temporary_certificates(self, uuid: str, jwt: str) -> None: + """Obtain temporary entitlement certificates. + + - Download and install temporary entitlement certificates and keys + without obtaining an identity certificate. + - Generate 'redhat.repo' file out of them. + + :param uuid: The anonymous UUID assigned by Candlepin. + :param jwt: The Bearer token sent by Candlepin. + """ + log.debug("Obtaining anonymous entitlement certificates and keys.") + certificates: List[Dict] = self.uep.getCertificates(consumer_uuid=uuid, jwt=jwt) + if not len(certificates): + log.debug("No anonymous entitlement certificates were received.") + return + + log.debug("Installing anonymous entitlement certificates and keys.") + report = EntCertUpdateReport() + installer = EntitlementCertBundlesInstaller(report=report) + entitlement_ids: List[int] = installer.install(cert_bundles=certificates) + + log.debug( + "The following anonymous entitlement certificates and keys were installed: " + + ", ".join(str(c) for c in entitlement_ids) + ) + + update_repo = repolib.RepoUpdateActionCommand() + update_repo_report: Optional[repolib.RepoActionReport] = update_repo.perform() + + if update_repo_report is None: + log.debug("Anonymous entitlement certificate did not cause repository updates.") + else: + log.debug( + "Anonymous entitlement certificate caused " + f"{update_repo_report.updates()} repositories to be updated." + ) diff --git a/src/subscription_manager/identity.py b/src/subscription_manager/identity.py index e6234eee5d..1473c8dc5c 100644 --- a/src/subscription_manager/identity.py +++ b/src/subscription_manager/identity.py @@ -96,6 +96,15 @@ def getConsumerName(self) -> str: def getSerialNumber(self) -> int: return self.x509.serial + def getConsumerOwner(self) -> Optional[str]: + """Get the name of the organization of the consumer. + + The value is stored in the 'Subject Name' > 'O (Organization)' field + of the consumer certificate. + """ + subject: dict = self.x509.subject + return subject.get("O") + # TODO: we're using a Certificate which has it's own write/delete, no idea # why this landed in a parallel disjoint class wrapping the actual cert. def write(self) -> None: @@ -134,6 +143,7 @@ def __init__(self): self._lock = threading.Lock() self._name: Optional[str] = None self._uuid: Optional[str] = None + self._owner: Optional[str] = None self._cert_dir_path: str = conf["rhsm"]["consumerCertDir"] self.reload() @@ -160,6 +170,7 @@ def reload(self) -> None: if self.consumer is not None: self._name = self.consumer.getConsumerName() self._uuid = self.consumer.getConsumerId() + self._owner = self.consumer.getConsumerOwner() # since Identity gets dep injected, lets look up # the cert dir on the active id instead of the global config self._cert_dir_path = self.consumer.PATH @@ -188,6 +199,11 @@ def uuid(self) -> Optional[str]: _uuid = self._uuid return _uuid + @property + def owner(self) -> Optional[str]: + with self._lock: + return self._owner + @property def cert_dir_path(self) -> str: with self._lock: diff --git a/src/subscription_manager/identitycertlib.py b/src/subscription_manager/identitycertlib.py index ecb396d8b5..3057a050e1 100644 --- a/src/subscription_manager/identitycertlib.py +++ b/src/subscription_manager/identitycertlib.py @@ -13,7 +13,7 @@ # import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List from subscription_manager import certlib from subscription_manager import injection as inj @@ -21,7 +21,7 @@ if TYPE_CHECKING: from rhsm.connection import UEPConnection from subscription_manager.cp_provider import CPProvider - from subscription_manager.identity import ConsumerIdentity, Identity + from subscription_manager.identity import Identity log = logging.getLogger(__name__) @@ -70,16 +70,23 @@ def _update_cert(self, identity: "Identity") -> certlib.ActionReport: # FIXME: move persist stuff here from subscription_manager import managerlib - idcert: ConsumerIdentity = identity.consumer + local_serial: int = identity.consumer.getSerialNumber() + local_owner: str = identity.owner - consumer: dict = self._get_consumer(identity) + consumer: dict = self.uep.getConsumer(identity.uuid) + actual_serial: int = consumer["idCert"]["serial"]["serial"] + actual_owner: str = consumer.get("owner", {}).get("key", "") + + # Only update the certificate if the serial has changed + if local_serial != actual_serial: + diff: List[str] = [f"{local_serial} => {actual_serial}"] + if local_owner != actual_owner: + diff += [f"{local_owner} => {actual_owner}"] - # only write the cert if the serial has changed - # FIXME: this would be a good place to have a Consumer/ConsumerCert - # model. - # FIXME: and this would be a ConsumerCert model '!=' - if idcert.getSerialNumber() != consumer["idCert"]["serial"]["serial"]: - log.debug("identity certificate changed, writing new one") + log.info( + f"Serial number of the identity certificate changed ({', '.join(diff)}), " + "new identity certificate will be saved." + ) # FIXME: should be in this module? managerlib is an odd place managerlib.persist_consumer_cert(consumer) @@ -87,8 +94,3 @@ def _update_cert(self, identity: "Identity") -> certlib.ActionReport: # updated the cert, or at least checked self.report._status = 1 return self.report - - def _get_consumer(self, identity: "Identity") -> dict: - # FIXME: not much for error handling here - consumer: dict = self.uep.getConsumer(identity.uuid) - return consumer diff --git a/src/subscription_manager/managerlib.py b/src/subscription_manager/managerlib.py index 16a1a04cc3..b61755f72e 100644 --- a/src/subscription_manager/managerlib.py +++ b/src/subscription_manager/managerlib.py @@ -1012,6 +1012,7 @@ def clean_all_data(backup: bool = True) -> None: require(POOL_STATUS_CACHE).delete_cache() require(OVERRIDE_STATUS_CACHE).delete_cache() require(RELEASE_STATUS_CACHE).delete_cache() + cache.CloudTokenCache.delete_cache() RepoActionInvoker.delete_repo_file() log.debug("Cleaned local data") diff --git a/src/subscription_manager/scripts/rhsmcertd_worker.py b/src/subscription_manager/scripts/rhsmcertd_worker.py index 71126adf80..3a56e0380c 100644 --- a/src/subscription_manager/scripts/rhsmcertd_worker.py +++ b/src/subscription_manager/scripts/rhsmcertd_worker.py @@ -10,54 +10,115 @@ # Red Hat trademarks are not licensed under GPLv2. No permission is # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. -# - -# hack to allow bytes/strings to be interpolated w/ unicode values (gettext gives us bytes) -# Without this, for example, "Формат: %s\n" % "foobar" will fail with UnicodeDecodeError -# See http://stackoverflow.com/a/29832646/6124862 for more details -import sys - -import signal -import logging -import dbus.mainloop.glib import base64 -from typing import Union +import enum +import logging +import random +import signal +import sys +import time +from argparse import SUPPRESS +from typing import Dict, List, Union, TYPE_CHECKING -import subscription_manager.injection as inj +from cloud_what.provider import detect_cloud_provider, CLOUD_PROVIDERS, BaseCloudProvider from rhsm import connection, config, logutil -from subscription_manager.injectioninit import init_dep_injection +from rhsmlib.services.register import RegisterService -from subscription_manager.action_client import HealingActionClient, ActionClient +import subscription_manager.utils +import subscription_manager.injection as inj +from subscription_manager import cache +from subscription_manager import entcertlib from subscription_manager import managerlib -from subscription_manager.identity import ConsumerIdentity +from subscription_manager.action_client import HealingActionClient, ActionClient +from subscription_manager.i18n import ugettext as _ from subscription_manager.i18n_argparse import ArgumentParser, USAGE -from argparse import SUPPRESS -from subscription_manager.utils import generate_correlation_id +from subscription_manager.identity import Identity, ConsumerIdentity +from subscription_manager.injectioninit import init_dep_injection -from subscription_manager.i18n import ugettext as _ -from cloud_what.provider import detect_cloud_provider, CLOUD_PROVIDERS, BaseCloudProvider -from rhsmlib.services.register import RegisterService +if TYPE_CHECKING: + import argparse + from rhsm.config import RhsmConfigParser + from rhsm.connection import UEPConnection + from subscription_manager.cp_provider import CPProvider + + +class ExitStatus(enum.IntEnum): + """Well-known exit codes. + + WARNING: THIS IS NOT A PUBLIC API. + Values of these errors should not be used by any external applications, they are subject + to change at any time without warning. + External applications should only differentiate between zero and non-zero exit codes. + """ + + OK = 0 + + RHSMCERTD_DISABLED = 5 + """rhsmcertd has been disabled through the config file.""" + LOCAL_CORRUPTION = 6 + """Local data have been corrupted.""" + + NO_CLOUD_PROVIDER = 10 + """No public cloud provider has been detected.""" + NO_CLOUD_METADATA = 11 + """Public cloud provider has been detected, but metadata could not be obtained.""" + + NO_REGISTRATION_TOKEN = 20 + """Registration token could not be obtained: server or cache are unavailable or broken.""" + BAD_TOKEN_TYPE = 21 + """Registration token was received, but is not recognized.""" + + REGISTRATION_FAILED = 30 + """The system registration was not successful.""" + + UNKNOWN_ERROR = -1 + """An unknown error occurred.""" init_dep_injection() +log = logging.getLogger(f"rhsm-app.{__name__}") + def exit_on_signal(_signumber, _stackframe): - sys.exit(0) + sys.exit(ExitStatus.OK) + + +def _is_enabled() -> bool: + """Check if rhsmcertd is enabled or disabled.""" + cfg: RhsmConfigParser = config.get_config_parser() + if cfg.get("rhsmcertd", "disable") == "1": + return False + return True + + +def _is_registered() -> bool: + """Check if the system is registered.""" + identity: Identity = inj.require(inj.IDENTITY) + return identity.is_valid() + + +def _create_cp_provider() -> "CPProvider": + """Create a CPProvider with unique correlation ID.""" + provider: CPProvider = inj.require(inj.CP_PROVIDER) + provider.set_correlation_id(correlation_id=subscription_manager.utils.generate_correlation_id()) + log.debug(f"X-Correlation-ID: {provider.correlation_id}") + return provider -def _collect_cloud_info(cloud_list: list, log) -> dict: +def _collect_cloud_info(cloud_list: List[str]) -> dict: """ Try to collect cloud information: metadata and signature provided by cloud provider. :param cloud_list: The list of detected cloud providers. In most cases the list contains only one item. - :param log: logging object - :return: The dictionary with metadata and signature (when signature is provided by cloud provider). - Metadata and signature are base64 encoded. Empty dictionary is returned, when it wasn't - possible to collect any metadata + :return: + Dictionary with 'metadata' and 'signature' (if it is provided by cloud provider), + both encoded in base64. + Empty dictionary is returned if metadata cannot be collected. """ + log.debug(f"Collecting metadata from cloud provider(s): {cloud_list}") # Create dispatcher dictionary from the list of supported cloud providers cloud_providers = {provider_cls.CLOUD_PROVIDER_ID: provider_cls for provider_cls in CLOUD_PROVIDERS} @@ -89,84 +150,169 @@ def _collect_cloud_info(cloud_list: list, log) -> dict: log.info(f"Metadata and signature gathered for cloud provider: {cloud_provider_id}") - # Encode metadata and signature using base64 encoding. Because base64.b64encode - # returns values as bytes, then we decode it to string using ASCII encoding. - b64_metadata: str = base64.b64encode(bytes(metadata, "utf-8")).decode("ascii") - b64_signature: str = base64.b64encode(bytes(signature, "utf-8")).decode("ascii") - result = { "cloud_id": cloud_provider_id, - "metadata": b64_metadata, - "signature": b64_signature, + "metadata": base64.b64encode(bytes(metadata, "utf-8")).decode("ascii"), + "signature": base64.b64encode(bytes(signature, "utf-8")).decode("ascii"), } break return result -def _auto_register(cp_provider, log): - """ - Try to perform auto-registration - :param cp_provider: provider of connection to candlepin server - :param log: logging object - :return: None - """ - log.debug("Trying to do auto-registration of this system") +def _auto_register(cp_provider: "CPProvider") -> ExitStatus: + """Try to perform automatic registration. - identity = inj.require(inj.IDENTITY) - if identity.is_valid() is True: - log.debug("System already registered. Skipping auto-registration") - return - - log.debug("Trying to detect cloud provider") + :param cp_provider: Provider of connection to Candlepin. + :returns: ExitStatus describing the result of a registration otherwise. + """ + log.info("Starting automatic registration.") - # Try to detect cloud provider first. Use lower threshold in this case, - # because we want to have more sensitive detection in this case - # (automatic registration is more important than reporting of facts) + log.debug("Detecting cloud provider.") + # Try to detect cloud provider first. We use lower threshold in this case, not the + # default, because we want to have more sensitive detection in this case; + # automatic registration is more important than reporting of facts. cloud_list = detect_cloud_provider(threshold=0.3) if len(cloud_list) == 0: - log.warning("This system does not run on any supported cloud provider. Skipping auto-registration") - sys.exit(-1) + log.warning( + "This system does not run on any supported cloud provider. " + "Automatic registration cannot be performed." + ) + return ExitStatus.NO_CLOUD_PROVIDER - # When some cloud provider(s) was detected, then try to collect metadata - # and signature - cloud_info = _collect_cloud_info(cloud_list, log) + # When some cloud provider(s) were detected, then try to collect metadata and signature + cloud_info = _collect_cloud_info(cloud_list) if len(cloud_info) == 0: - log.warning("It was not possible to collect any cloud metadata. Unable to perform auto-registration") - sys.exit(-1) + log.warning("Cloud metadata could not be collected. Unable to perform automatic registration.") + return ExitStatus.NO_CLOUD_METADATA # Get connection not using any authentication - cp = cp_provider.get_no_auth_cp() + uep: UEPConnection = cp_provider.get_no_auth_cp() - # Try to get JWT token from candlepin (cloud registration adapter) + # Obtain automatic registration token try: - jwt_token = cp.getJWToken( + token: Dict[str, str] = cache.CloudTokenCache.get( + uep=uep, cloud_id=cloud_info["cloud_id"], metadata=cloud_info["metadata"], signature=cloud_info["signature"], ) - except Exception as err: - log.error("Unable to get JWT token: {err}".format(err=str(err))) - log.warning("Canceling auto-registration") - sys.exit(-1) - - # Try to register using JWT token - register_service = RegisterService(cp=cp) - # Organization ID is set to None, because organization ID is - # included in JWT token - try: - register_service.register(org=None, jwt_token=jwt_token) - except Exception as err: - log.error("Unable to auto-register: {err}".format(err=err)) - sys.exit(-1) + except Exception: + log.exception("Cloud token could not be obtained. Unable to perform automatic registration.") + return ExitStatus.NO_REGISTRATION_TOKEN + + if token["tokenType"] == "CP-Cloud-Registration": + try: + _auto_register_standard(uep=uep, token=token) + except Exception: + log.exception("Standard automatic registration failed.") + return ExitStatus.REGISTRATION_FAILED + else: + log.info("Standard automatic registration was successful.") + return ExitStatus.OK + + if token["tokenType"] == "CP-Anonymous-Cloud-Registration": + try: + _auto_register_anonymous(uep=uep, token=token) + cache.CloudTokenCache.delete_cache() + except Exception: + log.exception("Anonymous automatic registration failed.") + return ExitStatus.REGISTRATION_FAILED + else: + log.info("Anonymous automatic registration was successful.") + return ExitStatus.OK + + log.error(f"Unsupported token type for automatic registration: {token['tokenType']}.") + return ExitStatus.BAD_TOKEN_TYPE + + +def _auto_register_standard(uep: "UEPConnection", token: Dict[str, str]) -> None: + """Perform standard automatic registration. + + The service will download identity certificate and entitlement certificates. + + :raises Exception: The system could not be registered. + """ + log.debug("Registering the system through standard automatic registration.") + + service = RegisterService(cp=uep) + service.register(org=None, jwt_token=token["token"]) + + +def _auto_register_anonymous(uep: "UEPConnection", token: Dict[str, str]) -> None: + """Perform anonymous automatic registration. + + First we download the anonymous entitlement certificates and install them. + + Then we wait the 'splay' period. This makes sure we give the cloud backend + enough time to create all the various objects in the Candlepin database. + + Then we perform the registration to obtain identity certificate and proper + entitlement certificates. + + :raises TimeoutError: JWT expired, the system could be registered. + :raises Exception: The system could not be registered. + """ + log.debug("Registering the system through anonymous automatic registration.") + + # Step 1: Get the anonymous entitlement certificates + manager = entcertlib.AnonymousCertificateManager(uep=uep) + manager.install_temporary_certificates(uuid=token["anonymousConsumerUuid"], jwt=token["token"]) + + # Step 2: Wait + cfg = config.get_config_parser() + if cfg.get("rhsmcertd", "splay") == "0": + log.debug("Trying to obtain the identity immediately, splay is disabled.") else: - log.debug("Auto-registration performed successfully") - sys.exit(0) + registration_interval = int(cfg.get("rhsmcertd", "auto_registration_interval")) + splay_interval: int = random.randint(60, registration_interval * 60) + log.debug( + f"Waiting a period of {splay_interval} seconds " + f"(about {splay_interval // 60} minutes) before attempting to obtain the identity." + ) + time.sleep(splay_interval) + + # Step 3: Obtain the identity certificate + log.debug("Obtaining system identity") + + service = RegisterService(cp=uep) + while cache.CloudTokenCache.is_valid(): + # While the server prepares the identity, it keeps sending status code 429 + # and a Retry-After header. + try: + service.register(org=None, jwt_token=token["token"]) + cache.CloudTokenCache.delete_cache() + # The anonymous organization will have different entitlement certificates, + # we need to refresh them. + log.debug( + "Replacing anonymous entitlement certificates " + "with entitlement certificates linked to an anonymous organization." + ) + report = entcertlib.EntCertUpdateAction().perform() + log.debug(report) + return + except connection.RateLimitExceededException as exc: + if exc.headers.get("Retry-After", None) is None: + raise + delay = int(exc.headers["Retry-After"]) + log.debug( + f"Got response with status code {exc.code} and Retry-After header, " + f"will try again in {delay} seconds." + ) + time.sleep(delay) + except Exception: + raise + + # In theory, this should not happen, it means that something has gone wrong server-side. + raise TimeoutError("The Candlepin JWT expired before we were able to register the system.") -def _main(options, log): - # Set default mainloop - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +def _main(args: "argparse.Namespace"): + if not _is_enabled() and not args.force: + log.info("The rhsmcertd process has been disabled by configuration.") + sys.exit(ExitStatus.RHSMCERTD_DISABLED) + + log.debug("Running rhsmcertd worker.") # exit on SIGTERM, otherwise finally statements don't run # (one explanation: http://stackoverflow.com/a/41840796) @@ -174,35 +320,32 @@ def _main(options, log): # without finally statements, we get confusing behavior (ex. see bz#1431659) signal.signal(signal.SIGTERM, exit_on_signal) - cp_provider = inj.require(inj.CP_PROVIDER) - correlation_id = generate_correlation_id() - log.debug("X-Correlation-ID: %s", correlation_id) - cp_provider.set_correlation_id(correlation_id) - cfg = config.get_config_parser() - - log.debug("check for rhsmcertd disable") - if "1" == cfg.get("rhsmcertd", "disable") and not options.force: - log.warning("The rhsmcertd process has been disabled by configuration.") - sys.exit(-1) + cp_provider: CPProvider = _create_cp_provider() - # Was script executed with --auto-register option - if options.auto_register is True: - _auto_register(cp_provider, log) + if args.auto_register is True: + if _is_registered(): + print(_("This system is already registered, ignoring request to automatically register.")) + log.debug("This system is already registered, skipping automatic registration.") + else: + print(_("Registering the system")) + status: ExitStatus = _auto_register(cp_provider) + sys.exit(status.value) if not ConsumerIdentity.existsAndValid(): log.error( "Either the consumer is not registered or the certificates" + " are corrupted. Certificate update using daemon failed." ) - sys.exit(-1) - print(_("Updating entitlement certificates & repositories")) + sys.exit(ExitStatus.LOCAL_CORRUPTION) + + print(_("Updating entitlement certificates & repositories.")) - cp = cp_provider.get_consumer_auth_cp() - # pre-load supported resources; serves as a way of failing before locking the repos - cp.supports_resource(None) + uep: UEPConnection = cp_provider.get_consumer_auth_cp() + # preload supported resources; serves as a way of failing before locking the repos + uep.supports_resource(None) try: - if options.autoheal: + if args.autoheal: action_client = HealingActionClient() else: action_client = ActionClient() @@ -215,51 +358,27 @@ def _main(options, log): print(update_report) except connection.ExpiredIdentityCertException as e: - log.critical(_("Your identity certificate has expired")) + log.critical("System's identity certificate has expired.") raise e except connection.GoneException as ge: uuid = ConsumerIdentity.read().getConsumerId() - - # This code is to prevent an errant 410 response causing consumer cert deletion. - # - # If a server responds with a 410, we want to very that it's not just a 410 http status, but - # also that the response is from candlepin, and include the right info about the consumer. - # - # A connection to the entitlement server could get an unintentional 410 response. A common - # cause for that kind of error would be a bug or crash or misconfiguration of a reverse proxy - # in front of candlepin. Most error codes we treat as temporary and transient, and they don't - # cause any action to be taken (aside from error handling). But since consumer deletion is tied - # to the 410 status code, and that is difficult to recover from, we try to be a little bit - # more paranoid about that case. + # The GoneException carries information about a consumer deleted on the server. # - # So we look for both the 410 status, and the expected response body. If we get those - # then python-rhsm will create a GoneException that includes the deleted_id. If we get - # A GoneException and the deleted_id matches, then we actually delete the consumer. - # - # However... If we get a GoneException and it's deleted_id does not match the current - # consumer uuid, we do not delete the consumer. That would require using a valid consumer - # cert, but making a request for a different consumer uuid, so unlikely. Could register - # with --consumerid get there? + # If this exception is raised and the `deleted_id` matches the current UUID, + # we clean up the system. In theory, we could use a valid consumer certificate + # to make a request for a different consumer UUID. if ge.deleted_id == uuid: - log.critical( - ( - "Consumer profile '%s' has been deleted from the server. " - "Its local certificates will now be archived" - ), - uuid, + log.info( + f"Consumer profile '{uuid}' has been deleted from the server. " + "Its local certificates will be archived to '/etc/pki/consumer.old/'." ) managerlib.clean_all_data() - log.critical( - "Certificates archived to '/etc/pki/consumer.old'. " - "Contact your system administrator if you need more information." - ) raise ge def main(): logutil.init_logger() - log = logging.getLogger("rhsm-app." + __name__) parser = ArgumentParser(usage=USAGE) parser.add_argument( @@ -278,21 +397,22 @@ def main(): help="perform auto-registration", ) + options: argparse.Namespace + args: List[str] (options, args) = parser.parse_known_args() try: - _main(options, log) + _main(options) except SystemExit as se: # sys.exit triggers an exception in older Python versions, which # in this case we can safely ignore as we do not want to log the # stack trace. We need to check the code, since we want to signal # exit with failure to the caller. Otherwise, we will exit with 0 if se.code: - sys.exit(-1) - except Exception as e: - log.error("Error while updating certificates using daemon") + sys.exit(ExitStatus.UNKNOWN_ERROR) + except Exception: + log.exception("Error while updating certificates using daemon") print(_("Unable to update entitlement certificates and repositories")) - log.exception(e) - sys.exit(-1) + sys.exit(ExitStatus.UNKNOWN_ERROR) if __name__ == "__main__": diff --git a/test/test_auto_registration.py b/test/test_auto_registration.py index 734090cbd2..a6d4cef6bb 100644 --- a/test/test_auto_registration.py +++ b/test/test_auto_registration.py @@ -133,7 +133,7 @@ def test_collect_cloud_info_one_cloud_provider_detected(self): aws.AWSCloudProvider._instance._session = mock_session cloud_list = ["aws"] - cloud_info = _collect_cloud_info(cloud_list, Mock()) + cloud_info = _collect_cloud_info(cloud_list) self.assertIsNotNone(cloud_info) self.assertTrue(len(cloud_info) > 0) @@ -166,7 +166,7 @@ def test_collect_cloud_info_more_cloud_providers_detected(self): # More cloud providers detected cloud_list = ["azure", "aws"] - cloud_info = _collect_cloud_info(cloud_list, Mock()) + cloud_info = _collect_cloud_info(cloud_list) self.assertIsNotNone(cloud_info) self.assertTrue(len(cloud_info) > 0) diff --git a/test/test_cache.py b/test/test_cache.py index 23f504cee9..76640603fd 100644 --- a/test/test_cache.py +++ b/test/test_cache.py @@ -9,6 +9,7 @@ # have received a copy of GPLv2 along with this software; if not, see # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. # +import base64 import unittest import datetime @@ -20,6 +21,7 @@ import tempfile import time from unittest.mock import Mock, patch, mock_open +from typing import Dict # used to get a user readable cfg class for test cases from .stubs import ( @@ -52,6 +54,7 @@ from rhsm.profile import Package, RPMProfile, EnabledReposProfile, ModulesProfile from rhsm.connection import ( + UEPConnection, RestlibException, UnauthorizedException, RateLimitExceededException, @@ -1360,3 +1363,142 @@ def test_status_on_failed_load(self): self.assertEqual(self.syspurpose_cache.get_overall_status(), "Unknown") self.assertEqual(self.syspurpose_cache.get_overall_status_code(), "unknown") self.assertIsNone(self.syspurpose_cache.get_status_reasons()) + + +class TestCloudTokenCache(SubManFixture): + original_path = None + + @classmethod + def setUpClass(cls): + cls.original_path = cache.CloudTokenCache.CACHE_FILE + + @classmethod + def tearDownClass(cls): + # Ensure we have cleaned up properly + assert ( + cls.original_path == cache.CloudTokenCache.CACHE_FILE + ), f"{cls.__name__} did not clean up after itself" + + def setUp(self): + self.actual_path: str = cache.CloudTokenCache.CACHE_FILE + self.test_path = tempfile.NamedTemporaryFile(prefix="cloud_token_cache.", suffix=".json") + cache.CloudTokenCache.CACHE_FILE = self.test_path.name + + def tearDown(self): + cache.CloudTokenCache.CACHE_FILE = self.actual_path + del self.actual_path + del self.test_path + + @staticmethod + def _get_uep(jwt_response: dict) -> Mock: + """Create mock UEPConnection object with custom `getCloudJWT()`.""" + uep = Mock(spec=UEPConnection) + uep.getCloudJWT.return_value = jwt_response + return uep + + @staticmethod + def _create_jwt(expired: bool = False) -> Dict[str, str]: + """Create fake JWT. + + :param expired: When True, the token will have the expiration date in the past. + """ + now = int(time.time()) + expiration: int = (now + 60) if not expired else (now - 60) + + jwt_header: str = base64.b64encode(b"{}").decode("utf-8") + jwt_body: str = base64.b64encode(json.dumps({"exp": expiration}).encode("utf-8")).decode("utf-8") + jwt_signature: str = base64.b64encode(b"{}").decode("utf-8") + + return { + "anonymousConsumerUuid": "fake-uuid", + "token": f"{jwt_header}.{jwt_body}.{jwt_signature}".replace("=", ""), + "tokenType": "Fake-CP-Cloud-Registration", + "__is_expired": expired, + } + + def test_no_cache(self): + """Call the server if we don't have any cache file. Cache the result.""" + expected = self._create_jwt(expired=False) + uep = self._get_uep(jwt_response=expected) + jwt = cache.CloudTokenCache.get( + uep=uep, cloud_id="fake-cloud", metadata="fake metadata", signature="fake signature" + ) + + # We have called the server + self.assertTrue(uep.getCloudJWT.called) + uep.getCloudJWT.assert_called_once_with("fake-cloud", "fake metadata", "fake signature") + # The cache returned the response + self.assertIsNotNone(jwt) + # The response was cached into a file + self.assertTrue(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + with open(cache.CloudTokenCache.CACHE_FILE, "r") as f: + self.assertEqual(json.load(f), expected) + + def test_is_valid__when_valid(self): + """Check that cache is reported as valid if the expiration date is in the future.""" + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=False)) + self.assertTrue(cache.CloudTokenCache.is_valid()) + + def test_is_valid__when_invalid(self): + """Check that cache is reported as invalid if the expiration date is in the past.""" + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=True)) + self.assertFalse(cache.CloudTokenCache.is_valid()) + + def test_expired_cache(self): + """Call the server if we have a cache file, but its content is expired.""" + # Simulate old expired cache + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=True)) + + expected = self._create_jwt(expired=False) + uep = self._get_uep(jwt_response=expected) + jwt = cache.CloudTokenCache.get( + uep=uep, cloud_id="fake-cloud", metadata="fake metadata", signature="fake signature" + ) + + # We have called the server + self.assertTrue(uep.getCloudJWT.called) + uep.getCloudJWT.assert_called_once_with("fake-cloud", "fake metadata", "fake signature") + # The cache returned the response + self.assertIsNotNone(jwt) + # The response was cached into a file + self.assertTrue(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + with open(cache.CloudTokenCache.CACHE_FILE, "r") as f: + self.assertEqual(json.load(f), expected) + + def test_valid_cache(self): + """Return the cached value.""" + expected = self._create_jwt(expired=False) + # Simulate existing cache + cache.CloudTokenCache._save_to_file(token=expected) + + uep = self._get_uep(jwt_response={}) + jwt = cache.CloudTokenCache.get( + uep=uep, cloud_id="fake-cloud", metadata="fake metadata", signature="fake signature" + ) + + # We have not called the server + self.assertFalse(uep.getCloudJWT.called) + # The cache returned the response + self.assertIsNotNone(jwt) + # The cache file still exists + self.assertTrue(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + with open(cache.CloudTokenCache.CACHE_FILE, "r") as f: + self.assertEqual(json.load(f), expected) + + def test_delete(self): + """Delete existing cache file.""" + # Simulate existing cache + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=False)) + + cache.CloudTokenCache.delete_cache() + + # The cache file does not exist + self.assertFalse(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + + def test_delete_no_cache(self): + """Try to delete cache file that does not exist.""" + # Delete the empty file + os.remove(cache.CloudTokenCache.CACHE_FILE) + + cache.CloudTokenCache.delete_cache() + self.assertFalse(os.path.isfile(cache.CloudTokenCache.CACHE_FILE))