Skip to content

Commit

Permalink
feat: implement the ability to use custom controllers, refactoring pr…
Browse files Browse the repository at this point in the history
…oxycontroller
  • Loading branch information
hartym committed Oct 27, 2024
1 parent d6ca744 commit 59ca627
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 107 deletions.
3 changes: 2 additions & 1 deletion docs/apps/proxy/examples/reference.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
proxy:
endpoints:
- description: null
- controller: null
description: null
name: my-endpoint
port: 4000
remote:
Expand Down
8 changes: 8 additions & 0 deletions docs/apps/proxy/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
"description": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"default": null
},
"controller": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"default": null
}
},
"required": ["name", "port"],
Expand Down Expand Up @@ -108,6 +112,10 @@
"anyOf": [{ "type": "string" }, { "type": "null" }],
"default": null
},
"controller": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"default": null
},
"remote": {
"anyOf": [{ "$ref": "#/$defs/RemoteSettings" }, { "type": "null" }],
"default": null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
''',
'harp_apps.proxy.settings.endpoint.Endpoint': '''
settings:
controller: null
description: null
name: api
port: 4000
Expand Down
9 changes: 7 additions & 2 deletions harp/controllers/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Type

from httpx import AsyncClient
from whistle import IAsyncEventDispatcher
Expand Down Expand Up @@ -48,15 +48,20 @@ def add(
*,
http_client: AsyncClient,
dispatcher: Optional[IAsyncEventDispatcher] = None,
ControllerType: Optional[Type[HttpProxyController]] = None,
):
if endpoint.settings.name in self._endpoints:
raise RuntimeError(f"Endpoint «{endpoint.settings.name}» already exists.")

if endpoint.settings.port in self._ports:
raise RuntimeError(f"Port «{endpoint.settings.port}» already in use.")

ControllerType = ControllerType or HttpProxyController
if not issubclass(ControllerType, HttpProxyController):
raise RuntimeError(f"Controller «{ControllerType}» must be a subclass of HttpProxyController.")

self._endpoints[endpoint.settings.name] = endpoint
controller = HttpProxyController(
controller = ControllerType(
endpoint.remote,
dispatcher=dispatcher,
http_client=http_client,
Expand Down
2 changes: 2 additions & 0 deletions harp_apps/dashboard/frontend/types/harp_apps.proxy.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ declare namespace Apps.Proxy {
name: string;
port: number;
description?: string | null;
controller?: string | null;
remote?: RemoteSettings | null;
}
/**
Expand Down Expand Up @@ -142,6 +143,7 @@ declare namespace Apps.Proxy {
name: string;
port: number;
description?: string | null;
controller?: string | null;
}
export interface Endpoint {
settings: EndpointSettings;
Expand Down
7 changes: 6 additions & 1 deletion harp_apps/dashboard/tests/controllers/test_system_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ async def test_get_proxy(self, controller: SystemController):
"min_pool_size": 1,
},
},
"settings": {"description": None, "name": "api", "port": 4000},
"settings": {
"name": "api",
"description": None,
"port": 4000,
"controller": None,
},
}
]
}
Expand Down
8 changes: 7 additions & 1 deletion harp_apps/proxy/__app__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from harp.config import Application
from harp.config.events import OnBindEvent, OnBoundEvent, OnShutdownEvent
from harp.utils.packages import import_string
from harp.utils.services import factory

from .settings import Proxy, ProxySettings
Expand Down Expand Up @@ -41,7 +42,12 @@ async def on_bound(event: OnBoundEvent):
http_client: AsyncClient = event.provider.get(AsyncClient)

for endpoint in proxy.endpoints:
event.resolver.add(endpoint, dispatcher=event.dispatcher, http_client=http_client)
ControllerType = None
if endpoint.settings.controller is not None:
ControllerType = import_string(endpoint.settings.controller)
event.resolver.add(
endpoint, dispatcher=event.dispatcher, http_client=http_client, ControllerType=ControllerType
)

event.provider.set(
PROXY_HEALTHCHECKS_TASK,
Expand Down
206 changes: 107 additions & 99 deletions harp_apps/proxy/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,51 +100,17 @@ def __init__(
except AttributeError:
self.user_agent = "harp"

async def adispatch(self, event_id, event=None):
"""
Shortcut method to dispatch an event using the controller's dispatcher, if there is one.
:return: :class:`IEvent <whistle.IEvent>` or None
"""
if self._dispatcher:
return await self._dispatcher.adispatch(event_id, event)

def debug(self, message, *args, **kwargs):
if not self._logging:
return
transaction: Transaction | None = kwargs.pop("transaction", None)
if transaction:
kwargs["transaction"] = transaction.id
kwargs.update(transaction.extras)
logger.debug(message, *args, **kwargs)

def info(self, message, *args, **kwargs):
if not self._logging:
return
transaction: Transaction | None = kwargs.pop("transaction", None)
if transaction:
kwargs["transaction"] = transaction.id
kwargs.update(transaction.extras)
logger.info(message, *args, **kwargs)

def warning(self, message, *args, **kwargs):
if not self._logging:
return
transaction: Transaction | None = kwargs.pop("transaction", None)
if transaction:
kwargs["transaction"] = transaction.id
kwargs.update(transaction.extras)
logger.warning(message, *args, **kwargs)
def __repr__(self):
return f"{type(self).__name__}({self.remote!r}, name={self.name!r})"

async def __call__(self, request: HttpRequest) -> HttpResponse:
"""Handle an incoming request and proxy it to the configured URL."""

labels = {"name": self.name or "-", "method": request.method}

with performances_observer("harp_proxy", labels=labels):
with performances_observer("transaction", labels=labels):
# create an envelope to override things, without touching the original request
context = ProxyFilterEvent(self.name, request=request)

await self.adispatch(EVENT_FILTER_PROXY_REQUEST, context)

remote_err = None
Expand Down Expand Up @@ -180,74 +146,81 @@ async def __call__(self, request: HttpRequest) -> HttpResponse:
)

await context.request.aread()
url = urljoin(remote_url, context.request.path) + (
full_remote_url = urljoin(remote_url, context.request.path) + (
f"?{urlencode(context.request.query)}" if context.request.query else ""
)

with performances_observer("harp_http", labels=labels):
if not context.response:
# PROXY REQUEST
remote_request: httpx.Request = self.http_client.build_request(
context.request.method,
url,
headers=list(context.request.headers.items()),
content=context.request.body,
extensions={"harp": {"endpoint": self.name}},
)
context.request.extensions["remote_method"] = remote_request.method
context.request.extensions["remote_url"] = remote_request.url

self.debug(
f"▶▶ {context.request.method} {url}",
transaction=transaction,
extensions=remote_request.extensions,
)

# PROXY RESPONSE
try:
remote_response: httpx.Response = await self.http_client.send(remote_request)
except Exception as exc:
return await self.end_transaction(remote_url, transaction, exc)

self.remote.notify_url_status(remote_url, remote_response.status_code)

await remote_response.aread()
await remote_response.aclose()

if self.remote[remote_url].status == CHECKING and 200 <= remote_response.status_code < 400:
self.remote.set_up(remote_url)

try:
_elapsed = f"{remote_response.elapsed.total_seconds()}s"
except RuntimeError:
_elapsed = "n/a"
self.debug(
f"◀◀ {remote_response.status_code} {remote_response.reason_phrase} ({_elapsed}{' cached' if remote_response.extensions.get('from_cache') else ''})",
transaction=transaction,
)

response_headers = {
k: v
for k, v in remote_response.headers.multi_items()
if k.lower() not in ("server", "date", "content-encoding", "content-length")
}
# XXX for now, we use transaction "extras" to store searchable data for later
transaction.extras["status_class"] = f"{remote_response.status_code // 100}xx"

if remote_response.extensions.get("from_cache"):
transaction.extras["cached"] = remote_response.extensions.get("cache_metadata", {}).get(
"cache_key", True
)

context.response = HttpResponse(
remote_response.content, status=remote_response.status_code, headers=response_headers
)
# If nothing prepared a ready to send response, it's time to proxy.
if not context.response:
context.response = await self.handle_remote_transaction(
context, remote_url, full_remote_url, labels=labels, transaction=transaction
)

await self.adispatch(EVENT_FILTER_PROXY_RESPONSE, context)

await context.response.aread()

return await self.end_transaction(remote_url, transaction, context.response)

async def handle_remote_transaction(
self, context: ProxyFilterEvent, base_remote_url, full_remote_url, *, labels, transaction
) -> BaseHttpMessage:
with performances_observer("remote_transaction", labels=labels):
# PROXY REQUEST
remote_request: httpx.Request = self.http_client.build_request(
context.request.method,
full_remote_url,
headers=list(context.request.headers.items()),
content=context.request.body,
extensions={"harp": {"endpoint": self.name}},
)
context.request.extensions["remote_method"] = remote_request.method
context.request.extensions["remote_url"] = remote_request.url

self.debug(
f"▶▶ {context.request.method} {full_remote_url}",
transaction=transaction,
extensions=remote_request.extensions,
)

# PROXY RESPONSE
try:
remote_response: httpx.Response = await self.http_client.send(remote_request)
except Exception as exc:
return await self.end_transaction(base_remote_url, transaction, exc)

self.remote.notify_url_status(base_remote_url, remote_response.status_code)

await remote_response.aread()
await remote_response.aclose()

if self.remote[base_remote_url].status == CHECKING and 200 <= remote_response.status_code < 400:
self.remote.set_up(base_remote_url)

try:
_elapsed = f"{remote_response.elapsed.total_seconds()}s"
except RuntimeError:
_elapsed = "n/a"
self.debug(
f"◀◀ {remote_response.status_code} {remote_response.reason_phrase} ({_elapsed}{' cached' if remote_response.extensions.get('from_cache') else ''})",
transaction=transaction,
)

response_headers = {
k: v
for k, v in remote_response.headers.multi_items()
if k.lower() not in ("server", "date", "content-encoding", "content-length")
}
# XXX for now, we use transaction "extras" to store searchable data for later
transaction.extras["status_class"] = f"{remote_response.status_code // 100}xx"

if remote_response.extensions.get("from_cache"):
transaction.extras["cached"] = remote_response.extensions.get("cache_metadata", {}).get(
"cache_key", True
)

return HttpResponse(remote_response.content, status=remote_response.status_code, headers=response_headers)

async def end_transaction(
self,
remote_url: Optional[str],
Expand Down Expand Up @@ -317,7 +290,17 @@ async def end_transaction(

return cast(HttpResponse, response)

async def _create_transaction_from_request(self, request: HttpRequest, *, tags=None):
async def _create_transaction_from_request(self, request: HttpRequest, *, tags=None) -> Transaction:
"""
Create a new transaction from the incoming request, generating a new (random, but orderable according to the
instant it happens) transaction ID.
Once created, it dispatches the EVENT_TRANSACTION_STARTED event to allow storage applications (or anything
else) to react to this creation, then it dispatches the EVENT_TRANSACTION_MESSAGE event to allow to react to
the fact this transaction contained a request.
:return: Transaction
"""
transaction = Transaction(
id=generate_transaction_id_ksuid(),
type="http",
Expand All @@ -327,6 +310,7 @@ async def _create_transaction_from_request(self, request: HttpRequest, *, tags=N
)
request.extensions["transaction"] = transaction

# If the request cache control asked for cache to be disabled, mark it in transaction.
request_cache_control = request.headers.get("cache-control")
if request_cache_control:
request_cache_control = parse_cache_control([request_cache_control])
Expand All @@ -347,8 +331,32 @@ async def _create_transaction_from_request(self, request: HttpRequest, *, tags=N

return transaction

def __repr__(self):
return f"{type(self).__name__}({self.remote!r}, name={self.name!r})"
async def adispatch(self, event_id, event=None):
"""
Shortcut method to dispatch an event using the controller's dispatcher, if there is one.
:return: :class:`IEvent <whistle.IEvent>` or None
"""
if self._dispatcher:
return await self._dispatcher.adispatch(event_id, event)

def debug(self, message, *args, **kwargs):
self._log("debug", message, *args, **kwargs)

def info(self, message, *args, **kwargs):
self._log("info", message, *args, **kwargs)

def warning(self, message, *args, **kwargs):
self._log("warning", message, *args, **kwargs)

def _log(self, level, message, *args, **kwargs):
if not self._logging:
return
transaction: Transaction | None = kwargs.pop("transaction", None)
if transaction:
kwargs["transaction"] = transaction.id
kwargs.update(transaction.extras)
getattr(logger, level)(message, *args, **kwargs)


@lru_cache
Expand Down
Loading

0 comments on commit 59ca627

Please sign in to comment.