diff --git a/src/daemons/rhsmcertd.c b/src/daemons/rhsmcertd.c index 7bb2c616c9..70f50085a2 100644 --- a/src/daemons/rhsmcertd.c +++ b/src/daemons/rhsmcertd.c @@ -67,7 +67,6 @@ typedef enum { #define N_(x) x #define CONFIG_KEY_NOT_FOUND (0) -#define MAX_AUTO_REGISTER_ATTEMPTS 3 #if !GLIB_CHECK_VERSION(2, 58, 0) #define G_SOURCE_FUNC(f) ((GSourceFunc) (void (*)(void)) (f)) @@ -97,7 +96,6 @@ static gint arg_reg_interval_minutes = -1; static gboolean arg_no_splay = FALSE; static gboolean arg_auto_registration = FALSE; static int fd_lock = -1; -static int auto_register_attempt = 0; struct CertCheckData { int interval_seconds; @@ -423,14 +421,8 @@ auto_register(gpointer data) exit (EXIT_FAILURE); } if (pid == 0) { - if (auto_register_attempt < MAX_AUTO_REGISTER_ATTEMPTS) { - debug ("(Auto-registration) executing: %s --auto-register", WORKER); - execl (WORKER, WORKER_NAME, "--auto-register", NULL); - } else { - warn ("(Auto-registration) the number of attempts reached the max limit: %d", MAX_AUTO_REGISTER_ATTEMPTS); - // Return False to not repeat this again - return false; - } + debug ("(Auto-registration) executing: %s --auto-register", WORKER); + execl (WORKER, WORKER_NAME, "--auto-register", NULL); } waitpid (pid, &status, 0); @@ -438,19 +430,10 @@ auto_register(gpointer data) if (status == 0) { info ("(Auto-registration) performed successfully."); - // No need to repeat this action again return false; } else { - auto_register_attempt++; - if (auto_register_attempt < MAX_AUTO_REGISTER_ATTEMPTS) { - warn ("(Auto-registration) failed (%d), retry will occur on next run.", status); - } else { - warn ("(Auto-registration) failed (%d), the number of attempts reached the max limit: %d", - status, MAX_AUTO_REGISTER_ATTEMPTS); - // Return False to not repeat this again - return false; - } - return true; + warn ("(Auto-registration) failed (%d)", status); + return false; } } @@ -492,30 +475,6 @@ cert_check (gboolean heal) return TRUE; } - -static gboolean -initial_auto_register (gpointer data) -{ - struct CertCheckData *cert_data = data; - gboolean repeat_attempts; - - repeat_attempts = auto_register(cert_data); - - // When first attempt was not successful, then try to do other - // auto-registration attempts - if (repeat_attempts == true) { - // Add the timeout to begin waiting on interval but offset by the initial - // delay. - g_timeout_add(cert_data->interval_seconds * 1000, - (GSourceFunc) auto_register, cert_data); - // Update timestamp - log_update(cert_data->interval_seconds, cert_data->next_update_file); - } - // Return false so that the timer does - // not run this again. - return false; -} - static gboolean initial_cert_check (gpointer data) { @@ -919,13 +878,11 @@ main (int argc, char *argv[]) // NOTE: We put the initial checks on a timer so that in the case of systemd, // we can ensure that the network interfaces are all up before the initial // checks are done. - int auto_reg_initial_delay = 0; int auto_attach_initial_delay = 0; int cert_check_initial_delay = 0; if (run_now) { info ("Initial checks will be run now!"); } else { - int auto_reg_offset = 0; int auto_attach_offset = 0; int cert_check_offset = 0; if (splay_enabled == true) { @@ -964,16 +921,10 @@ main (int argc, char *argv[]) } #endif srand((unsigned int) seed); - auto_reg_offset = gen_random(auto_reg_interval_seconds); auto_attach_offset = gen_random(heal_interval_seconds); cert_check_offset = gen_random(cert_interval_seconds); } - if (auto_reg_enabled) { - auto_reg_initial_delay = INITIAL_DELAY_SECONDS + auto_reg_offset; - info ("Waiting %.1f minutes plus %d splay seconds [%d seconds total] before performing first auto-register", - INITIAL_DELAY_SECONDS / 60.0, auto_reg_offset, auto_reg_initial_delay); - } auto_attach_initial_delay = INITIAL_DELAY_SECONDS + auto_attach_offset; info ("Waiting %.1f minutes plus %d splay seconds [%d seconds total] before performing first auto-attach.", INITIAL_DELAY_SECONDS / 60.0, auto_attach_offset, auto_attach_initial_delay); @@ -998,8 +949,7 @@ main (int argc, char *argv[]) auto_attach_data.next_update_file = NEXT_AUTO_ATTACH_UPDATE_FILE; if (auto_reg_enabled) { - g_timeout_add(auto_reg_initial_delay * 1000, - (GSourceFunc) initial_auto_register, (gpointer) &auto_register_data); + auto_register((gpointer) &auto_register_data); } g_timeout_add (cert_check_initial_delay * 1000, (GSourceFunc) initial_cert_check, (gpointer) &cert_check_data); @@ -1010,7 +960,7 @@ main (int argc, char *argv[]) // time. This works for most users, since the cert_interval aligns with // runs of heal_interval (i.e., heal_interval % cert_interval = 0) if (auto_reg_enabled) { - log_update (auto_reg_initial_delay, NEXT_AUTO_REGISTER_UPDATE_FILE); + log_update (0, NEXT_AUTO_REGISTER_UPDATE_FILE); } log_update (cert_check_initial_delay, NEXT_CERT_UPDATE_FILE); log_update (auto_attach_initial_delay, NEXT_AUTO_ATTACH_UPDATE_FILE); diff --git a/src/dnf-plugins/subscription-manager.py b/src/dnf-plugins/subscription-manager.py index 5d080bd7ff..ea0db71b72 100644 --- a/src/dnf-plugins/subscription-manager.py +++ b/src/dnf-plugins/subscription-manager.py @@ -38,6 +38,13 @@ else: from ConfigParser import ConfigParser +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from subscription_manager.certdirectory import EntitlementDirectory + from subscription_manager.identity import Identity + + expired_warning = _(""" *** WARNING *** The subscription for following product(s) has expired: @@ -124,12 +131,15 @@ def _update(cache_only): Update entitlement certificates and redhat.repo :param cache_only: is True, when rhsm.full_refresh_on_yum is set to 0 in rhsm.conf """ - - logger.info(_('Updating Subscription Management repositories.')) - - identity = inj.require(inj.IDENTITY) - - if not identity.is_valid(): + logger.info(_("Updating Subscription Management repositories.")) + identity: Identity = inj.require(inj.IDENTITY) + ent_dir: EntitlementDirectory = inj.require(inj.ENT_DIR) + + # During first phase of anonymous cloud registration the system has + # valid entitlement certificates, but does not yet have any identity. + # We have access to the content, so we shouldn't be reporting missing + # identity certificate. + if not identity.is_valid() and len(ent_dir.list_valid()) == 0: logger.info(_("Unable to read consumer identity")) if config.in_container(): diff --git a/src/rhsm/connection.py b/src/rhsm/connection.py index b0788cf1f7..4c7a069bb1 100644 --- a/src/rhsm/connection.py +++ b/src/rhsm/connection.py @@ -28,7 +28,7 @@ import sys import time import traceback -from typing import Optional +from typing import Any, Dict, List, Optional from pathlib import Path from email.utils import format_datetime @@ -453,7 +453,11 @@ class RateLimitExceededException(RestlibException): def __init__(self, code, msg=None, headers=None): super(RateLimitExceededException, self).__init__(code, msg) self.headers = headers or {} - self.retry_after = safe_int(self.headers.get('retry-after')) + self.retry_after = None + for header, value in self.headers.items(): + if header.lower() == "retry-after": + self.retry_after = safe_int(value) + break self.msg = msg or "Access rate limit exceeded" if self.retry_after is not None: self.msg += ", retry access after: %s seconds." % self.retry_after @@ -1107,11 +1111,12 @@ def get_supported_resources(self): return self.resources - def supports_resource(self, resource_name): - """ - Check if the server we're connecting too supports a particular - resource. For our use cases this is generally the plural form - of the resource. + def supports_resource(self, resource_name: Optional[str]): + """Check if the server supports a particular resource. + + :param resource_name: + Resource to be requested. + When `None`, API call `GET /` is made to cache all supported resources. """ if self.resources is None: self._load_supported_resources() @@ -1151,32 +1156,28 @@ def shutDown(self): def ping(self, username=None, password=None): return self.conn.request_get("/status/") - def getJWToken(self, cloud_id, metadata, signature): - """ - When automatic registration is enabled in rhsm.conf and it was possible - to gather cloud metadata, then it is possible to try to get JSON Web Token - for automatic registration. When candlepin does not provide automatic - registration, then raise exception. - :param cloud_id: ID of cloud provider, e.g. "aws", "azure", "gcp" - :param metadata: string with base64 encoded metadata - :param signature: string with base64 encoded signature - :return: string with JWT + def getCloudJWT(self, cloud_id: str, metadata: str, signature: str) -> Dict[str, Any]: + """Obtain cloud JWT. + + This method is part of the Cloud registration v2: standard or anonymous flow. + + :param cloud_id: Cloud provider, e.g. 'aws', 'azure' or 'gcp'. + :param metadata: Base64 encoded public cloud metadata. + :param signature: Base64 encoded public cloud signature. """ - params = { + data = { "type": cloud_id, "metadata": metadata, - "signature": signature + "signature": signature, } - # "Accept" http header has to be text/plain, because candlepin return - # token as simple text and it is not wrapped in json document headers = { - "Content-type": "application/json", - "Accept": "text/plain" + "Content-Type": "application/json", } + return self.conn.request_post( - method="/cloud/authorize", - params=params, - headers=headers + method="/cloud/authorize?version=2", + params=data, + headers=headers, ) def registerConsumer(self, name="unknown", type="system", facts={}, @@ -1513,16 +1514,29 @@ def unregisterConsumer(self, consumerId): method = '/consumers/%s' % self.sanitize(consumerId) return self.conn.request_delete(method) - def getCertificates(self, consumer_uuid, serials=[]): + def getCertificates( + self, + consumer_uuid: str, + serials: Optional[list] = None, + jwt: Optional[str] = None, + ) -> List[dict]: """ Fetch all entitlement certificates for this consumer. Specify a list of serial numbers to filter if desired. + + :param consumer_uuid: consumer UUID + :param serials: list of entitlement serial numbers + :param jwt: JWT identifying an anonymous system """ method = '/consumers/%s/certificates' % (self.sanitize(consumer_uuid)) - if len(serials) > 0: + if serials: serials_str = ','.join(serials) method = "%s?serials=%s" % (method, serials_str) - return self.conn.request_get(method) + headers = {} + if jwt: + headers["Authorization"] = "Bearer {jwt}".format(jwt=jwt) + + return self.conn.request_get(method, headers=headers) def getCertificateSerials(self, consumerId): """ diff --git a/src/subscription_manager/cache.py b/src/subscription_manager/cache.py index 621cc9b8e2..4c68659aa8 100644 --- a/src/subscription_manager/cache.py +++ b/src/subscription_manager/cache.py @@ -20,6 +20,7 @@ this with the current state, and perform an update on the server if necessary. """ +import base64 import logging import os import socket @@ -41,6 +42,11 @@ from subscription_manager.i18n import ugettext as _ +from typing import TYPE_CHECKING, Dict +if TYPE_CHECKING: + from rhsm.connection import UEPConnection + + log = logging.getLogger(__name__) PACKAGES_RESOURCE = "packages" @@ -1146,3 +1152,97 @@ def _load_data(self, open_file): except ValueError: # Ignore json file parse error pass + + +class CloudTokenCache: + """A cache for Candlepin's JWT used during automatic registration. + + This is used by rhsmcertd worker. + """ + + CACHE_FILE = "/var/lib/rhsm/cache/cloud_token_cache.json" + + @classmethod + def get(cls, uep: "UEPConnection", cloud_id: str, metadata: str, signature: str) -> Dict[str, str]: + """Get a JWT from the Candlepin server. + + If cached token already exists and is still valid, it will be used + without contacting the server. + """ + try: + token: Dict[str, str] = cls._get_from_file() + log.debug("JWT cache contains valid token, no need to contact the server.") + return token + except LookupError as exc: + log.debug(f"JWT cache doesn't contain valid token, contacting the server ({exc}).") + except Exception as exc: + log.debug(f"JWT cache couldn't be read (got {type(exc).__name__}), contacting the server.") + + token: Dict[str, str] = cls._get_from_server(uep, cloud_id, metadata, signature) + cls._save_to_file(token) + return token + + @classmethod + def is_valid(cls) -> bool: + """Check if the cached JWT is valid. + + :returns: + `True` if the locally cached JWT is valid, + `False` if the locally cached JWT is expired or does not exist. + """ + # 'exp' key: https://www.rfc-editor.org/rfc/rfc7519#section-4.1.4 + expiration: int = cls._get_payload()["exp"] + + now = int(time.time()) + return expiration > now + + @classmethod + def _get_payload(cls) -> Dict: + """Get the body of the JWT. + + :returns: The body of the JWT as a dictionary. + :raises Exception: The file is missing or malformed. + """ + with open(cls.CACHE_FILE, "r") as f: + content: Dict[str, str] = json.load(f) + + payload: str = content["token"].split(".")[1] + # JWT does not use the padding, base64.b64decode requires it. + payload = f"{payload}===" + + return json.loads(base64.b64decode(payload).decode("utf-8")) + + @classmethod + def _get_from_file(cls) -> Dict[str, str]: + """Get a JWT from a cache file. + + :raises LookupError: The token is expired. + :raises Exception: The file is missing or malformed. + """ + if not cls.is_valid(): + cls.delete_cache() + raise LookupError("Candlepin JWT is expired.") + + with open(cls.CACHE_FILE, "r") as f: + return json.load(f) + + @classmethod + def _get_from_server( + cls, uep: "UEPConnection", cloud_id: str, metadata: str, signature: str + ) -> Dict[str, str]: + """Get a JWT from the Candlepin server.""" + log.debug("Obtaining Candlepin JWT.") + result: Dict[str, str] = uep.getCloudJWT(cloud_id, metadata, signature) + return result + + @classmethod + def delete_cache(cls): + if os.path.exists(cls.CACHE_FILE): + os.remove(cls.CACHE_FILE) + log.debug(f"Candlepin JWT cache file ({cls.CACHE_FILE}) was deleted.") + + @classmethod + def _save_to_file(cls, token: Dict[str, str]) -> None: + with open(cls.CACHE_FILE, "w") as f: + json.dump(token, f) + log.debug(f"Candlepin JWT was saved to a cache file ({cls.CACHE_FILE}).") diff --git a/src/subscription_manager/entcertlib.py b/src/subscription_manager/entcertlib.py index 91c3e82475..c0aca2dfe7 100644 --- a/src/subscription_manager/entcertlib.py +++ b/src/subscription_manager/entcertlib.py @@ -28,8 +28,13 @@ from subscription_manager import rhelentbranding import subscription_manager.injection as inj +from subscription_manager import repolib from subscription_manager.i18n import ungettext, ugettext as _ +from typing import TYPE_CHECKING, Dict, List, Optional +if TYPE_CHECKING: + from rhsm.connection import UEPConnection + log = logging.getLogger(__name__) CONTENT_ACCESS_CERT_CAPABILITY = "org_level_content_access" @@ -477,3 +482,54 @@ def __str__(self): self.write(s, _('Added (new)'), self.added) self.write(s, _('Deleted (rogue):'), self.rogue) return '\n'.join(s) + + +class AnonymousCertificateManager: + """Manage anonymous entitlement certificates. + + Anonymous certificate can be obtained from Candlepin via JWT/bearer token + when the system is deployed as a cloud VM. + + These certificates are short-lived and are meant to be replaced by a proper + certificate in a short time. + """ + + def __init__(self, uep: "UEPConnection"): + self.uep = uep + + def install_temporary_certificates(self, uuid: str, jwt: str) -> None: + """Obtain temporary entitlement certificates. + + - Download and install temporary entitlement certificates and keys + without obtaining an identity certificate. + - Generate 'redhat.repo' file out of them. + + :param uuid: The anonymous UUID assigned by Candlepin. + :param jwt: The Bearer token sent by Candlepin. + """ + log.debug("Obtaining anonymous entitlement certificates and keys.") + certificates: List[Dict] = self.uep.getCertificates(consumer_uuid=uuid, jwt=jwt) + if not len(certificates): + log.debug("No anonymous entitlement certificates were received.") + return + + log.debug("Installing anonymous entitlement certificates and keys.") + report = EntCertUpdateReport() + installer = EntitlementCertBundlesInstaller(report=report) + entitlement_ids: List[int] = installer.install(cert_bundles=certificates) + + log.debug( + "The following anonymous entitlement certificates and keys were installed: " + + ", ".join(str(c) for c in entitlement_ids) + ) + + update_repo = repolib.RepoUpdateActionCommand() + update_repo_report: Optional[repolib.RepoActionReport] = update_repo.perform() + + if update_repo_report is None: + log.debug("Anonymous entitlement certificate did not cause repository updates.") + else: + log.debug( + "Anonymous entitlement certificate caused " + f"{update_repo_report.updates()} repositories to be updated." + ) diff --git a/src/subscription_manager/identity.py b/src/subscription_manager/identity.py index 616e506866..cf09b3aa6a 100644 --- a/src/subscription_manager/identity.py +++ b/src/subscription_manager/identity.py @@ -19,6 +19,8 @@ import os import errno import threading +from typing import Dict, Optional + import six from rhsm.certificate import create_from_pem @@ -94,6 +96,15 @@ def getConsumerName(self): def getSerialNumber(self): return self.x509.serial + def getConsumerOwner(self) -> Optional[str]: + """Get the name of the organization of the consumer. + + The value is stored in the 'Subject Name' > 'O (Organization)' field + of the consumer certificate. + """ + subject: Dict = self.x509.subject + return subject.get("O") + # TODO: we're using a Certificate which has it's own write/delete, no idea # why this landed in a parallel disjoint class wrapping the actual cert. def write(self): @@ -132,6 +143,7 @@ def __init__(self): self._lock = threading.Lock() self._name = None self._uuid = None + self._owner = None self._cert_dir_path = conf['rhsm']['consumerCertDir'] self.reload() @@ -159,6 +171,7 @@ def reload(self): if self.consumer is not None: self._name = self.consumer.getConsumerName() self._uuid = self.consumer.getConsumerId() + self._owner = self.consumer.getConsumerOwner() # since Identity gets dep injected, lets look up # the cert dir on the active id instead of the global config self._cert_dir_path = self.consumer.PATH @@ -187,6 +200,11 @@ def uuid(self): _uuid = self._uuid return _uuid + @property + def owner(self) -> Optional[str]: + with self._lock: + return self._owner + @property def cert_dir_path(self): with self._lock: diff --git a/src/subscription_manager/identitycertlib.py b/src/subscription_manager/identitycertlib.py index a1ab02a86a..ecb8cd85ab 100644 --- a/src/subscription_manager/identitycertlib.py +++ b/src/subscription_manager/identitycertlib.py @@ -16,7 +16,7 @@ # import logging - +from typing import List from subscription_manager import certlib from subscription_manager import injection as inj @@ -68,16 +68,23 @@ def _update_cert(self, identity): # FIXME: move persist stuff here from subscription_manager import managerlib - idcert = identity.consumer + local_serial: int = identity.consumer.getSerialNumber() + local_owner: str = identity.owner + + consumer: dict = self.uep.getConsumer(identity.uuid) + actual_serial: int = consumer["idCert"]["serial"]["serial"] + actual_owner: str = consumer.get("owner", {}).get("key", "") - consumer = self._get_consumer(identity) + # Only update the certificate if the serial has changed + if local_serial != actual_serial: + diff: List[str] = [f"{local_serial} => {actual_serial}"] + if local_owner != actual_owner: + diff += [f"{local_owner} => {actual_owner}"] - # only write the cert if the serial has changed - # FIXME: this would be a good place to have a Consumer/ConsumerCert - # model. - # FIXME: and this would be a ConsumerCert model '!=' - if idcert.getSerialNumber() != consumer['idCert']['serial']['serial']: - log.debug('identity certificate changed, writing new one') + log.info( + f"Serial number of the identity certificate changed ({', '.join(diff)}), " + "new identity certificate will be saved." + ) # FIXME: should be in this module? managerlib is an odd place managerlib.persist_consumer_cert(consumer) @@ -85,8 +92,3 @@ def _update_cert(self, identity): # updated the cert, or at least checked self.report._status = 1 return self.report - - def _get_consumer(self, identity): - # FIXME: not much for error handling here - consumer = self.uep.getConsumer(identity.uuid) - return consumer diff --git a/src/subscription_manager/managerlib.py b/src/subscription_manager/managerlib.py index 635035cf52..5abcb97c23 100644 --- a/src/subscription_manager/managerlib.py +++ b/src/subscription_manager/managerlib.py @@ -933,6 +933,7 @@ def clean_all_data(backup=True): require(POOL_STATUS_CACHE).delete_cache() require(OVERRIDE_STATUS_CACHE).delete_cache() require(RELEASE_STATUS_CACHE).delete_cache() + cache.CloudTokenCache.delete_cache() RepoActionInvoker.delete_repo_file() log.debug("Cleaned local data") diff --git a/src/subscription_manager/repolib.py b/src/subscription_manager/repolib.py index 103517db4d..4edc1a5e18 100644 --- a/src/subscription_manager/repolib.py +++ b/src/subscription_manager/repolib.py @@ -411,7 +411,7 @@ def perform(self): log.info("Removing %s due to manage_repos configuration." % repo_file.path) RepoActionInvoker.delete_repo_file() - return 0 + return None repo_pairs = [] for repo_class, server_val_repo_class in get_repo_file_classes(): diff --git a/src/subscription_manager/scripts/rhsmcertd_worker.py b/src/subscription_manager/scripts/rhsmcertd_worker.py index 7c33a5c6b9..1b6a2b4d0f 100644 --- a/src/subscription_manager/scripts/rhsmcertd_worker.py +++ b/src/subscription_manager/scripts/rhsmcertd_worker.py @@ -16,60 +16,120 @@ # Red Hat trademarks are not licensed under GPLv2. No permission is # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. -# - -# hack to allow bytes/strings to be interpolated w/ unicode values (gettext gives us bytes) -# Without this, for example, "Формат: %s\n" % u"foobar" will fail with UnicodeDecodeError -# See http://stackoverflow.com/a/29832646/6124862 for more details +import base64 +import enum +import logging +import random +import signal import sys +import time +from argparse import SUPPRESS +from typing import Dict, List, Union, TYPE_CHECKING -import signal -import logging import dbus.mainloop.glib -import base64 -from typing import Union -import subscription_manager.injection as inj +from cloud_what.provider import detect_cloud_provider, CLOUD_PROVIDERS, BaseCloudProvider from rhsm import connection, config, logutil -from subscription_manager import ga_loader -ga_loader.init_ga() - -from subscription_manager.injectioninit import init_dep_injection -init_dep_injection() +from rhsmlib.services.register import RegisterService -from subscription_manager.action_client import HealingActionClient, ActionClient +import subscription_manager.utils +import subscription_manager.injection as inj +from subscription_manager import cache +from subscription_manager import entcertlib from subscription_manager import managerlib -from subscription_manager.identity import ConsumerIdentity +from subscription_manager.action_client import HealingActionClient, ActionClient +from subscription_manager.i18n import ugettext as _ from subscription_manager.i18n_argparse import ArgumentParser, USAGE -from argparse import SUPPRESS -from subscription_manager.utils import generate_correlation_id +from subscription_manager.identity import Identity, ConsumerIdentity +from subscription_manager.injectioninit import init_dep_injection -from subscription_manager.i18n import ugettext as _ -from cloud_what.provider import detect_cloud_provider, CLOUD_PROVIDERS, BaseCloudProvider -from rhsmlib.services.register import RegisterService +if TYPE_CHECKING: + import argparse + from rhsm.config import RhsmConfigParser + from rhsm.connection import UEPConnection + from subscription_manager.cp_provider import CPProvider + + +class ExitStatus(enum.IntEnum): + """Well-known exit codes. + + WARNING: THIS IS NOT A PUBLIC API. + Values of these errors should not be used by any external applications, they are subject + to change at any time without warning. + External applications should only differentiate between zero and non-zero exit codes. + """ + + OK = 0 + + RHSMCERTD_DISABLED = 5 + """rhsmcertd has been disabled through the config file.""" + LOCAL_CORRUPTION = 6 + """Local data have been corrupted.""" + + NO_CLOUD_PROVIDER = 10 + """No public cloud provider has been detected.""" + NO_CLOUD_METADATA = 11 + """Public cloud provider has been detected, but metadata could not be obtained.""" + + NO_REGISTRATION_TOKEN = 20 + """Registration token could not be obtained: server or cache are unavailable or broken.""" + BAD_TOKEN_TYPE = 21 + """Registration token was received, but is not recognized.""" + + REGISTRATION_FAILED = 30 + """The system registration was not successful.""" + + UNKNOWN_ERROR = -1 + """An unknown error occurred.""" + + +init_dep_injection() + +log = logging.getLogger(f"rhsm-app.{__name__}") def exit_on_signal(_signumber, _stackframe): - sys.exit(0) + sys.exit(ExitStatus.OK) + + +def _is_enabled() -> bool: + """Check if rhsmcertd is enabled or disabled.""" + cfg: RhsmConfigParser = config.get_config_parser() + if cfg.get("rhsmcertd", "disable") == "1": + return False + return True + +def _is_registered() -> bool: + """Check if the system is registered.""" + identity: Identity = inj.require(inj.IDENTITY) + return identity.is_valid() -def _collect_cloud_info(cloud_list: list, log) -> dict: + +def _create_cp_provider() -> "CPProvider": + """Create a CPProvider with unique correlation ID.""" + provider: CPProvider = inj.require(inj.CP_PROVIDER) + provider.set_correlation_id(correlation_id=subscription_manager.utils.generate_correlation_id()) + log.debug(f"X-Correlation-ID: {provider.correlation_id}") + return provider + + +def _collect_cloud_info(cloud_list: List[str]) -> dict: """ Try to collect cloud information: metadata and signature provided by cloud provider. :param cloud_list: The list of detected cloud providers. In most cases the list contains only one item. - :param log: logging object - :return: The dictionary with metadata and signature (when signature is provided by cloud provider). - Metadata and signature are base64 encoded. Empty dictionary is returned, when it wasn't - possible to collect any metadata + :return: + Dictionary with 'metadata' and 'signature' (if it is provided by cloud provider), + both encoded in base64. + Empty dictionary is returned if metadata cannot be collected. """ + log.debug(f"Collecting metadata from cloud provider(s): {cloud_list}") # Create dispatcher dictionary from the list of supported cloud providers - cloud_providers = { - provider_cls.CLOUD_PROVIDER_ID: provider_cls for provider_cls in CLOUD_PROVIDERS - } + cloud_providers = {provider_cls.CLOUD_PROVIDER_ID: provider_cls for provider_cls in CLOUD_PROVIDERS} result = {} # Go through the list of detected cloud providers and try to collect @@ -98,82 +158,186 @@ def _collect_cloud_info(cloud_list: list, log) -> dict: log.info(f'Metadata and signature gathered for cloud provider: {cloud_provider_id}') - # Encode metadata and signature using base64 encoding. Because base64.b64encode - # returns values as bytes, then we decode it to string using ASCII encoding. - b64_metadata: str = base64.b64encode(bytes(metadata, 'utf-8')).decode('ascii') - b64_signature: str = base64.b64encode(bytes(signature, 'utf-8')).decode('ascii') - result = { - 'cloud_id': cloud_provider_id, - 'metadata': b64_metadata, - 'signature': b64_signature + "cloud_id": cloud_provider_id, + "metadata": base64.b64encode(bytes(metadata, "utf-8")).decode("ascii"), + "signature": base64.b64encode(bytes(signature, "utf-8")).decode("ascii"), } break return result -def _auto_register(cp_provider, log): - """ - Try to perform auto-registration - :param cp_provider: provider of connection to candlepin server - :param log: logging object - :return: None +def _auto_register_wait() -> None: + """Delay during the automatic registration. + + Wait for an amount of time during automatic registration, looking at the + configured splay and autoregistration interval. """ - log.debug("Trying to do auto-registration of this system") + cfg = config.get_config_parser() + if cfg.get("rhsmcertd", "splay") == "0": + log.debug("Trying to obtain the identity immediately, splay is disabled.") + else: + registration_interval = int(cfg.get("rhsmcertd", "auto_registration_interval")) + splay_interval: int = random.randint(60, registration_interval * 60) + log.debug( + f"Waiting a period of {splay_interval} seconds " + f"(about {splay_interval // 60} minutes) before attempting to obtain the identity." + ) + time.sleep(splay_interval) - identity = inj.require(inj.IDENTITY) - if identity.is_valid() is True: - log.debug('System already registered. Skipping auto-registration') - return - log.debug('Trying to detect cloud provider') +def _auto_register(cp_provider: "CPProvider") -> ExitStatus: + """Try to perform automatic registration. + + :param cp_provider: Provider of connection to Candlepin. + :returns: ExitStatus describing the result of a registration otherwise. + """ + log.info("Starting automatic registration.") - # Try to detect cloud provider first. Use lower threshold in this case, - # because we want to have more sensitive detection in this case - # (automatic registration is more important than reporting of facts) + log.debug("Detecting cloud provider.") + # Try to detect cloud provider first. We use lower threshold in this case, not the + # default, because we want to have more sensitive detection in this case; + # automatic registration is more important than reporting of facts. cloud_list = detect_cloud_provider(threshold=0.3) if len(cloud_list) == 0: - log.warning('This system does not run on any supported cloud provider. Skipping auto-registration') - sys.exit(-1) + log.warning( + "This system does not run on any supported cloud provider. " + "Automatic registration cannot be performed." + ) + return ExitStatus.NO_CLOUD_PROVIDER - # When some cloud provider(s) was detected, then try to collect metadata - # and signature - cloud_info = _collect_cloud_info(cloud_list, log) + # When some cloud provider(s) were detected, then try to collect metadata and signature + cloud_info = _collect_cloud_info(cloud_list) if len(cloud_info) == 0: - log.warning('It was not possible to collect any cloud metadata. Unable to perform auto-registration') - sys.exit(-1) + log.warning("Cloud metadata could not be collected. Unable to perform automatic registration.") + return ExitStatus.NO_CLOUD_METADATA # Get connection not using any authentication - cp = cp_provider.get_no_auth_cp() + uep: UEPConnection = cp_provider.get_no_auth_cp() - # Try to get JWT token from candlepin (cloud registration adapter) + # Obtain automatic registration token try: - jwt_token = cp.getJWToken( - cloud_id=cloud_info['cloud_id'], - metadata=cloud_info['metadata'], - signature=cloud_info['signature'] + token: Dict[str, str] = cache.CloudTokenCache.get( + uep=uep, + cloud_id=cloud_info["cloud_id"], + metadata=cloud_info["metadata"], + signature=cloud_info["signature"], ) - except Exception as err: - log.error('Unable to get JWT token: {err}'.format(err=str(err))) - log.warning('Canceling auto-registration') - sys.exit(-1) - - # Try to register using JWT token - register_service = RegisterService(cp=cp) - # Organization ID is set to None, because organization ID is - # included in JWT token - try: - register_service.register(org=None, jwt_token=jwt_token) - except Exception as err: - log.error("Unable to auto-register: {err}".format(err=err)) - sys.exit(-1) - else: - log.debug("Auto-registration performed successfully") - sys.exit(0) + except Exception: + log.exception("Cloud token could not be obtained. Unable to perform automatic registration.") + return ExitStatus.NO_REGISTRATION_TOKEN + + if token["tokenType"] == "CP-Cloud-Registration": + try: + _auto_register_standard(uep=uep, token=token) + except Exception: + log.exception("Standard automatic registration failed.") + return ExitStatus.REGISTRATION_FAILED + else: + log.info("Standard automatic registration was successful.") + return ExitStatus.OK + + if token["tokenType"] == "CP-Anonymous-Cloud-Registration": + try: + _auto_register_anonymous(uep=uep, token=token) + cache.CloudTokenCache.delete_cache() + except Exception: + log.exception("Anonymous automatic registration failed.") + return ExitStatus.REGISTRATION_FAILED + else: + log.info("Anonymous automatic registration was successful.") + return ExitStatus.OK + log.error(f"Unsupported token type for automatic registration: {token['tokenType']}.") + return ExitStatus.BAD_TOKEN_TYPE + + +def _auto_register_standard(uep: "UEPConnection", token: Dict[str, str]) -> None: + """Perform standard automatic registration. + + The service will download identity certificate and entitlement certificates. + + :raises Exception: The system could not be registered. + """ + log.debug("Registering the system through standard automatic registration.") + + _auto_register_wait() + + service = RegisterService(cp=uep) + service.register(org=None, jwt_token=token["token"]) + + +def _auto_register_anonymous(uep: "UEPConnection", token: Dict[str, str]) -> None: + """Perform anonymous automatic registration. + + First we download the anonymous entitlement certificates and install them. + + Then we wait the 'splay' period. This makes sure we give the cloud backend + enough time to create all the various objects in the Candlepin database. + + Then we perform the registration to obtain identity certificate and proper + entitlement certificates. + + :raises TimeoutError: JWT expired, the system could be registered. + :raises Exception: The system could not be registered. + """ + log.debug("Registering the system through anonymous automatic registration.") + + # Step 1: Get the anonymous entitlement certificates + manager = entcertlib.AnonymousCertificateManager(uep=uep) + manager.install_temporary_certificates(uuid=token["anonymousConsumerUuid"], jwt=token["token"]) + + # Step 2: Wait + _auto_register_wait() + + # Step 3: Obtain the identity certificate + log.debug("Obtaining system identity") + + service = RegisterService(cp=uep) + while cache.CloudTokenCache.is_valid(): + # While the server prepares the identity, it keeps sending status code 429 + # and a Retry-After header. + try: + service.register(org=None, jwt_token=token["token"]) + cache.CloudTokenCache.delete_cache() + # The anonymous organization will have different entitlement certificates, + # we need to refresh them. + log.debug( + "Replacing anonymous entitlement certificates " + "with entitlement certificates linked to an anonymous organization." + ) + report = entcertlib.EntCertUpdateAction().perform() + log.debug(report) + return + except connection.RateLimitExceededException as exc: + if exc.retry_after is None: + log.warning( + "Server did not include Retry-After header in rate-limited response. " + f"headers={exc.headers}" + ) + raise + delay = exc.retry_after + log.debug( + f"Got response with status code {exc.code} and Retry-After header, " + f"will try again in {delay} seconds." + ) + time.sleep(delay) + except Exception as exc: + log.warning(f"Anonymous registration failed, server returned {exc}.") + raise + + # In theory, this should not happen, it means that something has gone wrong server-side. + raise TimeoutError("The Candlepin JWT expired before we were able to register the system.") + + +def _main(args: "argparse.Namespace"): + if not _is_enabled() and not args.force: + log.info("The rhsmcertd process has been disabled by configuration.") + sys.exit(ExitStatus.RHSMCERTD_DISABLED) + + log.debug("Running rhsmcertd worker.") -def _main(options, log): # Set default mainloop dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) @@ -182,37 +346,37 @@ def _main(options, log): # without finally statements, we get confusing behavior (ex. see bz#1431659) signal.signal(signal.SIGTERM, exit_on_signal) - cp_provider = inj.require(inj.CP_PROVIDER) - correlation_id = generate_correlation_id() - log.debug('X-Correlation-ID: %s', correlation_id) - cp_provider.set_correlation_id(correlation_id) - cfg = config.get_config_parser() - - log.debug('check for rhsmcertd disable') - if '1' == cfg.get('rhsmcertd', 'disable') and not options.force: - log.warning('The rhsmcertd process has been disabled by configuration.') - sys.exit(-1) + cp_provider: CPProvider = _create_cp_provider() - # Was script executed with --auto-register option - if options.auto_register is True: - _auto_register(cp_provider, log) + if args.auto_register is True: + if _is_registered(): + print(_("This system is already registered, ignoring request to automatically register.")) + log.debug("This system is already registered, skipping automatic registration.") + else: + print(_("Registering the system")) + status: ExitStatus = _auto_register(cp_provider) + sys.exit(status.value) if not ConsumerIdentity.existsAndValid(): - log.error('Either the consumer is not registered or the certificates' + - ' are corrupted. Certificate update using daemon failed.') - sys.exit(-1) - print(_('Updating entitlement certificates & repositories')) + log.error( + "Either the consumer is not registered or the certificates" + + " are corrupted. Certificate update using daemon failed." + ) + sys.exit(ExitStatus.LOCAL_CORRUPTION) - cp = cp_provider.get_consumer_auth_cp() - cp.supports_resource(None) # pre-load supported resources; serves as a way of failing before locking the repos + print(_("Updating entitlement certificates & repositories.")) + + uep: UEPConnection = cp_provider.get_consumer_auth_cp() + # preload supported resources; serves as a way of failing before locking the repos + uep.supports_resource(None) try: - if options.autoheal: + if args.autoheal: action_client = HealingActionClient() else: action_client = ActionClient() - action_client.update(options.autoheal) + action_client.update() for update_report in action_client.update_reports: # FIXME: make sure we don't get None reports @@ -220,68 +384,61 @@ def _main(options, log): print(update_report) except connection.ExpiredIdentityCertException as e: - log.critical(_("Your identity certificate has expired")) + log.critical("System's identity certificate has expired.") raise e except connection.GoneException as ge: uuid = ConsumerIdentity.read().getConsumerId() - - # This code is to prevent an errant 410 response causing consumer cert deletion. + # The GoneException carries information about a consumer deleted on the server. # - # If a server responds with a 410, we want to very that it's not just a 410 http status, but - # also that the response is from candlepin, and include the right info about the consumer. - # - # A connection to the entitlement server could get an unintentional 410 response. A common - # cause for that kind of error would be a bug or crash or misconfiguration of a reverse proxy - # in front of candlepin. Most error codes we treat as temporary and transient, and they don't - # cause any action to be taken (aside from error handling). But since consumer deletion is tied - # to the 410 status code, and that is difficult to recover from, we try to be a little bit - # more paranoid about that case. - # - # So we look for both the 410 status, and the expected response body. If we get those - # then python-rhsm will create a GoneException that includes the deleted_id. If we get - # A GoneException and the deleted_id matches, then we actually delete the consumer. - # - # However... If we get a GoneException and it's deleted_id does not match the current - # consumer uuid, we do not delete the consumer. That would require using a valid consumer - # cert, but making a request for a different consumer uuid, so unlikely. Could register - # with --consumerid get there? + # If this exception is raised and the `deleted_id` matches the current UUID, + # we clean up the system. In theory, we could use a valid consumer certificate + # to make a request for a different consumer UUID. if ge.deleted_id == uuid: - log.critical("Consumer profile \"%s\" has been deleted from the server. Its local certificates will now be archived", uuid) + log.info( + f"Consumer profile '{uuid}' has been deleted from the server. " + "Its local certificates will be archived to '/etc/pki/consumer.old/'." + ) managerlib.clean_all_data() - log.critical("Certificates archived to '/etc/pki/consumer.old'. Contact your system administrator if you need more information.") raise ge def main(): logutil.init_logger() - log = logging.getLogger('rhsm-app.' + __name__) parser = ArgumentParser(usage=USAGE) - parser.add_argument("--autoheal", dest="autoheal", action="store_true", - default=False, help="perform an autoheal check") - parser.add_argument("--force", dest="force", action="store_true", - default=False, help=SUPPRESS) parser.add_argument( - "--auto-register", dest="auto_register", action="store_true", - default=False, help="perform auto-registration" + "--autoheal", + dest="autoheal", + action="store_true", + default=False, + help="perform an autoheal check", + ) + parser.add_argument("--force", dest="force", action="store_true", default=False, help=SUPPRESS) + parser.add_argument( + "--auto-register", + dest="auto_register", + action="store_true", + default=False, + help="perform auto-registration", ) + options: argparse.Namespace + args: List[str] (options, args) = parser.parse_known_args() try: - _main(options, log) + _main(options) except SystemExit as se: # sys.exit triggers an exception in older Python versions, which # in this case we can safely ignore as we do not want to log the # stack trace. We need to check the code, since we want to signal # exit with failure to the caller. Otherwise, we will exit with 0 if se.code: - sys.exit(-1) - except Exception as e: - log.error("Error while updating certificates using daemon") - print(_('Unable to update entitlement certificates and repositories')) - log.exception(e) - sys.exit(-1) + sys.exit(ExitStatus.UNKNOWN_ERROR) + except Exception: + log.exception("Error while updating certificates using daemon") + print(_("Unable to update entitlement certificates and repositories")) + sys.exit(ExitStatus.UNKNOWN_ERROR) if __name__ == '__main__': diff --git a/test/rhsm/unit/test_connection.py b/test/rhsm/unit/test_connection.py index a304525ca5..e195e91e86 100644 --- a/test/rhsm/unit/test_connection.py +++ b/test/rhsm/unit/test_connection.py @@ -740,6 +740,18 @@ def test_429_body(self): else: self.fail("Should have raised a RateLimitExceededException") + def test_429_weird_case(self): + content = '{"errors": ["TooFast"]}' + headers = {"RETry-aFteR": 20} + try: + self.vr("429", content, headers) + except RateLimitExceededException as e: + self.assertEqual(20, e.retry_after) + self.assertEqual("TooFast, retry access after: 20 seconds.", e.msg) + self.assertEqual("429", e.code) + else: + self.fail("Should have raised a RateLimitExceededException") + def test_500_empty(self): try: self.vr("500", "") diff --git a/test/test_auto_registration.py b/test/test_auto_registration.py index 5a585845a8..610e568be9 100644 --- a/test/test_auto_registration.py +++ b/test/test_auto_registration.py @@ -135,7 +135,7 @@ def test_collect_cloud_info_one_cloud_provider_detected(self): aws.AWSCloudProvider._instance._session = mock_session cloud_list = ['aws'] - cloud_info = _collect_cloud_info(cloud_list, Mock()) + cloud_info = _collect_cloud_info(cloud_list) self.assertIsNotNone(cloud_info) self.assertTrue(len(cloud_info) > 0) @@ -168,7 +168,7 @@ def test_collect_cloud_info_more_cloud_providers_detected(self): # More cloud providers detected cloud_list = ['azure', 'aws'] - cloud_info = _collect_cloud_info(cloud_list, Mock()) + cloud_info = _collect_cloud_info(cloud_list) self.assertIsNotNone(cloud_info) self.assertTrue(len(cloud_info) > 0) diff --git a/test/test_cache.py b/test/test_cache.py index bc9c594760..d2cc97cde6 100644 --- a/test/test_cache.py +++ b/test/test_cache.py @@ -14,10 +14,8 @@ # have received a copy of GPLv2 along with this software; if not, see # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. # -try: - import unittest2 as unittest -except ImportError: - import unittest +import base64 +import unittest import os import logging @@ -26,7 +24,7 @@ import socket import tempfile import time -from mock import Mock, patch, mock_open +from unittest.mock import Mock, patch, mock_open # used to get a user readable cfg class for test cases from .stubs import StubProduct, StubProductCertificate, StubCertificateDirectory, \ @@ -42,8 +40,12 @@ from rhsm.profile import Package, RPMProfile, EnabledReposProfile, ModulesProfile -from rhsm.connection import RestlibException, UnauthorizedException, \ - RateLimitExceededException +from rhsm.connection import ( + UEPConnection, + RestlibException, + UnauthorizedException, + RateLimitExceededException, +) from subscription_manager import injection as inj @@ -1307,3 +1309,142 @@ def test_max_timeout(self): uep.conn.smoothed_rt = 20.0 timeout = self.cache.timeout() self.assertEqual(timeout, self.cache.UBOUND) + + +class TestCloudTokenCache(SubManFixture): + original_path = None + + @classmethod + def setUpClass(cls): + cls.original_path = cache.CloudTokenCache.CACHE_FILE + + @classmethod + def tearDownClass(cls): + # Ensure we have cleaned up properly + assert ( + cls.original_path == cache.CloudTokenCache.CACHE_FILE + ), f"{cls.__name__} did not clean up after itself" + + def setUp(self): + self.actual_path: str = cache.CloudTokenCache.CACHE_FILE + self.test_path = tempfile.NamedTemporaryFile(prefix="cloud_token_cache.", suffix=".json") + cache.CloudTokenCache.CACHE_FILE = self.test_path.name + + def tearDown(self): + cache.CloudTokenCache.CACHE_FILE = self.actual_path + del self.actual_path + del self.test_path + + @staticmethod + def _get_uep(jwt_response: dict) -> Mock: + """Create mock UEPConnection object with custom `getCloudJWT()`.""" + uep = Mock(spec=UEPConnection) + uep.getCloudJWT.return_value = jwt_response + return uep + + @staticmethod + def _create_jwt(expired: bool = False): + """Create fake JWT. + + :param expired: When True, the token will have the expiration date in the past. + """ + now = int(time.time()) + expiration: int = (now + 60) if not expired else (now - 60) + + jwt_header: str = base64.b64encode(b"{}").decode("utf-8") + jwt_body: str = base64.b64encode(json.dumps({"exp": expiration}).encode("utf-8")).decode("utf-8") + jwt_signature: str = base64.b64encode(b"{}").decode("utf-8") + + return { + "anonymousConsumerUuid": "fake-uuid", + "token": f"{jwt_header}.{jwt_body}.{jwt_signature}".replace("=", ""), + "tokenType": "Fake-CP-Cloud-Registration", + "__is_expired": expired, + } + + def test_no_cache(self): + """Call the server if we don't have any cache file. Cache the result.""" + expected = self._create_jwt(expired=False) + uep = self._get_uep(jwt_response=expected) + jwt = cache.CloudTokenCache.get( + uep=uep, cloud_id="fake-cloud", metadata="fake metadata", signature="fake signature" + ) + + # We have called the server + self.assertTrue(uep.getCloudJWT.called) + uep.getCloudJWT.assert_called_once_with("fake-cloud", "fake metadata", "fake signature") + # The cache returned the response + self.assertIsNotNone(jwt) + # The response was cached into a file + self.assertTrue(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + with open(cache.CloudTokenCache.CACHE_FILE, "r") as f: + self.assertEqual(json.load(f), expected) + + def test_is_valid__when_valid(self): + """Check that cache is reported as valid if the expiration date is in the future.""" + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=False)) + self.assertTrue(cache.CloudTokenCache.is_valid()) + + def test_is_valid__when_invalid(self): + """Check that cache is reported as invalid if the expiration date is in the past.""" + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=True)) + self.assertFalse(cache.CloudTokenCache.is_valid()) + + def test_expired_cache(self): + """Call the server if we have a cache file, but its content is expired.""" + # Simulate old expired cache + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=True)) + + expected = self._create_jwt(expired=False) + uep = self._get_uep(jwt_response=expected) + jwt = cache.CloudTokenCache.get( + uep=uep, cloud_id="fake-cloud", metadata="fake metadata", signature="fake signature" + ) + + # We have called the server + self.assertTrue(uep.getCloudJWT.called) + uep.getCloudJWT.assert_called_once_with("fake-cloud", "fake metadata", "fake signature") + # The cache returned the response + self.assertIsNotNone(jwt) + # The response was cached into a file + self.assertTrue(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + with open(cache.CloudTokenCache.CACHE_FILE, "r") as f: + self.assertEqual(json.load(f), expected) + + def test_valid_cache(self): + """Return the cached value.""" + expected = self._create_jwt(expired=False) + # Simulate existing cache + cache.CloudTokenCache._save_to_file(token=expected) + + uep = self._get_uep(jwt_response={}) + jwt = cache.CloudTokenCache.get( + uep=uep, cloud_id="fake-cloud", metadata="fake metadata", signature="fake signature" + ) + + # We have not called the server + self.assertFalse(uep.getCloudJWT.called) + # The cache returned the response + self.assertIsNotNone(jwt) + # The cache file still exists + self.assertTrue(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + with open(cache.CloudTokenCache.CACHE_FILE, "r") as f: + self.assertEqual(json.load(f), expected) + + def test_delete(self): + """Delete existing cache file.""" + # Simulate existing cache + cache.CloudTokenCache._save_to_file(token=self._create_jwt(expired=False)) + + cache.CloudTokenCache.delete_cache() + + # The cache file does not exist + self.assertFalse(os.path.isfile(cache.CloudTokenCache.CACHE_FILE)) + + def test_delete_no_cache(self): + """Try to delete cache file that does not exist.""" + # Delete the empty file + os.remove(cache.CloudTokenCache.CACHE_FILE) + + cache.CloudTokenCache.delete_cache() + self.assertFalse(os.path.isfile(cache.CloudTokenCache.CACHE_FILE))