Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ThreadPoolExecutor to serve simultaneous sync requests #3416

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions tornado/test/web_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,19 @@ def get(self):
def get_app_kwargs(self):
return dict(template_path="FinalReturnTest")

def test_finish_method_return_future(self):
def test_finish_method_return_none(self):
response = self.fetch(self.get_url("/finish"))
self.assertEqual(response.code, 200)
self.assertIsInstance(self.final_return, Future)
self.assertTrue(self.final_return.done())
self.assertTrue(self.final_return is None)

response = self.fetch(self.get_url("/finish"), method="POST", body=b"")
self.assertEqual(response.code, 200)
self.assertIsInstance(self.final_return, Future)
self.assertTrue(self.final_return.done())
self.assertTrue(self.final_return is None)

def test_render_method_return_future(self):
def test_render_method_return_none(self):
response = self.fetch(self.get_url("/render"))
self.assertEqual(response.code, 200)
self.assertIsInstance(self.final_return, Future)
self.assertTrue(self.final_return is None)


class CookieTest(WebTestCase):
Expand Down Expand Up @@ -2531,17 +2529,23 @@ def get_handlers(self):
class TooHigh(RequestHandler):
def get(self):
self.set_header("Content-Length", "42")
self.finish("ok")

def _real_finish(self) -> "Future[None]":
try:
self.finish("ok")
return super()._real_finish()
except Exception as e:
test.server_error = e
raise

class TooLow(RequestHandler):
def get(self):
self.set_header("Content-Length", "2")
self.finish("hello")

def _real_finish(self) -> "Future[None]":
try:
self.finish("hello")
return super()._real_finish()
except Exception as e:
test.server_error = e
raise
Expand All @@ -2555,8 +2559,7 @@ def test_content_length_too_high(self):
with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
with ExpectLog(
gen_log,
"(Cannot send error response after headers written"
"|Failed to flush partial response)",
"Failed to flush response",
):
with self.assertRaises(HTTPClientError):
self.fetch("/high", raise_error=True)
Expand All @@ -2571,8 +2574,7 @@ def test_content_length_too_low(self):
with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
with ExpectLog(
gen_log,
"(Cannot send error response after headers written"
"|Failed to flush partial response)",
"Failed to flush response",
):
with self.assertRaises(HTTPClientError):
self.fetch("/low", raise_error=True)
Expand Down
112 changes: 70 additions & 42 deletions tornado/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ async def main():

"""

from asyncio import iscoroutine
import base64
import binascii
import datetime
Expand All @@ -70,6 +71,7 @@ async def main():
import hmac
import http.cookies
from inspect import isclass
from inspect import iscoroutinefunction
from io import BytesIO
import mimetypes
import numbers
Expand All @@ -96,6 +98,7 @@ async def main():
from tornado.log import access_log, app_log, gen_log
from tornado import template
from tornado.escape import utf8, _unicode
from tornado.ioloop import IOLoop
from tornado.routing import (
AnyMatches,
DefaultHostMatches,
Expand Down Expand Up @@ -216,8 +219,10 @@ def __init__(
self.application = application
self.request = request
self._headers_written = False
# When this flag is True, avoid further request writting, but finish() will be called later
self._finished = False
self._auto_finish = True
self._skip_finish_fn = False
self._finish_called = False
self._prepared_future = None
self.ui = ObjectDict(
(n, self._ui_method(m)) for n, m in application.ui_methods.items()
Expand Down Expand Up @@ -892,7 +897,7 @@ def redirect(
assert isinstance(status, int) and 300 <= status <= 399
self.set_status(status)
self.set_header("Location", utf8(url))
self.finish()
self._finished = True

def write(self, chunk: Union[str, bytes, dict]) -> None:
"""Writes the given chunk to the output buffer.
Expand Down Expand Up @@ -999,7 +1004,8 @@ def render(self, template_name: str, **kwargs: Any) -> "Future[None]":
if html_bodies:
hloc = html.index(b"</body>")
html = html[:hloc] + b"".join(html_bodies) + b"\n" + html[hloc:]
return self.finish(html)
self.write(html)
self._finished = True

def render_linked_js(self, js_files: Iterable[str]) -> str:
"""Default method used to render the final js links for the
Expand Down Expand Up @@ -1186,7 +1192,12 @@ def flush(self, include_footers: bool = False) -> "Future[None]":
future.set_result(None)
return future

def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[None]":
def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> None:
if chunk is not None:
self.write(chunk)
self._finished = True

def _real_finish(self) -> "Future[None]":
"""Finishes this response, ending the HTTP request.

Passing a ``chunk`` to ``finish()`` is equivalent to passing that
Expand All @@ -1201,12 +1212,11 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non

Now returns a `.Future` instead of ``None``.
"""
if self._finished:
if self._skip_finish_fn:
return
if self._finish_called:
raise RuntimeError("finish() called twice")

if chunk is not None:
self.write(chunk)

# Automatically support ETags and add the Content-Length header if
# we have not flushed any content yet.
if not self._headers_written:
Expand Down Expand Up @@ -1238,6 +1248,7 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non
future = self.flush(include_footers=True)
self.request.connection.finish()
self._log()
self._finish_called = True
self._finished = True
self.on_finish()
self._break_cycles()
Expand All @@ -1255,6 +1266,7 @@ def detach(self) -> iostream.IOStream:
.. versionadded:: 5.1
"""
self._finished = True
self._skip_finish_fn = True
# TODO: add detach to HTTPConnection?
return self.request.connection.detach() # type: ignore

