diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..573d854 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1 @@ +8e52306f85582ea626aa3e745547542d35ddae53 diff --git a/.github/workflows/build-sw-container.yml b/.github/workflows/build-sw-container.yml index de34340..7c7db99 100644 --- a/.github/workflows/build-sw-container.yml +++ b/.github/workflows/build-sw-container.yml @@ -4,7 +4,7 @@ on: push: pull_request: workflow_dispatch: - + jobs: build: runs-on: ubuntu-latest diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 0000000..2b11178 --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,14 @@ +name: pre-commit + +on: + pull_request: + push: + branches: [main] + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + - uses: pre-commit/action@v3.0.1 diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 6fc5bff..7cbe67a 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -3,15 +3,15 @@ name: run python unit tests on: push: pull_request: - + jobs: run-unit-tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - + - name: install m2crypto run: sudo apt-get install m2crypto - + - name: execute tests run: python3 -m unittest discover -v tests diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..e8107f8 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,25 @@ +repos: + - repo: "https://github.com/pre-commit/pre-commit-hooks" + rev: v4.1.0 + hooks: + - id: check-ast + - id: check-docstring-first + - id: check-merge-conflict + - id: end-of-file-fixer + - id: mixed-line-ending + - id: trailing-whitespace + - id: check-toml + - repo: "https://github.com/pycqa/isort" + rev: 5.11.5 + hooks: + - id: isort + - repo: "https://github.com/psf/black" + rev: 22.3.0 + hooks: + - id: black + - repo: "https://github.com/asottile/pyupgrade" + rev: v3.15.1 + hooks: + - id: pyupgrade + args: + - "--py39-plus" diff --git a/.travis.yml b/.travis.yml index 2453530..d0e3048 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,10 +3,10 @@ env: matrix: - OS_TYPE=centos OS_VERSION=6 - OS_TYPE=centos OS_VERSION=7 - + services: - docker - + before_install: - sudo apt-get update - echo 'DOCKER_OPTS="-H tcp://127.0.0.1:2375 -H unix:///var/run/docker.sock -s devicemapper"' | sudo tee /etc/default/docker > /dev/null @@ -14,7 +14,7 @@ before_install: - sleep 5 - sudo docker pull centos:centos${OS_VERSION} - + script: # Run tests in Container -- tests/setup_tests.sh ${OS_VERSION} \ No newline at end of file +- tests/setup_tests.sh ${OS_VERSION} diff --git a/Dockerfile b/Dockerfile index ad06767..c8d0597 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ RUN yum update -y && \ python3 setup.py install --root=/ && \ mkdir -p /etc/osg/pki && \ cp /usr/local/config/ca-issuer.conf /etc/osg/pki/ - + WORKDIR /output ENTRYPOINT ["/usr/local/bin/osg-cert-request"] diff --git a/README.md b/README.md index 6192312..0388d9a 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,6 @@ Usage: osg-incommon-cert-request [--debug] -u username -k pkey -c cert \ osg-incommon-cert-request --version ``` -See osg-incommon-cert-request -h or the +See osg-incommon-cert-request -h or the [man page](https://htmlpreview.github.io/?https://github.com/opensciencegrid/osg-pki-tools/blob/master/man/osgincommoncertrequest.html) for a description of the options. diff --git a/man/osg-incommon-cert-request.1 b/man/osg-incommon-cert-request.1 index 1eea37e..abea396 100644 --- a/man/osg-incommon-cert-request.1 +++ b/man/osg-incommon-cert-request.1 @@ -8,11 +8,11 @@ osg-incommon-cert-request \- retrieves host and service certificates from the In .B osg-incommon-cert-request retrieves host and service certificates from the InCommon IGTF Server CA. It requires an Incommon user account authorized as a Department -Registration Authority to issue InCommon IGTF Server CA certificates +Registration Authority to issue InCommon IGTF Server CA certificates with SSL auto approval. It authenticates the requestor with an InCommon client certificate. .PP -It allows bulk retrieval of certificates and keys. +It allows bulk retrieval of certificates and keys. .SH OPTIONS .PP .TP @@ -37,9 +37,9 @@ Specify requestor's user certificate (PEM format). Specify requestor's private key (PEM format). .TP .BR \-a, \-\-altname -Specify a Subject Alternative Name (SAN) for the requested certificate +Specify a Subject Alternative Name (SAN) for the requested certificate (only works with -.I \-H, \-\-hostname). +.I \-H, \-\-hostname). May be specified more than once for additional SANs. .TP .BR \-d, \-\-directory @@ -71,7 +71,7 @@ To generate a certificate with multiple Subject Alternative Names (SANS) .nf osg-incommon-cert-request -u incommonuser -c certpath.pem \\ -k keypath.pem -H hostname.yourdomain -a hostalt.yourdomain \\ - -a hostalt2.yourdomain + -a hostalt2.yourdomain .fi .RE .PP @@ -83,7 +83,7 @@ osg-incommon-cert-request -u incommonuser -c certpath.pem \\ -k keypath.pem -F hostfilepath.txt .RE .PP -hostfilepath.txt example +hostfilepath.txt example .PP .RS hostname01.yourdomain @@ -93,7 +93,7 @@ hostname04.yourdomain hostname05.yourdomain .fi .RE .PP -To provide alternative organization and department codes for the InCommon Certificate Service. +To provide alternative organization and department codes for the InCommon Certificate Service. .PP .RS .nf @@ -103,4 +103,3 @@ osg-incommon-cert-request -O 4567,8912 -u incommonuser \\ .RE .SH AUTHOR Jeny Teheran - diff --git a/osgpkitools/ExceptionDefinitions.py b/osgpkitools/ExceptionDefinitions.py index b98d21f..c3ea053 100644 --- a/osgpkitools/ExceptionDefinitions.py +++ b/osgpkitools/ExceptionDefinitions.py @@ -18,12 +18,12 @@ def __str__(self): class FileNotFoundException(Exception): - """ Exception raised when a file is not found + """Exception raised when a file is not found Attributes: filename -- Name of the file that is not found message -- message to be printed for this exception - """ + """ def __init__(self, filename, message): self.filename = filename @@ -34,12 +34,12 @@ def __str__(self): class BadPassphraseException(Exception): - """ This Exception occurs when the passphrase entered for the private key file + """This Exception occurs when the passphrase entered for the private key file does not match the stored passphrase of the key file. Attributes: message -- message to be displayed - """ + """ def __init__(self, message): self.message = message diff --git a/osgpkitools/cert_request.py b/osgpkitools/cert_request.py index 4a6ee81..a3171a3 100644 --- a/osgpkitools/cert_request.py +++ b/osgpkitools/cert_request.py @@ -1,53 +1,107 @@ """osg-cert-request tool module, separated from the script for unit testing """ +import argparse import os import re import sys -import argparse from collections import namedtuple -from M2Crypto import RSA, EVP, X509 -from osgpkitools import cert_utils -from osgpkitools import utils + +from M2Crypto import EVP, RSA, X509 + +from osgpkitools import cert_utils, utils def parse_cli(args): """This function parses all the arguments, validates them and then stores them in a dictionary that is used throughout in the script.""" - parser = argparse.ArgumentParser(add_help=False, # disable built-in help to control help message ordering - description='Generate certificate signing requests (CSRs) and private keys.') + parser = argparse.ArgumentParser( + add_help=False, # disable built-in help to control help message ordering + description="Generate certificate signing requests (CSRs) and private keys.", + ) - required = parser.add_argument_group('Required', 'Specify only one of -H/--hostname and -F/--hostfile') + required = parser.add_argument_group("Required", "Specify only one of -H/--hostname and -F/--hostfile") hosts = required.add_mutually_exclusive_group(required=True) - hosts.add_argument('-H', '--hostname', action='store', dest='hostname', - help='The hostname (FQDN) to request') - hosts.add_argument('-F', '--hostfile', action='store', dest='hostfile', - help='File containing list of hostnames (FQDN), one per line, to request. Space separated ' - 'subject alternative names (SANs) may be specified on the same line as each hostname.') - required.add_argument('-C', '--country', action=CountryAction, required=True, dest='country', - help='The 2-letter country code to associate with the generated CSR(s)') - required.add_argument('-S', '--state', action=StateAction, required=True, dest='state', - help='The unabbreviated state/province to associate with the generated CSR(s)') - required.add_argument('-L', '--locality', action='store', required=True, dest='locality', - help='The locality (i.e., city, town) to associate with the generated CSR(s)') - required.add_argument('-O', '--organization', action='store', required=True, dest='organization', - help='The organization to associate with the generated CSR(s)') + hosts.add_argument("-H", "--hostname", action="store", dest="hostname", help="The hostname (FQDN) to request") + hosts.add_argument( + "-F", + "--hostfile", + action="store", + dest="hostfile", + help="File containing list of hostnames (FQDN), one per line, to request. Space separated " + "subject alternative names (SANs) may be specified on the same line as each hostname.", + ) + required.add_argument( + "-C", + "--country", + action=CountryAction, + required=True, + dest="country", + help="The 2-letter country code to associate with the generated CSR(s)", + ) + required.add_argument( + "-S", + "--state", + action=StateAction, + required=True, + dest="state", + help="The unabbreviated state/province to associate with the generated CSR(s)", + ) + required.add_argument( + "-L", + "--locality", + action="store", + required=True, + dest="locality", + help="The locality (i.e., city, town) to associate with the generated CSR(s)", + ) + required.add_argument( + "-O", + "--organization", + action="store", + required=True, + dest="organization", + help="The organization to associate with the generated CSR(s)", + ) optional = parser.add_argument_group("Optional") - optional.add_argument('-h', '--help', action='help', - help='show this help message and exit') - optional.add_argument('-a', '--altname', action='append', dest='altnames', default=[], - help='Specify the SAN for the requested certificate (only works with -H/--hostname). ' - 'May be specified more than once for additional SANs.') - required.add_argument('-U', '--organizational-unit', action='append', dest='organizational_unit', default=[], - help='The organizational unit(s) to associate with the generated CSR(s)') - optional.add_argument('-d', '--directory', action='store', dest='write_directory', default='.', - help="The directory to write the generated CSR(s) and host key(s)") - optional.add_argument('-l', '--key-length', action='store', default=cert_utils.Csr.KEY_LENGTH, - type=int, help='The key size to generate') - optional.add_argument('-V', '--version', action='version', version=utils.VERSION_NUMBER) + optional.add_argument("-h", "--help", action="help", help="show this help message and exit") + optional.add_argument( + "-a", + "--altname", + action="append", + dest="altnames", + default=[], + help="Specify the SAN for the requested certificate (only works with -H/--hostname). " + "May be specified more than once for additional SANs.", + ) + required.add_argument( + "-U", + "--organizational-unit", + action="append", + dest="organizational_unit", + default=[], + help="The organizational unit(s) to associate with the generated CSR(s)", + ) + optional.add_argument( + "-d", + "--directory", + action="store", + dest="write_directory", + default=".", + help="The directory to write the generated CSR(s) and host key(s)", + ) + optional.add_argument( + "-l", + "--key-length", + action="store", + default=cert_utils.Csr.KEY_LENGTH, + type=int, + help="The key size to generate", + ) + optional.add_argument("-V", "--version", action="version", version=utils.VERSION_NUMBER) parsed_args = parser.parse_args(args) @@ -60,12 +114,12 @@ def parse_cli(args): class CountryAction(argparse.Action): - """Action for validating state/province options - """ + """Action for validating state/province options""" + def __init__(self, option_strings, dest, nargs=None, **kwargs): if nargs is not None: raise ValueError("-C/--country only accepts a single argument") - super(CountryAction, self).__init__(option_strings, dest, **kwargs) + super().__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, values, option_string=None): if values.isalpha() and len(values) == 2: @@ -76,12 +130,12 @@ def __call__(self, parser, namespace, values, option_string=None): class StateAction(argparse.Action): - """Action for validating state/province options - """ + """Action for validating state/province options""" + def __init__(self, option_strings, dest, nargs=None, **kwargs): if nargs is not None: raise ValueError("-S/--state only accepts a single argument") - super(StateAction, self).__init__(option_strings, dest, **kwargs) + super().__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, values, option_string=None): if len(values) > 2: @@ -92,30 +146,31 @@ def __call__(self, parser, namespace, values, option_string=None): def main(): - """The entrypoint for osg-cert-request - """ + """The entrypoint for osg-cert-request""" try: args = parse_cli(sys.argv[1:]) except ValueError as exc: sys.exit(exc) - location = namedtuple('Location', ['country', 'state', 'locality', 'organization', 'organizational_unit']) + location = namedtuple("Location", ["country", "state", "locality", "organization", "organizational_unit"]) loc = location(args.country, args.state, args.locality, args.organization, args.organizational_unit) if args.hostname: fqdns_list = [[args.hostname] + args.altnames] elif args.hostfile: - with open(args.hostfile, 'r') as hfile: + with open(args.hostfile) as hfile: hostfiles = [x.strip() for x in hfile.readlines()] hostfiles = [x for x in hostfiles if x] - fqdns_list = [re.split(r' +', x) for x in hostfiles] + fqdns_list = [re.split(r" +", x) for x in hostfiles] for fqdns in fqdns_list: print(f"Writing CSR for {fqdns[0]}...") - csr_obj = cert_utils.Csr(fqdns[0], - output_dir=os.path.abspath(args.write_directory), - altnames=fqdns[1:], - location=loc, - key_length=args.key_length) + csr_obj = cert_utils.Csr( + fqdns[0], + output_dir=os.path.abspath(args.write_directory), + altnames=fqdns[1:], + location=loc, + key_length=args.key_length, + ) csr_obj.write_pkey() csr_obj.write_csr() diff --git a/osgpkitools/cert_utils.py b/osgpkitools/cert_utils.py index 6b3f33e..b4c638a 100644 --- a/osgpkitools/cert_utils.py +++ b/osgpkitools/cert_utils.py @@ -4,17 +4,18 @@ import sys import tempfile +from M2Crypto import EVP, RSA, X509 + +from osgpkitools import utils + +from .ExceptionDefinitions import * + try: from urllib3.util import create_urllib3_context except ImportError: # EL7 with python36 from urllib3.util.ssl_ import create_urllib3_context -from M2Crypto import RSA, EVP, X509 - -from .ExceptionDefinitions import * -from osgpkitools import utils - # These flags are for the purpose of passing to the M2Crypto calls MBSTRING_FLAG = 0x1000 @@ -23,7 +24,7 @@ def get_ssl_context(usercert, userkey): - """ This function sets the ssl context for urllib3. + """This function sets the ssl context for urllib3. OpenSSL will prompt for password if needed. cert: Filename for user certificate. @@ -36,7 +37,8 @@ def get_ssl_context(usercert, userkey): ssl_context.load_cert_chain(usercert, userkey) return ssl_context -class Csr(object): + +class Csr: KEY_LENGTH = 4096 PUB_EXPONENT = 0x10001 @@ -44,12 +46,12 @@ class Csr(object): def __init__(self, hostname, output_dir=None, altnames=None, location=None, key_length=KEY_LENGTH): """ Create a certificate signing request (CSR - stored in the x509request attribute) and associated keys (stored in keypair attribute). - - The caller should use write_csr to write CSR when ready. + + The caller should use write_csr to write CSR when ready. The caller should use write_pkey to write private key when ready. - + INPUT - - hostname: common name for the CN field + - hostname: common name for the CN field - output_dir (optional): The destination directory to write the request and key - altnames (optional): Additional hostnames to be added to the Subject Alternative Names. - location (optional): A namedtuple containing country (e.g., US), state (e.g., Wisconsin), @@ -60,49 +62,48 @@ def __init__(self, hostname, output_dir=None, altnames=None, location=None, key_ if not output_dir: self.output_dir = os.getcwd() - # Set up CSR and PKEY file paths - self.csrpath = os.path.join(output_dir, hostname + '.req') - self.keypath = os.path.join(output_dir, hostname + '-key.pem') + # Set up CSR and PKEY file paths + self.csrpath = os.path.join(output_dir, hostname + ".req") + self.keypath = os.path.join(output_dir, hostname + "-key.pem") + + self.keypair = RSA.gen_key(key_length, self.PUB_EXPONENT, lambda: None) - self.keypair = RSA.gen_key(key_length, self.PUB_EXPONENT, lambda: None) - # The message digest shouldn't matter here since we don't use # PKey.sign_*() or PKey.verify_*() but there's no harm in keeping it and # it ensures a strong hashing algo (default is sha1) if we do decide to # sign things in the future - self.pkey = EVP.PKey(md='sha256') + self.pkey = EVP.PKey(md="sha256") self.pkey.assign_rsa(self.keypair) self.x509request = X509.Request() x509name = X509.X509_Name() - + # Build entries for x509 name entries = list() if location: - entries.append(('C', location.country)) - entries.append(('ST', location.state)) - entries.append(('L', location.locality)) - entries.append(('O', location.organization)) + entries.append(("C", location.country)) + entries.append(("ST", location.state)) + entries.append(("L", location.locality)) + entries.append(("O", location.organization)) for ou in location.organizational_unit: - entries.append(('OU', ou)) + entries.append(("OU", ou)) + + entries.append(("CN", hostname)) - entries.append(('CN', hostname)) - for key, val in entries: x509name.add_entry_by_txt(field=key, type=MBSTRING_ASC, entry=val, len=-1, loc=-1, set=0) self.x509request.set_subject_name(x509name) - + # Build altnames self.altnames = None if altnames: - str_altnames= ",".join(altnames) + str_altnames = ",".join(altnames) self.altnames = str_altnames extension_stack = X509.X509_Extension_Stack() - extension = X509.new_extension('subjectAltName', - ", ".join(['DNS:%s' % name for name in altnames])) + extension = X509.new_extension("subjectAltName", ", ".join(["DNS:%s" % name for name in altnames])) extension.set_critical(1) extension_stack.push(extension) self.x509request.add_extensions(extension_stack) @@ -110,8 +111,8 @@ def __init__(self, hostname, output_dir=None, altnames=None, location=None, key_ # Set up pubkey and sign CSR with privkey self.x509request.set_pubkey(pkey=self.pkey) self.x509request.set_version(0) - self.x509request.sign(pkey=self.pkey, md='sha256') - + self.x509request.sign(pkey=self.pkey, md="sha256") + def write_csr(self, csrpath=None): """Write the certificate signing request""" if not csrpath: @@ -120,7 +121,7 @@ def write_csr(self, csrpath=None): try: utils.safe_write(csrpath, self.x509request.as_pem()) except: - os.remove(self.keypath) # if we can't write the CSR, remove its associated private key + os.remove(self.keypath) # if we can't write the CSR, remove its associated private key raise def write_pkey(self, keypath=None): @@ -140,17 +141,12 @@ def write_pkey(self, keypath=None): def format_csr(self, csr): """Extract the base64 encoded string from the contents of a CSR""" - return csr.replace('-----BEGIN CERTIFICATE REQUEST-----\n', '')\ - .replace('-----END CERTIFICATE REQUEST-----\n', '')\ - .replace('\n', '') + return ( + csr.replace("-----BEGIN CERTIFICATE REQUEST-----\n", "") + .replace("-----END CERTIFICATE REQUEST-----\n", "") + .replace("\n", "") + ) def base64_csr(self): """Extract the base64 encoded string from the contents of a certificate signing request""" - return self.format_csr(self.x509request.as_pem().decode('utf-8')) - - - - - - - + return self.format_csr(self.x509request.as_pem().decode("utf-8")) diff --git a/osgpkitools/incommon_request.py b/osgpkitools/incommon_request.py index 6f82463..d43b76c 100644 --- a/osgpkitools/incommon_request.py +++ b/osgpkitools/incommon_request.py @@ -1,11 +1,10 @@ #!/usr/bin/python3 -# -*- coding: utf-8 -*- """ This script is used to submit multiple certificate requests to InCommon certificate service. The intended user for the script is the Department Registration Authority Officer (DRAO) with SSL auto-approval and Certificate Auth enabled. -The DRAO must authenticate with a user certificate issued by InCommon. The certificate must be configured for the DRAO in the InCommon Certificate Manager interface > Admins section. +The DRAO must authenticate with a user certificate issued by InCommon. The certificate must be configured for the DRAO in the InCommon Certificate Manager interface > Admins section. This script works in two modes: 1) Requesting single host certificate with -H option @@ -13,89 +12,152 @@ This script retrieves the certificates and output a set of files: hostname.key (private key) and hostname.pem (certificate) """ - - -prog = "osg-incommon-cert-request" -args = None import argparse +import configparser import http.client +import json +import logging +import os import socket import sys -import os import time import traceback -import json -import configparser - -import logging -logger = logging.getLogger('incommon_request') -logging.basicConfig() from io import StringIO from ssl import SSLError -from . import utils -from . import cert_utils +from . import cert_utils, utils from .ExceptionDefinitions import * from .rest_client import InCommonApiClient +prog = "osg-incommon-cert-request" +args = None + +logger = logging.getLogger("incommon_request") +logging.basicConfig() + + MAX_RETRY_RETRIEVAL = 40 -WAIT_RETRIEVAL= 10 +WAIT_RETRIEVAL = 10 WAIT_APPROVAL = 30 def parse_cli(): """This function parses all the arguments, validates them and then stores them in a dictionary that is used throughout the script.""" - - usage = \ - '''%(prog)s [--debug] -u username -k pkey -c cert \\ + + usage = """%(prog)s [--debug] -u username -k pkey -c cert \\ (-H hostname | -F hostfile) [-a altnames] [-d write_directory] \\ [-O org,dept] %(prog)s [--debug] -u username -k pkey -c cert -t %(prog)s -h - %(prog)s --version''' - - parser = argparse.ArgumentParser(add_help=False, usage=usage, - description='Request and retrieve certificates from the InCommon IGTF server CA.') + %(prog)s --version""" - required = parser.add_argument_group('Required', 'Specify only one of -H/--hostname and -F/--hostfile') + parser = argparse.ArgumentParser( + add_help=False, usage=usage, description="Request and retrieve certificates from the InCommon IGTF server CA." + ) + + required = parser.add_argument_group("Required", "Specify only one of -H/--hostname and -F/--hostfile") hosts = required.add_mutually_exclusive_group() - hosts.add_argument('-H', '--hostname', action='store', dest='hostname', - help='The hostname (FQDN) to request. If specified, -F/--hostfile will be ignored') - hosts.add_argument('-F', '--hostfile', action=FilePathAction, dest='hostfile', - help='File containing list of hostnames (FQDN), one per line, to request. Space separated ' - 'subject alternative names (SANs) may be specified on the same line as each hostname.') + hosts.add_argument( + "-H", + "--hostname", + action="store", + dest="hostname", + help="The hostname (FQDN) to request. If specified, -F/--hostfile will be ignored", + ) + hosts.add_argument( + "-F", + "--hostfile", + action=FilePathAction, + dest="hostfile", + help="File containing list of hostnames (FQDN), one per line, to request. Space separated " + "subject alternative names (SANs) may be specified on the same line as each hostname.", + ) + + required.add_argument( + "-u", + "--username", + action="store", + required=True, + dest="login", + help="Specify requestor's InCommon username/login", + ) - required.add_argument('-u', '--username', action='store', required=True, dest='login', - help="Specify requestor's InCommon username/login") + required.add_argument( + "-c", + "--cert", + action=FilePathAction, + required=True, + dest="usercert", + help="Specify requestor's user certificate (PEM Format)", + ) - required.add_argument('-c', '--cert', action=FilePathAction, required=True, dest='usercert', - help="Specify requestor's user certificate (PEM Format)") - - required.add_argument('-k', '--pkey', action=FilePathAction, required=True, dest='userprivkey', - help="Specify requestor's private key (PEM Format)") + required.add_argument( + "-k", + "--pkey", + action=FilePathAction, + required=True, + dest="userprivkey", + help="Specify requestor's private key (PEM Format)", + ) optional = parser.add_argument_group("Optional") - optional.add_argument('-h', '--help', action='help', - help='show this help message and exit') - optional.add_argument('-a', '--altname', action='append', dest='altnames', default=[], - help='Specify the SAN for the requested certificate (only works with -H/--hostname). ' - 'May be specified more than once for additional SANs.') - optional.add_argument('-C', '--config', action='store', dest='config_file', default='/etc/osg/pki/ca-issuer.conf', - help='Path to configuration file') - optional.add_argument('-d', '--directory', action='store', dest='write_directory', default='.', - help="The directory to write the host certificate(s) and key(s)") - optional.add_argument('-O', '--orgcode', action='store', dest='orgcode', metavar='ORG,DEPT', - help='Organization and Department codes for the InCommon Certificate Service. Defaults are Fermilab\'s codes.') - optional.add_argument('-l', '--key-length', action='store', default=cert_utils.Csr.KEY_LENGTH, - type=int, help='The key size to generate') - optional.add_argument('--debug', action='store_true', dest='debug', default=False, - help="Write debug output to stdout") - optional.add_argument('-t', '--test', action='store_true', dest='test', default=False, - help='Testing mode: test connection to InCommon API but does not request certificates. ' - 'Useful to test authentication credentials, optional arguments are ignored.') - optional.add_argument('-v', '--version', action='version', version=utils.VERSION_NUMBER) + optional.add_argument("-h", "--help", action="help", help="show this help message and exit") + optional.add_argument( + "-a", + "--altname", + action="append", + dest="altnames", + default=[], + help="Specify the SAN for the requested certificate (only works with -H/--hostname). " + "May be specified more than once for additional SANs.", + ) + optional.add_argument( + "-C", + "--config", + action="store", + dest="config_file", + default="/etc/osg/pki/ca-issuer.conf", + help="Path to configuration file", + ) + optional.add_argument( + "-d", + "--directory", + action="store", + dest="write_directory", + default=".", + help="The directory to write the host certificate(s) and key(s)", + ) + optional.add_argument( + "-O", + "--orgcode", + action="store", + dest="orgcode", + metavar="ORG,DEPT", + help="Organization and Department codes for the InCommon Certificate Service. Defaults are Fermilab's codes.", + ) + optional.add_argument( + "-l", + "--key-length", + action="store", + default=cert_utils.Csr.KEY_LENGTH, + type=int, + help="The key size to generate", + ) + optional.add_argument( + "--debug", action="store_true", dest="debug", default=False, help="Write debug output to stdout" + ) + optional.add_argument( + "-t", + "--test", + action="store_true", + dest="test", + default=False, + help="Testing mode: test connection to InCommon API but does not request certificates. " + "Useful to test authentication credentials, optional arguments are ignored.", + ) + optional.add_argument("-v", "--version", action="version", version=utils.VERSION_NUMBER) parsed_args = parser.parse_args() @@ -103,49 +165,48 @@ def parse_cli(): if parsed_args.hostfile and parsed_args.altnames: parsed_args.altnames = [] print("-a/--altname option ignored with -F/--hostfile", file=sys.stderr) - + if parsed_args.debug: # this sets the root debug level logging.getLogger().setLevel(logging.DEBUG) - logger.debug('Debug mode enabled') - - # (-H/--hostname | -F/--hostfile) are mutually exclusive but not required so testing mode can be enabled with optional param -t/--test + logger.debug("Debug mode enabled") + + # (-H/--hostname | -F/--hostfile) are mutually exclusive but not required so testing mode can be enabled with optional param -t/--test if not parsed_args.test and not parsed_args.hostname and not parsed_args.hostfile: - parser.error('argument -H/--hostname or -F/--hostfile is required.') + parser.error("argument -H/--hostname or -F/--hostfile is required.") return parsed_args class FilePathAction(argparse.Action): - """Action for validating if the file exists and there are read permissions - """ + """Action for validating if the file exists and there are read permissions""" + def __call__(self, parser, namespace, values, option_string=None): values = os.path.expanduser(values) if not os.path.exists(values): - raise IOError(f"Unable to locate the file at: {values}") - + raise OSError(f"Unable to locate the file at: {values}") + try: - open(values, 'r') + open(values) setattr(namespace, self.dest, values) - except IOError: - raise IOError(f"Unable to read the file at: {values}") - + except OSError: + raise OSError(f"Unable to read the file at: {values}") + def fail(message): - """Immediately fail with the specified message - """ + """Immediately fail with the specified message""" sys.exit(message) def build_headers(config): - """"This function build the headers for the HTTP request. - Returns headers for the HTTP request + """ "This function build the headers for the HTTP request. + Returns headers for the HTTP request """ headers = { - "Content-type": str(config['content_type']), - "login": str(args.login), - "customerUri": str(config['customeruri']) + "Content-type": str(config["content_type"]), + "login": str(args.login), + "customerUri": str(config["customeruri"]), } return headers @@ -153,20 +214,20 @@ def build_headers(config): def test_incommon_connection(config, restclient): """This function tests the connection to InCommon API - and the credentials for authentication: cert and key. - Performs a call to the listing SSL types endpoint. - Successful if response is HTTP 200 OK + and the credentials for authentication: cert and key. + Performs a call to the listing SSL types endpoint. + Successful if response is HTTP 200 OK """ # Build headers. headers = build_headers(config) response = None - - response = restclient.get_request(config['listingurl'], headers) + + response = restclient.get_request(config["listingurl"], headers) response_text = response.data.decode() - logger.debug('response text: ' + str(response_text)) + logger.debug("response text: " + str(response_text)) try: if response.status == 200: - print(prog + ": Connection successful to InCommon API") + print(prog + ": Connection successful to InCommon API") else: # InCommon API HTTP Error codes and messages are not consistent with documentation. print(prog + ": Connection failure to InCommon API") @@ -181,73 +242,77 @@ def test_incommon_connection(config, restclient): def submit_request(config, restclient, hostname, cert_csr, sans=None): """This function submits an enrollment request for a certificate - If successful returns a self-enrollment certificate Id = sslId + If successful returns a self-enrollment certificate Id = sslId """ - # Build and update headers for the restclient + # Build and update headers for the restclient headers = build_headers(config) response = None response_data = None - cert_type = config['igtfservercert'] - + cert_type = config["igtfservercert"] + if sans: - cert_type = config['igtfmultidomain'] - + cert_type = config["igtfmultidomain"] + payload = dict( csr=cert_csr, - orgId=config['department'], + orgId=config["department"], certType=cert_type, numberServers=0, - serverType=config['servertype'], - term=config['term'], - comments="Certificate request for " + hostname + serverType=config["servertype"], + term=config["term"], + comments="Certificate request for " + hostname, ) - + if sans: payload.update(subjAltNames=sans) - + try: - response = restclient.post_request(config['enrollurl'], headers, payload) - + response = restclient.post_request(config["enrollurl"], headers, payload) + if response.status == 200: response_text = response.data.decode() - logger.debug('response text: ' + str(response_text)) + logger.debug("response text: " + str(response_text)) response_data = json.loads(response_text) - response_data = response_data['sslId'] + response_data = response_data["sslId"] elif response.status == 401: - raise AuthenticationFailureException(response.status, "Connection failure to InCommon API. Check your authentication credentials.") + raise AuthenticationFailureException( + response.status, "Connection failure to InCommon API. Check your authentication credentials." + ) else: - print(prog + ": Connection failure to InCommon API. HTTP " + str(response.status) + " " + str(response.reason)) + print( + prog + ": Connection failure to InCommon API. HTTP " + str(response.status) + " " + str(response.reason) + ) raise http.client.HTTPException() except http.client.HTTPException as exc: - raise - + raise + return response_data - + + def retrieve_cert(config, sslcontext, sslId): - """This function retrieves a certificate given a self-enrollment certificate Id = sslId - """ - + """This function retrieves a certificate given a self-enrollment certificate Id = sslId""" + # Build and update headers for the restclient. Headers will be reused for all requests headers = build_headers(config) response = None - response_data = None + response_data = None retry_count = MAX_RETRY_RETRIEVAL - retrieve_url = config['retrieveurl'] + str(sslId) + config['certx509co'] - + retrieve_url = config["retrieveurl"] + str(sslId) + config["certx509co"] + for _ in range(retry_count): try: - restclient = InCommonApiClient(config['apiurl'], sslcontext) + restclient = InCommonApiClient(config["apiurl"], sslcontext) response = restclient.get_request(retrieve_url, headers) # InCommon API responds with HTTP 400 Bad Request when the certificate is still being procesed # "code": 0, "description": "Being processed by Sectigo" # Triggers the BadStatusLine exception avoiding to reusing the connection response_text = response.data.decode() - logger.debug('response text: ' + str(response_text)) - # HTTP 200 OK brings the certificate in the response, connection will be closed before exiting the loop + logger.debug("response text: " + str(response_text)) + # HTTP 200 OK brings the certificate in the response, connection will be closed before exiting the loop if response.status == 200: print(" - Certificate request is approved. Downloading certificate now.") response_data = response_text @@ -258,24 +323,24 @@ def retrieve_cert(config, sslcontext, sslId): pass except http.client.HTTPException as exc: raise - print(" - Certificate request is pending approval...") - print(f" - Waiting for {WAIT_RETRIEVAL} seconds before retrying certificate retrieval" ) + print(" - Certificate request is pending approval...") + print(f" - Waiting for {WAIT_RETRIEVAL} seconds before retrying certificate retrieval") # Closing the connection before going to sleep restclient.close_connection() time.sleep(WAIT_RETRIEVAL) - + return response_data - + + def main(): - """The entrypoint for osg-incommon-cert-request - """ + """The entrypoint for osg-incommon-cert-request""" global args try: args = parse_cli() - + config_parser = configparser.ConfigParser() try: - with open(args.config_file, 'r', encoding='utf-8') as config_file: + with open(args.config_file, encoding="utf-8") as config_file: try: config_parser.read_file(config_file) except configparser.Error as exc: @@ -284,28 +349,28 @@ def main(): fail(exc) try: - CONFIG = dict(config_parser.items('InCommon')) + CONFIG = dict(config_parser.items("InCommon")) except configparser.NoSectionError: - fail(f'Could not find [InCommon] section header in {args.config_file}') + fail(f"Could not find [InCommon] section header in {args.config_file}") if args.orgcode: - codes = [code.strip() for code in args.orgcode.split(',')] - CONFIG['organization'] = codes[0] - CONFIG['department'] = codes[1] - + codes = [code.strip() for code in args.orgcode.split(",")] + CONFIG["organization"] = codes[0] + CONFIG["department"] = codes[1] + print(f"Using organization code of {CONFIG['organization']} and department code of {CONFIG['department']}") utils.check_permissions(args.write_directory) - + if args.test: print("Beginning testing mode: ignoring optional parameters.") - print("="*60) + print("=" * 60) # Creating SSLContext with cert and key provided # usercert and userprivkey are already validated by utils.findusercred ssl_context = cert_utils.get_ssl_context(usercert=args.usercert, userkey=args.userprivkey) - restclient = InCommonApiClient(CONFIG['apiurl'], ssl_context) + restclient = InCommonApiClient(CONFIG["apiurl"], ssl_context) if args.test: test_incommon_connection(CONFIG, restclient) @@ -313,40 +378,40 @@ def main(): sys.exit(0) print("Beginning certificate request") - print("="*60) - - #Create tuple(s) either with a single hostname and altnames or with a set of hostnames and altnames from the hostfile + print("=" * 60) + + # Create tuple(s) either with a single hostname and altnames or with a set of hostnames and altnames from the hostfile if args.hostname: hosts = [tuple([args.hostname.strip()] + args.altnames)] else: with open(args.hostfile) as hosts_file: host_lines = hosts_file.readlines() hosts = [tuple(line.split()) for line in host_lines if line.strip()] - + requests = list() csrs = list() print("The following Common Name (CN) and Subject Alternative Names (SANS) have been specified: ") - # Building the lists with certificates --> utils.Csr(object) + # Building the lists with certificates --> utils.Csr(object) for host in set(hosts): common_name = host[0] sans = host[1:] - + print(f"CN: {common_name}, SANS: {sans}") - csr_obj = cert_utils.Csr(common_name, - output_dir=args.write_directory, - altnames=sans, key_length=args.key_length) - + csr_obj = cert_utils.Csr( + common_name, output_dir=args.write_directory, altnames=sans, key_length=args.key_length + ) + logger.debug(csr_obj.x509request.as_text()) csrs.append(csr_obj) - print("="*60) + print("=" * 60) for csr in csrs: subj = str(csr.x509request.get_subject()) print(f"Requesting certificate for {subj}: ") response_request = submit_request(CONFIG, restclient, subj, csr.base64_csr(), sans=csr.altnames) - + # response_request stores the sslId for the certificate request if response_request: requests.append(tuple([response_request, subj])) @@ -354,18 +419,18 @@ def main(): csr.write_pkey() else: print(f"Request failed for {subj}") - - print("-"*60) - + + print("-" * 60) + # Closing the restclient connection before going idle waiting for approval restclient.close_connection() - + print(f"{len(requests)} certificate(s) was(were) requested successfully") print(f"Waiting {WAIT_APPROVAL} seconds for requests approval...") - time.sleep(WAIT_APPROVAL) - + time.sleep(WAIT_APPROVAL) + print("\nStarting certificate retrieval") - print("="*60) + print("=" * 60) # Certificate retrieval has to retry until it gets the certificate # A restclient (InCommonApiClient) needs to be created for each retrieval attempt for request in requests: @@ -374,7 +439,7 @@ def main(): response_retrieve = retrieve_cert(CONFIG, ssl_context, request[0]) if response_retrieve is not None: - cert_path = os.path.join(args.write_directory, subj.split("=")[1] + '-cert.pem') + cert_path = os.path.join(args.write_directory, subj.split("=")[1] + "-cert.pem") print(f"Retrieval successful. Writing certificate file at: {cert_path}") utils.safe_rename(cert_path) utils.atomic_write(cert_path, response_retrieve.encode()) @@ -383,8 +448,8 @@ def main(): print(f"Retrieval failure for {subj}") print("The certificate can be retrieved directly from the InCommon Cert Manager interface.") print(f"CN {subj}, Self-enrollment Certificate ID (sslId): {request[0]}") - - print("-"*60) + + print("-" * 60) except SystemExit: raise @@ -392,17 +457,26 @@ def main(): sys.exit(exc) except KeyboardInterrupt as exc: print(str(exc)) - sys.exit('''Interrupted by user\n''') + sys.exit("""Interrupted by user\n""") except KeyError as exc: print(prog + ": error: " + f"Key {exc} not found in dictionary") sys.exit(1) except FileNotFoundException as exc: - print(prog + ": error: " + str(exc) + ':' + exc.filename) + print(prog + ": error: " + str(exc) + ":" + exc.filename) sys.exit(1) except SSLError as exc: print(prog + ": " + str(exc)) - sys.exit('Please check for valid certificate.\n') - except (IOError, FileWriteException, BadPassphraseException, AttributeError, EnvironmentError, ValueError, EOFError, SSLError, AuthenticationFailureException) as exc: + sys.exit("Please check for valid certificate.\n") + except ( + OSError, + FileWriteException, + BadPassphraseException, + AttributeError, + ValueError, + EOFError, + SSLError, + AuthenticationFailureException, + ) as exc: print(prog + ": error: " + str(exc)) sys.exit(1) except http.client.HTTPException as exc: diff --git a/osgpkitools/osg-cert-request b/osgpkitools/osg-cert-request index d0e1786..d19bda5 100755 --- a/osgpkitools/osg-cert-request +++ b/osgpkitools/osg-cert-request @@ -1,8 +1,10 @@ #!/usr/bin/env python3 -import os, sys +import os +import sys + if __name__ == "__main__" and __package__ is None: - sys.path.append(os.path.abspath(__file__+"/../..")) + sys.path.append(os.path.abspath(__file__ + "/../..")) from osgpkitools import cert_request diff --git a/osgpkitools/osg-incommon-cert-request b/osgpkitools/osg-incommon-cert-request index 43b71ae..fc31ff3 100755 --- a/osgpkitools/osg-incommon-cert-request +++ b/osgpkitools/osg-incommon-cert-request @@ -1,8 +1,10 @@ #!/usr/bin/env python3 -import os, sys +import os +import sys + if __name__ == "__main__" and __package__ is None: - sys.path.append(os.path.abspath(__file__+"/../..")) + sys.path.append(os.path.abspath(__file__ + "/../..")) from osgpkitools import incommon_request diff --git a/osgpkitools/rest_client.py b/osgpkitools/rest_client.py index 43e19f5..9705c83 100644 --- a/osgpkitools/rest_client.py +++ b/osgpkitools/rest_client.py @@ -2,11 +2,12 @@ import logging import socket import ssl -import urllib3 import urllib.parse from json import dumps +import urllib3 + from . import utils from .ExceptionDefinitions import * @@ -14,24 +15,26 @@ logger = logging.getLogger(__name__) -class InCommonApiClient(): +class InCommonApiClient: def __init__(self, base_url, ssl_context, *args, **kwargs): """base_url and api_timeout Args: - base_url (string): Will be used to connect to the InCommon API (cert-auth) + base_url (string): Will be used to connect to the InCommon API (cert-auth) ssl_context (object): urllib3 SSL context including user credentials """ - + if not base_url.startswith("http"): base_url = "https://" + base_url self.base_url = base_url - self.http = urllib3.PoolManager(ssl_context=ssl_context, - ca_certs=ssl.get_default_verify_paths().cafile, - timeout=urllib3.util.Timeout(connect=2, read=10)) - + self.http = urllib3.PoolManager( + ssl_context=ssl_context, + ca_certs=ssl.get_default_verify_paths().cafile, + timeout=urllib3.util.Timeout(connect=2, read=10), + ) + def close_connection(self): self.http.clear() @@ -42,21 +45,21 @@ def post_request(self, url, headers, data): data (json):body containing information headers (json): additional headers to complete the request """ - + url = urllib.parse.urljoin(self.base_url, url) - - logger.debug('posting to ' + url) - logger.debug('headers ' + str(headers)) - logger.debug('post data ' + str(data)) - + + logger.debug("posting to " + url) + logger.debug("headers " + str(headers)) + logger.debug("post data " + str(data)) + params = urllib.parse.urlencode(data, doseq=True) - + try: post_response = self.http.request("POST", url, body=dumps(data), headers=headers) utils.check_response_500(post_response) - logger.debug('post response status ' + str(post_response.status) + ': ' + str(post_response.reason)) + logger.debug("post response status " + str(post_response.status) + ": " + str(post_response.reason)) except http.client.HTTPException as exc: - print((prog + f": error: post to {url} failed : {exc}")) + print(prog + f": error: post to {url} failed : {exc}") raise return post_response @@ -69,17 +72,17 @@ def get_request(self, url, headers): """ url = urllib.parse.urljoin(self.base_url, url) - logger.debug('requesting to ' + url) - logger.debug('headers ' + str(headers)) - + logger.debug("requesting to " + url) + logger.debug("headers " + str(headers)) + try: get_response = self.http.request("GET", url, None, headers) utils.check_response_500(get_response) - logger.debug('get response status ' + str(get_response.status) + ': ' + str(get_response.reason)) + logger.debug("get response status " + str(get_response.status) + ": " + str(get_response.reason)) except http.client.BadStatusLine as exc: raise except http.client.HTTPException as exc: - print((prog + f": error: request from {url} failed : {exc}")) + print(prog + f": error: request from {url} failed : {exc}") raise - + return get_response diff --git a/osgpkitools/utils.py b/osgpkitools/utils.py index d41d3e4..a409b8f 100644 --- a/osgpkitools/utils.py +++ b/osgpkitools/utils.py @@ -8,12 +8,11 @@ from .ExceptionDefinitions import * VERSION_NUMBER = "3.7.1" -HELP_EMAIL = 'help@opensciencegrid.org' +HELP_EMAIL = "help@opensciencegrid.org" def atomic_write(filename, contents): - """Write to a temporary file then move it to its final location - """ + """Write to a temporary file then move it to its final location""" temp_fd, temp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) os.write(temp_fd, contents) os.close(temp_fd) @@ -21,35 +20,32 @@ def atomic_write(filename, contents): def check_response_500(response): - """ This functions handles the 500 error response from the server""" + """This functions handles the 500 error response from the server""" if response.status == 500: raise Exception_500response(response.status, response.reason) def safe_rename(filename): - """Renames 'filename' to 'filename.old' - """ - old_filename = filename + '.old' + """Renames 'filename' to 'filename.old'""" + old_filename = filename + ".old" try: shutil.move(filename, old_filename) print(f"Renamed existing file from {filename} to {old_filename}") - except IOError as exc: + except OSError as exc: if exc.errno != errno.ENOENT: print(exc) - raise RuntimeError(f'ERROR: Failed to rename {filename} to {old_filename}') + raise RuntimeError(f"ERROR: Failed to rename {filename} to {old_filename}") def safe_write(filename, contents): - """Safely backup the target 'filename' then write 'contents' - """ + """Safely backup the target 'filename' then write 'contents'""" safe_rename(filename) atomic_write(filename, contents) def check_permissions(path): - """The function checks for write permissions for the given path to verify if the user has write permissions - """ + """The function checks for write permissions for the given path to verify if the user has write permissions""" if os.access(path, os.W_OK): return else: diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1faeea3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ + +[tool.black] +line-length = 120 +target-version = ['py39'] + +[tool.isort] +src_paths = ["osgpkitools", "tests"] +py_version = 39 +profile = "black" +line_length = 120 +combine_star = "True" +order_by_type = "True" +dedup_headings = "True" +lines_between_types = 1 +force_alphabetical_sort_within_sections = "True" diff --git a/rpm/osg-pki-tools.spec b/rpm/osg-pki-tools.spec index 4951fb6..f86e759 100644 --- a/rpm/osg-pki-tools.spec +++ b/rpm/osg-pki-tools.spec @@ -47,7 +47,7 @@ mv %{buildroot}/%{_prefix}/config/ca-issuer.conf %{buildroot}%{_sysconfdir}/osg/ - Set the default ca_certs bundle for osg-incommon-cert-request. * Tue Feb 20 2024 Dave Dykstra - 3.7.0 -- Convert osg-incommon-cert-request to use urllib3 instead of M2crypto +- Convert osg-incommon-cert-request to use urllib3 instead of M2crypto for its https connection, to make it work properly on EL9. * Wed Sep 6 2023 Brian Lin - 3.6.1 diff --git a/setup.py b/setup.py index f8ffc11..37c51ee 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ from distutils.core import setup + from osgpkitools import utils setup( @@ -11,8 +12,6 @@ data_files=[("config", ["config/ca-issuer.conf"])], description="Open Science Grid x509 certificate tools.", long_description="Open Science Grid x509 certificate tools.", - packages=['osgpkitools'], - license='Apache 2.0', + packages=["osgpkitools"], + license="Apache 2.0", ) - - diff --git a/tests/OSGPKIUtilsTests.py b/tests/OSGPKIUtilsTests.py index d3bd7a2..006f947 100644 --- a/tests/OSGPKIUtilsTests.py +++ b/tests/OSGPKIUtilsTests.py @@ -2,54 +2,68 @@ import re import signal -from json import dumps import sys import unittest -from .pkiunittest import DOMAIN, EMAIL, PYPATH +from json import dumps # Allow import of OSGPKIUtilsTests. Hacky. sys.path.insert(1, PYPATH) + from osgpkitools import OSGPKIUtils +from .pkiunittest import DOMAIN, EMAIL, PYPATH + + class OSGPKIUtilsTests(unittest.TestCase): - FQDN = 'test.' + DOMAIN + FQDN = "test." + DOMAIN def test_csr_generation(self): - '''Generate a basic CSR''' + """Generate a basic CSR""" cert = OSGPKIUtils.Cert(self.FQDN, email=EMAIL) - self.assertTrue(cert.x509request, 'missing CSR contents') + self.assertTrue(cert.x509request, "missing CSR contents") def test_alt_name_csr_generation(self): - '''Generate a CSR with multiple SANs''' - alias = 'test-san.' + DOMAIN + """Generate a CSR with multiple SANs""" + alias = "test-san." + DOMAIN cert = OSGPKIUtils.Cert(self.FQDN, altnames=[alias], email=EMAIL) csr_contents = cert.x509request.as_text() - self.assertIsNotNone(re.search(r'X509v3 Subject Alternative Name: critical', csr_contents), - "Subject Alternative Name not marked as 'critical'\n" + csr_contents) - found_names = set(match.group(1) for match in re.finditer(r'DNS:([\w\-\.]+)', csr_contents)) - expected_names = set([alias]) - self.assertEqual(found_names, expected_names, - f"Did not find expected SAN contents {list(expected_names)}:\n{csr_contents}") # printed lists are easier to read` than printed sets + self.assertIsNotNone( + re.search(r"X509v3 Subject Alternative Name: critical", csr_contents), + "Subject Alternative Name not marked as 'critical'\n" + csr_contents, + ) + found_names = {match.group(1) for match in re.finditer(r"DNS:([\w\-\.]+)", csr_contents)} + expected_names = {alias} + self.assertEqual( + found_names, expected_names, f"Did not find expected SAN contents {list(expected_names)}:\n{csr_contents}" + ) # printed lists are easier to read` than printed sets def test_timeout(self): - '''Verify timeout length''' - OSGPKIUtils.start_timeout_clock(1) # 1 minute timeout - alarm_timer = signal.alarm(0) # cancel alarm and get remaining time of previous alarm - self.assertEqual(alarm_timer, 60, f'Expected 1 min timeout, got {alarm_timer}s') + """Verify timeout length""" + OSGPKIUtils.start_timeout_clock(1) # 1 minute timeout + alarm_timer = signal.alarm(0) # cancel alarm and get remaining time of previous alarm + self.assertEqual(alarm_timer, 60, f"Expected 1 min timeout, got {alarm_timer}s") def test_sigalrm_handler(self): - '''Verify exiting handler''' + """Verify exiting handler""" self.assertRaises(SystemExit, OSGPKIUtils.start_timeout_clock, 0) def test_missing_vo_exception(self): - '''Verify helpful error message when user fails to include necessary VO information.''' - response = dumps({'status': 'FAILED', - 'detail': ' -- '.join(["Failed to find GridAdmins for specified CSRs/VO", - "Couldn't find GridAdmin group under specified VO.", - "GOC alert will be sent to GOC infrastructure team about this " + \ - "issue. Meanwhile, feel free to open a GOC ticket at " + \ - "https://ticket.grid.iu.edu"])}) + """Verify helpful error message when user fails to include necessary VO information.""" + response = dumps( + { + "status": "FAILED", + "detail": " -- ".join( + [ + "Failed to find GridAdmins for specified CSRs/VO", + "Couldn't find GridAdmin group under specified VO.", + "GOC alert will be sent to GOC infrastructure team about this " + + "issue. Meanwhile, feel free to open a GOC ticket at " + + "https://ticket.grid.iu.edu", + ] + ), + } + ) try: OSGPKIUtils.print_failure_reason_exit(response) except SystemExit: @@ -58,12 +72,13 @@ def test_missing_vo_exception(self): self.fail("print_failure_reason_exit() did not raise SystemExit") def test_read_config(self): - '''Verify that configuration is read in as a dictionary''' - for section, itb in list({'production': False, 'ITB': True}.items()): - self.assertTrue(OSGPKIUtils.read_config(itb, config_files=['../osgpkitools/pki-clients.ini']), - f'Unable to read the {section} config section') - + """Verify that configuration is read in as a dictionary""" + for section, itb in list({"production": False, "ITB": True}.items()): + self.assertTrue( + OSGPKIUtils.read_config(itb, config_files=["../osgpkitools/pki-clients.ini"]), + f"Unable to read the {section} config section", + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/pkiunittest.py b/tests/pkiunittest.py index 6663192..fb5d319 100644 --- a/tests/pkiunittest.py +++ b/tests/pkiunittest.py @@ -11,7 +11,8 @@ import tempfile from copy import deepcopy -from subprocess import Popen, PIPE +from subprocess import PIPE, Popen + from M2Crypto import RSA, X509 global TEST_PATH @@ -42,10 +43,11 @@ # Scripts import from osgpkitools, and it is up a directory PYPATH = os.path.abspath("..") -TEST_PATH = '' +TEST_PATH = "" # The expected subject for CILogon host/service certs before the common name -HOST_SUBJECT_PREFIX = '/DC=org/DC=opensciencegrid/O=Open Science Grid/OU=Services/CN=' +HOST_SUBJECT_PREFIX = "/DC=org/DC=opensciencegrid/O=Open Science Grid/OU=Services/CN=" + def test_env_setup(): """Create a test dir and environment""" @@ -56,66 +58,74 @@ def test_env_setup(): # Set path and python path for tests orig_env = deepcopy(os.environ) try: - os.environ['PYTHONPATH'] += f':{PYPATH}' + os.environ["PYTHONPATH"] += f":{PYPATH}" except KeyError: - os.environ['PYTHONPATH'] = PYPATH - os.environ['PATH'] += f':{SCRIPTS_PATH}' + os.environ["PYTHONPATH"] = PYPATH + os.environ["PATH"] += f":{SCRIPTS_PATH}" # Create temp dir and place necessary config in the cwd - ini_file = os.path.join(SCRIPTS_PATH, 'pki-clients.ini') + ini_file = os.path.join(SCRIPTS_PATH, "pki-clients.ini") cwd = os.getcwd() shutil.copy2(ini_file, cwd) TEST_PATH = tempfile.mkdtemp(dir=cwd) + def test_env_teardown(): """Blow up the test dir""" - os.environ = deepcopy(orig_env) # restore environment - os.unlink('pki-clients.ini') + os.environ = deepcopy(orig_env) # restore environment + os.unlink("pki-clients.ini") shutil.rmtree(TEST_PATH) + def run_command(cmd, env=None): proc = Popen(cmd, stdout=PIPE, stderr=PIPE, env=env) stdout, stderr = proc.communicate() rc = proc.returncode - diagnostic = f"Command: {' '.join(cmd)}\n" \ - + f"Return code: {rc}\n" \ - + "STDOUT:\n" + stdout \ - + "STDERR:\n" + stderr + diagnostic = f"Command: {' '.join(cmd)}\n" + f"Return code: {rc}\n" + "STDOUT:\n" + stdout + "STDERR:\n" + stderr return rc, stdout, stderr, diagnostic + def run_python(script, *args): - '''Run osg-pki-tools script ''' + """Run osg-pki-tools script""" script_path = os.path.join(SCRIPTS_PATH, script) py_cmd = (sys.executable, script_path) + args return run_command(py_cmd, env=os.environ) -class OIM(object): + +class OIM: """OIM and cert/key pair interface""" + def __init__(self): """Create a cert instance""" - self.reqid = '' + self.reqid = "" self.certs = list() self.keys = list() def request(self, *opts): """Run osg-cert-request""" - rc, stdout, stderr, msg = run_python('osg-cert-request', - '--test', - '--comment', 'osg-pki-tools developer testing', - '--cc', 'test@example.com,test2@example.com', - '--directory', TEST_PATH, - '--vo', TEST_VO, - *opts) + rc, stdout, stderr, msg = run_python( + "osg-cert-request", + "--test", + "--comment", + "osg-pki-tools developer testing", + "--cc", + "test@example.com,test2@example.com", + "--directory", + TEST_PATH, + "--vo", + TEST_VO, + *opts, + ) try: - self.reqid = re.search(r'OIM Request ID: (\d+)', stdout).group(1) + self.reqid = re.search(r"OIM Request ID: (\d+)", stdout).group(1) except AttributeError: - msg = 'Could not parse stdout for key or request ID\n' + msg + msg = "Could not parse stdout for key or request ID\n" + msg try: - key_path = re.search(r'Writing key to ([^\n]+)', stdout).group(1) + key_path = re.search(r"Writing key to ([^\n]+)", stdout).group(1) except AttributeError: - msg = 'Could not parse stdout for key or request ID\n' + msg + msg = "Could not parse stdout for key or request ID\n" + msg else: try: key = OIM.verify_key(key_path) @@ -126,28 +136,34 @@ def request(self, *opts): def gridadmin_request(self, *opts): """Run osg-gridadmin-request""" - rc, stdout, stderr, msg = run_python('osg-gridadmin-cert-request', - '--test', - '--cert', GA_CERT_PATH, - '--pkey', GA_KEY_PATH, - '--directory', TEST_PATH, - '--vo', TEST_VO, - *opts) + rc, stdout, stderr, msg = run_python( + "osg-gridadmin-cert-request", + "--test", + "--cert", + GA_CERT_PATH, + "--pkey", + GA_KEY_PATH, + "--directory", + TEST_PATH, + "--vo", + TEST_VO, + *opts, + ) # Populate instance attr try: - self.reqid = re.search(r'OIM Request ID: (\d+)', stdout).group(1) + self.reqid = re.search(r"OIM Request ID: (\d+)", stdout).group(1) except AttributeError: - msg = 'Could not parse stdout for request ID\n' + msg + msg = "Could not parse stdout for request ID\n" + msg # find all certs and keys in the output dir as sorted lists - certs = sorted([x for x in glob.glob(os.path.join(TEST_PATH, '*.pem')) if '-key.pem' not in x]) - keys = sorted(glob.glob(os.path.join(TEST_PATH, '*-key.pem'))) + certs = sorted([x for x in glob.glob(os.path.join(TEST_PATH, "*.pem")) if "-key.pem" not in x]) + keys = sorted(glob.glob(os.path.join(TEST_PATH, "*-key.pem"))) if len(certs) != len(keys): - raise AssertionError('Mismatched number of issued certs and keys\n' + msg) + raise AssertionError("Mismatched number of issued certs and keys\n" + msg) # Verify permissions of created files, if any for cert_path, key_path in zip(certs, keys): try: - subject = HOST_SUBJECT_PREFIX + os.path.basename(os.path.splitext(cert_path)[0]).replace('_', '/') + subject = HOST_SUBJECT_PREFIX + os.path.basename(os.path.splitext(cert_path)[0]).replace("_", "/") cert = OIM.verify_cert(cert_path, subject) key = OIM.verify_key(key_path) except CertFileError as cert_err: @@ -161,57 +177,60 @@ def gridadmin_request(self, *opts): def retrieve(self, *opts): """Run osg-cert-retrieve""" - if not self.reqid and '--help' not in opts: - raise CertFileError('Could not revoke cert due to missing request ID\n') + if not self.reqid and "--help" not in opts: + raise CertFileError("Could not revoke cert due to missing request ID\n") args = list(opts + (self.reqid,)) - return run_python('osg-cert-retrieve', '--test', - '--directory', TEST_PATH, - *args) + return run_python("osg-cert-retrieve", "--test", "--directory", TEST_PATH, *args) def user_renew(self, *opts): """Run osg-user-cert-renew""" - return run_python('osg-user-cert-renew', '--test', *opts) + return run_python("osg-user-cert-renew", "--test", *opts) def revoke(self, *opts): """Run osg-cert-revoke""" - if not self.reqid and '--help' not in opts: - raise CertFileError('Could not revoke cert due to missing request ID\n') - args = opts + ('--test', - '--cert', GA_CERT_PATH, - '--pkey', GA_KEY_PATH, - self.reqid, 'osg-pki-tools unit test - revoke') - return run_python('osg-cert-revoke', *args) + if not self.reqid and "--help" not in opts: + raise CertFileError("Could not revoke cert due to missing request ID\n") + args = opts + ( + "--test", + "--cert", + GA_CERT_PATH, + "--pkey", + GA_KEY_PATH, + self.reqid, + "osg-pki-tools unit test - revoke", + ) + return run_python("osg-cert-revoke", *args) def user_revoke(self, *opts): """Run osg-user-cert-revoke, which is a bash wrapper around osg-cert-revoke""" - if not self.reqid and '--help' not in opts: - raise CertFileError('Could not revoke cert due to missing request ID\n') - args = opts + ('--test', self.reqid, 'osg-pki-tools unit test - user revoke') - cmd = (os.path.join(SCRIPTS_PATH, 'osg-user-cert-revoke'),) + args + if not self.reqid and "--help" not in opts: + raise CertFileError("Could not revoke cert due to missing request ID\n") + args = opts + ("--test", self.reqid, "osg-pki-tools unit test - user revoke") + cmd = (os.path.join(SCRIPTS_PATH, "osg-user-cert-revoke"),) + args return run_command(cmd) @staticmethod def verify_key(path): """Ensure proper key permission bits and ability to unlock the key""" - fail_prefix = 'VerificationFailure: ' + fail_prefix = "VerificationFailure: " if not os.path.exists(path): raise KeyFileError(fail_prefix + "No associated key file\n") - permissions = os.stat(path).st_mode & 0o777 # Mask non-permission bits + permissions = os.stat(path).st_mode & 0o777 # Mask non-permission bits if permissions != 0o600: raise KeyFileError(fail_prefix + f"Bad permissions ({permissions}) on key '{path}'\n") try: key = RSA.load_key(path, OIM.simple_pass_callback) except RSA.RSAError as exc: - if 'no start line' in exc: + if "no start line" in exc: raise KeyFileError(fail_prefix + f"Could not load key file '{path}'\n") - elif 'bad pass' in exc: + elif "bad pass" in exc: raise KeyFileError(fail_prefix + f"Incorrect pass for key file {path}\n") return key @staticmethod def verify_cert(path, expected_subject): """Ensure proper cert permissions, returns""" - fail_prefix = 'VerificationFailure: ' + fail_prefix = "VerificationFailure: " if not os.path.exists(path): raise CertFileError(fail_prefix + "No associated cert file\n") mode = os.stat(path).st_mode @@ -221,21 +240,21 @@ def verify_cert(path, expected_subject): cert = X509.load_cert(path) cert_subject = str(cert.get_subject()) except X509.X509Error: - raise CertFileError(f'Malformed cert: {path}\n') + raise CertFileError(f"Malformed cert: {path}\n") if cert_subject != expected_subject: - raise CertFileError(f'Incorrect subject. EXPECTED:\n{expected_subject}\nGOT:\n{cert_subject}\n') + raise CertFileError(f"Incorrect subject. EXPECTED:\n{expected_subject}\nGOT:\n{cert_subject}\n") return cert @staticmethod def simple_pass_callback(verify): """Callback for unlocking keys with passwords in plaintext for testing.""" - return '' + return "" def assertNumCerts(self, num_expected_certs, msg): """Verify expected number of certs""" num_found_certs = len(self.certs) if num_found_certs != num_expected_certs: - raise AssertionError(f'Expected {num_found_certs} cert(s), found {num_expected_certs}\n{msg}') + raise AssertionError(f"Expected {num_found_certs} cert(s), found {num_expected_certs}\n{msg}") def assertSans(self, hosts_list, msg): """Verify that the we have the correct number of certs and expected SAN contents for each cert. @@ -246,8 +265,8 @@ def assertSans(self, hosts_list, msg): self.assertNumCerts(len(hosts_list), msg) for cert, expected_names in zip(self.certs, hosts_list): # Verify list of SANs are as expected - san_contents = cert.get_ext('subjectAltName').get_value() - found_names = set(match.group(1) for match in re.finditer(r'DNS:([\w\-\.]+)', san_contents)) + san_contents = cert.get_ext("subjectAltName").get_value() + found_names = {match.group(1) for match in re.finditer(r"DNS:([\w\-\.]+)", san_contents)} if found_names != set(expected_names): raise AssertionError(f"Did not find expected SAN contents {expected_names}:\n{cert.as_text()}\n{msg}") @@ -255,5 +274,6 @@ def assertSans(self, hosts_list, msg): class KeyFileError(AssertionError): pass + class CertFileError(AssertionError): pass diff --git a/tests/setup_tests.sh b/tests/setup_tests.sh index 995c4e3..4d4bbd4 100755 --- a/tests/setup_tests.sh +++ b/tests/setup_tests.sh @@ -31,6 +31,3 @@ elif [ "$el_version" = "7" ]; then docker rm -v $DOCKER_CONTAINER_ID fi - - - diff --git a/tests/test_cert_request.py b/tests/test_cert_request.py index 15af237..d0957b6 100644 --- a/tests/test_cert_request.py +++ b/tests/test_cert_request.py @@ -4,18 +4,20 @@ import sys import unittest -from itertools import permutations from contextlib import contextmanager from io import StringIO +from itertools import permutations from osgpkitools import cert_request -HOST_ARGS = [('--hostname', 'hostname.example.edu')] -HOSTFILE_ARGS = [('--hostfile', 'hosts.txt')] -LOCATION_ARGS = [('--country', 'US'), - ('--state', 'New York'), - ('--locality', 'Stony Brook'), - ('--organization', 'SUNY - Stony Brook')] +HOST_ARGS = [("--hostname", "hostname.example.edu")] +HOSTFILE_ARGS = [("--hostfile", "hosts.txt")] +LOCATION_ARGS = [ + ("--country", "US"), + ("--state", "New York"), + ("--locality", "Stony Brook"), + ("--organization", "SUNY - Stony Brook"), +] # https://stackoverflow.com/questions/18651705/argparse-unit-tests-suppress-the-help-message @@ -55,59 +57,53 @@ def parse_cli_flatten_args(args): class CertRequestTests(unittest.TestCase): - """Tests for CSR generation - """ + """Tests for CSR generation""" def test_conflicting_opts(self): - """Users should not provide both a hostname and hosts file - """ + """Users should not provide both a hostname and hosts file""" with capture_sys_output() as (_, _): self.assertRaises(SystemExit, parse_cli_flatten_args, HOST_ARGS + HOSTFILE_ARGS) def test_required_opts(self): - """Users need to provide location information and hostname or hosts file - """ + """Users need to provide location information and hostname or hosts file""" for host in [[()], HOST_ARGS, HOSTFILE_ARGS]: for location in permutations(LOCATION_ARGS, 3): args = host + list(location) with capture_sys_output() as (_, stderr): self.assertRaises(SystemExit, parse_cli_flatten_args, args) - self.assertIsNotNone(re.search(r'error.*are required.*', stderr.getvalue())) + self.assertIsNotNone(re.search(r"error.*are required.*", stderr.getvalue())) def test_ignored_opts(self): - """-A/--altname should be ignored when specifying -F/--hostfile - """ + """-A/--altname should be ignored when specifying -F/--hostfile""" with capture_sys_output() as (_, _): - args = parse_cli_flatten_args(HOSTFILE_ARGS + LOCATION_ARGS + - [('--altname', 'test-san.opensciencegrid.org')]) - self.assertEqual(args.altnames, [], 'Altname option was not ignored when --hostfile was specified') + args = parse_cli_flatten_args( + HOSTFILE_ARGS + LOCATION_ARGS + [("--altname", "test-san.opensciencegrid.org")] + ) + self.assertEqual(args.altnames, [], "Altname option was not ignored when --hostfile was specified") def test_state_opt(self): - """State values should be unabbreviated - """ + """State values should be unabbreviated""" with capture_sys_output() as (_, _): - self.assertRaises(ValueError, parse_cli_flatten_args, HOST_ARGS + [('--state', 'WI')]) + self.assertRaises(ValueError, parse_cli_flatten_args, HOST_ARGS + [("--state", "WI")]) args = parse_cli_flatten_args(HOST_ARGS + LOCATION_ARGS) - self.assertEqual(args.state, 'New York', f"Unexpected value '{args.state}' for state option:\n{args}") + self.assertEqual(args.state, "New York", f"Unexpected value '{args.state}' for state option:\n{args}") def test_country_opt(self): - """Country values should be the abbreviated, 2-letter country code - """ + """Country values should be the abbreviated, 2-letter country code""" with capture_sys_output() as (_, _): - self.assertRaises(ValueError, parse_cli_flatten_args, HOST_ARGS + [('--country', 'United States')]) + self.assertRaises(ValueError, parse_cli_flatten_args, HOST_ARGS + [("--country", "United States")]) args = parse_cli_flatten_args(HOST_ARGS + LOCATION_ARGS) - self.assertEqual(args.country, 'US', f"Unexpected value '{args.country}' for country option:\n{args}") + self.assertEqual(args.country, "US", f"Unexpected value '{args.country}' for country option:\n{args}") def test_help_opt(self): - """Verify help option - """ - for opt in ['-h', '--help']: + """Verify help option""" + for opt in ["-h", "--help"]: with capture_sys_output() as (_, _): self.assertRaises(SystemExit, parse_cli_flatten_args, [opt]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_inside_docker.sh b/tests/test_inside_docker.sh index 72d00a9..b7e15cf 100755 --- a/tests/test_inside_docker.sh +++ b/tests/test_inside_docker.sh @@ -55,4 +55,3 @@ else python -m unittest discover -v tests fi popd -