Skip to content

Commit

Permalink
refactor: remove wrapped requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurD1 committed Jul 29, 2024
1 parent 7b0c4ac commit e72a045
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 166 deletions.
3 changes: 1 addition & 2 deletions harp/http/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .errors import HttpError
from .requests import HttpRequest, WrappedHttpRequest
from .requests import HttpRequest
from .responses import AlreadyHandledHttpResponse, HttpResponse, JsonHttpResponse
from .serializers import HttpRequestSerializer, get_serializer_for
from .typing import BaseHttpMessage, BaseMessage, HttpRequestBridge, HttpResponseBridge
Expand All @@ -17,6 +17,5 @@
"HttpResponse",
"HttpResponseBridge",
"JsonHttpResponse",
"WrappedHttpRequest",
"get_serializer_for",
]
95 changes: 46 additions & 49 deletions harp/http/requests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import binascii
from base64 import b64decode
from functools import cached_property
from typing import TYPE_CHECKING, MutableMapping, cast

from multidict import CIMultiDict, CIMultiDictProxy, MultiDictProxy
from httpx import AsyncByteStream
from multidict import CIMultiDict, MultiDictProxy

from ..utils.collections import MultiChainMap
from .streams import AsyncStreamFromAsgiBridge, ByteStream
from .typing import BaseHttpMessage, HttpRequestBridge
from .utils.cookies import parse_cookie
Expand All @@ -18,48 +17,67 @@ def __init__(self, impl: HttpRequestBridge):
super().__init__()
self._impl = impl
self._closed = False
self.stream = AsyncStreamFromAsgiBridge(self._impl)
self._stream: AsyncByteStream = AsyncStreamFromAsgiBridge(self._impl)

@cached_property
# Initialize properties from the implementation bridge
self._method = self._impl.get_method()
self._path = self._impl.get_path()
self._query = MultiDictProxy(self._impl.get_query())
self._server_ipaddr = self._impl.get_server_ipaddr()
self._server_port = self._impl.get_server_port()
self._headers = CIMultiDict(self._impl.get_headers())

@property
def stream(self):
return self._stream

@stream.setter
def stream(self, stream):
self._stream = stream
if hasattr(self, "_body"):
delattr(self, "_body")

@property
def server_ipaddr(self) -> str:
"""Get the server IP address from the implementation bridge."""
return self._impl.get_server_ipaddr()
"""The server IP address."""
return self._server_ipaddr

@cached_property
def server_port(self) -> int:
"""Get the server port from the implementation bridge."""
return self._impl.get_server_port()
"""The server port."""
return self._server_port

@cached_property
@property
def method(self) -> str:
"""Get the HTTP method from the implementation bridge."""
return self._impl.get_method()
"""The HTTP method."""
return self._method

@cached_property
@property
def path(self) -> str:
"""Get the HTTP path from the implementation bridge."""
return self._impl.get_path()
"""The HTTP path."""
return self._path

@cached_property
@property
def query(self) -> MultiDictProxy:
"""Get the query string from the implementation bridge, as a read only proxy."""
return MultiDictProxy(self._impl.get_query())
"""The query string."""
return self._query

@cached_property
def headers(self) -> CIMultiDictProxy:
return CIMultiDictProxy(self._impl.get_headers())
@property
def headers(self) -> CIMultiDict:
return self._headers

# body, content, raw_body
@headers.setter
def headers(self, headers):
self._headers = CIMultiDict(headers)

@cached_property
@property
def cookies(self) -> dict:
"""Parse and returns cookies from headers."""
cookies = {}
for header in self.headers.getall("cookie", []):
cookies.update(parse_cookie(header))
return cookies

@cached_property
@property
def basic_auth(self) -> tuple[str, str] | None:
"""Parse and returns basic auth from headers."""
for header in self.headers.getall("authorization", []):
Expand Down Expand Up @@ -89,31 +107,10 @@ async def aread(self):
"""Read all chunks from request. We may want to be able to read partial body later, but for now it's all or
nothing. This method does nothing if the body has already been read."""
if not hasattr(self, "_body"):
self._body = b"".join([part async for part in self.stream])
if not isinstance(self.stream, ByteStream):
self.stream = ByteStream(self._body)
self._body = b"".join([part async for part in self._stream])
if not isinstance(self._stream, ByteStream):
self._stream = ByteStream(self._body)
return self.body

def __str__(self):
return f"{self.method} {self.path}"


class WrappedHttpRequest(HttpRequest):
def __init__(self, wrapped: HttpRequest, /):
# we do not need to call super, but we don't want linters to complain
if TYPE_CHECKING:
super().__init__(wrapped._impl)

