diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0b86174a..e794b639 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,20 +8,21 @@ jobs: strategy: max-parallel: 8 matrix: - os: [ubuntu-20.04, ubuntu-22.04, macos-12] - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + os: [ubuntu-20.04, ubuntu-24.04, macos-12] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: ${{matrix.python-version}} - run: | - if [[ $(uname) == Linux ]]; then sudo apt-get install --no-install-recommends python3-openssl python3-lxml python3-certifi; fi + if [[ $(uname) == Linux ]]; then sudo apt-get install --no-install-recommends python3-lxml python3-certifi; fi - run: make install - if: ${{matrix.python-version == '3.12'}} run: make lint - run: make test - uses: codecov/codecov-action@v3 + if: ${{matrix.python-version == '3.12' && matrix.os == 'ubuntu-24.04'}} black: runs-on: ubuntu-22.04 steps: diff --git a/Makefile b/Makefile index a5005d86..7dd1743b 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ SHELL=/bin/bash lint: - ruff $$(dirname */__init__.py) + ruff check $$(dirname */__init__.py) mypy --install-types --non-interactive --check-untyped-defs $$(dirname */__init__.py) test: diff --git a/README.rst b/README.rst index 1ce517b2..c7cf3447 100644 --- a/README.rst +++ b/README.rst @@ -17,8 +17,8 @@ standard, and most recommended ones. Its features are: `_, required to verify signatures generated by some SAML implementations) * Modern Python compatibility (3.7-3.11+ and PyPy) -* Well-supported, portable, reliable dependencies: `lxml `_, - `cryptography `_, `pyOpenSSL `_ +* Well-supported, portable, reliable dependencies: `lxml `_ and + `cryptography `_ * Comprehensive testing (including the XMLDSig interoperability suite) and `continuous integration `_ * Simple interface with useful, ergonomic, and secure defaults (no network calls, XSLT or XPath transforms) @@ -30,22 +30,6 @@ Installation pip install signxml -Note: SignXML depends on `lxml `_ and `cryptography -`_, which in turn depend on `OpenSSL `_, `LibXML -`_, and Python tools to interface with them. You can install those as follows: - -+--------------+----------------------------------------------------------------------------------------------------------------------+ -| OS | Command | -+==============+======================================================================================================================+ -| Ubuntu | ``apt-get install --no-install-recommends python3-pip python3-wheel python3-setuptools python3-openssl python3-lxml``| -+--------------+----------------------------------------------------------------------------------------------------------------------+ -| Red Hat, | ``yum install python3-pip python3-pyOpenSSL python3-lxml`` | -| Amazon Linux,| | -| CentOS | | -+--------------+----------------------------------------------------------------------------------------------------------------------+ -| Mac OS | Install `Homebrew `_, then run ``brew install python``. | -+--------------+----------------------------------------------------------------------------------------------------------------------+ - Synopsis -------- SignXML uses the `lxml ElementTree API `_ to work with XML data. @@ -66,9 +50,11 @@ To make this example self-sufficient for test purposes: - Generate a test certificate and key using ``openssl req -x509 -nodes -subj "/CN=test" -days 1 -newkey rsa -keyout privkey.pem -out cert.pem`` - (run ``yum install openssl`` on Red Hat). + (run ``apt-get install openssl``, ``yum install openssl``, or ``brew install openssl`` if the ``openssl`` executable + is not found). - Pass the ``x509_cert=cert`` keyword argument to ``XMLVerifier.verify()``. (In production, ensure this is replaced with - the correct configuration for the trusted CA or certificate - this determines which signatures your application trusts.) + the correct configuration for the trusted CA or certificate - this determines which signatures your application + trusts.) .. _verifying-saml-assertions: diff --git a/docs/conf.py b/docs/conf.py index d35dd30b..9aabf094 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -24,7 +24,6 @@ "python": ("https://docs.python.org/3", None), "lxml": ("https://lxml.de/apidoc", "https://lxml.de/apidoc/objects.inv"), "Cryptography": ("https://cryptography.io/en/latest", "https://cryptography.io/en/latest/objects.inv"), - "pyOpenSSL": ("https://www.pyopenssl.org/en/stable", "https://www.pyopenssl.org/en/stable/objects.inv"), } templates_path = [""] ogp_site_url = "https://xml-security.github.io/" + project diff --git a/pyproject.toml b/pyproject.toml index 9f91f8c6..24df85dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,4 +9,6 @@ skip_gitignore = true [tool.ruff] line-length = 120 + +[tool.ruff.lint] per-file-ignores = {"signxml/__init__.py" = ["F401"], "signxml/xades/__init__.py" = ["F401"], "signxml/verifier.py" = ["E721"]} diff --git a/setup.py b/setup.py index e67ac303..d8c7d8dd 100755 --- a/setup.py +++ b/setup.py @@ -13,12 +13,9 @@ long_description=open("README.rst").read(), python_requires=">=3.7", install_requires=[ - # Dependencies are restricted by major version range according to semver. - # By default, version minimums are set to be compatible with the oldest supported Ubuntu LTS (currently 20.04). - "lxml >= 4.5.0, < 6", - "cryptography >= 3.4.8", # Set to the version in Ubuntu 22.04 due to features we need from cryptography 3.1 - "pyOpenSSL >= 19.0.0", - "certifi >= 2019.11.28", + "lxml >= 5.2.1, < 6", # Ubuntu 24.04 LTS + "cryptography >= 43", # Required to support client certificate validation + "certifi >= 2023.11.17", # Ubuntu 24.04 LTS # "tsp-client >= 0.1.3", ], extras_require={ diff --git a/signxml/algorithms.py b/signxml/algorithms.py index 2e7ec22c..5169c2e7 100644 --- a/signxml/algorithms.py +++ b/signxml/algorithms.py @@ -37,7 +37,7 @@ class SignatureConstructionMethod(Enum): class FragmentLookupMixin: @classmethod def from_fragment(cls, fragment): - for i in cls: # type: ignore + for i in cls: # type: ignore[attr-defined] if i.value.endswith("#" + fragment): return i else: @@ -50,7 +50,7 @@ def _missing_(cls, value): raise InvalidInput(f"Unrecognized {cls.__name__}: {value}") def __repr__(self): - return f"{self.__class__.__name__}.{self.name}" # type: ignore + return f"{self.__class__.__name__}.{self.name}" # type: ignore[attr-defined] class DigestAlgorithm(FragmentLookupMixin, InvalidInputErrorMixin, Enum): diff --git a/signxml/processor.py b/signxml/processor.py index 011d5889..fca886cd 100644 --- a/signxml/processor.py +++ b/signxml/processor.py @@ -76,7 +76,7 @@ class XMLSignatureProcessor(XMLProcessor): "urn:oid:1.3.132.0.37": ec.SECT409R1, "urn:oid:1.3.132.0.38": ec.SECT571K1, } - known_ecdsa_curve_oids = {ec().name: oid for oid, ec in known_ecdsa_curves.items()} # type: ignore + known_ecdsa_curve_oids = {ec().name: oid for oid, ec in known_ecdsa_curves.items()} # type: ignore[abstract] excise_empty_xmlns_declarations = False diff --git a/signxml/signer.py b/signxml/signer.py index 67b4f6f0..ac39002e 100644 --- a/signxml/signer.py +++ b/signxml/signer.py @@ -2,12 +2,12 @@ from dataclasses import dataclass, replace from typing import List, Optional, Union +from cryptography import x509 from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa, utils from cryptography.hazmat.primitives.asymmetric.padding import MGF1, PSS, PKCS1v15 from cryptography.hazmat.primitives.hmac import HMAC -from cryptography.hazmat.primitives.serialization import load_pem_private_key +from cryptography.hazmat.primitives.serialization import Encoding, load_pem_private_key from lxml.etree import Element, SubElement, _Element -from OpenSSL.crypto import FILETYPE_PEM, X509, dump_certificate from .algorithms import ( CanonicalizationMethod, @@ -128,7 +128,7 @@ def sign( *, key: Optional[Union[str, bytes, rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey]] = None, passphrase: Optional[bytes] = None, - cert: Optional[Union[str, List[str], List[X509]]] = None, + cert: Optional[Union[str, List[str], List[x509.Certificate]]] = None, reference_uri: Optional[Union[str, List[str], List[SignatureReference]]] = None, key_name: Optional[str] = None, key_info: Optional[_Element] = None, @@ -151,8 +151,8 @@ def sign( :param passphrase: Passphrase to use to decrypt the key, if any. :param cert: X.509 certificate to use for signing. This should be a string containing a PEM-formatted certificate, or an - array of strings or :class:`OpenSSL.crypto.X509` objects containing the certificate and a chain of - intermediate certificates. + array of strings or :class:`cryptography.x509.Certificate` objects containing the certificate and a chain + of intermediate certificates. :param reference_uri: Custom reference URI or list of reference URIs to incorporate into the signature. When ``method`` is set to ``detached`` or ``enveloped``, reference URIs are set to this value and only the referenced elements are @@ -201,7 +201,7 @@ def sign( if len(cert_chain) == 0: raise InvalidInput("No PEM-encoded certificates found in string cert input data") else: - cert_chain = cert # type: ignore + cert_chain = cert # type:ignore[assignment] input_references = self._preprocess_reference_uri(reference_uri) @@ -244,7 +244,7 @@ def sign( signed_info_node, algorithm=self.c14n_alg, inclusive_ns_prefixes=inclusive_ns_prefixes ) if self.sign_alg.name.startswith("HMAC_"): - signer = HMAC(key=key, algorithm=digest_algorithm_implementations[self.sign_alg]()) # type: ignore + signer = HMAC(key=key, algorithm=digest_algorithm_implementations[self.sign_alg]()) # type:ignore[arg-type] signer.update(signed_info_c14n) signature_value_node.text = b64encode(signer.finalize()).decode() sig_root.append(signature_value_node) @@ -313,7 +313,7 @@ def _add_key_info(self, sig_root, signing_settings: SigningSettings): if isinstance(cert, (str, bytes)): x509_certificate.text = strip_pem_header(cert) else: - x509_certificate.text = strip_pem_header(dump_certificate(FILETYPE_PEM, cert)) + x509_certificate.text = strip_pem_header(cert.public_bytes(Encoding.PEM)) else: sig_root.append(signing_settings.key_info) @@ -378,12 +378,15 @@ def _unpack(self, data, references: List[SignatureReference]): return sig_root, doc_root, c14n_inputs, references def _build_transforms_for_reference(self, *, transforms_node: _Element, reference: SignatureReference): + assert reference.c14n_method is not None if self.construction_method == SignatureConstructionMethod.enveloped: SubElement(transforms_node, ds_tag("Transform"), Algorithm=SignatureConstructionMethod.enveloped.value) - SubElement(transforms_node, ds_tag("Transform"), Algorithm=reference.c14n_method.value) # type: ignore + SubElement(transforms_node, ds_tag("Transform"), Algorithm=reference.c14n_method.value) else: c14n_xform = SubElement( - transforms_node, ds_tag("Transform"), Algorithm=reference.c14n_method.value # type: ignore + transforms_node, + ds_tag("Transform"), + Algorithm=reference.c14n_method.value, ) if reference.inclusive_ns_prefixes: SubElement( diff --git a/signxml/util/__init__.py b/signxml/util/__init__.py index 31933efe..b073b694 100644 --- a/signxml/util/__init__.py +++ b/signxml/util/__init__.py @@ -12,10 +12,12 @@ from dataclasses import dataclass from typing import Any, List, Optional +import certifi +from cryptography import x509 from cryptography.hazmat.primitives import hashes, hmac from lxml.etree import QName -from ..exceptions import InvalidCertificate, RedundantCert, SignXMLException +from ..exceptions import InvalidCertificate PEM_HEADER = "-----BEGIN CERTIFICATE-----" PEM_FOOTER = "-----END CERTIFICATE-----" @@ -150,17 +152,18 @@ def bits_to_bytes_unit(num_of_bits): def strip_pem_header(cert): - try: - return re.search(pem_regexp, ensure_str(cert)).group(1).replace("\r", "") # type: ignore - except Exception: - return ensure_str(cert).replace("\r", "") + search_res = re.search(pem_regexp, ensure_str(cert)) + if search_res: + return search_res.group(1).replace("\r", "") + return ensure_str(cert).replace("\r", "") def add_pem_header(bare_base64_cert): bare_base64_cert = ensure_str(bare_base64_cert) if bare_base64_cert.startswith(PEM_HEADER): - return bare_base64_cert - return PEM_HEADER + "\n" + textwrap.fill(bare_base64_cert, 64) + "\n" + PEM_FOOTER + return bare_base64_cert.encode() + cert_with_header = PEM_HEADER + "\n" + textwrap.fill(bare_base64_cert, 64) + "\n" + PEM_FOOTER + return cert_with_header.encode() def iterate_pem(certs): @@ -206,24 +209,7 @@ def p_sha1(client_b64_bytes, server_b64_bytes): return b64encode(raw_p_sha1(client_bytes, server_bytes, (len(client_bytes), len(server_bytes)))[0]).decode() -def _add_cert_to_store(store, cert): - from OpenSSL.crypto import Error as OpenSSLCryptoError - from OpenSSL.crypto import X509StoreContext, X509StoreContextError - - try: - X509StoreContext(store, cert).verify_certificate() - except X509StoreContextError as e: - raise InvalidCertificate(e) - try: - store.add_cert(cert) - return cert - except OpenSSLCryptoError as e: - if e.args == ([("x509 certificate routines", "X509_STORE_add_cert", "cert already in hash table")],): - raise RedundantCert(e) - raise - - -def verify_x509_cert_chain(cert_chain, ca_pem_file=None, ca_path=None): +class X509CertChainVerifier: """ Look at certs in the cert chain and add them to the store one by one. Return the cert at the end of the chain. That is the cert to be used by the caller for verifying. @@ -231,36 +217,55 @@ def verify_x509_cert_chain(cert_chain, ca_pem_file=None, ca_path=None): "All certificates appearing in an X509Data element must relate to the validation key by either containing it or being part of a certification chain that terminates in a certificate containing the validation key. No ordering is implied by the above constraints" + + Note: SignXML no longer uses OpenSSL for certificate chain verificaiton. The CApath parameter supported by OpenSSL + is not supported by cryptography. The CApath parameter is used to specify a directory containing CA certificates in + PEM format. The files each contain one CA certificate. The files are looked up by the CA subject name hash value. + See https://docs.openssl.org/master/man3/SSL_CTX_load_verify_locations/#notes. If you need CApath support, please + contact SignXML maintainers. """ - # TODO: migrate to Cryptography (pending cert validation support) or https://github.com/wbond/certvalidator - from OpenSSL import SSL - - context = SSL.Context(SSL.TLSv1_METHOD) - if ca_pem_file is None and ca_path is None: - import certifi - - ca_pem_file = certifi.where() - context.load_verify_locations(ensure_bytes(ca_pem_file, none_ok=True), capath=ca_path) - store = context.get_cert_store() - certs = list(reversed(cert_chain)) - end_of_chain = None - last_error: Exception = SignXMLException("Invalid certificate chain") - while len(certs) > 0: - for cert in certs: + + def __init__(self, ca_pem_file=None, ca_path=None, verification_time=None): + if ca_pem_file is None: + ca_pem_file = certifi.where() + self.ca_pem_file = ca_pem_file + if ca_path is not None: + msg = "CApath is not supported. If you need this feature, please contact SignXML maintainers." + raise NotImplementedError(msg) + + self.verification_time = verification_time + + @property + def store(self): + with open(self.ca_pem_file, "rb") as pems: + certs = x509.load_pem_x509_certificates(pems.read()) + return x509.verification.Store(certs) + + @property + def builder(self): + builder = x509.verification.PolicyBuilder() + builder = builder.store(self.store) + if self.verification_time is not None: + builder = builder.time(self.verification_time) + return builder + + @property + def verifier(self): + return self.builder.build_client_verifier() + + def _do_verify(self, cert_chain): + leaf, intermediates = cert_chain[0], cert_chain[1:] + result = self.verifier.verify(leaf=leaf, intermediates=intermediates) + return result.chain[0] + + def verify(self, cert_chain): + try: + return self._do_verify(cert_chain) + except x509.verification.VerificationError: try: - end_of_chain = _add_cert_to_store(store, cert) - certs.remove(cert) - break - except RedundantCert: - certs.remove(cert) - if end_of_chain is None: - end_of_chain = cert - break - except Exception as e: - last_error = e - else: - raise last_error - return end_of_chain + return self._do_verify(list(reversed(cert_chain))) + except x509.verification.VerificationError as e: + raise InvalidCertificate(e) def _remove_sig(signature, idempotent=False): diff --git a/signxml/verifier.py b/signxml/verifier.py index 41a88f93..78f410ef 100644 --- a/signxml/verifier.py +++ b/signxml/verifier.py @@ -2,15 +2,13 @@ from dataclasses import dataclass, replace from typing import Callable, FrozenSet, List, Optional, Union +import cryptography.exceptions +from cryptography import x509 from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa, utils from cryptography.hazmat.primitives.asymmetric.padding import MGF1, PSS, AsymmetricPadding, PKCS1v15 from cryptography.hazmat.primitives.hmac import HMAC from cryptography.hazmat.primitives.serialization import load_der_public_key from lxml import etree -from OpenSSL.crypto import FILETYPE_PEM, X509 -from OpenSSL.crypto import Error as OpenSSLCryptoError -from OpenSSL.crypto import load_certificate -from OpenSSL.crypto import verify as openssl_verify from .algorithms import ( CanonicalizationMethod, @@ -19,9 +17,10 @@ SignatureMethod, digest_algorithm_implementations, ) -from .exceptions import InvalidCertificate, InvalidDigest, InvalidInput, InvalidSignature +from .exceptions import InvalidCertificate, InvalidDigest, InvalidInput, InvalidSignature, SignXMLException from .processor import XMLSignatureProcessor from .util import ( + X509CertChainVerifier, _remove_sig, add_pem_header, bits_to_bytes_unit, @@ -29,7 +28,6 @@ ds_tag, ensure_bytes, namespaces, - verify_x509_cert_chain, ) @@ -117,14 +115,21 @@ def _get_signature(self, root): def _verify_signature_with_pubkey( self, + *, signed_info_c14n: bytes, raw_signature: bytes, - key_value: etree._Element, - der_encoded_key_value: Optional[etree._Element], signature_alg: SignatureMethod, - ) -> None: + key_value: Optional[etree._Element] = None, + der_encoded_key_value: Optional[etree._Element] = None, + signing_certificate: Optional[x509.Certificate] = None, + ) -> bytes: if der_encoded_key_value is not None: - key = load_der_public_key(b64decode(der_encoded_key_value.text)) # type: ignore + assert der_encoded_key_value.text is not None + key = load_der_public_key(b64decode(der_encoded_key_value.text)) + elif signing_certificate is not None: + key = signing_certificate.public_key() + elif key_value is None: + raise InvalidInput("Expected one of key_value, der_encoded_key_value, or signing_certificate to be set") digest_alg_impl = digest_algorithm_implementations[signature_alg]() if signature_alg.name.startswith("ECDSA_"): @@ -136,12 +141,12 @@ def _verify_signature_with_pubkey( x = bytes_to_long(key_data[: len(key_data) // 2]) y = bytes_to_long(key_data[len(key_data) // 2 :]) curve_class = self.known_ecdsa_curves[named_curve.get("URI")] - ecpn = ec.EllipticCurvePublicNumbers(x=x, y=y, curve=curve_class()) # type: ignore + ecpn = ec.EllipticCurvePublicNumbers(x=x, y=y, curve=curve_class()) # type: ignore[abstract] key = ecpn.public_key() elif not isinstance(key, ec.EllipticCurvePublicKey): raise InvalidInput("DER encoded key value does not match specified signature algorithm") - dss_signature = self._encode_dss_signature(raw_signature, key.key_size) - key.verify(dss_signature, data=signed_info_c14n, signature_algorithm=ec.ECDSA(digest_alg_impl)) + signature_for_ecdsa = self._encode_dss_signature(raw_signature, key.key_size) + key.verify(signature_for_ecdsa, data=signed_info_c14n, signature_algorithm=ec.ECDSA(digest_alg_impl)) elif signature_alg.name.startswith("DSA_"): if key_value is not None: dsa_key_value = self._find(key_value, "DSAKeyValue") @@ -150,7 +155,7 @@ def _verify_signature_with_pubkey( g = self._get_long(dsa_key_value, "G", require=False) y = self._get_long(dsa_key_value, "Y") dsapn = dsa.DSAPublicNumbers(y=y, parameter_numbers=dsa.DSAParameterNumbers(p=p, q=q, g=g)) - key = dsapn.public_key() # type: ignore + key = dsapn.public_key() elif not isinstance(key, dsa.DSAPublicKey): raise InvalidInput("DER encoded key value does not match specified signature algorithm") # TODO: supply meaningful key_size_bits for signature length assertion @@ -170,7 +175,8 @@ def _verify_signature_with_pubkey( padding = PSS(mgf=MGF1(algorithm=digest_alg_impl), salt_length=digest_alg_impl.digest_size) key.verify(raw_signature, data=signed_info_c14n, padding=padding, algorithm=digest_alg_impl) else: - raise NotImplementedError() + raise InvalidInput(f"Unsupported signature algorithm {signature_alg}") + return signed_info_c14n def _encode_dss_signature(self, raw_signature: bytes, key_size_bits: int) -> bytes: want_raw_signature_len = bits_to_bytes_unit(key_size_bits) * 2 @@ -226,11 +232,49 @@ def _apply_transforms(self, payload, *, transforms_node: etree._Element, signatu return payload + def get_cert_chain_verifier(self, ca_pem_file, ca_path): + return X509CertChainVerifier(ca_pem_file=ca_pem_file, ca_path=ca_path) + + def _match_key_values(self, key_value, der_encoded_key_value, signing_cert, signature_alg): + if self.config.ignore_ambiguous_key_info is False: + return + cert_pub_key = signing_cert.public_key() + # If both X509Data and KeyValue are present, match one against the other and raise an error on mismatch + if key_value is not None: + match_result = self._check_key_value_matches_cert_public_key(key_value, cert_pub_key, signature_alg) + if match_result is False: + raise InvalidInput( + "Both X509Data and KeyValue found and they represent different public keys. " + "Use verify(ignore_ambiguous_key_info=True) to ignore KeyValue and validate " + "using X509Data only." + ) + + # If both X509Data and DEREncodedKeyValue are present, match one against the other and raise an error on + # mismatch + if der_encoded_key_value is not None: + match_result = self._check_der_key_value_matches_cert_public_key( + der_encoded_key_value, signing_cert.public_key(), signature_alg + ) + if match_result is False: + raise InvalidInput( + "Both X509Data and DEREncodedKeyValue found and they represent different " + "public keys. Use verify(ignore_ambiguous_key_info=True) to ignore " + "DEREncodedKeyValue and validate using X509Data only." + ) + + def check_digest_alg_expected(self, digest_alg): + if digest_alg not in self.config.digest_algorithms: + raise InvalidInput(f"Digest algorithm {digest_alg.name} forbidden by configuration") + + def check_signature_alg_expected(self, signature_alg): + if signature_alg not in self.config.signature_methods: + raise InvalidInput(f"Signature method {signature_alg.name} forbidden by configuration") + def verify( self, data, *, - x509_cert: Optional[Union[str, X509]] = None, + x509_cert: Optional[Union[str, x509.Certificate]] = None, cert_subject_name: Optional[str] = None, cert_resolver: Optional[Callable] = None, ca_pem_file: Optional[Union[str, bytes]] = None, @@ -276,10 +320,10 @@ def verify( :param data: Signature data to verify :type data: String, file-like object, or XML ElementTree Element API compatible object :param x509_cert: - A trusted external X.509 certificate, given as a PEM-formatted string or OpenSSL.crypto.X509 object, to use - for verification. Overrides any X.509 certificate information supplied by the signature. If left set to - ``None``, requires that the signature supply a valid X.509 certificate chain that validates against the - known certificate authorities. Implies **require_x509=True**. + A trusted external X.509 certificate, given as a PEM-formatted string or cryptography.x509.Certificate + object, to use for verification. Overrides any X.509 certificate information supplied by the signature. If + left set to ``None``, requires that the signature supply a valid X.509 certificate chain that validates + against the known certificate authorities. Implies **require_x509=True**. :param cert_subject_name: Subject Common Name to check the signing X.509 certificate against. Implies **require_x509=True**. :param cert_resolver: @@ -346,8 +390,7 @@ def verify( signature_method = self._find(signed_info, "SignatureMethod") signature_value = self._find(signature, "SignatureValue") signature_alg = SignatureMethod(signature_method.get("Algorithm")) - if signature_alg not in self.config.signature_methods: - raise InvalidInput(f"Signature method {signature_alg.name} forbidden by configuration") + self.check_signature_alg_expected(signature_alg) raw_signature = b64decode(signature_value.text) x509_data = signature.find("ds:KeyInfo/ds:X509Data", namespaces=namespaces) key_value = signature.find("ds:KeyInfo/ds:KeyValue", namespaces=namespaces) @@ -373,81 +416,60 @@ def verify( ) if len(cert_chain) == 0: raise InvalidCertificate("No certificate found for given X509 data") - if not all(isinstance(c, X509) for c in cert_chain): - cert_chain = [load_certificate(FILETYPE_PEM, add_pem_header(cert)) for cert in cert_chain] + if not all(isinstance(c, x509.Certificate) for c in cert_chain): + cert_chain = [x509.load_pem_x509_certificate(add_pem_header(cert)) for cert in cert_chain] else: msg = "Expected to find an X509Certificate element in the signature" msg += " (X509SubjectName, X509SKI are not supported)" raise InvalidInput(msg) else: - cert_chain = [load_certificate(FILETYPE_PEM, add_pem_header(cert)) for cert in certs] - signing_cert = verify_x509_cert_chain(cert_chain, ca_pem_file=ca_pem_file, ca_path=ca_path) - elif isinstance(self.x509_cert, X509): + cert_chain = [x509.load_pem_x509_certificate(add_pem_header(cert)) for cert in certs] + + cert_verifier = self.get_cert_chain_verifier(ca_pem_file=ca_pem_file, ca_path=ca_path) + + signing_cert = cert_verifier.verify(cert_chain) + elif isinstance(self.x509_cert, x509.Certificate): signing_cert = self.x509_cert else: - signing_cert = load_certificate(FILETYPE_PEM, add_pem_header(self.x509_cert)) - - if cert_subject_name and signing_cert.get_subject().commonName != cert_subject_name: - raise InvalidSignature("Certificate subject common name mismatch") + signing_cert = x509.load_pem_x509_certificate(add_pem_header(self.x509_cert)) - if signature_alg.name.startswith("ECDSA"): - raw_signature = self._encode_dss_signature(raw_signature, signing_cert.get_pubkey().bits()) + if cert_subject_name is not None: + cn_oid = x509.oid.NameOID.COMMON_NAME + subject_cn_from_signing_cert = signing_cert.subject.get_attributes_for_oid(cn_oid)[0].value + if subject_cn_from_signing_cert != cert_subject_name: + raise InvalidSignature("Certificate subject common name mismatch") try: - digest_alg_name = str(digest_algorithm_implementations[signature_alg].name) - openssl_verify(signing_cert, raw_signature, signed_info_c14n, digest_alg_name) - except OpenSSLCryptoError as e: - try: - lib, func, reason = e.args[0][0] - except Exception: - reason = e - raise InvalidSignature(f"Signature verification failed: {reason}") - - # If both X509Data and KeyValue are present, match one against the other and raise an error on mismatch - if key_value is not None: - if ( - self._check_key_value_matches_cert_public_key(key_value, signing_cert.get_pubkey(), signature_alg) - is False - ): - if self.config.ignore_ambiguous_key_info is False: - raise InvalidInput( - "Both X509Data and KeyValue found and they represent different public keys. " - "Use verify(ignore_ambiguous_key_info=True) to ignore KeyValue and validate " - "using X509Data only." - ) - - # If both X509Data and DEREncodedKeyValue are present, match one against the other and raise an error on - # mismatch - if der_encoded_key_value is not None: - if ( - self._check_der_key_value_matches_cert_public_key( - der_encoded_key_value, signing_cert.get_pubkey(), signature_alg - ) - is False - ): - if self.config.ignore_ambiguous_key_info is False: - raise InvalidInput( - "Both X509Data and DEREncodedKeyValue found and they represent different " - "public keys. Use verify(ignore_ambiguous_key_info=True) to ignore " - "DEREncodedKeyValue and validate using X509Data only." - ) - - # TODO: CN verification goes here - # TODO: require one of the following to be set: either x509_cert or (ca_pem_file or ca_path) or common_name - # Use ssl.match_hostname or code from it to perform match + verified_signed_info_c14n = self._verify_signature_with_pubkey( + signed_info_c14n=signed_info_c14n, + raw_signature=raw_signature, + signing_certificate=signing_cert, + signature_alg=signature_alg, + ) + except cryptography.exceptions.InvalidSignature as e: + raise InvalidSignature(f"Signature verification failed: {e}") + + self._match_key_values( + key_value=key_value, + der_encoded_key_value=der_encoded_key_value, + signing_cert=signing_cert, + signature_alg=signature_alg, + ) elif signature_alg.name.startswith("HMAC_"): if self.hmac_key is None: raise InvalidInput('Parameter "hmac_key" is required when verifying a HMAC signature') signer = HMAC(key=ensure_bytes(self.hmac_key), algorithm=digest_algorithm_implementations[signature_alg]()) signer.update(signed_info_c14n) - if raw_signature != signer.finalize(): + if raw_signature == signer.finalize(): + verified_signed_info_c14n = signed_info_c14n + else: raise InvalidSignature("Signature mismatch (HMAC)") else: if key_value is None and der_encoded_key_value is None: raise InvalidInput("Expected to find either KeyValue or X509Data XML element in KeyInfo") - self._verify_signature_with_pubkey( + verified_signed_info_c14n = self._verify_signature_with_pubkey( signed_info_c14n=signed_info_c14n, raw_signature=raw_signature, key_value=key_value, @@ -455,8 +477,9 @@ def verify( signature_alg=signature_alg, ) + verified_signed_info = self._fromstring(verified_signed_info_c14n) verify_results: List[VerifyResult] = [] - for idx, reference in enumerate(self._findall(signed_info, "Reference")): + for idx, reference in enumerate(self._findall(verified_signed_info, "Reference")): verify_results.append(self._verify_reference(reference, idx, root, uri_resolver, c14n_algorithm, signature)) if type(self.config.expect_references) is int and len(verify_results) != self.config.expect_references: @@ -474,8 +497,7 @@ def _verify_reference(self, reference, index, root, uri_resolver, c14n_algorithm payload = self._resolve_reference(copied_root, reference, uri_resolver=uri_resolver) payload_c14n = self._apply_transforms(payload, transforms_node=transforms, signature=copied_signature_ref) digest_alg = DigestAlgorithm(digest_method_alg_name) - if digest_alg not in self.config.digest_algorithms: - raise InvalidInput(f"Digest algorithm {digest_alg.name} forbidden by configuration") + self.check_digest_alg_expected(digest_alg) if b64decode(digest_value.text) != self._get_digest(payload_c14n, digest_alg): raise InvalidDigest(f"Digest mismatch for reference {index} ({reference.get('URI')})") @@ -495,12 +517,12 @@ def validate_schema(self, signature): return except Exception as e: last_exception = e - raise last_exception # type: ignore + if last_exception is not None: + raise last_exception + raise SignXMLException("Invalid state") def _check_key_value_matches_cert_public_key(self, key_value, public_key, signature_alg: SignatureMethod): - if signature_alg.name.startswith("ECDSA_") and isinstance( - public_key.to_cryptography_key(), ec.EllipticCurvePublicKey - ): + if signature_alg.name.startswith("ECDSA_") and isinstance(public_key, ec.EllipticCurvePublicKey): ec_key_value = self._find(key_value, "dsig11:ECKeyValue") named_curve = self._find(ec_key_value, "dsig11:NamedCurve") pub_key = self._find(ec_key_value, "dsig11:PublicKey") @@ -509,31 +531,31 @@ def _check_key_value_matches_cert_public_key(self, key_value, public_key, signat y = bytes_to_long(key_data[len(key_data) // 2 :]) curve_class = self.known_ecdsa_curves[named_curve.get("URI")] - pubk_curve = public_key.to_cryptography_key().public_numbers().curve - pubk_x = public_key.to_cryptography_key().public_numbers().x - pubk_y = public_key.to_cryptography_key().public_numbers().y + pubk_curve = public_key.public_numbers().curve + pubk_x = public_key.public_numbers().x + pubk_y = public_key.public_numbers().y return curve_class == pubk_curve and x == pubk_x and y == pubk_y - elif signature_alg.name.startswith("DSA_") and isinstance(public_key.to_cryptography_key(), dsa.DSAPublicKey): + elif signature_alg.name.startswith("DSA_") and isinstance(public_key, dsa.DSAPublicKey): dsa_key_value = self._find(key_value, "DSAKeyValue") p = self._get_long(dsa_key_value, "P") q = self._get_long(dsa_key_value, "Q") g = self._get_long(dsa_key_value, "G", require=False) - pubk_p = public_key.to_cryptography_key().public_numbers().p - pubk_q = public_key.to_cryptography_key().public_numbers().q - pubk_g = public_key.to_cryptography_key().public_numbers().g + pubk_p = public_key.public_numbers().parameter_numbers.p + pubk_q = public_key.public_numbers().parameter_numbers.q + pubk_g = public_key.public_numbers().parameter_numbers.g return p == pubk_p and q == pubk_q and g == pubk_g - elif signature_alg.name.startswith("RSA_") and isinstance(public_key.to_cryptography_key(), rsa.RSAPublicKey): + elif signature_alg.name.startswith("RSA_") and isinstance(public_key, rsa.RSAPublicKey): rsa_key_value = self._find(key_value, "RSAKeyValue") n = self._get_long(rsa_key_value, "Modulus") e = self._get_long(rsa_key_value, "Exponent") - pubk_n = public_key.to_cryptography_key().public_numbers().n - pubk_e = public_key.to_cryptography_key().public_numbers().e + pubk_n = public_key.public_numbers().n + pubk_e = public_key.public_numbers().e return n == pubk_n and e == pubk_e @@ -546,43 +568,43 @@ def _check_der_key_value_matches_cert_public_key(self, der_encoded_key_value, pu if ( signature_alg.name.startswith("ECDSA_") and isinstance(der_public_key, ec.EllipticCurvePublicKey) - and isinstance(public_key.to_cryptography_key(), ec.EllipticCurvePublicKey) + and isinstance(public_key, ec.EllipticCurvePublicKey) ): curve_class = der_public_key.public_numbers().curve x = der_public_key.public_numbers().x y = der_public_key.public_numbers().y - pubk_curve = public_key.to_cryptography_key().public_numbers().curve - pubk_x = public_key.to_cryptography_key().public_numbers().x - pubk_y = public_key.to_cryptography_key().public_numbers().y + pubk_curve = public_key.public_numbers().curve + pubk_x = public_key.public_numbers().x + pubk_y = public_key.public_numbers().y return curve_class == pubk_curve and x == pubk_x and y == pubk_y elif ( signature_alg.name.startswith("DSA_") and isinstance(der_public_key, dsa.DSAPublicKey) - and isinstance(public_key.to_cryptography_key(), dsa.DSAPublicKey) + and isinstance(public_key, dsa.DSAPublicKey) ): - p = der_public_key.public_numbers().parameter_numbers().p # type: ignore - q = der_public_key.public_numbers().parameter_numbers().q # type: ignore - g = der_public_key.public_numbers().parameter_numbers().g # type: ignore + p = der_public_key.public_numbers().parameter_numbers.p + q = der_public_key.public_numbers().parameter_numbers.q + g = der_public_key.public_numbers().parameter_numbers.g - pubk_p = public_key.to_cryptography_key().public_numbers().p - pubk_q = public_key.to_cryptography_key().public_numbers().q - pubk_g = public_key.to_cryptography_key().public_numbers().g + pubk_p = public_key.public_numbers().parameter_numbers.p + pubk_q = public_key.public_numbers().parameter_numbers.q + pubk_g = public_key.public_numbers().parameter_numbers.g return p == pubk_p and q == pubk_q and g == pubk_g elif ( signature_alg.name.startswith("RSA_") and isinstance(der_public_key, rsa.RSAPublicKey) - and isinstance(public_key.to_cryptography_key(), rsa.RSAPublicKey) + and isinstance(public_key, rsa.RSAPublicKey) ): n = der_public_key.public_numbers().n e = der_public_key.public_numbers().e - pubk_n = public_key.to_cryptography_key().public_numbers().n - pubk_e = public_key.to_cryptography_key().public_numbers().e + pubk_n = public_key.public_numbers().n + pubk_e = public_key.public_numbers().e return n == pubk_n and e == pubk_e diff --git a/signxml/xades/xades.py b/signxml/xades/xades.py index 1149a61d..1e0f4168 100644 --- a/signxml/xades/xades.py +++ b/signxml/xades/xades.py @@ -27,11 +27,12 @@ from functools import wraps from typing import Dict, List, Optional, Union +from cryptography import x509 +from cryptography.hazmat.primitives.serialization import Encoding from lxml.etree import SubElement, _Element -from OpenSSL.crypto import FILETYPE_ASN1, FILETYPE_PEM, X509, dump_certificate, load_certificate from .. import SignatureConfiguration, VerifyResult, XMLSignatureProcessor, XMLSigner, XMLVerifier -from ..algorithms import DigestAlgorithm +from ..algorithms import DigestAlgorithm, digest_algorithm_implementations from ..exceptions import InvalidDigest, InvalidInput from ..util import SigningSettings, add_pem_header, ds_tag, namespaces, xades_tag @@ -127,7 +128,7 @@ def __init__( self.namespaces.update(xades=namespaces.xades) @wraps(XMLSigner.sign) - def sign(self, data, always_add_key_value: bool = True, **kwargs) -> _Element: # type: ignore + def sign(self, data, always_add_key_value: bool = True, **kwargs) -> _Element: # type: ignore[override] return super().sign(data=data, always_add_key_value=always_add_key_value, **kwargs) def _get_token(self, length=4): @@ -186,7 +187,7 @@ def _add_reference_to_signed_info(self, sig_root, node_to_reference, **attrs): def add_signing_time(self, signed_signature_properties, sig_root, signing_settings: SigningSettings): signing_time = SubElement(signed_signature_properties, xades_tag("SigningTime"), nsmap=self.namespaces) # TODO: make configurable - utc_iso_ts = datetime.datetime.utcnow().isoformat(timespec="seconds") + utc_iso_ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") signing_time.text = f"{utc_iso_ts}+00:00" def add_signing_certificate(self, signed_signature_properties, sig_root, signing_settings: SigningSettings): @@ -194,12 +195,13 @@ def add_signing_certificate(self, signed_signature_properties, sig_root, signing signing_cert_v2 = SubElement( signed_signature_properties, xades_tag("SigningCertificateV2"), nsmap=self.namespaces ) - for cert in signing_settings.cert_chain: # type: ignore - if isinstance(cert, X509): + assert signing_settings.cert_chain is not None + for cert in signing_settings.cert_chain: + if isinstance(cert, x509.Certificate): loaded_cert = cert else: - loaded_cert = load_certificate(FILETYPE_PEM, add_pem_header(cert)) - der_encoded_cert = dump_certificate(FILETYPE_ASN1, loaded_cert) + loaded_cert = x509.load_pem_x509_certificate(add_pem_header(cert)) + der_encoded_cert = loaded_cert.public_bytes(Encoding.DER) cert_digest_bytes = self._get_digest(der_encoded_cert, algorithm=self.digest_alg) cert_node = SubElement(signing_cert_v2, xades_tag("Cert"), nsmap=self.namespaces) cert_digest = SubElement(cert_node, xades_tag("CertDigest"), nsmap=self.namespaces) @@ -278,15 +280,14 @@ def _verify_cert_digest(self, signing_cert_node, expect_cert): digest_alg = DigestAlgorithm(self._find(cert_digest, "DigestMethod").get("Algorithm")) digest_value = self._find(cert_digest, "DigestValue") # check spec for specific method of retrieving cert - der_encoded_cert = dump_certificate(FILETYPE_ASN1, expect_cert) - - if b64decode(digest_value.text) != self._get_digest(der_encoded_cert, algorithm=digest_alg): + digest_alg_impl = digest_algorithm_implementations[digest_alg]() + if b64decode(digest_value.text) != expect_cert.fingerprint(digest_alg_impl): raise InvalidDigest("Digest mismatch for certificate digest") def _verify_cert_digests(self, verify_result: VerifyResult): x509_data = verify_result.signature_xml.find("ds:KeyInfo/ds:X509Data", namespaces=namespaces) - cert_from_key_info = load_certificate( - FILETYPE_PEM, add_pem_header(self._find(x509_data, "X509Certificate").text) + cert_from_key_info = x509.load_pem_x509_certificate( + add_pem_header(self._find(x509_data, "X509Certificate").text) ) signed_signature_props = self._find(verify_result.signed_xml, "xades:SignedSignatureProperties") signing_cert = self._find(signed_signature_props, "xades:SigningCertificate", require=False) @@ -333,7 +334,7 @@ def _verify_signed_properties(self, verify_result): ) return self._find(verify_result.signed_xml, "xades:SignedSignatureProperties") - def verify( # type: ignore + def verify( # type: ignore[override] self, data, *, @@ -367,7 +368,7 @@ def verify( # type: ignore if verify_result.signed_xml is None: continue if verify_result.signed_xml.tag == xades_tag("SignedProperties"): - verify_results[i] = XAdESVerifyResult( # type: ignore + verify_results[i] = XAdESVerifyResult( # type: ignore[misc] *astuple(verify_result), signed_properties=self._verify_signed_properties(verify_result) ) break @@ -375,4 +376,4 @@ def verify( # type: ignore raise InvalidInput("Expected to find a xades:SignedProperties element") # TODO: assert all mandatory signed properties are set - return verify_results # type: ignore + return verify_results # type: ignore[return-value] diff --git a/test/test.py b/test/test.py index 69782d18..63ce129a 100755 --- a/test/test.py +++ b/test/test.py @@ -8,11 +8,13 @@ from base64 import b64decode, b64encode from concurrent.futures import ThreadPoolExecutor from dataclasses import replace +from datetime import datetime from glob import glob from xml.etree import ElementTree as stdlibElementTree import cryptography.exceptions from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa +from cryptography.x509 import load_der_x509_certificate, load_pem_x509_certificate from lxml import etree sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) @@ -64,6 +66,16 @@ def reset_tree(t, method): s.getparent().remove(s) +def get_verifier_for_year(year: int): + class _Verifier(XMLVerifier): + def get_cert_chain_verifier(self, ca_pem_file, ca_path): + verifier = super().get_cert_chain_verifier(ca_pem_file, ca_path) + verifier.verification_time = datetime(year, 1, 1) + return verifier + + return _Verifier() + + class URIResolver(etree.Resolver): def resolve(self, url, id, context): print(f"Resolving URL '{url}'") @@ -221,10 +233,7 @@ def resolver(uri): pass def test_x509_certs(self): - from OpenSSL.crypto import FILETYPE_PEM - from OpenSSL.crypto import Error as OpenSSLCryptoError - from OpenSSL.crypto import load_certificate - + verifier = get_verifier_for_year(2015) tree = etree.parse(self.example_xml_files[0]) ca_pem_file = os.path.join(os.path.dirname(__file__), "example-ca.pem").encode("utf-8") crt, key = self.load_example_keys() @@ -234,36 +243,40 @@ def test_x509_certs(self): signer = XMLSigner(method=method, signature_algorithm=SignatureMethod.RSA_SHA256) signed = signer.sign(data, key=key, cert=crt) signed_data = etree.tostring(signed) - XMLVerifier().verify(signed_data, ca_pem_file=ca_pem_file) - XMLVerifier().verify(signed_data, x509_cert=crt) - XMLVerifier().verify(signed_data, x509_cert=load_certificate(FILETYPE_PEM, crt)) - XMLVerifier().verify(signed_data, x509_cert=crt, cert_subject_name="*.example.com") + verifier.verify(signed_data, x509_cert=crt) + verifier.verify(signed_data, x509_cert=load_pem_x509_certificate(crt)) + verifier.verify(signed_data, x509_cert=crt, cert_subject_name="*.example.com") - with self.assertRaises(OpenSSLCryptoError): - XMLVerifier().verify(signed_data, x509_cert=crt[::-1]) + with self.assertRaises(ValueError): + verifier.verify(signed_data, x509_cert=crt[::-1]) with self.assertRaises(InvalidSignature): - XMLVerifier().verify(signed_data, x509_cert=crt, cert_subject_name="test") + verifier.verify(signed_data, x509_cert=crt, cert_subject_name="test") + + # FIXME: create new test case with reconfigured CA/EKU + with self.assertRaisesRegex(InvalidCertificate, "required EKU not found"): + verifier.verify(signed_data, ca_pem_file=ca_pem_file) + + with self.assertRaisesRegex(InvalidCertificate, "required EKU not found"): + verifier.verify(signed_data) - with self.assertRaisesRegex(InvalidCertificate, "unable to get local issuer certificate"): - XMLVerifier().verify(signed_data) # TODO: negative: verify with wrong cert, wrong CA def test_xmldsig_interop_examples(self): ca_pem_file = os.path.join(os.path.dirname(__file__), "interop", "cacert.pem").encode("utf-8") + + verifier = get_verifier_for_year(2015) for signature_file in glob(os.path.join(os.path.dirname(__file__), "interop", "*.xml")): print("Verifying", signature_file) with open(signature_file, "rb") as fh: - with self.assertRaisesRegex(InvalidCertificate, "certificate has expired"): - XMLVerifier().verify(fh.read(), ca_pem_file=ca_pem_file, expect_config=sha1_ok) + msg = "basicConstraints.cA must not be asserted in an EE certificate" + with self.assertRaisesRegex(InvalidCertificate, msg): + verifier.verify(fh.read(), ca_pem_file=ca_pem_file, expect_config=sha1_ok) def test_xmldsig_interop_TR2012(self): def get_x509_cert(**kwargs): - from cryptography.x509 import load_der_x509_certificate - from OpenSSL.crypto import X509 - with open(os.path.join(interop_dir, "TR2012", "rsa-cert.der"), "rb") as fh: - return [X509.from_cryptography(load_der_x509_certificate(fh.read()))] + return [load_der_x509_certificate(fh.read())] signature_files = glob(os.path.join(interop_dir, "TR2012", "signature*.xml")) for signature_file in signature_files: @@ -336,7 +349,7 @@ def cert_resolver(x509_issuer_name, x509_serial_number, x509_digest): with open(signature_file, "rb") as fh: try: sig = fh.read() - verifier = XMLVerifier() + verifier = get_verifier_for_year(2010 if "phaos" in signature_file else 2014) verifier.excise_empty_xmlns_declarations = True verifier.verify( sig, @@ -377,7 +390,7 @@ def cert_resolver(x509_issuer_name, x509_serial_number, x509_digest): if signature_file.endswith("expired-cert.xml") or signature_file.endswith( "wsfederation_metadata.xml" ): # noqa - with self.assertRaisesRegex(InvalidCertificate, "certificate has expired"): + with self.assertRaisesRegex(InvalidCertificate, "cert is not valid at validation time"): raise elif signature_file.endswith("invalid_enveloped_transform.xml"): self.assertIsInstance(e, InvalidSignature) @@ -412,9 +425,11 @@ def cert_resolver(x509_issuer_name, x509_serial_number, x509_digest): print("Unsupported test case:", type(e), e) elif any(x in signature_file for x in bad_interop_cases) or "Unable to resolve reference" in str(e): print("Bad interop test case:", type(e), e) - elif "certificate has expired" in str(e) and ( - "signature-dsa" in signature_file or "signature-rsa" in signature_file - ): # noqa + elif "Certificate is missing required extension" in str(e): + print("IGNORED:", type(e), e) + elif "certificate must be an X509v3 certificate" in str(e): + print("IGNORED:", type(e), e) + elif "basicConstraints.cA must not be asserted in an EE certificate" in str(e): print("IGNORED:", type(e), e) elif "TR2012" not in signature_file: raise @@ -747,7 +762,7 @@ def test_xades_interop_examples(self): print("Verifying", sig_file) with open(sig_file, "rb") as fh: doc = etree.parse(fh) - cert = doc.find("//{http://www.w3.org/2000/09/xmldsig#}X509Certificate").text + cert = doc.find(".//{http://www.w3.org/2000/09/xmldsig#}X509Certificate").text kwargs = dict( x509_cert=cert, expect_references=self.expect_references.get(os.path.basename(sig_file), 2),