diff --git a/docs/apps/proxy/examples/reference.yml b/docs/apps/proxy/examples/reference.yml index 0b47a8be..a892920a 100644 --- a/docs/apps/proxy/examples/reference.yml +++ b/docs/apps/proxy/examples/reference.yml @@ -1,6 +1,7 @@ proxy: endpoints: - - description: null + - controller: null + description: null name: my-endpoint port: 4000 remote: diff --git a/docs/apps/proxy/schema.json b/docs/apps/proxy/schema.json index 0c8dec9d..99b7c2d4 100644 --- a/docs/apps/proxy/schema.json +++ b/docs/apps/proxy/schema.json @@ -8,6 +8,10 @@ "description": { "anyOf": [{ "type": "string" }, { "type": "null" }], "default": null + }, + "controller": { + "anyOf": [{ "type": "string" }, { "type": "null" }], + "default": null } }, "required": ["name", "port"], @@ -108,6 +112,10 @@ "anyOf": [{ "type": "string" }, { "type": "null" }], "default": null }, + "controller": { + "anyOf": [{ "type": "string" }, { "type": "null" }], + "default": null + }, "remote": { "anyOf": [{ "$ref": "#/$defs/RemoteSettings" }, { "type": "null" }], "default": null diff --git a/harp/config/tests/__snapshots__/test_all_applications_settings.ambr b/harp/config/tests/__snapshots__/test_all_applications_settings.ambr index 9aa9591e..afdb7f1e 100644 --- a/harp/config/tests/__snapshots__/test_all_applications_settings.ambr +++ b/harp/config/tests/__snapshots__/test_all_applications_settings.ambr @@ -80,6 +80,7 @@ ''', 'harp_apps.proxy.settings.endpoint.Endpoint': ''' settings: + controller: null description: null name: api port: 4000 diff --git a/harp/controllers/resolvers.py b/harp/controllers/resolvers.py index 36d682ac..10b26349 100644 --- a/harp/controllers/resolvers.py +++ b/harp/controllers/resolvers.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Type from httpx import AsyncClient from whistle import IAsyncEventDispatcher @@ -48,6 +48,7 @@ def add( *, http_client: AsyncClient, dispatcher: Optional[IAsyncEventDispatcher] = None, + ControllerType: Optional[Type[HttpProxyController]] = None, ): if endpoint.settings.name in self._endpoints: raise RuntimeError(f"Endpoint «{endpoint.settings.name}» already exists.") @@ -55,8 +56,12 @@ def add( if endpoint.settings.port in self._ports: raise RuntimeError(f"Port «{endpoint.settings.port}» already in use.") + ControllerType = ControllerType or HttpProxyController + if not issubclass(ControllerType, HttpProxyController): + raise RuntimeError(f"Controller «{ControllerType}» must be a subclass of HttpProxyController.") + self._endpoints[endpoint.settings.name] = endpoint - controller = HttpProxyController( + controller = ControllerType( endpoint.remote, dispatcher=dispatcher, http_client=http_client, diff --git a/harp_apps/dashboard/frontend/types/harp_apps.proxy.d.ts b/harp_apps/dashboard/frontend/types/harp_apps.proxy.d.ts index 66a4a1ee..75f91ac1 100644 --- a/harp_apps/dashboard/frontend/types/harp_apps.proxy.d.ts +++ b/harp_apps/dashboard/frontend/types/harp_apps.proxy.d.ts @@ -49,6 +49,7 @@ declare namespace Apps.Proxy { name: string; port: number; description?: string | null; + controller?: string | null; remote?: RemoteSettings | null; } /** @@ -142,6 +143,7 @@ declare namespace Apps.Proxy { name: string; port: number; description?: string | null; + controller?: string | null; } export interface Endpoint { settings: EndpointSettings; diff --git a/harp_apps/dashboard/tests/controllers/test_system_proxy.py b/harp_apps/dashboard/tests/controllers/test_system_proxy.py index 9f9997ab..90c92273 100644 --- a/harp_apps/dashboard/tests/controllers/test_system_proxy.py +++ b/harp_apps/dashboard/tests/controllers/test_system_proxy.py @@ -52,7 +52,12 @@ async def test_get_proxy(self, controller: SystemController): "min_pool_size": 1, }, }, - "settings": {"description": None, "name": "api", "port": 4000}, + "settings": { + "name": "api", + "description": None, + "port": 4000, + "controller": None, + }, } ] } diff --git a/harp_apps/proxy/__app__.py b/harp_apps/proxy/__app__.py index 0706ea03..81c91dc5 100644 --- a/harp_apps/proxy/__app__.py +++ b/harp_apps/proxy/__app__.py @@ -11,6 +11,7 @@ from harp.config import Application from harp.config.events import OnBindEvent, OnBoundEvent, OnShutdownEvent +from harp.utils.packages import import_string from harp.utils.services import factory from .settings import Proxy, ProxySettings @@ -41,7 +42,12 @@ async def on_bound(event: OnBoundEvent): http_client: AsyncClient = event.provider.get(AsyncClient) for endpoint in proxy.endpoints: - event.resolver.add(endpoint, dispatcher=event.dispatcher, http_client=http_client) + ControllerType = None + if endpoint.settings.controller is not None: + ControllerType = import_string(endpoint.settings.controller) + event.resolver.add( + endpoint, dispatcher=event.dispatcher, http_client=http_client, ControllerType=ControllerType + ) event.provider.set( PROXY_HEALTHCHECKS_TASK, diff --git a/harp_apps/proxy/controllers.py b/harp_apps/proxy/controllers.py index c4c48274..9af4d3ac 100644 --- a/harp_apps/proxy/controllers.py +++ b/harp_apps/proxy/controllers.py @@ -100,51 +100,17 @@ def __init__( except AttributeError: self.user_agent = "harp" - async def adispatch(self, event_id, event=None): - """ - Shortcut method to dispatch an event using the controller's dispatcher, if there is one. - - :return: :class:`IEvent ` or None - """ - if self._dispatcher: - return await self._dispatcher.adispatch(event_id, event) - - def debug(self, message, *args, **kwargs): - if not self._logging: - return - transaction: Transaction | None = kwargs.pop("transaction", None) - if transaction: - kwargs["transaction"] = transaction.id - kwargs.update(transaction.extras) - logger.debug(message, *args, **kwargs) - - def info(self, message, *args, **kwargs): - if not self._logging: - return - transaction: Transaction | None = kwargs.pop("transaction", None) - if transaction: - kwargs["transaction"] = transaction.id - kwargs.update(transaction.extras) - logger.info(message, *args, **kwargs) - - def warning(self, message, *args, **kwargs): - if not self._logging: - return - transaction: Transaction | None = kwargs.pop("transaction", None) - if transaction: - kwargs["transaction"] = transaction.id - kwargs.update(transaction.extras) - logger.warning(message, *args, **kwargs) + def __repr__(self): + return f"{type(self).__name__}({self.remote!r}, name={self.name!r})" async def __call__(self, request: HttpRequest) -> HttpResponse: """Handle an incoming request and proxy it to the configured URL.""" labels = {"name": self.name or "-", "method": request.method} - with performances_observer("harp_proxy", labels=labels): + with performances_observer("transaction", labels=labels): # create an envelope to override things, without touching the original request context = ProxyFilterEvent(self.name, request=request) - await self.adispatch(EVENT_FILTER_PROXY_REQUEST, context) remote_err = None @@ -180,74 +146,81 @@ async def __call__(self, request: HttpRequest) -> HttpResponse: ) await context.request.aread() - url = urljoin(remote_url, context.request.path) + ( + full_remote_url = urljoin(remote_url, context.request.path) + ( f"?{urlencode(context.request.query)}" if context.request.query else "" ) - with performances_observer("harp_http", labels=labels): - if not context.response: - # PROXY REQUEST - remote_request: httpx.Request = self.http_client.build_request( - context.request.method, - url, - headers=list(context.request.headers.items()), - content=context.request.body, - extensions={"harp": {"endpoint": self.name}}, - ) - context.request.extensions["remote_method"] = remote_request.method - context.request.extensions["remote_url"] = remote_request.url - - self.debug( - f"▶▶ {context.request.method} {url}", - transaction=transaction, - extensions=remote_request.extensions, - ) - - # PROXY RESPONSE - try: - remote_response: httpx.Response = await self.http_client.send(remote_request) - except Exception as exc: - return await self.end_transaction(remote_url, transaction, exc) - - self.remote.notify_url_status(remote_url, remote_response.status_code) - - await remote_response.aread() - await remote_response.aclose() - - if self.remote[remote_url].status == CHECKING and 200 <= remote_response.status_code < 400: - self.remote.set_up(remote_url) - - try: - _elapsed = f"{remote_response.elapsed.total_seconds()}s" - except RuntimeError: - _elapsed = "n/a" - self.debug( - f"◀◀ {remote_response.status_code} {remote_response.reason_phrase} ({_elapsed}{' cached' if remote_response.extensions.get('from_cache') else ''})", - transaction=transaction, - ) - - response_headers = { - k: v - for k, v in remote_response.headers.multi_items() - if k.lower() not in ("server", "date", "content-encoding", "content-length") - } - # XXX for now, we use transaction "extras" to store searchable data for later - transaction.extras["status_class"] = f"{remote_response.status_code // 100}xx" - - if remote_response.extensions.get("from_cache"): - transaction.extras["cached"] = remote_response.extensions.get("cache_metadata", {}).get( - "cache_key", True - ) - - context.response = HttpResponse( - remote_response.content, status=remote_response.status_code, headers=response_headers - ) + # If nothing prepared a ready to send response, it's time to proxy. + if not context.response: + context.response = await self.handle_remote_transaction( + context, remote_url, full_remote_url, labels=labels, transaction=transaction + ) + await self.adispatch(EVENT_FILTER_PROXY_RESPONSE, context) await context.response.aread() return await self.end_transaction(remote_url, transaction, context.response) + async def handle_remote_transaction( + self, context: ProxyFilterEvent, base_remote_url, full_remote_url, *, labels, transaction + ) -> BaseHttpMessage: + with performances_observer("remote_transaction", labels=labels): + # PROXY REQUEST + remote_request: httpx.Request = self.http_client.build_request( + context.request.method, + full_remote_url, + headers=list(context.request.headers.items()), + content=context.request.body, + extensions={"harp": {"endpoint": self.name}}, + ) + context.request.extensions["remote_method"] = remote_request.method + context.request.extensions["remote_url"] = remote_request.url + + self.debug( + f"▶▶ {context.request.method} {full_remote_url}", + transaction=transaction, + extensions=remote_request.extensions, + ) + + # PROXY RESPONSE + try: + remote_response: httpx.Response = await self.http_client.send(remote_request) + except Exception as exc: + return await self.end_transaction(base_remote_url, transaction, exc) + + self.remote.notify_url_status(base_remote_url, remote_response.status_code) + + await remote_response.aread() + await remote_response.aclose() + + if self.remote[base_remote_url].status == CHECKING and 200 <= remote_response.status_code < 400: + self.remote.set_up(base_remote_url) + + try: + _elapsed = f"{remote_response.elapsed.total_seconds()}s" + except RuntimeError: + _elapsed = "n/a" + self.debug( + f"◀◀ {remote_response.status_code} {remote_response.reason_phrase} ({_elapsed}{' cached' if remote_response.extensions.get('from_cache') else ''})", + transaction=transaction, + ) + + response_headers = { + k: v + for k, v in remote_response.headers.multi_items() + if k.lower() not in ("server", "date", "content-encoding", "content-length") + } + # XXX for now, we use transaction "extras" to store searchable data for later + transaction.extras["status_class"] = f"{remote_response.status_code // 100}xx" + + if remote_response.extensions.get("from_cache"): + transaction.extras["cached"] = remote_response.extensions.get("cache_metadata", {}).get( + "cache_key", True + ) + + return HttpResponse(remote_response.content, status=remote_response.status_code, headers=response_headers) + async def end_transaction( self, remote_url: Optional[str], @@ -317,7 +290,17 @@ async def end_transaction( return cast(HttpResponse, response) - async def _create_transaction_from_request(self, request: HttpRequest, *, tags=None): + async def _create_transaction_from_request(self, request: HttpRequest, *, tags=None) -> Transaction: + """ + Create a new transaction from the incoming request, generating a new (random, but orderable according to the + instant it happens) transaction ID. + + Once created, it dispatches the EVENT_TRANSACTION_STARTED event to allow storage applications (or anything + else) to react to this creation, then it dispatches the EVENT_TRANSACTION_MESSAGE event to allow to react to + the fact this transaction contained a request. + + :return: Transaction + """ transaction = Transaction( id=generate_transaction_id_ksuid(), type="http", @@ -327,6 +310,7 @@ async def _create_transaction_from_request(self, request: HttpRequest, *, tags=N ) request.extensions["transaction"] = transaction + # If the request cache control asked for cache to be disabled, mark it in transaction. request_cache_control = request.headers.get("cache-control") if request_cache_control: request_cache_control = parse_cache_control([request_cache_control]) @@ -347,8 +331,32 @@ async def _create_transaction_from_request(self, request: HttpRequest, *, tags=N return transaction - def __repr__(self): - return f"{type(self).__name__}({self.remote!r}, name={self.name!r})" + async def adispatch(self, event_id, event=None): + """ + Shortcut method to dispatch an event using the controller's dispatcher, if there is one. + + :return: :class:`IEvent ` or None + """ + if self._dispatcher: + return await self._dispatcher.adispatch(event_id, event) + + def debug(self, message, *args, **kwargs): + self._log("debug", message, *args, **kwargs) + + def info(self, message, *args, **kwargs): + self._log("info", message, *args, **kwargs) + + def warning(self, message, *args, **kwargs): + self._log("warning", message, *args, **kwargs) + + def _log(self, level, message, *args, **kwargs): + if not self._logging: + return + transaction: Transaction | None = kwargs.pop("transaction", None) + if transaction: + kwargs["transaction"] = transaction.id + kwargs.update(transaction.extras) + getattr(logger, level)(message, *args, **kwargs) @lru_cache diff --git a/harp_apps/proxy/settings/endpoint.py b/harp_apps/proxy/settings/endpoint.py index cfad359f..ac9f50bc 100644 --- a/harp_apps/proxy/settings/endpoint.py +++ b/harp_apps/proxy/settings/endpoint.py @@ -7,10 +7,18 @@ class BaseEndpointSettings(Configurable): + #: endpoint name, used as an unique identifier name: str + + #: port to listen on port: int + + #: description, informative only description: Optional[str] = None + #: custom controller + controller: Optional[str] = None + class EndpointSettings(BaseEndpointSettings): """ @@ -37,7 +45,7 @@ class EndpointSettings(BaseEndpointSettings): """ - # resilience-compatible remote definition, with url pools, probes, etc. + #: remote definition, with url pools, probes, etc. remote: Optional[RemoteSettings] = Field(None, repr=False) @model_validator(mode="before") diff --git a/harp_apps/proxy/tests/__snapshots__/test_settings_endpoint.ambr b/harp_apps/proxy/tests/__snapshots__/test_settings_endpoint.ambr index bc9dd47a..8718423b 100644 --- a/harp_apps/proxy/tests/__snapshots__/test_settings_endpoint.ambr +++ b/harp_apps/proxy/tests/__snapshots__/test_settings_endpoint.ambr @@ -305,6 +305,18 @@ }), 'additionalProperties': False, 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ @@ -651,6 +663,18 @@ }), 'additionalProperties': False, 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ @@ -997,6 +1021,18 @@ }), 'additionalProperties': False, 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ @@ -1343,6 +1379,18 @@ }), 'additionalProperties': False, 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ @@ -1411,6 +1459,18 @@ url: http://my-endpoint:8080 ''', 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ @@ -1887,6 +1947,18 @@ url: http://my-endpoint:8080 ''', 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ @@ -2382,6 +2454,18 @@ url: http://my-endpoint:8080 ''', 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ @@ -2858,6 +2942,18 @@ url: http://my-endpoint:8080 ''', 'properties': dict({ + 'controller': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Controller', + }), 'description': dict({ 'anyOf': list([ dict({ diff --git a/harp_apps/proxy/tests/test_settings_endpoint.py b/harp_apps/proxy/tests/test_settings_endpoint.py index 89e83837..08baa526 100644 --- a/harp_apps/proxy/tests/test_settings_endpoint.py +++ b/harp_apps/proxy/tests/test_settings_endpoint.py @@ -14,7 +14,7 @@ class TestEndpointSettings(BaseConfigurableTest): type = EndpointSettings initial = {**base_settings} expected = {**initial} - expected_verbose = {**expected, "description": None, "remote": None} + expected_verbose = {**expected, "description": None, "remote": None, "controller": None} def test_old_url_syntax(self): obj = self.create(url="http://my-endpoint:8080") @@ -48,6 +48,7 @@ class TestEndpointSettingsWithRemote(BaseConfigurableTest): "probe": None, "liveness": {"type": "inherit"}, }, + "controller": None, } @@ -60,6 +61,7 @@ class TestEndpointStateful(BaseConfigurableTest): "settings": { **TestEndpointSettings.expected, "description": None, + "controller": None, }, } expected_verbose = { @@ -95,7 +97,11 @@ class TestEndpointStatefulWithRemote(BaseConfigurableTest): "min_pool_size": 1, }, }, - "settings": {**TestEndpointSettings.expected, "description": None}, + "settings": { + **TestEndpointSettings.expected, + "description": None, + "controller": None, + }, } expected_verbose = {