From 889d469ae73af5b4a520f11df658e038c5b2e3a9 Mon Sep 17 00:00:00 2001 From: S Anand Date: Mon, 8 Apr 2024 12:02:01 +0530 Subject: [PATCH] ENH: ProxyHandler supports streaming --- gramex/handlers/proxyhandler.py | 86 ++++++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 23 deletions(-) diff --git a/gramex/handlers/proxyhandler.py b/gramex/handlers/proxyhandler.py index b5c97451..049fca87 100644 --- a/gramex/handlers/proxyhandler.py +++ b/gramex/handlers/proxyhandler.py @@ -1,7 +1,7 @@ import tornado.web import tornado.gen from urllib.parse import urlsplit, urlunsplit, parse_qs, urlencode -from tornado.httputil import HTTPHeaders +from tornado.httputil import HTTPHeaders, parse_response_start_line, HTTPInputError from tornado.httpclient import AsyncHTTPClient, HTTPRequest from typing import Callable from gramex.transforms import build_transform @@ -11,6 +11,15 @@ from gramex.handlers import WebSocketHandler +skip_response_headers = { + 'Connection', + 'Content-Encoding', + 'Content-Length', + 'Host', + 'Transfer-Encoding', +} + + class ProxyHandler(BaseHandler, BaseWebSocketHandler): @classmethod def setup( @@ -52,6 +61,7 @@ def setup( `response` in-place connect_timeout: Timeout for initial connection in seconds (default: 20) request_timeout: Timeout for entire request in seconds (default: 20) + stream: If True, the response is streamed (default: false) The response has the same HTTP headers and body as the proxied request, but: @@ -79,7 +89,7 @@ def setup( cls.url, cls.request_headers, cls.default = url, request_headers, default cls.headers = headers cls.connect_timeout, cls.request_timeout = connect_timeout, request_timeout - cls.info = {} + cls.info = {'stream': kwargs.get('stream', False)} for key, fn in (('prepare', prepare), ('modify', modify)): if fn: cls.info[key] = build_transform( @@ -145,6 +155,9 @@ def get(self, *path_args): connect_timeout=self.connect_timeout, request_timeout=self.request_timeout, ) + if 'stream' in self.info: + request.header_callback = self._header_callback + request.streaming_callback = self._write_and_flush if 'prepare' in self.info: self.info['prepare'](handler=self, request=request, response=None) @@ -152,28 +165,55 @@ def get(self, *path_args): app_log.debug(f'{self.name}: proxying {url}') response = yield self.browser().fetch(request, raise_error=False) - if response.code in (MOVED_PERMANENTLY, FOUND): - location = response.headers.get('Location', '') - # TODO: check if Location: header MATCHES the url, not startswith - # url: example.org/?x should match Location: example.org/?a=1&x - # even though location does not start with url. - if location.startswith(url): - response.headers['Location'] = location.replace('url', self.conf.pattern) - + # Set response headers only if not streaming + if not self.info['stream']: + if response.code in (MOVED_PERMANENTLY, FOUND): + location = response.headers.get('Location', '') + # TODO: check if Location: header MATCHES the url, not startswith + # url: example.org/?x should match Location: example.org/?a=1&x + # even though location does not start with url. + if location.startswith(url): + response.headers['Location'] = location.replace('url', self.conf.pattern) + # Pass on the headers as-is, but override with the handler HTTP headers + self.set_header('X-Proxy-Url', response.effective_url) + for header_name, header_value in response.headers.items(): + if header_name not in {'Connection', 'Transfer-Encoding', 'Content-Length'}: + self.set_header(header_name, header_value) + # Proxies may send the wrong Content-Length. Correct it, else Tornado raises an error + if response.body is not None: + self.set_header('Content-Length', len(response.body)) + for header_name, header_value in self.headers.items(): + self.set_header(header_name, header_value) + # Pass on HTTP status code and response body as-is + self.set_status(response.code, reason=response.reason) if 'modify' in self.info: self.info['modify'](handler=self, request=request, response=response) - # Pass on the headers as-is, but override with the handler HTTP headers - self.set_header('X-Proxy-Url', response.effective_url) - for header_name, header_value in response.headers.items(): - if header_name not in {'Connection', 'Transfer-Encoding', 'Content-Length'}: - self.set_header(header_name, header_value) - # Proxies may send the wrong Content-Length. Correct it, else Tornado raises an error - if response.body is not None: - self.set_header('Content-Length', len(response.body)) - for header_name, header_value in self.headers.items(): - self.set_header(header_name, header_value) - # Pass on HTTP status code and response body as-is - self.set_status(response.code, reason=response.reason) if response.body is not None: - self.write(response.body) + self._write_and_flush(response.body) + + def _write_and_flush(self, data: bytes): + '''Write data to response and flush it''' + self.write(data) + self.flush() + + def _header_callback(self, header_line: str): + '''Proxy request headers as they appear''' + # If we don't use a _header_callback, streaming writes will write default headers + # before we get a chance to set the headers. So set them as soon as they appear + # From tornado.curl_httpclient.CurlAsyncHTTPClient._curl_header_callback + header_line = header_line.rstrip() + if header_line.startswith('HTTP/'): + try: + (__, code, reason) = parse_response_start_line(header_line) + self.set_status(code, reason) + except HTTPInputError: + return + elif header_line: + # from tornado.httputil.HTTPHeaders.parse_line. Ignore multi-line headers + try: + name, value = header_line.split(':', 1) + if name not in skip_response_headers: + self.set_header(name, value) + except ValueError as err: + raise HTTPInputError('no colon in header line') from err