self._wrapped = wrapped
self._headers = CIMultiDict()

@property
def _impl(self):
return self._wrapped._impl

@property
def headers(self) -> MutableMapping:
return MultiChainMap(self._headers, cast(MutableMapping, self._wrapped.headers))

def __getattr__(self, item):
return getattr(self._wrapped, item)
12 changes: 7 additions & 5 deletions harp/http/responses.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import orjson
from httpx import AsyncByteStream
from multidict import CIMultiDict

from harp.utils.bytes import ensure_bytes
Expand All @@ -16,7 +17,7 @@ def __init__(self, body: bytes | str, /, *, status: int = 200, headers: dict = N
self._body = ensure_bytes(body)
self._status = int(status)
self._headers = CIMultiDict(headers or {})
self._stream = ByteStream(self._body)
self._stream: AsyncByteStream = ByteStream(self._body)

if content_type:
self._headers["content-type"] = content_type
Expand All @@ -28,7 +29,8 @@ def stream(self):
@stream.setter
def stream(self, stream):
self._stream = stream
delattr(self, "_body")
if hasattr(self, "_body"):
delattr(self, "_body")

@property
def body(self) -> bytes:
Expand All @@ -50,9 +52,9 @@ def content_type(self) -> str:

async def aread(self):
if not hasattr(self, "_body"):
self._body = b"".join([part async for part in self.stream])
if not isinstance(self.stream, ByteStream):
self.stream = ByteStream(self._body)
self._body = b"".join([part async for part in self._stream])
if not isinstance(self._stream, ByteStream):
self._stream = ByteStream(self._body)
return self.body


Expand Down
23 changes: 2 additions & 21 deletions harp/http/streams.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,8 @@
from typing import AsyncIterator, Iterator

from .typing import HttpRequestBridge


class AsyncByteStream:
async def __aiter__(self) -> AsyncIterator[bytes]:
raise NotImplementedError("The '__aiter__' method must be implemented.") # pragma: no cover
yield b"" # pragma: no cover
from httpx import AsyncByteStream

async def aclose(self) -> None:
pass


class SyncByteStream:
def __iter__(self) -> Iterator[bytes]:
raise NotImplementedError("The '__iter__' method must be implemented.") # pragma: no cover
yield b"" # pragma: no cover

def close(self) -> None:
"""
Subclasses can override this method to release any network resources
after a request/response cycle is complete.
"""
from .typing import HttpRequestBridge


class ByteStream(AsyncByteStream):
Expand Down
83 changes: 0 additions & 83 deletions harp/http/tests/test_requests_wrapped.py

This file was deleted.

20 changes: 14 additions & 6 deletions harp_apps/proxy/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from harp import __parsed_version__, get_logger
from harp.asgi.events import HttpMessageEvent, TransactionEvent
from harp.http import BaseHttpMessage, HttpError, HttpRequest, HttpResponse, WrappedHttpRequest
from harp.http import BaseHttpMessage, HttpError, HttpRequest, HttpResponse
from harp.http.utils import parse_cache_control
from harp.models import Transaction
from harp.settings import USE_PROMETHEUS
Expand Down Expand Up @@ -132,7 +132,7 @@ async def __call__(self, request: HttpRequest):

with performances_observer("proxy", labels=labels):
# create an envelope to override things, without touching the original request
context = ProxyFilterEvent(self.name, request=WrappedHttpRequest(request))
context = ProxyFilterEvent(self.name, request=request)
await self.adispatch(EVENT_FILTER_PROXY_REQUEST, context)

# override a few required headers. That may be done in the httpx request instead of here.
Expand Down Expand Up @@ -279,16 +279,24 @@ async def _create_transaction_from_request(self, request: HttpRequest, *, tags=N

return transaction

def _extract_tags_from_request(self, request: WrappedHttpRequest):
def _extract_tags_from_request(self, request: HttpRequest):
"""
Convert special request headers (x-harp-*) into tags (key-value pairs) that we'll attach to the
transaction. Headers are "consumed", meaning they are removed from the request headers.
"""

tags = {}
headers_to_remove = []

for header in request.headers:
header = header.lower()
if header.startswith("x-harp-"):
tags[header[7:]] = request.headers.pop(header)
lower_header = header.lower()
if lower_header.startswith("x-harp-"):
tags[lower_header[7:]] = request.headers[header]
headers_to_remove.append(header)

for header in headers_to_remove:
request.headers.pop(header)

return tags

def __repr__(self):
Expand Down

0 comments on commit e72a045

Please sign in to comment.