diff --git a/src/subscription_manager/scripts/rhsmcertd_worker.py b/src/subscription_manager/scripts/rhsmcertd_worker.py index 5bcc5fa48a..1ed05d4430 100644 --- a/src/subscription_manager/scripts/rhsmcertd_worker.py +++ b/src/subscription_manager/scripts/rhsmcertd_worker.py @@ -10,6 +10,7 @@ # Red Hat trademarks are not licensed under GPLv2. No permission is # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. +import enum import logging import random import signal @@ -38,6 +39,39 @@ from subscription_manager.utils import generate_correlation_id +class ExitStatus(enum.IntEnum): + """Well-known exit codes. + + WARNING: THIS IS NOT A PUBLIC API. + Values of these errors should not be used by any external applications, they are subject + to change at any time without warning. + External applications should only differentiate between zero and non-zero exit codes. + """ + + OK = 0 + + RHSMCERTD_DISABLED = 5 + """rhsmcertd has been disabled through the config file.""" + LOCAL_CORRUPTION = 6 + """Local data have been corrupted.""" + + NO_CLOUD_PROVIDER = 10 + """No public cloud provider has been detected.""" + NO_CLOUD_METADATA = 11 + """Public cloud provider has been detected, but metadata could not be obtained.""" + + NO_REGISTRATION_TOKEN = 20 + """Registration token could not be obtained: server or cache are unavailable or broken.""" + BAD_TOKEN_TYPE = 21 + """Registration token was received, but is not recognized.""" + + REGISTRATION_FAILED = 30 + """The system registration was not successful.""" + + UNKNOWN_ERROR = -1 + """An unknown error occured.""" + + if TYPE_CHECKING: import argparse from rhsm.config import RhsmConfigParser @@ -52,7 +86,7 @@ def exit_on_signal(_signumber, _stackframe): - sys.exit(0) + sys.exit(ExitStatus.OK) def _is_registered() -> bool: @@ -130,13 +164,13 @@ def _auto_register(cp_provider: "CPProvider") -> None: "This system does not run on any supported cloud provider. " "Automatic registration cannot be performed." ) - sys.exit(1) + sys.exit(ExitStatus.NO_CLOUD_PROVIDER) # When some cloud provider(s) were detected, then try to collect metadata and signature cloud_info = _collect_cloud_info(cloud_list) if len(cloud_info) == 0: log.warning("Cloud metadata could not be collected. Unable to perform automatic registration.") - sys.exit(1) + sys.exit(ExitStatus.NO_CLOUD_METADATA) # Get connection not using any authentication uep: "UEPConnection" = cp_provider.get_no_auth_cp() @@ -151,17 +185,17 @@ def _auto_register(cp_provider: "CPProvider") -> None: ) except Exception: log.exception("Cloud token could not be obtained. Unable to perform automatic registration.") - sys.exit(1) + sys.exit(ExitStatus.NO_REGISTRATION_TOKEN) if token["tokenType"] == "CP-Cloud-Registration": try: _auto_register_standard(uep=uep, token=token) except Exception: log.exception("Standard automatic registration failed.") - sys.exit(1) + sys.exit(ExitStatus.REGISTRATION_FAILED) else: log.info("Standard automatic registration was successful.") - sys.exit(0) + sys.exit(ExitStatus.OK) if token["tokenType"] == "CP-Anonymous-Cloud-Registration": try: @@ -169,13 +203,13 @@ def _auto_register(cp_provider: "CPProvider") -> None: cache.CloudTokenCache.delete_cache() except Exception: log.exception("Anonymous automatic registration failed.") - sys.exit(1) + sys.exit(ExitStatus.REGISTRATION_FAILED) else: log.info("Anonymous automatic registration was successful.") - sys.exit(0) + sys.exit(ExitStatus.OK) log.error(f"Unsupported token type for automatic registration: {token['tokenType']}.") - sys.exit(1) + sys.exit(ExitStatus.BAD_TOKEN_TYPE) def _auto_register_standard(uep: "UEPConnection", token: Dict[str, str]) -> None: @@ -279,7 +313,7 @@ def _main(options: "argparse.Namespace"): log.debug("check for rhsmcertd disable") if cfg.get("rhsmcertd", "disable") == "1" and not options.force: log.warning("The rhsmcertd process has been disabled by configuration.") - sys.exit(-1) + sys.exit(ExitStatus.RHSMCERTD_DISABLED) # Was script executed with --auto-register option if options.auto_register is True: @@ -290,7 +324,7 @@ def _main(options: "argparse.Namespace"): "Either the consumer is not registered or the certificates" + " are corrupted. Certificate update using daemon failed." ) - sys.exit(-1) + sys.exit(ExitStatus.LOCAL_CORRUPTION) print(_("Updating entitlement certificates & repositories")) cp: UEPConnection = cp_provider.get_consumer_auth_cp() @@ -384,11 +418,11 @@ def main(): # stack trace. We need to check the code, since we want to signal # exit with failure to the caller. Otherwise, we will exit with 0 if se.code: - sys.exit(-1) + sys.exit(ExitStatus.UNKNOWN_ERROR) except Exception: log.exception("Error while updating certificates using daemon") print(_("Unable to update entitlement certificates and repositories")) - sys.exit(-1) + sys.exit(ExitStatus.UNKNOWN_ERROR) if __name__ == "__main__":