diff --git a/testsuite/objects/__init__.py b/testsuite/objects/__init__.py index 063e5f68..f62647a3 100644 --- a/testsuite/objects/__init__.py +++ b/testsuite/objects/__init__.py @@ -121,6 +121,81 @@ class ValueFrom(ABCValue): selector: str +@dataclass +class ABCResponse(abc.ABC): + """Abstract dataclass specifying a Response section item.""" + + +@dataclass +class ResponseJson(ABCResponse): + """Response item as JSON injection.""" + + properties: dict[str, ABCValue] + + def asdict(self): + """Custom asdict due to nested structure.""" + asdict_properties = {} + for key, value in self.properties.items(): + asdict_properties[key] = asdict(value) + return {"json": {"properties": asdict_properties}} + + +@dataclass +class ResponsePlain(ABCResponse): + """Response item as plain text value.""" + + plain: ABCValue + + +@dataclass +class WristbandSigningKeyRef: + """Name of Kubernetes secret and corresponding signing algorithm.""" + + name: str + algorithm: str = "RS256" + + +@dataclass(kw_only=True) +class ResponseWristband(ABCResponse): + """ + Response item as Festival Wristband Token. + + :param issuer: Endpoint to the Authorino service that issues the wristband + :param signingKeyRefs: List of Kubernetes secrets of dataclass `WristbandSigningKeyRef` + :param customClaims: Custom claims added to the wristband token. + :param tokenDuration: Time span of the wristband token, in seconds. + """ + + issuer: str + signingKeyRefs: list[WristbandSigningKeyRef] + customClaims: Optional[list[dict[str, ABCValue]]] = None + tokenDuration: Optional[int] = None + + def asdict(self): + """Custom asdict due to nested structure.""" + + asdict_key_refs = [asdict(i) for i in self.signingKeyRefs] + asdict_custom_claims = [asdict(i) for i in self.customClaims] if self.customClaims else None + return { + "wristband": { + "issuer": self.issuer, + "signingKeyRefs": asdict_key_refs, + "customClaims": asdict_custom_claims, + "tokenDuration": self.tokenDuration, + } + } + + +@dataclass(kw_only=True) +class DenyResponse: + """Dataclass for custom responses deny reason.""" + + code: Optional[int] = None + message: Optional[ABCValue] = None + headers: Optional[dict[str, ABCValue]] = None + body: Optional[ABCValue] = None + + @dataclass class Cache: """Dataclass for specifying Cache in Authorization""" diff --git a/testsuite/openshift/objects/auth_config/sections.py b/testsuite/openshift/objects/auth_config/sections.py index 855c08ab..f54409c0 100644 --- a/testsuite/openshift/objects/auth_config/sections.py +++ b/testsuite/openshift/objects/auth_config/sections.py @@ -9,6 +9,9 @@ Selector, Credentials, ValueFrom, + ABCResponse, + ResponseJson, + DenyResponse, ) from testsuite.openshift.objects import modify @@ -209,90 +212,42 @@ def success_dynamic_metadata(self): """Nested dict for items wrapped as Envoy Dynamic Metadata.""" return self.section.setdefault("success", {}).setdefault("dynamicMetadata", {}) - def _add( - self, - name: str, - value: dict, - wrapper: Literal["headers", "dynamicMetadata"] = "headers", - **common_features, - ): + def add_simple(self, auth_json: str, name="simple", key="data", **common_features): """ - Add response to AuthConfig. + Add simple response to AuthConfig, used for configuring response for debugging purposes, + which can be easily read back using extract_response + """ + self.add_success_header(name, ResponseJson({key: ValueFrom(auth_json)}), **common_features) - :param wrapper: This variable configures if the response should be wrapped as HTTP headers or - as Envoy Dynamic Metadata. Default is "headers" + def add_success_header(self, name: str, value: ABCResponse, **common_features): + """ + Add item to responses.success.headers section. + This section is for items wrapped as HTTP headers. """ - add_common_features(value, **common_features) - if wrapper == "headers": - self.success_headers.update({name: value}) - if wrapper == "dynamicMetadata": - self.success_dynamic_metadata.update({name: value}) - def add_simple(self, auth_json: str, name="simple", key="data", **common_features): + asdict_value = asdict(value) + add_common_features(asdict_value, **common_features) + self.success_headers.update({name: asdict_value}) + + def add_success_dynamic(self, name: str, value: ABCResponse, **common_features): """ - Add simple response to AuthConfig, used for configuring response for debugging purposes, - which can be easily read back using extract_response + Add item to responses.success.dynamicMetadata section. + This section is for items wrapped as Envoy Dynamic Metadata. """ - self.add_json(name, {key: ValueFrom(auth_json)}, **common_features) - @modify - def add_json(self, name: str, properties: dict[str, ABCValue], **common_features): - """Adds json response to AuthConfig""" - asdict_properties = {} - for key, value in properties.items(): - asdict_properties[key] = asdict(value) - self._add(name, {"json": {"properties": asdict_properties}}, **common_features) + asdict_value = asdict(value) + add_common_features(asdict_value, **common_features) + self.success_dynamic_metadata.update({name: asdict_value}) - @modify - def add_plain(self, name: str, value: ABCValue, **common_features): - """Adds plain response to AuthConfig""" - self._add(name, {"plain": asdict(value)}, **common_features) + def set_unauthenticated(self, deny_response: DenyResponse): + """Set custom deny response for unauthenticated error.""" - @modify - def add_wristband(self, name: str, issuer: str, secret_name: str, algorithm: str = "RS256", **common_features): - """Adds wristband response to AuthConfig""" - self._add( - name, - { - "wristband": { - "issuer": issuer, - "signingKeyRefs": [ - { - "name": secret_name, - "algorithm": algorithm, - } - ], - }, - }, - **common_features, - ) + self.add_item("unauthenticated", asdict(deny_response)) - @modify - def set_deny_with( - self, - category: Literal["unauthenticated", "unauthorized"], - code: int = None, - message: ABCValue = None, - headers: dict[str, ABCValue] = None, - body: ABCValue = None, - ): - """Set default deny code, message, headers, and body for 'unauthenticated' and 'unauthorized' error.""" - asdict_message = asdict(message) if message else None - asdict_body = asdict(body) if body else None - asdict_headers = None - if headers: - asdict_headers = {} - for key, value in headers.items(): - asdict_headers[key] = asdict(value) - self.add_item( - category, - { - "code": code, - "message": asdict_message, - "headers": asdict_headers, - "body": asdict_body, - }, - ) + def set_unauthorized(self, deny_response: DenyResponse): + """Set custom deny response for unauthorized error.""" + + self.add_item("unauthorized", asdict(deny_response)) class AuthorizationSection(Section):