Expand All @@ -1276,15 +1288,7 @@ def send_error(self, status_code: int = 500, **kwargs: Any) -> None:
"""
if self._headers_written:
gen_log.error("Cannot send error response after headers written")
if not self._finished:
# If we get an error between writing headers and finishing,
# we are unlikely to be able to finish due to a
# Content-Length mismatch. Try anyway to release the
# socket.
try:
self.finish()
except Exception:
gen_log.error("Failed to flush partial response", exc_info=True)
self._finished = True
return
self.clear()

Expand All @@ -1298,8 +1302,7 @@ def send_error(self, status_code: int = 500, **kwargs: Any) -> None:
self.write_error(status_code, **kwargs)
except Exception:
app_log.error("Uncaught exception in write_error", exc_info=True)
if not self._finished:
self.finish()
self._finished = True

def write_error(self, status_code: int, **kwargs: Any) -> None:
"""Override to implement custom error pages.
Expand All @@ -1318,13 +1321,13 @@ def write_error(self, status_code: int, **kwargs: Any) -> None:
self.set_header("Content-Type", "text/plain")
for line in traceback.format_exception(*kwargs["exc_info"]):
self.write(line)
self.finish()
else:
self.finish(
self.write(
"<html><title>%(code)d: %(message)s</title>"
"<body>%(code)d: %(message)s</body></html>"
% {"code": status_code, "message": self._reason}
)
self._finished = True

@property
def locale(self) -> tornado.locale.Locale:
Expand Down Expand Up @@ -1743,12 +1746,8 @@ def val(x: bytes) -> bytes:
break
return match

async def _execute(
self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes
) -> None:
"""Executes this request with the given output transforms."""
self._transforms = transforms
try:
async def _execute_no_err(self, *args: bytes, **kwargs: bytes):
if True:
if self.request.method not in self.SUPPORTED_METHODS:
raise HTTPError(405)
self.path_args = [self.decode_argument(arg) for arg in args]
Expand Down Expand Up @@ -1782,27 +1781,55 @@ async def _execute(
try:
await self.request._body_future
except iostream.StreamClosedError:
return
raise FinishExecute()

tornado_workers_executor = self.application.settings.get('tornado_workers_executor')
method = getattr(self, self.request.method.lower())
result = method(*self.path_args, **self.path_kwargs)
if iscoroutinefunction(method) or getattr(method, '__tornado_coroutine__', False):
result = await method(*self.path_args, **self.path_kwargs)
elif tornado_workers_executor:
result = await IOLoop.current().run_in_executor(
tornado_workers_executor,
functools.partial(method, *self.path_args, **self.path_kwargs))
else:
result = method(*self.path_args, **self.path_kwargs)
if result is not None:
result = await result
if self._auto_finish and not self._finished:
self.finish()
if iscoroutine(result):
app_log.warn(f'{method} returned a coroutine, you should await your own coroutines')
await result
else:
app_log.warn(f'{method} returned {result}, it was ignored')

async def _execute(
self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes
) -> None:
"""Executes this request with the given output transforms."""
self._transforms = transforms
try:
await self._execute_no_err(*args, **kwargs)
except FinishExecute:
return
except Finish as e:
if e.args:
self.write(*e.args)
self._finished = True
except Exception as e:
try:
self._handle_request_exception(e)
except Exception:
app_log.error("Exception in exception handler", exc_info=True)
finally:
# Unset result to avoid circular references
result = None
if self._prepared_future is not None and not self._prepared_future.done():
# In case we failed before setting _prepared_future, do it
# now (to unblock the HTTP server). Note that this is not
# in a finally block to avoid GC issues prior to Python 3.4.
self._prepared_future.set_result(None)
finally:
if not self._finish_called:
try:
self._real_finish()
except Exception:
self.log_exception(*sys.exc_info())
gen_log.error("Failed to flush response", exc_info=True)

def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
"""Implement this method to handle streamed request data.
Expand Down Expand Up @@ -1830,11 +1857,6 @@ def _request_summary(self) -> str:
)

def _handle_request_exception(self, e: BaseException) -> None:
if isinstance(e, Finish):
# Not an error; just finish the request without logging.
if not self._finished:
self.finish(*e.args)
return
try:
self.log_exception(*sys.exc_info())
except Exception:
Expand Down Expand Up @@ -2518,6 +2540,11 @@ class Finish(Exception):
pass


class FinishExecute(Exception):
"""A convenience exception to just finish _execute() without calling finish()"""
pass


class MissingArgumentError(HTTPError):
"""Exception raised by `RequestHandler.get_argument`.

Expand Down Expand Up @@ -2677,8 +2704,8 @@ def reset(cls) -> None:
with cls._lock:
cls._static_hashes = {}

def head(self, path: str) -> Awaitable[None]:
return self.get(path, include_body=False)
async def head(self, path: str) -> Awaitable[None]:
return await self.get(path, include_body=False)

async def get(self, path: str, include_body: bool = True) -> None:
# Set up our path instance variables.
Expand Down Expand Up @@ -3148,6 +3175,7 @@ def initialize(
def prepare(self) -> None:
self.fallback(self.request)
self._finished = True
self._skip_finish_fn = True
self.on_finish()


Expand Down
1 change: 1 addition & 0 deletions tornado/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ async def _accept_connection(self, handler: WebSocketHandler) -> None:
handler.set_header("Connection", "Upgrade")
handler.set_header("Sec-WebSocket-Accept", self._challenge_response(handler))
handler.finish()
handler._real_finish()

self.stream = handler._detach_stream()

Expand Down