Skip to content

Commit 55d08a6

Browse files
authored
Merge pull request #416 from rollbar/added-thread-pool
2 parents f95076d + e14af00 commit 55d08a6

File tree

4 files changed

+112
-25
lines changed

4 files changed

+112
-25
lines changed

Diff for: .github/workflows/ci.yml

+22-22
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,28 @@ jobs:
205205
- name: Install dependencies
206206
run: pip install setuptools==39.2.0 --force-reinstall
207207

208+
- name: Install Python 2 dependencies
209+
if: ${{ contains(matrix.python-version, '2.7') }}
210+
# certifi dropped support for Python 2 in 2020.4.5.2 but only started
211+
# using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with
212+
# Python 2 support.
213+
run: pip install certifi==2021.10.8 requests==2.27.1 incremental==21.3.0
214+
215+
- name: Install Python 3.4 dependencies
216+
if: ${{ contains(matrix.python-version, '3.4') }}
217+
# certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18
218+
run: pip install certifi==2021.10.8 "typing-extensions<4" incremental==21.3.0
219+
220+
- name: Install Python 3.5 dependencies
221+
if: ${{ contains(matrix.python-version, '3.5') }}
222+
# typing-extensions dropped support for Python 3.5 in version 4
223+
run: pip install "typing-extensions<4"
224+
225+
- name: Install Python 3.6 dependencies
226+
if: ${{ contains(matrix.python-version, '3.6') }}
227+
# typing-extensions dropped support for Python 3.6 in version 4.2
228+
run: pip install "typing-extensions<4.2"
229+
208230
- name: Set the framework
209231
run: echo ${{ matrix.framework }} >> $GITHUB_ENV
210232

@@ -232,27 +254,5 @@ jobs:
232254
if: ${{ contains(matrix.framework, 'FASTAPI_VERSION') }}
233255
run: pip install fastapi==$FASTAPI_VERSION
234256

235-
- name: Install Python 2 dependencies
236-
if: ${{ contains(matrix.python-version, '2.7') }}
237-
# certifi dropped support for Python 2 in 2020.4.5.2 but only started
238-
# using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with
239-
# Python 2 support.
240-
run: pip install certifi==2021.10.8 requests==2.27.1
241-
242-
- name: Install Python 3.4 dependencies
243-
if: ${{ contains(matrix.python-version, '3.4') }}
244-
# certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18
245-
run: pip install certifi==2021.10.8 "typing-extensions<4"
246-
247-
- name: Install Python 3.5 dependencies
248-
if: ${{ contains(matrix.python-version, '3.5') }}
249-
# typing-extensions dropped support for Python 3.5 in version 4
250-
run: pip install "typing-extensions<4"
251-
252-
- name: Install Python 3.6 dependencies
253-
if: ${{ contains(matrix.python-version, '3.6') }}
254-
# typing-extensions dropped support for Python 3.6 in version 4.2
255-
run: pip install "typing-extensions<4.2"
256-
257257
- name: Run tests
258258
run: python setup.py test

Diff for: rollbar/__init__.py

+26-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from rollbar.lib import events, filters, dict_merge, parse_qs, text, transport, urljoin, iteritems, defaultJSONEncode
2424

2525

26-
__version__ = '0.16.3'
26+
__version__ = '0.16.4beta'
2727
__log_name__ = 'rollbar'
2828
log = logging.getLogger(__log_name__)
2929

