Skip to content

Commit

Permalink
Code formatting using black
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvo committed Oct 8, 2022
1 parent 262e4e7 commit 65e3a67
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 115 deletions.
2 changes: 1 addition & 1 deletion pyodide_http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
except ImportError:
_SHOULD_PATCH = False

__version__ = '0.1.0'
__version__ = "0.1.0"


def patch_requests(continue_on_import_error: bool = False):
Expand Down
45 changes: 25 additions & 20 deletions pyodide_http/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,32 @@
from dataclasses import dataclass, field
from typing import Optional, Dict
from email.parser import Parser

# need to import streaming here so that the web-worker is setup
from ._streaming import send_streaming_request


class _RequestError(Exception):
def __init__(self,message=None,*,request=None,response=None):
self.request=request
self.response=response
self.message=message
def __init__(self, message=None, *, request=None, response=None):
self.request = request
self.response = response
self.message = message
super().__init__(self.message)


class _StreamingError(_RequestError):
pass


class _StreamingTimeout(_StreamingError):
pass



@dataclass
class Request:
method: str
url: str
params: Optional[Dict[str,str]]=None
params: Optional[Dict[str, str]] = None
body: Optional[bytes] = None
headers: Dict[str, str] = field(default_factory=dict)
timeout: int = 0
Expand All @@ -38,7 +40,7 @@ def set_body(self, body: bytes):

def set_json(self, body: dict):
self.set_header("Content-Type", "application/json; charset=utf-8")
self.set_body(json.dumps(body).encode('utf-8'))
self.set_body(json.dumps(body).encode("utf-8"))


@dataclass
Expand All @@ -48,27 +50,34 @@ class Response:
body: bytes


_SHOWN_WARNING=False
_SHOWN_WARNING = False


def show_streaming_warning():
global _SHOWN_WARNING
if not _SHOWN_WARNING:
_SHOWN_WARNING=True
_SHOWN_WARNING = True
from js import console
console.warn("requests can't stream data in the main thread, using non-streaming fallback")

console.warn(
"requests can't stream data in the main thread, using non-streaming fallback"
)


def send(request: Request, stream: bool = False) -> Response:
if request.params:
from js import URLSearchParams

params = URLSearchParams.new()
for k, v in request.params.items():
params.append(k, v)
request.url += "?"+params.toString()
request.url += "?" + params.toString()

from js import XMLHttpRequest

try:
from js import importScripts

_IN_WORKER = True
except ImportError:
_IN_WORKER = False
Expand All @@ -88,13 +97,13 @@ def send(request: Request, stream: bool = False) -> Response:
# set timeout only if pyodide is in a worker, because
# there is a warning not to set timeout on synchronous main thread
# XMLHttpRequest https://developer.mozilla.org/en-US/docs/Web/API/XMLHttpRequest/timeout
if _IN_WORKER and request.timeout!=0:
xhr.timeout=int(request.timeout*1000)
if _IN_WORKER and request.timeout != 0:
xhr.timeout = int(request.timeout * 1000)

if _IN_WORKER:
xhr.responseType = "arraybuffer"
else:
xhr.overrideMimeType('text/plain; charset=ISO-8859-15')
xhr.overrideMimeType("text/plain; charset=ISO-8859-15")

xhr.open(request.method, request.url, False)
for name, value in request.headers.items():
Expand All @@ -107,10 +116,6 @@ def send(request: Request, stream: bool = False) -> Response:
if _IN_WORKER:
body = xhr.response.to_py().tobytes()
else:
body = xhr.response.encode('ISO-8859-15')
body = xhr.response.encode("ISO-8859-15")

return Response(
status_code=xhr.status,
headers=headers,
body=body
)
return Response(status_code=xhr.status, headers=headers, body=body)
38 changes: 19 additions & 19 deletions pyodide_http/_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from requests.utils import get_encoding_from_headers, CaseInsensitiveDict

from ._core import Request, send
from ._core import _StreamingError,_StreamingTimeout
from ._core import _StreamingError, _StreamingTimeout

_IS_PATCHED = False

Expand All @@ -15,9 +15,7 @@ class PyodideHTTPAdapter(BaseAdapter):
def __init__(self):
super().__init__()

