Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸ› Handle stray exceptions in worker threads #649

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ per-file-ignores =
cheroot/test/test_dispatch.py: DAR101, DAR201, S101, WPS111, WPS121, WPS302, WPS422, WPS430
cheroot/test/test_ssl.py: C818, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, S101, S309, S404, S603, WPS100, WPS110, WPS111, WPS114, WPS121, WPS130, WPS201, WPS202, WPS204, WPS210, WPS211, WPS218, WPS219, WPS222, WPS226, WPS231, WPS300, WPS301, WPS317, WPS318, WPS324, WPS326, WPS335, WPS336, WPS337, WPS352, WPS408, WPS420, WPS421, WPS422, WPS432, WPS436, WPS440, WPS441, WPS442, WPS450, WPS509, WPS510, WPS608
cheroot/test/test_server.py: DAR101, DAR201, DAR301, I001, I003, I004, I005, S101, WPS110, WPS111, WPS118, WPS121, WPS122, WPS130, WPS201, WPS202, WPS210, WPS218, WPS226, WPS229, WPS300, WPS317, WPS318, WPS324, WPS326, WPS421, WPS422, WPS430, WPS432, WPS433, WPS436, WPS437, WPS442, WPS507, WPS509, WPS608
cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526
cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS402, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526
cheroot/test/webtest.py: B007, DAR101, DAR201, DAR401, I001, I003, I004, N802, RST303, RST304, S101, S104, WPS100, WPS110, WPS111, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS220, WPS221, WPS223, WPS229, WPS230, WPS231, WPS236, WPS301, WPS306, WPS317, WPS323, WPS326, WPS338, WPS361, WPS414, WPS420, WPS421, WPS422, WPS430, WPS432, WPS433, WPS437, WPS440, WPS501, WPS503, WPS505, WPS601
cheroot/testing.py: B014, C815, DAR101, DAR201, DAR301, I001, I003, S104, WPS100, WPS202, WPS211, WPS229, WPS301, WPS306, WPS317, WPS414, WPS420, WPS422, WPS430, WPS503, WPS526
cheroot/workers/threadpool.py: B007, DAR101, DAR201, E800, I001, I003, I004, RST201, RST203, RST301, WPS100, WPS110, WPS111, WPS121, WPS125, WPS211, WPS214, WPS220, WPS229, WPS230, WPS231, WPS304, WPS306, WPS317, WPS318, WPS322, WPS326, WPS335, WPS338, WPS362, WPS410, WPS414, WPS420, WPS422, WPS428, WPS432, WPS440, WPS462, WPS501, WPS505, WPS601, WPS602, WPS609
Expand Down
258 changes: 258 additions & 0 deletions cheroot/test/test_conn.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for TCP connection handling, including proper and timely close."""

import errno
from re import match as _matches_pattern
import socket
import time
import logging
Expand Down Expand Up @@ -700,6 +701,263 @@ def _close_kernel_socket(self):
assert _close_kernel_socket.exception_leaked is exception_leaks


def test_broken_connection_during_http_communication_fallback( # noqa: WPS118
monkeypatch,
test_client,
testing_server,
wsgi_server_thread,
):
"""Test that unhandled internal error cascades into shutdown."""
def _raise_connection_reset(*_args, **_kwargs):
raise ConnectionResetError(666)

def _read_request_line(self):
monkeypatch.setattr(self.conn.rfile, 'close', _raise_connection_reset)
monkeypatch.setattr(self.conn.wfile, 'write', _raise_connection_reset)
_raise_connection_reset()

monkeypatch.setattr(
test_client.server_instance.ConnectionClass.RequestHandlerClass,
'read_request_line',
_read_request_line,
)

test_client.get_connection().send(b'GET / HTTP/1.1')
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(logging.WARNING, r'^socket\.error 666$'),
(
logging.INFO,
'^Got a connection error while handling a connection '
r'from .*:\d{1,5} \(666\)',
),
(
logging.CRITICAL,
r'A fatal exception happened\. Setting the server interrupt flag '
r'to ConnectionResetError\(666\) and giving up\.\n\nPlease, '
'report this on the Cheroot tracker at '
r'<https://github\.com/cherrypy/cheroot/issues/new/choose>, '
'providing a full reproducer with as much context and details '
r'as possible\.$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


def test_kb_int_from_http_handler(
test_client,
testing_server,
wsgi_server_thread,
):
"""Test that a keyboard interrupt from HTTP handler causes shutdown."""
def _trigger_kb_intr(_req, _resp):
raise KeyboardInterrupt('simulated test handler keyboard interrupt')
testing_server.wsgi_app.handlers['/kb_intr'] = _trigger_kb_intr

http_conn = test_client.get_connection()
http_conn.putrequest('GET', '/kb_intr', skip_host=True)
http_conn.putheader('Host', http_conn.host)
http_conn.endheaders()
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(
logging.DEBUG,
'^Got a server shutdown request while handling a connection '
r'from .*:\d{1,5} \(simulated test handler keyboard interrupt\)$',
),
(
logging.DEBUG,
'^Setting the server interrupt flag to KeyboardInterrupt'
r"\('simulated test handler keyboard interrupt'\)$",
),
(
logging.INFO,
'^Keyboard Interrupt: shutting down$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


def test_unhandled_exception_in_request_handler(
mocker,
monkeypatch,
test_client,
testing_server,
wsgi_server_thread,
):
"""Ensure worker threads are resilient to in-handler exceptions."""

class SillyMistake(BaseException): # noqa: WPS418, WPS431
"""A simulated crash within an HTTP handler."""

def _trigger_scary_exc(_req, _resp):
raise SillyMistake('simulated unhandled exception πŸ’£ in test handler')

testing_server.wsgi_app.handlers['/scary_exc'] = _trigger_scary_exc

server_connection_close_spy = mocker.spy(
test_client.server_instance.ConnectionClass,
'close',
)

http_conn = test_client.get_connection()
http_conn.putrequest('GET', '/scary_exc', skip_host=True)
http_conn.putheader('Host', http_conn.host)
http_conn.endheaders()

# NOTE: This spy ensure the log entry gets recorded before we're testing
# NOTE: them and before server shutdown, preserving their order and making
# NOTE: the log entry presence non-flaky.
while not server_connection_close_spy.called: # noqa: WPS328
pass

assert len(testing_server.requests._threads) == 10
while testing_server.requests.idle < 10: # noqa: WPS328
pass
assert len(testing_server.requests._threads) == 10
testing_server.interrupt = SystemExit('test requesting shutdown')
assert not testing_server.requests._threads
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(
logging.ERROR,
'^Unhandled error while processing an incoming connection '
'SillyMistake'
r"\('simulated unhandled exception πŸ’£ in test handler'\)$",
),
(
logging.INFO,
'^SystemExit raised: shutting down$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


def test_remains_alive_post_unhandled_exception(
mocker,
monkeypatch,
test_client,
testing_server,
wsgi_server_thread,
):
"""Ensure worker threads are resilient to unhandled exceptions."""

class ScaryCrash(BaseException): # noqa: WPS418, WPS431
"""A simulated crash during HTTP parsing."""

_orig_read_request_line = (
test_client.server_instance.
ConnectionClass.RequestHandlerClass.
read_request_line
)

def _read_request_line(self):
_orig_read_request_line(self)
raise ScaryCrash(666)

monkeypatch.setattr(
test_client.server_instance.ConnectionClass.RequestHandlerClass,
'read_request_line',
_read_request_line,
)

server_connection_close_spy = mocker.spy(
test_client.server_instance.ConnectionClass,
'close',
)

# NOTE: The initial worker thread count is 10.
assert len(testing_server.requests._threads) == 10

test_client.get_connection().send(b'GET / HTTP/1.1')

# NOTE: This spy ensure the log entry gets recorded before we're testing
# NOTE: them and before server shutdown, preserving their order and making
# NOTE: the log entry presence non-flaky.
while not server_connection_close_spy.called: # noqa: WPS328
pass

# NOTE: This checks for whether there's any crashed threads
while testing_server.requests.idle < 10: # noqa: WPS328
pass
assert len(testing_server.requests._threads) == 10
assert all(
worker_thread.is_alive()
for worker_thread in testing_server.requests._threads
)
testing_server.interrupt = SystemExit('test requesting shutdown')
assert not testing_server.requests._threads
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(
logging.ERROR,
'^Unhandled error while processing an incoming connection '
r'ScaryCrash\(666\)$',
),
(
logging.INFO,
'^SystemExit raised: shutting down$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


@pytest.mark.parametrize(
'timeout_before_headers',
(
Expand Down
77 changes: 75 additions & 2 deletions cheroot/workers/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import collections
import logging
import threading
import time
import socket
Expand Down Expand Up @@ -107,14 +108,38 @@ def run(self):
from the inner-layer code constitute a global server interrupt
request. When they happen, the worker thread exits.

:raises BaseException: when an unexpected non-interrupt
exception leaks from the inner layers

# noqa: DAR401 KeyboardInterrupt SystemExit
"""
self.server.stats['Worker Threads'][self.name] = self.stats
self.ready = True
try:
self._process_connections_until_interrupted()
except (KeyboardInterrupt, SystemExit) as ex:
self.server.interrupt = ex
except (KeyboardInterrupt, SystemExit) as interrupt_exc:
interrupt_cause = interrupt_exc.__cause__ or interrupt_exc
self.server.error_log(
f'Setting the server interrupt flag to {interrupt_cause !r}',
level=logging.DEBUG,
)
self.server.interrupt = interrupt_cause
except BaseException as underlying_exc: # noqa: WPS424
# NOTE: This is the last resort logging with the last dying breath
# NOTE: of the worker. It is only reachable when exceptions happen
# NOTE: in the `finally` branch of the internal try/except block.
self.server.error_log(
'A fatal exception happened. Setting the server interrupt flag'
f' to {underlying_exc !r} and giving up.'
'\N{NEW LINE}\N{NEW LINE}'
'Please, report this on the Cheroot tracker at '
'<https://github.com/cherrypy/cheroot/issues/new/choose>, '
'providing a full reproducer with as much context and details as possible.',
level=logging.CRITICAL,
traceback=True,
)
self.server.interrupt = underlying_exc
raise
finally:
self.ready = False

Expand All @@ -123,6 +148,9 @@ def _process_connections_until_interrupted(self):

Retrieves incoming connections from thread pool, processing
them one by one.

:raises SystemExit: on the internal requests to stop the
server instance
"""
while True:
conn = self.server.requests.get()
Expand All @@ -136,7 +164,52 @@ def _process_connections_until_interrupted(self):
keep_conn_open = False
try:
keep_conn_open = conn.communicate()
except ConnectionError as connection_error:
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(
'Got a connection error while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({connection_error !s})',
level=logging.INFO,
)
continue
except (KeyboardInterrupt, SystemExit) as shutdown_request:
# Shutdown request
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(
'Got a server shutdown request while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({shutdown_request !s})',
level=logging.DEBUG,
)
raise SystemExit(
str(shutdown_request),
) from shutdown_request
except BaseException as unhandled_error: # noqa: WPS424
# NOTE: Only a shutdown request should bubble up to the
# NOTE: external cleanup code. Otherwise, this thread dies.
# NOTE: If this were to happen, the threadpool would still
# NOTE: list a dead thread without knowing its state. And
# NOTE: the calling code would fail to schedule processing
# NOTE: of new requests.
self.server.error_log(
'Unhandled error while processing an incoming '
f'connection {unhandled_error !r}',
level=logging.ERROR,
traceback=True,
)
continue # Prevent the thread from dying
finally:
# NOTE: Any exceptions coming from within `finally` may
# NOTE: kill the thread, causing the threadpool to only
# NOTE: contain references to dead threads rendering the
# NOTE: server defunct, effectively meaning a DoS.
# NOTE: Ideally, things called here should process
# NOTE: everything recoverable internally. Any unhandled
# NOTE: errors will bubble up into the outer try/except
# NOTE: block. They will be treated as fatal and turned
# NOTE: into server shutdown requests and then reraised
# NOTE: unconditionally.
if keep_conn_open:
self.server.put_conn(conn)
else:
Expand Down
Loading