Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken mtls test #293

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions testsuite/httpx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,27 @@ def __init__(self, retry_codes, response=None, error=None):

def should_backoff(self):
"""True, if the Result can be considered an instability and should be retried"""
return self.has_dns_error() or (not self.has_error() and self.status_code in self.retry_codes)
return self.has_dns_error() or (self.error is None and self.status_code in self.retry_codes)

def has_error(self):
"""True, if the request failed and an error was returned"""
return self.error is not None
def has_error(self, error_msg: str) -> bool:
"""True, if the request failed and an error with message was returned"""
return self.error is not None and len(self.error.args) > 0 and any(error_msg in arg for arg in self.error.args)

def has_dns_error(self):
"""True, if the result failed due to DNS failure"""
return (
self.has_error()
and len(self.error.args) > 0
and any("Name or service not known" in arg for arg in self.error.args)
)

def has_tls_error(self):
"""True, if the result failed due to TLS failure"""
return (
self.has_error()
and len(self.error.args) > 0
and any("SSL: CERTIFICATE_VERIFY_FAILED" in arg for arg in self.error.args)
)
return self.has_error("Name or service not known")

def has_cert_verify_error(self):
"""True, if the result failed due to TLS certificate verification failure"""
return self.has_error("SSL: CERTIFICATE_VERIFY_FAILED")

def has_unknown_ca_error(self):
"""True, if the result failed due to TLS unknown certificate authority failure"""
return self.has_error("SSL: TLSV1_ALERT_UNKNOWN_CA")

def has_cert_required_error(self):
"""True, if the result failed due to TLS certificate absense failure"""
return self.has_error("SSL: TLSV13_ALERT_CERTIFICATE_REQUIRED")

def __getattr__(self, item):
"""For backwards compatibility"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""mTLS authentication tests"""
from typing import Callable
import pytest
from httpx import ReadError, ConnectError

from testsuite.httpx import Result


def test_mtls_success(envoy_authority, valid_cert, hostname):
Expand All @@ -11,24 +13,22 @@ def test_mtls_success(envoy_authority, valid_cert, hostname):


@pytest.mark.parametrize(
"cert_authority, certificate, err, err_match",
"cert_authority, certificate, check_error",
[
pytest.param("envoy_authority", "self_signed_cert", ReadError, "unknown ca", id="Self-Signed Certificate"),
pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"),
pytest.param("envoy_authority", None, ReadError, "certificate required", id="Without certificate"),
pytest.param(
"invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority"
),
pytest.param("envoy_authority", "self_signed_cert", Result.has_unknown_ca_error, id="Self-signed cert"),
pytest.param("envoy_authority", "invalid_cert", Result.has_unknown_ca_error, id="Invalid certificate"),
pytest.param("envoy_authority", None, Result.has_cert_required_error, id="Without certificate"),
pytest.param("invalid_authority", "valid_cert", Result.has_cert_verify_error, id="Unknown authority"),
],
)
def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, hostname):
def test_mtls_fail(request, cert_authority, certificate, check_error: Callable, hostname):
"""Test failed mtls verification"""
ca = request.getfixturevalue(cert_authority)
cert = request.getfixturevalue(certificate) if certificate else None

with pytest.raises(err, match=err_match):
with hostname.client(verify=ca, cert=cert) as client:
client.get("/get")
with hostname.client(verify=ca, cert=cert) as client:
result = client.get("/get")
assert check_error(result)


def test_mtls_unmatched_attributes(envoy_authority, custom_cert, hostname):
Expand Down
14 changes: 6 additions & 8 deletions testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""Tests that envoy deployed with TLS security works with Authorino"""
import pytest
from httpx import ReadError


def test_valid_certificate(envoy_authority, valid_cert, auth, hostname):
Expand All @@ -12,13 +10,13 @@ def test_valid_certificate(envoy_authority, valid_cert, auth, hostname):

def test_no_certificate(hostname, envoy_authority):
"""Test that request without certificate will be rejected"""
with pytest.raises(ReadError, match="certificate required"):
with hostname.client(verify=envoy_authority) as client:
client.get("/get")
with hostname.client(verify=envoy_authority) as client:
result = client.get("/get")
result.has_cert_required_error()


def test_invalid_certificate(envoy_authority, invalid_cert, auth, hostname):
"""Tests that certificate with different CA will be rejeceted"""
with pytest.raises(ReadError, match="unknown ca"):
with hostname.client(verify=envoy_authority, cert=invalid_cert) as client:
client.get("/get", auth=auth)
with hostname.client(verify=envoy_authority, cert=invalid_cert) as client:
result = client.get("/get", auth=auth)
result.has_unknown_ca_error()
2 changes: 1 addition & 1 deletion testsuite/tests/mgc/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ def test_smoke(client):

result = client.get("/get")
assert not result.has_dns_error()
assert not result.has_tls_error()
assert not result.has_cert_verify_error()
assert result.status_code == 200