def send(
self, request, **kwargs
):
def send(self, request, **kwargs):
"""Sends PreparedRequest object. Returns Response object.
:param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
:param stream: (optional) Whether to stream the request content.
Expand All @@ -31,24 +29,27 @@ def send(
:param cert: (optional) Any user-provided SSL certificate to be trusted.
:param proxies: (optional) The proxies dictionary to apply to the request.
"""
stream = kwargs.get('stream', False)
stream = kwargs.get("stream", False)
pyodide_request = Request(request.method, request.url)
pyodide_request.timeout=kwargs.get('timeout',0)
pyodide_request.timeout = kwargs.get("timeout", 0)
if not pyodide_request.timeout:
pyodide_request.timeout=0
pyodide_request.params=None # this is done in preparing request now
pyodide_request.timeout = 0
pyodide_request.params = None # this is done in preparing request now
pyodide_request.headers = dict(request.headers)
if request.body:
pyodide_request.set_body(request.body)
try:
resp = send(pyodide_request, stream)
except _StreamingTimeout:
from requests import ConnectTimeout

raise ConnectTimeout(request=pyodide_request)
except _StreamingError:
from requests import ConnectionError

raise ConnectionError(request=pyodide_request)
import requests

response = requests.Response()
# Fallback to None if there's no status_code, for whatever reason.
response.status_code = getattr(resp, "status_code", None)
Expand All @@ -63,20 +64,17 @@ def send(
# non-streaming response, make it look like a stream
response.raw = BytesIO(resp.body)

def new_read(self,amt=None,decode_content=False,cache_content=False):
def new_read(self, amt=None, decode_content=False, cache_content=False):
return self.old_read(amt)

# make the response stream look like a urllib3 stream
response.raw.old_read=response.raw.read
response.raw.read=new_read.__get__(response.raw,type(response.raw))

response.raw.old_read = response.raw.read
response.raw.read = new_read.__get__(response.raw, type(response.raw))

response.reason = ''
response.reason = ""
response.url = request.url
return response



def close(self):
"""Cleans up adapter specific items."""
pass
Expand All @@ -94,13 +92,15 @@ def patch():
return

import requests
requests.sessions.Session._old_init=requests.sessions.Session.__init__

requests.sessions.Session._old_init = requests.sessions.Session.__init__

def new_init(self):
self._old_init()
self.mount("https://", PyodideHTTPAdapter())
self.mount("http://", PyodideHTTPAdapter())
self.mount("http://", PyodideHTTPAdapter())

requests.sessions.Session._old_init=requests.sessions.Session.__init__
requests.sessions.Session.__init__=new_init
requests.sessions.Session._old_init = requests.sessions.Session.__init__
requests.sessions.Session.__init__ = new_init

_IS_PATCHED = True
78 changes: 58 additions & 20 deletions pyodide_http/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from js import crossOriginIsolated
from pyodide.ffi import to_js
from urllib.request import Request

SUCCESS_HEADER = -1
SUCCESS_EOF = -2
ERROR_TIMEOUT = -3
Expand Down Expand Up @@ -137,14 +138,14 @@ def _obj_from_dict(dict_val: dict) -> any:


class _ReadStream(io.RawIOBase):
def __init__(self, int_buffer, byte_buffer,timeout,worker,connection_id):
def __init__(self, int_buffer, byte_buffer, timeout, worker, connection_id):
self.int_buffer = int_buffer
self.byte_buffer = byte_buffer
self.read_pos = 0
self.read_len = 0
self.connection_id=connection_id
self.worker=worker
self.timeout=int(1000*timeout) if timeout>0 else None
self.connection_id = connection_id
self.worker = worker
self.timeout = int(1000 * timeout) if timeout > 0 else None

def __del__(self):
self.worker.postMessage(_obj_from_dict({"close": self.connection_id}))
Expand All @@ -165,15 +166,17 @@ def readinto(self, byte_obj) -> bool:
# wait for the worker to send something
js.Atomics.store(self.int_buffer, 0, 0)
self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id}))
if js.Atomics.wait(self.int_buffer, 0, 0, self.timeout)=='timed-out':
if js.Atomics.wait(self.int_buffer, 0, 0, self.timeout) == "timed-out":
from ._core import _StreamingTimeout
raise _StreamingTimeout

raise _StreamingTimeout
data_len = self.int_buffer[0]
if data_len > 0:
self.read_len = data_len
self.read_pos = 0
elif data_len==ERROR_EXCEPTION:
elif data_len == ERROR_EXCEPTION:
from ._core import _StreamingError