@@ -124,7 +124,7 @@ def wrap(*args, **kwargs):
124124
from twisted.internet.ssl import CertificateOptions
125125
from twisted.internet import task, defer, ssl, reactor
126126
from zope.interface import implementer
127-
127+
128128
@implementer(IPolicyForHTTPS)
129129
class VerifyHTTPS(object):
130130
def __init__(self):
@@ -275,7 +275,12 @@ def _get_fastapi_request():
275275
'root': None, # root path to your code
276276
'branch': None, # git branch name
277277
'code_version': None,
278-
'handler': 'default', # 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted' or 'httpx'
278+
# 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted', 'httpx' or 'thread_pool'
279+
# 'async' requires Python 3.4 or higher.
280+
# 'httpx' requires Python 3.7 or higher.
281+
# 'thread_pool' requires Python 3.2 or higher.
282+
'handler': 'default',
283+
'thread_pool_workers': None,
279284
'endpoint': DEFAULT_ENDPOINT,
280285
'timeout': DEFAULT_TIMEOUT,
281286
'agent.log_file': 'log.rollbar',
@@ -383,6 +388,9 @@ def init(access_token, environment='production', scrub_fields=None, url_fields=N
383388

384389
if SETTINGS.get('handler') == 'agent':
385390
agent_log = _create_agent_log()
391+
elif SETTINGS.get('handler') == 'thread_pool':
392+
from rollbar.lib.thread_pool import init_pool
393+
init_pool(SETTINGS.get('thread_pool_workers', None))
386394

387395
if not SETTINGS['locals']['safelisted_types'] and SETTINGS['locals']['whitelisted_types']:
388396
warnings.warn('whitelisted_types deprecated use safelisted_types instead', DeprecationWarning)
@@ -523,6 +531,7 @@ def send_payload(payload, access_token):
523531
- 'gae': calls _send_payload_appengine() (which makes a blocking call to Google App Engine)
524532
- 'twisted': calls _send_payload_twisted() (which makes an async HTTP request using Twisted and Treq)
525533
- 'httpx': calls _send_payload_httpx() (which makes an async HTTP request using HTTPX)
534+
- 'thread_pool': uses a pool of worker threads to make HTTP requests off the main thread. Returns immediately.
526535
"""
527536
payload = events.on_payload(payload)
528537
if payload is False:
@@ -569,6 +578,8 @@ def send_payload(payload, access_token):
569578
_send_payload_async(payload_str, access_token)
570579
elif handler == 'thread':
571580
_send_payload_thread(payload_str, access_token)
581+
elif handler == 'thread_pool':
582+
_send_payload_thread_pool(payload_str, access_token)
572583
else:
573584
# default to 'thread'
574585
_send_payload_thread(payload_str, access_token)
@@ -1510,6 +1521,18 @@ def _send_payload_thread(payload_str, access_token):
15101521
thread.start()
15111522

15121523

1524+
def _send_payload_pool(payload_str, access_token):
1525+
try:
1526+
_post_api('item/', payload_str, access_token=access_token)
1527+
except Exception as e:
1528+
log.exception('Exception while posting item %r', e)
1529+
1530+
1531+
def _send_payload_thread_pool(payload_str, access_token):
1532+
from rollbar.lib.thread_pool import submit
1533+
submit(_send_payload_pool, payload_str, access_token)
1534+
1535+
15131536
def _send_payload_appengine(payload_str, access_token):
15141537
try:
15151538
_post_api_appengine('item/', payload_str, access_token=access_token)

Diff for: rollbar/lib/thread_pool.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import logging
2+
import os
3+
import sys
4+
from concurrent.futures import ThreadPoolExecutor
5+
6+
_pool = None # type: ThreadPoolExecutor|None
7+
8+
log = logging.getLogger(__name__)
9+
10+
11+
def init_pool(max_workers):
12+
"""
13+
Creates the thread pool with the max workers.
14+
15+
:type max_workers: int|None
16+
:param max_workers: If max_workers is None it will use the logic from the standard library to calculate the number
17+
of threads. However, we ported the logic from Python 3.5 to earlier versions.
18+
"""
19+
if max_workers is None and sys.version_info < (3, 5):
20+
max_workers = (os.cpu_count() or 1) * 5
21+
22+
global _pool
23+
_pool = ThreadPoolExecutor(max_workers)
24+
25+
26+
def submit(worker, payload_str, access_token):
27+
"""
28+
Submit a new task to the thread pool.
29+
30+
:type worker: function
31+
:type payload_str: str
32+
:type access_token: str
33+
"""
34+
global _pool
35+
if _pool is None:
36+
log.warning('pyrollbar: Thead pool not initialized. Please ensure init_pool() is called prior to submit().')
37+
return
38+
_pool.submit(worker, payload_str, access_token)

Diff for: rollbar/test/test_rollbar.py

+26
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,32 @@ def _raise():
10351035

10361036
send_payload_httpx.assert_called_once()
10371037

1038+
@unittest.skipUnless(sys.version_info >= (3, 6), 'assert_called_once support requires Python3.6+')
1039+
@mock.patch('rollbar._send_payload_thread_pool')
1040+
def test_thread_pool_handler(self, send_payload_thread_pool):
1041+
def _raise():
1042+
try:
1043+
raise Exception('foo')
1044+
except:
1045+
rollbar.report_exc_info()
1046+
rollbar.SETTINGS['handler'] = 'thread_pool'
1047+
_raise()
1048+
1049+
send_payload_thread_pool.assert_called_once()
1050+
1051+
@unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+')
1052+
def test_thread_pool_submit(self):
1053+
from rollbar.lib.thread_pool import init_pool, submit
1054+
init_pool(1)
1055+
ran = {'nope': True} # dict used so it is not shadowed in run
1056+
1057+
def run(payload_str, access_token):
1058+
ran['nope'] = False
1059+
1060+
submit(run, 'foo', 'bar')
1061+
self.assertFalse(ran['nope'])
1062+
1063+
10381064
@mock.patch('rollbar.send_payload')
10391065
def test_args_constructor(self, send_payload):
10401066

0 commit comments

Comments
 (0)