Skip to content

Commit

Permalink
CCT-67: Use automatic registration v2
Browse files Browse the repository at this point in the history
* Card ID: CCT-67

This patch implements the standard and anonymous flows for automatic
cloud registration.
  • Loading branch information
m-horky committed Jan 19, 2024
1 parent a931108 commit bcbc7fe
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 64 deletions.
197 changes: 146 additions & 51 deletions src/subscription_manager/scripts/rhsmcertd_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
import base64
import logging
import random
import signal
import sys
import time
from argparse import SUPPRESS
from typing import List, Union, TYPE_CHECKING
from typing import Dict, List, Union, TYPE_CHECKING

import dbus.mainloop.glib

Expand All @@ -27,6 +28,8 @@
import subscription_manager.injection as inj
from subscription_manager.injectioninit import init_dep_injection

from subscription_manager import cache
from subscription_manager import entcertlib
from subscription_manager import managerlib
from subscription_manager.action_client import HealingActionClient, ActionClient
from subscription_manager.identity import ConsumerIdentity
Expand All @@ -45,20 +48,25 @@

init_dep_injection()

log = logging.getLogger(__name__)
log = logging.getLogger(f"rhsm-app.{__name__}")


def exit_on_signal(_signumber, _stackframe):
sys.exit(0)


def _collect_cloud_info(cloud_list: list) -> dict:
def _is_registered() -> bool:
identity: Identity = inj.require(inj.IDENTITY)
return identity.is_valid()


def _collect_cloud_info(cloud_list: List[str]) -> dict:
"""
Try to collect cloud information: metadata and signature provided by cloud provider.
:param cloud_list: The list of detected cloud providers. In most cases the list contains only one item.
:return: The dictionary with metadata and signature (when signature is provided by cloud provider).
Metadata and signature are base64 encoded. Empty dictionary is returned, when it wasn't
possible to collect any metadata
:return:
Dictionary with 'metadata' and 'signature' (if it is provided by cloud provider).
Empty dictionary is returned if metadata cannot be collected.
"""

# Create dispatcher dictionary from the list of supported cloud providers
Expand Down Expand Up @@ -91,78 +99,163 @@ def _collect_cloud_info(cloud_list: list) -> dict:

log.info(f"Metadata and signature gathered for cloud provider: {cloud_provider_id}")

# Encode metadata and signature using base64 encoding. Because base64.b64encode
# returns values as bytes, then we decode it to string using ASCII encoding.
b64_metadata: str = base64.b64encode(bytes(metadata, "utf-8")).decode("ascii")
b64_signature: str = base64.b64encode(bytes(signature, "utf-8")).decode("ascii")

result = {
"cloud_id": cloud_provider_id,
"metadata": b64_metadata,
"signature": b64_signature,
"metadata": metadata,
"signature": signature,
}
break

return result


