Skip to content

Commit

Permalink
Responses section refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
azgabur committed Nov 10, 2023
1 parent fba5d9c commit 4d98963
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 74 deletions.
75 changes: 75 additions & 0 deletions testsuite/objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,81 @@ class ValueFrom(ABCValue):
selector: str


@dataclass
class ABCResponse(abc.ABC):
"""Abstract dataclass specifying a Response section item."""


@dataclass
class ResponseJson(ABCResponse):
"""Response item as JSON injection."""

properties: dict[str, ABCValue]

def asdict(self):
"""Custom asdict due to nested structure."""
asdict_properties = {}
for key, value in self.properties.items():
asdict_properties[key] = asdict(value)
return {"json": {"properties": asdict_properties}}


@dataclass
class ResponsePlain(ABCResponse):
"""Response item as plain text value."""

plain: ABCValue


@dataclass
class WristbandSigningKeyRef:
"""Name of Kubernetes secret and corresponding signing algorithm."""

name: str
algorithm: str = "RS256"


@dataclass(kw_only=True)
class ResponseWristband(ABCResponse):
"""
Response item as Festival Wristband Token.
:param issuer: Endpoint to the Authorino service that issues the wristband
:param signingKeyRefs: List of Kubernetes secrets of dataclass `WristbandSigningKeyRef`
:param customClaims: Custom claims added to the wristband token.
:param tokenDuration: Time span of the wristband token, in seconds.
"""

issuer: str
signingKeyRefs: list[WristbandSigningKeyRef]
customClaims: Optional[list[dict[str, ABCValue]]] = None
tokenDuration: Optional[int] = None

def asdict(self):
"""Custom asdict due to nested structure."""

asdict_key_refs = [asdict(i) for i in self.signingKeyRefs]
asdict_custom_claims = [asdict(i) for i in self.customClaims] if self.customClaims else None
return {
"wristband": {
"issuer": self.issuer,
"signingKeyRefs": asdict_key_refs,
"customClaims": asdict_custom_claims,
"tokenDuration": self.tokenDuration,
}
}


@dataclass(kw_only=True)
class DenyResponse:
"""Dataclass for custom responses deny reason."""

code: Optional[int] = None
message: Optional[ABCValue] = None
headers: Optional[dict[str, ABCValue]] = None
body: Optional[ABCValue] = None


@dataclass
class Cache:
"""Dataclass for specifying Cache in Authorization"""
Expand Down
103 changes: 29 additions & 74 deletions testsuite/openshift/objects/auth_config/sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
Selector,
Credentials,
ValueFrom,
ABCResponse,
ResponseJson,
DenyResponse,
)
from testsuite.openshift.objects import modify

Expand Down Expand Up @@ -209,90 +212,42 @@ def success_dynamic_metadata(self):
"""Nested dict for items wrapped as Envoy Dynamic Metadata."""
return self.section.setdefault("success", {}).setdefault("dynamicMetadata", {})

def _add(
self,
name: str,
value: dict,
wrapper: Literal["headers", "dynamicMetadata"] = "headers",
**common_features,
):
def add_simple(self, auth_json: str, name="simple", key="data", **common_features):
"""
Add response to AuthConfig.
Add simple response to AuthConfig, used for configuring response for debugging purposes,
which can be easily read back using extract_response
"""
self.add_success_header(name, ResponseJson({key: ValueFrom(auth_json)}), **common_features)

:param wrapper: This variable configures if the response should be wrapped as HTTP headers or
as Envoy Dynamic Metadata. Default is "headers"
def add_success_header(self, name: str, value: ABCResponse, **common_features):
"""
Add item to responses.success.headers section.
This section is for items wrapped as HTTP headers.
"""
add_common_features(value, **common_features)
if wrapper == "headers":
self.success_headers.update({name: value})
if wrapper == "dynamicMetadata":
self.success_dynamic_metadata.update({name: value})

def add_simple(self, auth_json: str, name="simple", key="data", **common_features):
asdict_value = asdict(value)
add_common_features(asdict_value, **common_features)
self.success_headers.update({name: asdict_value})

def add_success_dynamic(self, name: str, value: ABCResponse, **common_features):
"""
Add simple response to AuthConfig, used for configuring response for debugging purposes,
which can be easily read back using extract_response
Add item to responses.success.dynamicMetadata section.
This section is for items wrapped as Envoy Dynamic Metadata.
"""
self.add_json(name, {key: ValueFrom(auth_json)}, **common_features)

@modify
def add_json(self, name: str, properties: dict[str, ABCValue], **common_features):
"""Adds json response to AuthConfig"""
asdict_properties = {}
for key, value in properties.items():
asdict_properties[key] = asdict(value)
self._add(name, {"json": {"properties": asdict_properties}}, **common_features)
asdict_value = asdict(value)
add_common_features(asdict_value, **common_features)
self.success_dynamic_metadata.update({name: asdict_value})

@modify
def add_plain(self, name: str, value: ABCValue, **common_features):
"""Adds plain response to AuthConfig"""
self._add(name, {"plain": asdict(value)}, **common_features)
def set_unauthenticated(self, deny_response: DenyResponse):
"""Set custom deny response for unauthenticated error."""

@modify
def add_wristband(self, name: str, issuer: str, secret_name: str, algorithm: str = "RS256", **common_features):
"""Adds wristband response to AuthConfig"""
self._add(
name,
{
"wristband": {
"issuer": issuer,
"signingKeyRefs": [
{
"name": secret_name,
"algorithm": algorithm,
}
],
},
},
**common_features,
)
self.add_item("unauthenticated", asdict(deny_response))

@modify
def set_deny_with(
self,
category: Literal["unauthenticated", "unauthorized"],
code: int = None,
message: ABCValue = None,
headers: dict[str, ABCValue] = None,
body: ABCValue = None,
):
"""Set default deny code, message, headers, and body for 'unauthenticated' and 'unauthorized' error."""
asdict_message = asdict(message) if message else None
asdict_body = asdict(body) if body else None
asdict_headers = None
if headers:
asdict_headers = {}
for key, value in headers.items():
asdict_headers[key] = asdict(value)
self.add_item(
category,
{
"code": code,
"message": asdict_message,
"headers": asdict_headers,
"body": asdict_body,
},
)
def set_unauthorized(self, deny_response: DenyResponse):
"""Set custom deny response for unauthorized error."""

self.add_item("unauthorized", asdict(deny_response))


class AuthorizationSection(Section):
Expand Down

0 comments on commit 4d98963

Please sign in to comment.