diff --git a/signxml/__init__.py b/signxml/__init__.py index 6661816..6d343e7 100644 --- a/signxml/__init__.py +++ b/signxml/__init__.py @@ -566,7 +566,7 @@ def _verify_signature_with_pubkey(self, signed_info_c14n, raw_signature, key_val if der_encoded_key_value is not None: key = load_der_public_key(b64decode(der_encoded_key_value.text), backend=default_backend()) if "ecdsa-" in signature_alg: - if key_value: + if key_value is not None: ec_key_value = self._find(key_value, "ECKeyValue", namespace="dsig11") named_curve = self._find(ec_key_value, "NamedCurve", namespace="dsig11") public_key = self._find(ec_key_value, "PublicKey", namespace="dsig11") @@ -586,7 +586,7 @@ def _verify_signature_with_pubkey(self, signed_info_c14n, raw_signature, key_val ), ) elif "dsa-" in signature_alg: - if key_value: + if key_value is not None: dsa_key_value = self._find(key_value, "DSAKeyValue") p = self._get_long(dsa_key_value, "P") q = self._get_long(dsa_key_value, "Q") @@ -602,7 +602,7 @@ def _verify_signature_with_pubkey(self, signed_info_c14n, raw_signature, key_val data=signed_info_c14n, algorithm=self._get_signature_digest_method(signature_alg)) elif "rsa-" in signature_alg: - if key_value: + if key_value is not None: rsa_key_value = self._find(key_value, "RSAKeyValue") modulus = self._get_long(rsa_key_value, "Modulus") exponent = self._get_long(rsa_key_value, "Exponent") @@ -660,8 +660,8 @@ def _apply_transforms(self, payload, transforms_node, signature, c14n_algorithm) return payload - def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None, ca_pem_file=None, ca_path=None, - hmac_key=None, validate_schema=True, parser=None, uri_resolver=None, cert_resolver=None, + def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None, cert_resolver=None, + ca_pem_file=None, ca_path=None, hmac_key=None, validate_schema=True, parser=None, uri_resolver=None, id_attribute=None, expect_references=1, ignore_ambiguous_key_info=False): """ Verify the XML signature supplied in the data and return the XML node signed by the signature, or raise an @@ -704,6 +704,16 @@ def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None ``None``, requires that the signature supply a valid X.509 certificate chain that validates against the known certificate authorities. Implies **require_x509=True**. :type x509_cert: string or OpenSSL.crypto.X509 + :param cert_subject_name: + Subject Common Name to check the signing X.509 certificate against. Implies **require_x509=True**. + :type cert_subject_name: string + :param cert_resolver: + Function to use to resolve trusted X.509 certificates when X509IssuerSerial and X509Digest references are + found in the signature. The function is called with the keyword arguments ``x509_issuer_name``, + ``x509_serial_number`` and ``x509_digest``, and is expected to return an iterable of one or more + strings containing a PEM-formatted certificate and a chain of intermediate certificates, if needed. + Implies **require_x509=True**. + :type cert_resolver: callable :param ca_pem_file: Filename of a PEM file containing certificate authority information to use when verifying certificate-based signatures. @@ -713,9 +723,6 @@ def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None certificate-based signatures. If neither **ca_pem_file** nor **ca_path** is given, the Mozilla CA bundle provided by :py:mod:`certifi` will be loaded. :type ca_path: string - :param cert_subject_name: - Subject Common Name to check the signing X.509 certificate against. Implies **require_x509=True**. - :type cert_subject_name: string :param hmac_key: If using HMAC, a string containing the shared secret. :type hmac_key: string :param validate_schema: Whether to validate **data** against the XML Signature schema. @@ -728,12 +735,6 @@ def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None Function to use to resolve reference URIs that don't start with "#". The function is called with a single string argument containing the URI to be resolved, and is expected to return a lxml.etree node or string. :type uri_resolver: callable - :param cert_resolver: - Function to use to resolve X.509 certificates when X509IssuerSerial and X509Digest references are found in - the signature. The function is called with the keyword arguments ``x509_issuer_name``, - ``x509_serial_number`` and ``x509_digest``, and is expected to return an iterable of one or more - strings containing PEM-formatted certificates. - :type cert_resolver: callable :param id_attribute: Name of the attribute whose value ``URI`` refers to. By default, SignXML will search for "Id", then "ID". :type id_attribute: string @@ -761,7 +762,7 @@ def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None self.x509_cert = x509_cert self._parser = parser - if x509_cert: + if x509_cert or cert_resolver: self.require_x509 = True if id_attribute is not None: @@ -803,15 +804,20 @@ def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None x509_iss = x509_data.find("ds:X509IssuerSerial/ds:X509IssuerName", namespaces=namespaces) x509_sn = x509_data.find("ds:X509IssuerSerial/ds:X509SerialNumber", namespaces=namespaces) x509_digest = x509_data.find("dsig11:X509Digest", namespaces=namespaces) - if cert_resolver is not None and (x509_iss or x509_sn or x509_digest): - certs = cert_resolver(x509_issuer_name=x509_iss.text if x509_iss is not None else None, - x509_serial_number=x509_sn.text if x509_sn is not None else None, - x509_digest=x509_digest.text if x509_digest is not None else None) + if cert_resolver and any(i is not None for i in (x509_iss, x509_sn, x509_digest)): + cert_chain = cert_resolver(x509_issuer_name=x509_iss.text if x509_iss is not None else None, + x509_serial_number=x509_sn.text if x509_sn is not None else None, + x509_digest=x509_digest.text if x509_digest is not None else None) + if len(cert_chain) == 0: + raise InvalidCertificate("No certificate found for given X509 data") + if not all(isinstance(c, X509) for c in cert_chain): + cert_chain = [load_certificate(FILETYPE_PEM, add_pem_header(cert)) for cert in cert_chain] else: msg = "Expected to find an X509Certificate element in the signature" msg += " (X509SubjectName, X509SKI are not supported)" raise InvalidInput(msg) - cert_chain = [load_certificate(FILETYPE_PEM, add_pem_header(cert)) for cert in certs] + else: + cert_chain = [load_certificate(FILETYPE_PEM, add_pem_header(cert)) for cert in certs] signing_cert = verify_x509_cert_chain(cert_chain, ca_pem_file=ca_pem_file, ca_path=ca_path) elif isinstance(self.x509_cert, X509): signing_cert = self.x509_cert diff --git a/test/interop/TR2012/rsa-cert.der b/test/interop/TR2012/rsa-cert.der new file mode 100644 index 0000000..a6c6e5f Binary files /dev/null and b/test/interop/TR2012/rsa-cert.der differ diff --git a/test/test.py b/test/test.py index 3e29558..4f8461d 100755 --- a/test/test.py +++ b/test/test.py @@ -14,7 +14,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec from eight import str, open -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # noqa +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from signxml import (XMLSigner, XMLVerifier, XMLSignatureProcessor, methods, namespaces, InvalidInput, InvalidSignature, InvalidCertificate, InvalidDigest) @@ -186,9 +186,11 @@ def test_xmldsig_interop_examples(self): XMLVerifier().verify(fh.read(), ca_pem_file=ca_pem_file) def test_xmldsig_interop_TR2012(self): - def get_x509_cert(signature_file): - with open(os.path.join(os.path.dirname(__file__), "keys", "p256-cert.der"), "rb") as fh: - return fh.read() + def get_x509_cert(**kwargs): + from cryptography.x509 import load_der_x509_certificate + from OpenSSL.crypto import X509 + with open(os.path.join(interop_dir, "TR2012", "rsa-cert.der"), "rb") as fh: + return [X509.from_cryptography(load_der_x509_certificate(fh.read(), backend=default_backend()))] signature_files = glob(os.path.join(interop_dir, "TR2012", "signature*.xml")) for signature_file in signature_files: @@ -196,11 +198,14 @@ def get_x509_cert(signature_file): with open(signature_file, "rb") as fh: try: sig = fh.read() - XMLVerifier().verify(sig, require_x509=False, hmac_key="testkey", validate_schema=True) + XMLVerifier().verify(sig, require_x509=False, hmac_key="testkey", validate_schema=True, + cert_resolver=get_x509_cert if "x509digest" in signature_file else None) decoded_sig = sig.decode("utf-8") except Exception as e: if "keyinforeference" in signature_file: print("Unsupported test case:", type(e), e) + elif "x509digest" in signature_file: + assert isinstance(e, InvalidCertificate) else: raise @@ -239,6 +244,10 @@ def get_ca_pem_file(signature_file): return None return ca_pem_file.encode("utf-8") + def cert_resolver(x509_issuer_name, x509_serial_number, x509_digest): + with open(os.path.join(interop_dir, "phaos-xmldsig-three", "certs", "rsa-cert.pem")) as fh: + return [fh.read()] + signature_files = glob(os.path.join(interop_dir, "*", "signature*.xml")) signature_files += glob(os.path.join(interop_dir, "aleksey*", "*.xml")) signature_files += glob(os.path.join(interop_dir, "xml-crypto", "*.xml")) @@ -254,6 +263,7 @@ def get_ca_pem_file(signature_file): validate_schema=True, uri_resolver=resolver, x509_cert=get_x509_cert(signature_file), + cert_resolver=cert_resolver if "issuer-serial" in signature_file else None, ca_pem_file=get_ca_pem_file(signature_file)) decoded_sig = sig.decode("utf-8") if "HMACOutputLength" in decoded_sig or "bad" in signature_file or "expired" in signature_file: