diff --git a/cheroot/connections.py b/cheroot/connections.py index 55658d9170..0007d5ea27 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -1,22 +1,17 @@ """Utilities to manage open connections.""" -from __future__ import absolute_import, division, print_function -__metaclass__ = type - import io import os import socket import threading import time +import selectors +from contextlib import suppress from . import errors -from ._compat import selectors -from ._compat import suppress from ._compat import IS_WINDOWS from .makefile import MakeFile -import six - try: import fcntl except ImportError: @@ -280,8 +275,7 @@ def _remove_invalid_sockets(self): # One of the reason on why a socket could cause an error # is that the socket is already closed, ignore the # socket error if we try to close it at this point. - # This is equivalent to OSError in Py3 - with suppress(socket.error): + with suppress(OSError): conn.close() def _from_server_socket(self, server_socket): # noqa: C901 # FIXME @@ -299,7 +293,20 @@ def _from_server_socket(self, server_socket): # noqa: C901 # FIXME if self.server.ssl_adapter is not None: try: s, ssl_env = self.server.ssl_adapter.wrap(s) - except errors.NoSSLError: + except errors.FatalSSLAlert as tls_connection_drop_error: + self.server.error_log( + f'Client {addr !s} lost — peer dropped the TLS ' + 'connection suddenly, during handshake: ' + f'{tls_connection_drop_error !s}', + ) + return + except errors.NoSSLError as http_over_https_err: + self.server.error_log( + f'Client {addr !s} attempted to speak plain HTTP into ' + 'a TCP connection configured for TLS-only traffic — ' + 'trying to send back a plain HTTP error response: ' + f'{http_over_https_err !s}', + ) msg = ( 'The client sent a plain HTTP request, but ' 'this server only speaks HTTPS on this port.' @@ -311,11 +318,10 @@ def _from_server_socket(self, server_socket): # noqa: C901 # FIXME msg, ] - sock_to_make = s if not six.PY2 else s._sock - wfile = mf(sock_to_make, 'wb', io.DEFAULT_BUFFER_SIZE) + wfile = mf(s, 'wb', io.DEFAULT_BUFFER_SIZE) try: wfile.write(''.join(buf).encode('ISO-8859-1')) - except socket.error as ex: + except OSError as ex: if ex.args[0] not in errors.socket_errors_to_ignore: raise return @@ -328,10 +334,7 @@ def _from_server_socket(self, server_socket): # noqa: C901 # FIXME conn = self.server.ConnectionClass(self.server, s, mf) - if not isinstance( - self.server.bind_addr, - (six.text_type, six.binary_type), - ): + if not isinstance(self.server.bind_addr, (str, bytes)): # optional values # Until we do DNS lookups, omit REMOTE_HOST if addr is None: # sometimes this can happen @@ -354,7 +357,7 @@ def _from_server_socket(self, server_socket): # noqa: C901 # FIXME # accept() by default self.server.sstat('fss-timeout') return - except socket.error as ex: + except OSError as ex: if self.server.stats['Enabled']: self.server.stats['Socket Errors'] += 1 if ex.args[0] in errors.socket_error_eintr: diff --git a/cheroot/ssl/builtin.py b/cheroot/ssl/builtin.py index ff987a7102..e28e5df188 100644 --- a/cheroot/ssl/builtin.py +++ b/cheroot/ssl/builtin.py @@ -7,12 +7,10 @@ ``BuiltinSSLAdapter``. """ -from __future__ import absolute_import, division, print_function -__metaclass__ = type - import socket import sys import threading +from contextlib import suppress try: import ssl @@ -27,19 +25,11 @@ except ImportError: DEFAULT_BUFFER_SIZE = -1 -import six - from . import Adapter from .. import errors -from .._compat import IS_ABOVE_OPENSSL10, suppress from ..makefile import StreamReader, StreamWriter from ..server import HTTPServer -if six.PY2: - generic_socket_error = socket.error -else: - generic_socket_error = OSError - def _assert_ssl_exc_contains(exc, *msgs): """Check whether SSL exception contains either of messages provided.""" @@ -272,62 +262,35 @@ def bind(self, sock): def wrap(self, sock): """Wrap and return the given socket, plus WSGI environ entries.""" - EMPTY_RESULT = None, {} try: s = self.context.wrap_socket( sock, do_handshake_on_connect=True, server_side=True, ) - except ssl.SSLError as ex: - if ex.errno == ssl.SSL_ERROR_EOF: - # This is almost certainly due to the cherrypy engine - # 'pinging' the socket to assert it's connectable; - # the 'ping' isn't SSL. - return EMPTY_RESULT - elif ex.errno == ssl.SSL_ERROR_SSL: - if _assert_ssl_exc_contains(ex, 'http request'): - # The client is speaking HTTP to an HTTPS server. - raise errors.NoSSLError - - # Check if it's one of the known errors - # Errors that are caught by PyOpenSSL, but thrown by - # built-in ssl - _block_errors = ( - 'unknown protocol', 'unknown ca', 'unknown_ca', - 'unknown error', - 'https proxy request', 'inappropriate fallback', - 'wrong version number', - 'no shared cipher', 'certificate unknown', - 'ccs received early', - 'certificate verify failed', # client cert w/o trusted CA - 'version too low', # caused by SSL3 connections - 'unsupported protocol', # caused by TLS1 connections - ) - if _assert_ssl_exc_contains(ex, *_block_errors): - # Accepted error, let's pass - return EMPTY_RESULT - elif _assert_ssl_exc_contains(ex, 'handshake operation timed out'): - # This error is thrown by builtin SSL after a timeout - # when client is speaking HTTP to an HTTPS server. - # The connection can safely be dropped. - return EMPTY_RESULT - raise - except generic_socket_error as exc: - """It is unclear why exactly this happens. - - It's reproducible only with openssl>1.0 and stdlib - :py:mod:`ssl` wrapper. - In CherryPy it's triggered by Checker plugin, which connects - to the app listening to the socket port in TLS mode via plain - HTTP during startup (from the same process). - - - Ref: https://github.com/cherrypy/cherrypy/issues/1618 - """ - is_error0 = exc.args == (0, 'Error') - - if is_error0 and IS_ABOVE_OPENSSL10: - return EMPTY_RESULT - raise + except ( + ssl.SSLEOFError, + ssl.SSLZeroReturnError, + ) as tls_connection_drop_error: + raise errors.FatalSSLAlert( + *tls_connection_drop_error.args, + ) from tls_connection_drop_error + except ssl.SSLError as generic_tls_error: + peer_speaks_plain_http_over_https = ( + generic_tls_error.errno == ssl.SSL_ERROR_SSL and + _assert_ssl_exc_contains(generic_tls_error, 'http request') + ) + if peer_speaks_plain_http_over_https: + reraised_connection_drop_exc_cls = errors.NoSSLError + else: + reraised_connection_drop_exc_cls = errors.FatalSSLAlert + + raise reraised_connection_drop_exc_cls( + *generic_tls_error.args, + ) from generic_tls_error + except OSError as tcp_connection_drop_error: + raise errors.FatalSSLAlert( + *tcp_connection_drop_error.args, + ) from tcp_connection_drop_error + return s, self.get_environ(s) def get_environ(self, sock): diff --git a/cheroot/workers/threadpool.py b/cheroot/workers/threadpool.py index c615482ae9..a2f362e56d 100644 --- a/cheroot/workers/threadpool.py +++ b/cheroot/workers/threadpool.py @@ -5,17 +5,13 @@ joinable """ -from __future__ import absolute_import, division, print_function -__metaclass__ = type - - import collections +import logging import threading import time import socket import warnings - -from six.moves import queue +import queue from jaraco.functools import pass_none @@ -35,7 +31,7 @@ def __radd__(self, other): trueyzero = TrueyZero() -_SHUTDOWNREQUEST = None +_SHUTDOWNREQUEST = object() class WorkerThread(threading.Thread): @@ -104,47 +100,136 @@ def __init__(self, server): threading.Thread.__init__(self) def run(self): - """Process incoming HTTP connections. + """Set up incoming HTTP connection processing loop. + + This is the thread's entry-point. It performs lop-layer + exception handling and interrupt processing. + :exc:`KeyboardInterrupt` and :exc:`SystemExit` bubbling up + from the inner-layer code constitute a global server interrupt + request. When they happen, the worker thread exits. - Retrieves incoming connections from thread pool. + :raises BaseException: when an unexpected non-interrupt + exception leaks from the inner layers + + # noqa: DAR401 KeyboardInterrupt SystemExit """ self.server.stats['Worker Threads'][self.name] = self.stats + self.ready = True try: - self.ready = True - while True: - obj = self.server.requests.get() - if obj is _SHUTDOWNREQUEST: - return + self._process_connections_until_interrupted() + except (KeyboardInterrupt, SystemExit) as interrupt_exc: + interrupt_cause = interrupt_exc.__cause__ or interrupt_exc + self.server.error_log( + f'Setting the server interrupt flag to {interrupt_cause !r}', + level=logging.DEBUG, + ) + self.server.interrupt = interrupt_cause + except BaseException as underlying_exc: # noqa: WPS424 + # NOTE: This is the last resort logging with the last dying breath + # NOTE: of the worker. It is only reachable when exceptions happen + # NOTE: in the `finally` branch of the internal try/except block. + self.server.error_log( + 'A fatal exception happened. Setting the server interrupt flag' + f' to {underlying_exc !r} and giving up.' + '\N{NEW LINE}\N{NEW LINE}' + 'Please, report this on the Cheroot tracker at ' + ', ' + 'providing a full reproducer with as much context and details as possible.', + level=logging.CRITICAL, + traceback=True, + ) + self.server.interrupt = underlying_exc + raise + finally: + self.ready = False + + def _process_connections_until_interrupted(self): + """Process incoming HTTP connections in an infinite loop. - conn, put_time = obj - self.server.sstat('work-start-conn', time.time() - put_time) + Retrieves incoming connections from thread pool, processing + them one by one. - start = time.time() + :raises SystemExit: on the internal requests to stop the + server instance + """ + while True: + obj = self.server.requests.get() + if obj is _SHUTDOWNREQUEST: + return + + conn, put_time = obj + self.server.sstat('work-start-conn', time.time() - put_time) + + start = time.time() + + self.conn = conn - self.conn = conn - is_stats_enabled = self.server.stats['Enabled'] + is_stats_enabled = self.server.stats['Enabled'] + if is_stats_enabled: + self.start_time = time.time() + keep_conn_open = False + + try: + keep_conn_open = conn.communicate() + except ConnectionError as connection_error: + keep_conn_open = False # Drop the connection cleanly + self.server.error_log( + 'Got a connection error while handling a ' + f'connection from {conn.remote_addr !s}:' + f'{conn.remote_port !s} ({connection_error !s})', + level=logging.INFO, + ) + continue + except (KeyboardInterrupt, SystemExit) as shutdown_request: + # Shutdown request + keep_conn_open = False # Drop the connection cleanly + self.server.error_log( + 'Got a server shutdown request while handling a ' + f'connection from {conn.remote_addr !s}:' + f'{conn.remote_port !s} ({shutdown_request !s})', + level=logging.DEBUG, + ) + raise SystemExit( + str(shutdown_request), + ) from shutdown_request + except BaseException as unhandled_error: # noqa: WPS424 + # NOTE: Only a shutdown request should bubble up to the + # NOTE: external cleanup code. Otherwise, this thread dies. + # NOTE: If this were to happen, the threadpool would still + # NOTE: list a dead thread without knowing its state. And + # NOTE: the calling code would fail to schedule processing + # NOTE: of new requests. + self.server.error_log( + 'Unhandled error while processing an incoming ' + f'connection {unhandled_error !r}', + level=logging.ERROR, + traceback=True, + ) + continue # Prevent the thread from dying + finally: + # NOTE: Any exceptions coming from within `finally` may + # NOTE: kill the thread, causing the threadpool to only + # NOTE: contain references to dead threads rendering the + # NOTE: server defunct, effectively meaning a DoS. + # NOTE: Ideally, things called here should process + # NOTE: everything recoverable internally. Any unhandled + # NOTE: errors will bubble up into the outer try/except + # NOTE: block. They will be treated as fatal and turned + # NOTE: into server shutdown requests and then reraised + # NOTE: unconditionally. + if keep_conn_open: + self.server.put_conn(conn) + else: + conn.close() if is_stats_enabled: - self.start_time = time.time() - keep_conn_open = False - try: - keep_conn_open = conn.communicate() - finally: - if keep_conn_open: - self.server.put_conn(conn) - else: - conn.close() - if is_stats_enabled: - self.requests_seen += self.conn.requests_seen - self.bytes_read += self.conn.rfile.bytes_read - self.bytes_written += self.conn.wfile.bytes_written - self.work_time += time.time() - self.start_time - self.start_time = None - self.conn = None - - self.server.sstat('work-conn', time.time() - start) - - except (KeyboardInterrupt, SystemExit) as ex: - self.server.interrupt = ex + self.requests_seen += conn.requests_seen + self.bytes_read += conn.rfile.bytes_read + self.bytes_written += conn.wfile.bytes_written + self.work_time += time.time() - self.start_time + self.start_time = None + self.conn = None + + self.server.sstat('work-conn', time.time() - start) class ThreadPool: @@ -164,12 +249,33 @@ def __init__( server (cheroot.server.HTTPServer): web server object receiving this request min (int): minimum number of worker threads - max (int): maximum number of worker threads + max (int): maximum number of worker threads (-1/inf for no max) accepted_queue_size (int): maximum number of active requests in queue accepted_queue_timeout (int): timeout for putting request into queue + + :raises ValueError: if the min/max values are invalid + :raises TypeError: if the max is not an integer or inf """ + if min < 1: + raise ValueError(f'min={min!s} must be > 0') + + if max == float('inf'): + pass + elif not isinstance(max, int) or max == 0: + raise TypeError( + 'Expected an integer or the infinity value for the `max` ' + f'argument but got {max!r}.', + ) + elif max < 0: + max = float('inf') + + if max < min: + raise ValueError( + f'max={max!s} must be > min={min!s} (or infinity for no max)', + ) + self.server = server self.min = min self.max = max @@ -180,18 +286,13 @@ def __init__( self._pending_shutdowns = collections.deque() def start(self): - """Start the pool of threads.""" - for _ in range(self.min): - self._threads.append(WorkerThread(self.server)) - for worker in self._threads: - worker.name = ( - 'CP Server {worker_name!s}'. - format(worker_name=worker.name), - ) - worker.start() - for worker in self._threads: - while not worker.ready: - time.sleep(.1) + """Start the pool of threads. + + :raises RuntimeError: if the pool is already started + """ + if self._threads: + raise RuntimeError('Threadpools can only be started once.') + self.grow(self.min) @property def idle(self): # noqa: D401; irrelevant for properties @@ -219,24 +320,20 @@ def _clear_dead_threads(self): def grow(self, amount): """Spawn new worker threads (not above self.max).""" - if self.max > 0: - budget = max(self.max - len(self._threads), 0) - else: - # self.max <= 0 indicates no maximum - budget = float('inf') - + budget = max(self.max - len(self._threads), 0) n_new = min(amount, budget) workers = [self._spawn_worker() for i in range(n_new)] - while not all(worker.ready for worker in workers): - time.sleep(.1) + for worker in workers: + while not worker.ready: + time.sleep(.1) self._threads.extend(workers) def _spawn_worker(self): worker = WorkerThread(self.server) worker.name = ( 'CP Server {worker_name!s}'. - format(worker_name=worker.name), + format(worker_name=worker.name) ) worker.start() return worker