Skip to content

Commit

Permalink
Merge pull request #350 from averevki/test-k8s-tokenreview
Browse files Browse the repository at this point in the history
Add kubernetes token-review identity tests
  • Loading branch information
pehala authored Feb 21, 2024
2 parents c498080 + 74ae472 commit c17bb62
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 4 deletions.
3 changes: 1 addition & 2 deletions testsuite/httpx/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from httpx import Auth, Request, URL, Response

from testsuite.oidc.rhsso import User
from testsuite.openshift.api_key import APIKey
from testsuite.oidc import Token

TokenType = Union[Token, Callable[[], Token]]
Expand Down Expand Up @@ -57,7 +56,7 @@ def auth_flow(self, request: Request) -> Generator[Request, Response, None]:
class HeaderApiKeyAuth(Auth):
"""Auth class for authentication with API key"""

def __init__(self, api_key: APIKey, prefix: str = "APIKEY") -> None:
def __init__(self, api_key, prefix: str = "APIKEY") -> None:
super().__init__()
self.api_key = str(api_key)
self.prefix = prefix
Expand Down
31 changes: 31 additions & 0 deletions testsuite/openshift/service_account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Service Account object for OpenShift"""

from testsuite.openshift import OpenShiftObject
from testsuite.openshift.client import OpenShiftClient


class ServiceAccount(OpenShiftObject):
"""Service account object for OpenShift"""

def __init__(self, openshift: OpenShiftClient, model: dict):
self.openshift = openshift
super().__init__(model, context=openshift.context)

@classmethod
def create_instance(cls, openshift: OpenShiftClient, name: str, labels: dict[str, str] = None):
"""Creates new instance of service account"""
model = {
"kind": "ServiceAccount",
"apiVersion": "v1",
"metadata": {
"name": name,
"labels": labels,
},
}

return cls(openshift, model)

def get_auth_token(self, audiences: list[str] = None) -> str:
"""Requests and returns bound token for service account"""
audiences_args = [f"--audience={a}" for a in audiences or []]
return self.openshift.do_action("create", "token", self.name(), *audiences_args).out().strip()
4 changes: 2 additions & 2 deletions testsuite/policy/authorization/sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ def add_mtls(self, name: str, selector: Selector, **common_features):
self.add_item(name, {"x509": {"selector": asdict(selector)}, **common_features})

@modify
def add_kubernetes(self, name: str, audiences: list[str], **common_features):
def add_kubernetes(self, name: str, audiences: list[str] = None, **common_features):
"""Adds Kubernetes identity
Args:
:param name: name of the identity
:param audiences: token audiences
"""
self.add_item(name, {"kubernetesTokenReview": {"audiences": audiences}}, **common_features)
self.add_item(name, {"kubernetesTokenReview": {"audiences": audiences} if audiences else {}}, **common_features)

@modify
def add_oidc(self, name, endpoint, *, ttl: int = 0, credentials: Credentials = None, **common_features):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Conftest for kubernetes token-review tests"""

import pytest

from testsuite.httpx.auth import HeaderApiKeyAuth
from testsuite.openshift.service_account import ServiceAccount


@pytest.fixture(scope="module")
def create_service_account(request, openshift, blame, module_label):
"""Creates and returns service account"""

def _create_service_account(name):
service_account = ServiceAccount.create_instance(openshift, blame(name), labels={"app": module_label})
request.addfinalizer(service_account.delete)
service_account.commit()
return service_account

return _create_service_account


@pytest.fixture(scope="module")
def service_account_token(create_service_account, audience):
"""Create service account and request its bound token with the hostname as audience"""
service_account = create_service_account("tkn-rev")
return service_account.get_auth_token(audience)


@pytest.fixture(scope="module")
def auth(service_account_token):
"""Create request auth with service account token as API key"""
return HeaderApiKeyAuth(service_account_token, "Bearer")
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Test kubernetes token-review authorization with bound sa token that should contain all specified audiences"""

import pytest

pytestmark = [pytest.mark.authorino]


TEST_AUDIENCES = ["test-aud1", "test-aud2", "test-aud3"]


@pytest.fixture(scope="module")
def authorization(authorization):
"""Add kubernetes token-review identity with custom audiences specified"""
authorization.identity.add_kubernetes("token-review-aud", TEST_AUDIENCES)
return authorization


@pytest.fixture(scope="module")
def audience():
"""Return custom audiences for the service account bound token"""
return TEST_AUDIENCES


def test_custom_audience(client, auth):
"""Test kubernetes token-review by adding custom audiences to the sa token and using it for the request"""
response = client.get("/get")
assert response.status_code == 401

response = client.get("/get", auth=auth)
assert response.status_code == 200
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Test kubernetes token-review authorization with bound sa token that should contain host as audience by default"""

import pytest

pytestmark = [pytest.mark.authorino]


@pytest.fixture(scope="module")
def authorization(authorization):
"""Add kubernetes token-review identity without any audiences"""
authorization.identity.add_kubernetes("token-review-host")
return authorization


@pytest.fixture(scope="module")
def audience(hostname):
"""Return hostname as only audience for the service account bound token"""
return [hostname.hostname]


def test_host_audience(client, auth):
"""Test kubernetes token-review by adding hostname audience to the sa token and using it for the request"""
response = client.get("/get")
assert response.status_code == 401

response = client.get("/get", auth=auth)
assert response.status_code == 200

0 comments on commit c17bb62

Please sign in to comment.