def _auto_register(cp_provider: "CPProvider") -> None:
"""Try to perform auto-registration.
"""Try to perform automatic registration.
:param cp_provider: provider of connection to candlepin server
:return: None
:param cp_provider: Provider of connection to Candlepin.
"""
log.debug("Trying to auto-register this system")

identity: Identity = inj.require(inj.IDENTITY)
if identity.is_valid() is True:
log.debug("System already registered. Skipping auto-registration")
log.debug("Trying to automatically register this system.")
if _is_registered():
log.debug("This system is already registered, skipping automatic registration.")
return

log.debug("Trying to detect cloud provider")
log.debug("Trying to detect cloud provider.")

# Try to detect cloud provider first. Use lower threshold in this case,
# because we want to have more sensitive detection in this case
# (automatic registration is more important than reporting of facts)
# Try to detect cloud provider first. We use lower threshold in this case, not the
# default, because we want to have more sensitive detection in this case;
# automatic registration is more important than reporting of facts.
cloud_list = detect_cloud_provider(threshold=0.3)
if len(cloud_list) == 0:
log.warning("This system does not run on any supported cloud provider. Skipping auto-registration")
sys.exit(-1)
log.warning(
"This system does not run on any supported cloud provider. "
"Automatic registration cannot be performed."
)
sys.exit(1)

# When some cloud provider(s) was detected, then try to collect metadata
# and signature
# When some cloud provider(s) were detected, then try to collect metadata and signature
cloud_info = _collect_cloud_info(cloud_list)
if len(cloud_info) == 0:
log.warning("It was not possible to collect any cloud metadata. Unable to perform auto-registration")
sys.exit(-1)
log.warning("Cloud metadata could not be collected. Unable to perform automatic registration.")
sys.exit(1)

# Get connection not using any authentication
cp = cp_provider.get_no_auth_cp()
uep: "UEPConnection" = cp_provider.get_no_auth_cp()

# Try to get JWT token from candlepin (cloud registration adapter)
# Obtain automatic registration token
try:
jwt_token = cp.getJWToken(
token: Dict[str, str] = cache.CloudTokenCache.get(
uep=uep,
cloud_id=cloud_info["cloud_id"],
metadata=cloud_info["metadata"],
signature=cloud_info["signature"],
)
except Exception as err:
log.error("Unable to get JWT token: {err}".format(err=str(err)))
log.warning("Canceling auto-registration")
sys.exit(-1)
except Exception:
log.exception("Cloud token could not be obtained. Unable to perform automatic registration.")
sys.exit(1)

if token["tokenType"] == "CP-Cloud-Registration":
try:
_auto_register_standard(uep=uep, token=token)
except Exception:
log.exception("Standard automatic registration failed.")
sys.exit(1)
else:
log.info("Standard automatic registration was successful.")
sys.exit(0)

if token["tokenType"] == "CP-Anonymous-Cloud-Registration":
try:
_auto_register_anonymous(uep=uep, token=token)
cache.CloudTokenCache.delete_cache()
except Exception:
log.exception("Anonymous automatic registration failed.")
sys.exit(1)
else:
log.info("Anonymous automatic registration was successful.")
sys.exit(0)

# Try to register using JWT token
register_service = RegisterService(cp=cp)
# Organization ID is set to None, because organization ID is
# included in JWT token
try:
register_service.register(org=None, jwt_token=jwt_token)
except Exception as err:
log.error("Unable to auto-register: {err}".format(err=err))
sys.exit(-1)
log.error(f"Unsupported token type for automatic registration: {token['tokenType']}.")
sys.exit(1)


def _auto_register_standard(uep: "UEPConnection", token: Dict[str, str]) -> None:
"""Perform standard automatic registration.
:raises Exception: The system could not be registered.
"""
log.debug("Registering the system through standard automatic registration.")

service = RegisterService(cp=uep)
service.register(org=None, jwt_token=token["token"])


def _auto_register_anonymous(uep: "UEPConnection", token: Dict[str, str]) -> None:
"""Perform anonymous automatic registration.
First we download the anonymous entitlement certificates and install them.
Then we wait the 'splay' period. This makes sure we give the cloud backend
enough time to create all the various objects in the Candlepin database.
Then we perform the registration to obtain identity certificate and proper
entitlement certificates.
:raises Exception: The system could not be registered.
"""
log.debug("Registering the system through anonymous automatic registration.")

# Step 1: Get the anonymous entitlement certificates
manager = entcertlib.AnonymousCertificateManager(uep=uep)
manager.install_temporary_certificates(uuid=token["anonymousConsumerUuid"], jwt=token["token"])

# Step 2: Wait
cfg = config.get_config_parser()
if cfg.get("rhsmcertd", "splay") == "0":
log.debug("Trying to obtain the identity immediately, splay is disabled.")
else:
log.debug("Auto-registration performed successfully")
sys.exit(0)
registration_interval = int(cfg.get("rhsmcertd", "auto_registration_interval"))
splay_interval: int = random.randint(60, registration_interval * 60)
log.debug(
f"Waiting a period of {splay_interval} seconds "
f"(about {splay_interval // 60} minutes) before attempting to obtain the identity."
)
time.sleep(splay_interval)

# Step 3: Obtain the identity certificate
log.debug("Obtaining system identity")

registration_attempt: int = 0
service = RegisterService(cp=uep)
while True:
# While the server prepares the identity, it keeps sending status code 451
# and a Retry-After header.
# The Retry-After intervals get shorter and shorter, but we don't have any
# actual guarantee about the time the registration wil actually succeed.
#
# Because `while` loops with no definite exit conditions are scary and
# tend to go wrong, we limit this to ten attempts.
registration_attempt += 1
if registration_attempt > 10:
log.error("Registration was attempted too many times with no success.")
raise TimeoutError(
"Anonymous automatic registration could not be transitioned to "
"standard automatic registration, identity certificate could not be obtained."
)

try:
service.register(org=None, jwt_token=token["token"])
break
except connection.RateLimitExceededException as exc:
if exc.headers.get("Retry-After", None) is None:
raise
delay = int(exc.headers["Retry-After"])
log.debug(
f"Got response with status code {exc.code} and Retry-After header, "
f"will try again in {delay} seconds."
)
time.sleep(delay)
except Exception:
raise


def _main(options: "argparse.Namespace"):
Expand All @@ -175,14 +268,16 @@ def _main(options: "argparse.Namespace"):
# without finally statements, we get confusing behavior (ex. see bz#1431659)
signal.signal(signal.SIGTERM, exit_on_signal)

log.debug("Running rhsmcertd worker.")

cp_provider: CPProvider = inj.require(inj.CP_PROVIDER)
correlation_id: str = generate_correlation_id()
log.debug("X-Correlation-ID: %s", correlation_id)
cp_provider.set_correlation_id(correlation_id)

cfg: RhsmConfigParser = config.get_config_parser()
log.debug("check for rhsmcertd disable")
if "1" == cfg.get("rhsmcertd", "disable") and not options.force:
if cfg.get("rhsmcertd", "disable") == "1" and not options.force:
log.warning("The rhsmcertd process has been disabled by configuration.")
sys.exit(-1)

Expand Down
23 changes: 10 additions & 13 deletions test/test_auto_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"""

import unittest
import base64
from unittest.mock import Mock

from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info
Expand Down Expand Up @@ -141,14 +140,13 @@ def test_collect_cloud_info_one_cloud_provider_detected(self):
self.assertEqual(cloud_info["cloud_id"], "aws")
# Test metadata
self.assertTrue("metadata" in cloud_info)
b64_metadata = cloud_info["metadata"]
metadata = base64.b64decode(b64_metadata).decode("utf-8")
self.assertEqual(metadata, AWS_METADATA)
self.assertEqual(cloud_info["metadata"], AWS_METADATA)
# Test signature
self.assertTrue("signature" in cloud_info)
b64_signature = cloud_info["signature"]
signature = base64.b64decode(b64_signature).decode("utf-8")
self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----")
self.assertEqual(
cloud_info["signature"],
"-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----",
)

def test_collect_cloud_info_more_cloud_providers_detected(self):
"""
Expand All @@ -174,11 +172,10 @@ def test_collect_cloud_info_more_cloud_providers_detected(self):
self.assertEqual(cloud_info["cloud_id"], "aws")
# Test metadata
self.assertTrue("metadata" in cloud_info)
b64_metadata = cloud_info["metadata"]
metadata = base64.b64decode(b64_metadata).decode("utf-8")
self.assertEqual(metadata, AWS_METADATA)
self.assertEqual(cloud_info["metadata"], AWS_METADATA)
# Test signature
self.assertTrue("signature" in cloud_info)
b64_signature = cloud_info["signature"]
signature = base64.b64decode(b64_signature).decode("utf-8")
self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----")
self.assertEqual(
cloud_info["signature"],
"-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----",
)

0 comments on commit bcbc7fe

Please sign in to comment.