raise _StreamingError
else:
# EOF, free the buffers and return zero
Expand All @@ -184,8 +187,9 @@ def readinto(self, byte_obj) -> bool:
return 0
# copy from int32array to python bytes
ret_length = min(self.read_len, len(byte_obj))
self.byte_buffer.subarray(
self.read_pos, self.read_pos+ret_length).assign_to(byte_obj[0:ret_length])
self.byte_buffer.subarray(self.read_pos, self.read_pos + ret_length).assign_to(
byte_obj[0:ret_length]
)
self.read_len -= ret_length
self.read_pos += ret_length
return ret_length
Expand All @@ -194,34 +198,55 @@ def readinto(self, byte_obj) -> bool:
class _StreamingFetcher:
def __init__(self):
# make web-worker and data buffer on startup
dataBlob = js.Blob.new([_STREAMING_WORKER_CODE], _obj_from_dict(
{"type": 'application/javascript'}))
dataBlob = js.Blob.new(
[_STREAMING_WORKER_CODE], _obj_from_dict({"type": "application/javascript"})
)
dataURL = js.URL.createObjectURL(dataBlob)
self._worker = js.Worker.new(dataURL)

def send(self, request):
from ._core import Response

headers = request.headers
body = request.body
fetch_data = {"headers": headers, "body": body, "method": request.method}
# start the request off in the worker
timeout=int(1000*request.timeout) if request.timeout>0 else None
timeout = int(1000 * request.timeout) if request.timeout > 0 else None
shared_buffer = js.SharedArrayBuffer.new(1048576)
int_buffer = js.Int32Array.new(shared_buffer)
byte_buffer = js.Uint8Array.new(shared_buffer, 8)

js.Atomics.store(int_buffer, 0, 0)
js.Atomics.notify(int_buffer, 0)
absolute_url = js.URL.new(request.url, js.location).href
js.console.log(_obj_from_dict(
{"buffer": shared_buffer, "url": absolute_url, "fetchParams": fetch_data}))
self._worker.postMessage(_obj_from_dict(
{"buffer": shared_buffer, "url": absolute_url, "fetchParams": fetch_data}))
js.console.log(
_obj_from_dict(
{
"buffer": shared_buffer,
"url": absolute_url,
"fetchParams": fetch_data,
}
)
)
self._worker.postMessage(
_obj_from_dict(
{
"buffer": shared_buffer,
"url": absolute_url,
"fetchParams": fetch_data,
}
)
)
# wait for the worker to send something
js.Atomics.wait(int_buffer, 0, 0, timeout)
if int_buffer[0] == 0:
from ._core import _StreamingTimeout
raise _StreamingTimeout("Timeout connecting to streaming request",request=request, response=None)

raise _StreamingTimeout(
"Timeout connecting to streaming request",
request=request,
response=None,
)
if int_buffer[0] == SUCCESS_HEADER:
# got response
# header length is in second int of intBuffer
Expand All @@ -236,27 +261,40 @@ def send(self, request):
return Response(
status_code=response_obj["status"],
headers=response_obj["headers"],
body=io.BufferedReader(_ReadStream(
int_buffer, byte_buffer,request.timeout,self._worker,response_obj["connectionID"]), buffer_size=1048576)
body=io.BufferedReader(
_ReadStream(
int_buffer,
byte_buffer,
request.timeout,
self._worker,
response_obj["connectionID"],
),
buffer_size=1048576,
),
)
if int_buffer[0] == ERROR_EXCEPTION:
string_len = int_buffer[1]
# decode the error string
decoder = js.TextDecoder.new()
json_str = decoder.decode(byte_buffer.slice(0, string_len))
from ._core import _StreamingError
raise _StreamingError(f"Exception thrown in fetch: {json_str}",request=request, response=None)

raise _StreamingError(
f"Exception thrown in fetch: {json_str}", request=request, response=None
)


if crossOriginIsolated:
_fetcher = _StreamingFetcher()
else:
_fetcher = None


def send_streaming_request(request: Request):
if _fetcher:
return _fetcher.send(request)
else:
from ._core import show_streaming_warning

show_streaming_warning()
return False
Loading

0 comments on commit 65e3a67

Please sign in to comment.