diff --git a/testsuite/httpx/__init__.py b/testsuite/httpx/__init__.py index 8d7e2795..6d8810e1 100644 --- a/testsuite/httpx/__init__.py +++ b/testsuite/httpx/__init__.py @@ -30,27 +30,27 @@ def __init__(self, retry_codes, response=None, error=None): def should_backoff(self): """True, if the Result can be considered an instability and should be retried""" - return self.has_dns_error() or (not self.has_error() and self.status_code in self.retry_codes) + return self.has_dns_error() or (self.error is None and self.status_code in self.retry_codes) - def has_error(self): - """True, if the request failed and an error was returned""" - return self.error is not None + def has_error(self, error_msg: str) -> bool: + """True, if the request failed and an error with message was returned""" + return self.error is not None and len(self.error.args) > 0 and any(error_msg in arg for arg in self.error.args) def has_dns_error(self): """True, if the result failed due to DNS failure""" - return ( - self.has_error() - and len(self.error.args) > 0 - and any("Name or service not known" in arg for arg in self.error.args) - ) - - def has_tls_error(self): - """True, if the result failed due to TLS failure""" - return ( - self.has_error() - and len(self.error.args) > 0 - and any("SSL: CERTIFICATE_VERIFY_FAILED" in arg for arg in self.error.args) - ) + return self.has_error("Name or service not known") + + def has_cert_verify_error(self): + """True, if the result failed due to TLS certificate verification failure""" + return self.has_error("SSL: CERTIFICATE_VERIFY_FAILED") + + def has_unknown_ca_error(self): + """True, if the result failed due to TLS unknown certificate authority failure""" + return self.has_error("SSL: TLSV1_ALERT_UNKNOWN_CA") + + def has_cert_required_error(self): + """True, if the result failed due to TLS certificate absense failure""" + return self.has_error("SSL: TLSV13_ALERT_CERTIFICATE_REQUIRED") def __getattr__(self, item): """For backwards compatibility""" diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py index cafb3505..baf08a42 100644 --- a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py +++ b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py @@ -1,6 +1,8 @@ """mTLS authentication tests""" +from typing import Callable import pytest -from httpx import ReadError, ConnectError + +from testsuite.httpx import Result def test_mtls_success(envoy_authority, valid_cert, hostname): @@ -11,24 +13,22 @@ def test_mtls_success(envoy_authority, valid_cert, hostname): @pytest.mark.parametrize( - "cert_authority, certificate, err, err_match", + "cert_authority, certificate, check_error", [ - pytest.param("envoy_authority", "self_signed_cert", ReadError, "unknown ca", id="Self-Signed Certificate"), - pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"), - pytest.param("envoy_authority", None, ReadError, "certificate required", id="Without certificate"), - pytest.param( - "invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority" - ), + pytest.param("envoy_authority", "self_signed_cert", Result.has_unknown_ca_error, id="Self-signed cert"), + pytest.param("envoy_authority", "invalid_cert", Result.has_unknown_ca_error, id="Invalid certificate"), + pytest.param("envoy_authority", None, Result.has_cert_required_error, id="Without certificate"), + pytest.param("invalid_authority", "valid_cert", Result.has_cert_verify_error, id="Unknown authority"), ], ) -def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, hostname): +def test_mtls_fail(request, cert_authority, certificate, check_error: Callable, hostname): """Test failed mtls verification""" ca = request.getfixturevalue(cert_authority) cert = request.getfixturevalue(certificate) if certificate else None - with pytest.raises(err, match=err_match): - with hostname.client(verify=ca, cert=cert) as client: - client.get("/get") + with hostname.client(verify=ca, cert=cert) as client: + result = client.get("/get") + assert check_error(result) def test_mtls_unmatched_attributes(envoy_authority, custom_cert, hostname): diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py b/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py index 96c7e23a..34a519c7 100644 --- a/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py +++ b/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py @@ -1,6 +1,4 @@ """Tests that envoy deployed with TLS security works with Authorino""" -import pytest -from httpx import ReadError def test_valid_certificate(envoy_authority, valid_cert, auth, hostname): @@ -12,13 +10,13 @@ def test_valid_certificate(envoy_authority, valid_cert, auth, hostname): def test_no_certificate(hostname, envoy_authority): """Test that request without certificate will be rejected""" - with pytest.raises(ReadError, match="certificate required"): - with hostname.client(verify=envoy_authority) as client: - client.get("/get") + with hostname.client(verify=envoy_authority) as client: + result = client.get("/get") + result.has_cert_required_error() def test_invalid_certificate(envoy_authority, invalid_cert, auth, hostname): """Tests that certificate with different CA will be rejeceted""" - with pytest.raises(ReadError, match="unknown ca"): - with hostname.client(verify=envoy_authority, cert=invalid_cert) as client: - client.get("/get", auth=auth) + with hostname.client(verify=envoy_authority, cert=invalid_cert) as client: + result = client.get("/get", auth=auth) + result.has_unknown_ca_error() diff --git a/testsuite/tests/mgc/test_basic.py b/testsuite/tests/mgc/test_basic.py index 926d4e05..a168b43f 100644 --- a/testsuite/tests/mgc/test_basic.py +++ b/testsuite/tests/mgc/test_basic.py @@ -25,5 +25,5 @@ def test_smoke(client): result = client.get("/get") assert not result.has_dns_error() - assert not result.has_tls_error() + assert not result.has_cert_verify_error() assert result.status_code == 200