Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCT-67: Anonymous registration #3370

Merged
merged 16 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 6 additions & 57 deletions src/daemons/rhsmcertd.c
m-horky marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ typedef enum {
#define N_(x) x
#define CONFIG_KEY_NOT_FOUND (0)

#define MAX_AUTO_REGISTER_ATTEMPTS 3

#if defined(__linux)
# if LINUX_VERSION_CODE >= KERNEL_VERSION(3,17,0)
# ifdef HAVE_LINUX_GETRANDOM
Expand All @@ -93,7 +91,6 @@ static gint arg_reg_interval_minutes = -1;
static gboolean arg_no_splay = FALSE;
static gboolean arg_auto_registration = FALSE;
static int fd_lock = -1;
static int auto_register_attempt = 0;

struct CertCheckData {
int interval_seconds;
Expand Down Expand Up @@ -419,34 +416,19 @@ auto_register(gpointer data)
exit (EXIT_FAILURE);
}
if (pid == 0) {
if (auto_register_attempt < MAX_AUTO_REGISTER_ATTEMPTS) {
debug ("(Auto-registration) executing: %s --auto-register", WORKER);
execl (WORKER, WORKER_NAME, "--auto-register", NULL);
} else {
warn ("(Auto-registration) the number of attempts reached the max limit: %d", MAX_AUTO_REGISTER_ATTEMPTS);
// Return False to not repeat this again
return false;
}
debug ("(Auto-registration) executing: %s --auto-register", WORKER);
execl (WORKER, WORKER_NAME, "--auto-register", NULL);
}

waitpid (pid, &status, 0);
status = WEXITSTATUS (status);

if (status == 0) {
info ("(Auto-registration) performed successfully.");
// No need to repeat this action again
return false;
} else {
auto_register_attempt++;
if (auto_register_attempt < MAX_AUTO_REGISTER_ATTEMPTS) {
warn ("(Auto-registration) failed (%d), retry will occur on next run.", status);
} else {
warn ("(Auto-registration) failed (%d), the number of attempts reached the max limit: %d",
status, MAX_AUTO_REGISTER_ATTEMPTS);
// Return False to not repeat this again
return false;
}
return true;
warn ("(Auto-registration) failed (%d)", status);
return false;
}
}

Expand Down Expand Up @@ -488,30 +470,6 @@ cert_check (gboolean heal)
return TRUE;
}


static gboolean
initial_auto_register (gpointer data)
{
struct CertCheckData *cert_data = data;
gboolean repeat_attempts;

repeat_attempts = auto_register(cert_data);

// When first attempt was not successful, then try to do other
// auto-registration attempts
if (repeat_attempts == true) {
// Add the timeout to begin waiting on interval but offset by the initial
// delay.
g_timeout_add(cert_data->interval_seconds * 1000,
(GSourceFunc) auto_register, cert_data);
// Update timestamp
log_update(cert_data->interval_seconds, cert_data->next_update_file);
}
// Return false so that the timer does
// not run this again.
return false;
}

