Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: ProxyHandler supports streaming #784

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 63 additions & 23 deletions gramex/handlers/proxyhandler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import tornado.web
import tornado.gen
from urllib.parse import urlsplit, urlunsplit, parse_qs, urlencode
from tornado.httputil import HTTPHeaders
from tornado.httputil import HTTPHeaders, parse_response_start_line, HTTPInputError
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from typing import Callable
from gramex.transforms import build_transform
Expand All @@ -11,6 +11,15 @@
from gramex.handlers import WebSocketHandler


skip_response_headers = {
'Connection',
'Content-Encoding',
'Content-Length',
'Host',
'Transfer-Encoding',
}


class ProxyHandler(BaseHandler, BaseWebSocketHandler):
@classmethod
def setup(
Expand Down Expand Up @@ -52,6 +61,7 @@ def setup(
`response` in-place
connect_timeout: Timeout for initial connection in seconds (default: 20)
request_timeout: Timeout for entire request in seconds (default: 20)
stream: If True, the response is streamed (default: false)

The response has the same HTTP headers and body as the proxied request, but:

Expand Down Expand Up @@ -79,7 +89,7 @@ def setup(
cls.url, cls.request_headers, cls.default = url, request_headers, default
cls.headers = headers
cls.connect_timeout, cls.request_timeout = connect_timeout, request_timeout
cls.info = {}
cls.info = {'stream': kwargs.get('stream', False)}
for key, fn in (('prepare', prepare), ('modify', modify)):
if fn:
cls.info[key] = build_transform(
Expand Down Expand Up @@ -145,35 +155,65 @@ def get(self, *path_args):
connect_timeout=self.connect_timeout,
request_timeout=self.request_timeout,
)
if 'stream' in self.info:
request.header_callback = self._header_callback
request.streaming_callback = self._write_and_flush

if 'prepare' in self.info:
self.info['prepare'](handler=self, request=request, response=None)

app_log.debug(f'{self.name}: proxying {url}')
response = yield self.browser().fetch(request, raise_error=False)

if response.code in (MOVED_PERMANENTLY, FOUND):
location = response.headers.get('Location', '')
# TODO: check if Location: header MATCHES the url, not startswith
# url: example.org/?x should match Location: example.org/?a=1&x
# even though location does not start with url.
if location.startswith(url):
response.headers['Location'] = location.replace('url', self.conf.pattern)

# Set response headers only if not streaming
if not self.info['stream']:
if response.code in (MOVED_PERMANENTLY, FOUND):
location = response.headers.get('Location', '')
# TODO: check if Location: header MATCHES the url, not startswith
# url: example.org/?x should match Location: example.org/?a=1&x
# even though location does not start with url.
if location.startswith(url):
response.headers['Location'] = location.replace('url', self.conf.pattern)
# Pass on the headers as-is, but override with the handler HTTP headers
self.set_header('X-Proxy-Url', response.effective_url)
for header_name, header_value in response.headers.items():
if header_name not in {'Connection', 'Transfer-Encoding', 'Content-Length'}:
self.set_header(header_name, header_value)
# Proxies may send the wrong Content-Length. Correct it, else Tornado raises an error
if response.body is not None:
self.set_header('Content-Length', len(response.body))
for header_name, header_value in self.headers.items():
self.set_header(header_name, header_value)
# Pass on HTTP status code and response body as-is
self.set_status(response.code, reason=response.reason)
if 'modify' in self.info:
self.info['modify'](handler=self, request=request, response=response)

# Pass on the headers as-is, but override with the handler HTTP headers
self.set_header('X-Proxy-Url', response.effective_url)
for header_name, header_value in response.headers.items():
if header_name not in {'Connection', 'Transfer-Encoding', 'Content-Length'}:
self.set_header(header_name, header_value)
# Proxies may send the wrong Content-Length. Correct it, else Tornado raises an error
if response.body is not None:
self.set_header('Content-Length', len(response.body))
for header_name, header_value in self.headers.items():
self.set_header(header_name, header_value)
# Pass on HTTP status code and response body as-is
self.set_status(response.code, reason=response.reason)
if response.body is not None:
self.write(response.body)
self._write_and_flush(response.body)

def _write_and_flush(self, data: bytes):
'''Write data to response and flush it'''
self.write(data)
self.flush()

def _header_callback(self, header_line: str):
'''Proxy request headers as they appear'''
# If we don't use a _header_callback, streaming writes will write default headers
# before we get a chance to set the headers. So set them as soon as they appear
# From tornado.curl_httpclient.CurlAsyncHTTPClient._curl_header_callback
header_line = header_line.rstrip()
if header_line.startswith('HTTP/'):
try:
(__, code, reason) = parse_response_start_line(header_line)
self.set_status(code, reason)
except HTTPInputError:
return
elif header_line:
# from tornado.httputil.HTTPHeaders.parse_line. Ignore multi-line headers
try:
name, value = header_line.split(':', 1)
if name not in skip_response_headers:
self.set_header(name, value)
except ValueError as err:
raise HTTPInputError('no colon in header line') from err
Loading