static gboolean
initial_cert_check (gpointer data)
{
Expand Down Expand Up @@ -915,13 +873,11 @@ main (int argc, char *argv[])
// NOTE: We put the initial checks on a timer so that in the case of systemd,
// we can ensure that the network interfaces are all up before the initial
// checks are done.
int auto_reg_initial_delay = 0;
int auto_attach_initial_delay = 0;
int cert_check_initial_delay = 0;
if (run_now) {
info ("Initial checks will be run now!");
} else {
int auto_reg_offset = 0;
int auto_attach_offset = 0;
int cert_check_offset = 0;
if (splay_enabled == true) {
Expand Down Expand Up @@ -960,16 +916,10 @@ main (int argc, char *argv[])
}
#endif
srand((unsigned int) seed);
auto_reg_offset = gen_random(auto_reg_interval_seconds);
auto_attach_offset = gen_random(heal_interval_seconds);
cert_check_offset = gen_random(cert_interval_seconds);
}

if (auto_reg_enabled) {
auto_reg_initial_delay = INITIAL_DELAY_SECONDS + auto_reg_offset;
info ("Waiting %.1f minutes plus %d splay seconds [%d seconds total] before performing first auto-register",
INITIAL_DELAY_SECONDS / 60.0, auto_reg_offset, auto_reg_initial_delay);
}
auto_attach_initial_delay = INITIAL_DELAY_SECONDS + auto_attach_offset;
info ("Waiting %.1f minutes plus %d splay seconds [%d seconds total] before performing first auto-attach.",
INITIAL_DELAY_SECONDS / 60.0, auto_attach_offset, auto_attach_initial_delay);
Expand All @@ -994,8 +944,7 @@ main (int argc, char *argv[])
auto_attach_data.next_update_file = NEXT_AUTO_ATTACH_UPDATE_FILE;

if (auto_reg_enabled) {
g_timeout_add(auto_reg_initial_delay * 1000,
(GSourceFunc) initial_auto_register, (gpointer) &auto_register_data);
auto_register((gpointer) &auto_register_data);
}
g_timeout_add (cert_check_initial_delay * 1000,
(GSourceFunc) initial_cert_check, (gpointer) &cert_check_data);
Expand All @@ -1006,7 +955,7 @@ main (int argc, char *argv[])
// time. This works for most users, since the cert_interval aligns with
// runs of heal_interval (i.e., heal_interval % cert_interval = 0)
if (auto_reg_enabled) {
log_update (auto_reg_initial_delay, NEXT_AUTO_REGISTER_UPDATE_FILE);
log_update (0, NEXT_AUTO_REGISTER_UPDATE_FILE);
}
log_update (cert_check_initial_delay, NEXT_CERT_UPDATE_FILE);
log_update (auto_attach_initial_delay, NEXT_AUTO_ATTACH_UPDATE_FILE);
Expand Down
20 changes: 15 additions & 5 deletions src/plugins/dnf/subscription_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@

from configparser import ConfigParser

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from subscription_manager.certdirectory import EntitlementDirectory
from subscription_manager.identity import Identity


expired_warning = _(
"""
*** WARNING ***
Expand Down Expand Up @@ -137,12 +144,15 @@ def _update(cache_only):
Update entitlement certificates and redhat.repo
:param cache_only: is True, when rhsm.full_refresh_on_yum is set to 0 in rhsm.conf
"""

logger.info(_("Updating Subscription Management repositories."))

identity = inj.require(inj.IDENTITY)

if not identity.is_valid():
identity: Identity = inj.require(inj.IDENTITY)
ent_dir: EntitlementDirectory = inj.require(inj.ENT_DIR)

# During first phase of anonymous cloud registration the system has
# valid entitlement certificates, but does not yet have any identity.
# We have access to the content, so we shouldn't be reporting missing
# identity certificate.
if not identity.is_valid() and len(ent_dir.list_valid()) == 0:
logger.info(_("Unable to read consumer identity"))

if config.in_container():
Expand Down
56 changes: 32 additions & 24 deletions src/rhsm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,11 +1442,12 @@ def get_supported_resources(self) -> dict:

return self.resources

def supports_resource(self, resource_name: str) -> bool:
"""
Check if the server we're connecting too supports a particular
resource. For our use cases this is generally the plural form
of the resource.
def supports_resource(self, resource_name: Optional[str]) -> bool:
"""Check if the server supports a particular resource.
:param resource_name:
Resource to be requested.
When `None`, API call 'GET /' is made to cache all supported resources.
"""
if self.resources is None:
self._load_supported_resources()
Expand Down Expand Up @@ -1488,31 +1489,27 @@ def has_capability(self, capability: str) -> bool:
def ping(self, *args, **kwargs) -> Any:
return self.conn.request_get("/status/", description=_("Checking connection status"))

def getJWToken(self, cloud_id: str, metadata: str, signature: str) -> Any:
"""
When automatic registration is enabled in rhsm.conf and it was possible
to gather cloud metadata, then it is possible to try to get JSON Web Token
for automatic registration. When candlepin does not provide automatic
registration, then raise exception.
:param cloud_id: ID of cloud provider, e.g. "aws", "azure", "gcp"
:param metadata: string with base64 encoded metadata
:param signature: string with base64 encoded signature
:return: string with JWT
def getCloudJWT(self, cloud_id: str, metadata: str, signature: str) -> Dict[str, Any]:
"""Obtain cloud JWT.
This method is part of the Cloud registration v2: standard or anonymous flow.
:param cloud_id: Cloud provider, e.g. 'aws', 'azure' or 'gcp'.
:param metadata: Base64 encoded public cloud metadata.
:param signature: Base64 encoded public cloud signature.
"""
params = {
data = {
"type": cloud_id,
"metadata": metadata,
"signature": signature,
}
# "Accept" http header has to be text/plain, because candlepin return
# token as simple text and it is not wrapped in json document
headers = {
"Content-type": "application/json",
"Accept": "text/plain",
"Content-Type": "application/json",
}

return self.conn.request_post(
method="/cloud/authorize",
params=params,
method="/cloud/authorize?version=2",
params=data,
headers=headers,
description=_("Fetching cloud token"),
)
Expand Down Expand Up @@ -1844,18 +1841,29 @@ def unregisterConsumer(self, consumerId: str) -> bool:
method = "/consumers/%s" % self.sanitize(consumerId)
return self.conn.request_delete(method, description=_("Unregistering system")) is None

def getCertificates(self, consumer_uuid: str, serials: Optional[list] = None) -> List[dict]:
def getCertificates(
self,
consumer_uuid: str,
serials: Optional[list] = None,
jwt: Optional[str] = None,
) -> List[dict]:
"""
Fetch all entitlement certificates for this consumer. Specify a list of serial numbers to
filter if desired
:param consumer_uuid: consumer UUID
:param serials: list of entitlement serial numbers
:param jwt: JWT identifying an anonymous system
"""
method = "/consumers/%s/certificates" % (self.sanitize(consumer_uuid))
if serials:
serials_str = ",".join(serials)
method = "%s?serials=%s" % (method, serials_str)
return self.conn.request_get(method, description=_("Fetching certificates"))
headers: Dict[str, str] = {}
if jwt:
headers["Authorization"] = f"Bearer {jwt}"

return self.conn.request_get(method, headers=headers, description=_("Fetching certificates"))

def getCertificateSerials(self, consumerId: str) -> List[dict]:
"""
Expand Down
4 changes: 2 additions & 2 deletions src/rhsmlib/services/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import logging
import socket
from typing import Callable
from typing import Callable, Optional

from rhsm.connection import UEPConnection

Expand All @@ -37,7 +37,7 @@ def __init__(self, cp: UEPConnection) -> None:

def register(
self,
org: str,
org: Optional[str],
activation_keys: list = None,
environments: list = None,
force: bool = False,
Expand Down
96 changes: 96 additions & 0 deletions src/subscription_manager/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
this with the current state, and perform an update on the server if
necessary.
"""
import base64
import datetime
import logging
import os
Expand All @@ -27,6 +28,7 @@

if TYPE_CHECKING:
from rhsm.certificate2 import EntitlementCertificate, Product
from rhsm.connection import UEPConnection
from subscription_manager.certdirectory import ProductDirectory, EntitlementDirectory
from subscription_manager.cp_provider import CPProvider
from subscription_manager.identity import Identity
Expand Down Expand Up @@ -1173,3 +1175,97 @@ def _load_data(self, open_file: TextIO) -> Optional[Dict]:
except ValueError:
# Ignore json file parse error
pass


class CloudTokenCache:
m-horky marked this conversation as resolved.
Show resolved Hide resolved
"""A cache for Candlepin's JWT used during automatic registration.
This is used by rhsmcertd worker.
"""

CACHE_FILE = "/var/lib/rhsm/cache/cloud_token_cache.json"

@classmethod
def get(cls, uep: "UEPConnection", cloud_id: str, metadata: str, signature: str) -> Dict[str, str]:
"""Get a JWT from the Candlepin server.
If cached token already exists and is still valid, it will be used
without contacting the server.
"""
try:
token: Dict[str, str] = cls._get_from_file()
log.debug("JWT cache contains valid token, no need to contact the server.")
return token
except LookupError as exc:
log.debug(f"JWT cache doesn't contain valid token, contacting the server ({exc}).")
except Exception as exc:
log.debug(f"JWT cache couldn't be read (got {type(exc).__name__}), contacting the server.")
m-horky marked this conversation as resolved.
Show resolved Hide resolved

token: Dict[str, str] = cls._get_from_server(uep, cloud_id, metadata, signature)
m-horky marked this conversation as resolved.
Show resolved Hide resolved
cls._save_to_file(token)
return token

@classmethod
def is_valid(cls) -> bool:
"""Check if the cached JWT is valid.
:returns:
`True` if the locally cached JWT is valid,
`False` if the locally cached JWT is expired or does not exist.
"""
# 'exp' key: https://www.rfc-editor.org/rfc/rfc7519#section-4.1.4
expiration: int = cls._get_payload()["exp"]

now = int(time.time())
return expiration > now

@classmethod
def _get_payload(cls) -> Dict:
"""Get the body of the JWT.
:returns: The body of the JWT as a dictionary.
:raises Exception: The file is missing or malformed.
"""
with open(cls.CACHE_FILE, "r") as f:
content: Dict[str, str] = json.load(f)

payload: str = content["token"].split(".")[1]
# JWT does not use the padding, base64.b64decode requires it.
payload = f"{payload}==="
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have almost the same code here: https://github.com/candlepin/subscription-manager/blob/main/src/cloud_what/providers/gcp.py#L275 . I would do some refactoring to remove duplicated code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is necessary? This implementation is much smaller and quite self-contained. I am not sure it is worth extracting both into a shared implementation, since we're only doing this on two places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


return json.loads(base64.b64decode(payload).decode("utf-8"))

@classmethod
def _get_from_file(cls) -> Dict[str, str]:
"""Get a JWT from a cache file.
:raises LookupError: The token is expired.
:raises Exception: The file is missing or malformed.
"""
if not cls.is_valid():
cls.delete_cache()
raise LookupError("Candlepin JWT is expired.")

with open(cls.CACHE_FILE, "r") as f:
return json.load(f)

@classmethod
def _get_from_server(
m-horky marked this conversation as resolved.
Show resolved Hide resolved
cls, uep: "UEPConnection", cloud_id: str, metadata: str, signature: str
) -> Dict[str, str]:
"""Get a JWT from the Candlepin server."""
log.debug("Obtaining Candlepin JWT.")
result: Dict[str, str] = uep.getCloudJWT(cloud_id, metadata, signature)
return result

@classmethod
def delete_cache(cls):
if os.path.exists(cls.CACHE_FILE):
os.remove(cls.CACHE_FILE)
log.debug(f"Candlepin JWT cache file ({cls.CACHE_FILE}) was deleted.")

@classmethod
def _save_to_file(cls, token: Dict[str, str]) -> None:
with open(cls.CACHE_FILE, "w") as f:
json.dump(token, f)
log.debug(f"Candlepin JWT was saved to a cache file ({cls.CACHE_FILE}).")
Loading
Loading