Skip to content

Commit d1b81ba

Browse files
authored
Merge pull request #156 from pguz/queue_overflow_handler
Add queue overflow handler in asyncsender.
2 parents eb68481 + 478bd02 commit d1b81ba

File tree

2 files changed

+87
-1
lines changed

2 files changed

+87
-1
lines changed

Diff for: fluent/asyncsender.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self,
5555
msgpack_kwargs=None,
5656
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
5757
queue_circular=DEFAULT_QUEUE_CIRCULAR,
58+
queue_overflow_handler=None,
5859
**kwargs):
5960
"""
6061
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
@@ -66,6 +67,10 @@ def __init__(self,
6667
**kwargs)
6768
self._queue_maxsize = queue_maxsize
6869
self._queue_circular = queue_circular
70+
if queue_circular and queue_overflow_handler:
71+
self._queue_overflow_handler = queue_overflow_handler
72+
else:
73+
self._queue_overflow_handler = self._queue_overflow_handler_default
6974

7075
self._thread_guard = threading.Event() # This ensures visibility across all variables
7176
self._closed = False
@@ -109,9 +114,11 @@ def _send(self, bytes_):
109114
if self._queue_circular and self._queue.full():
110115
# discard oldest
111116
try:
112-
self._queue.get(block=False)
117+
discarded_bytes = self._queue.get(block=False)
113118
except Empty: # pragma: no cover
114119
pass
120+
else:
121+
self._queue_overflow_handler(discarded_bytes)
115122
try:
116123
self._queue.put(bytes_, block=(not self._queue_circular))
117124
except Full: # pragma: no cover
@@ -132,5 +139,8 @@ def _send_loop(self):
132139
finally:
133140
self._close()
134141

142+
def _queue_overflow_handler_default(self, discarded_bytes):
143+
pass
144+
135145
def __exit__(self, exc_type, exc_val, exc_tb):
136146
self.close()

Diff for: tests/test_asynchandler.py

+76
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@
44
import sys
55
import unittest
66

7+
try:
8+
from unittest import mock
9+
except ImportError:
10+
import mock
11+
try:
12+
from unittest.mock import patch
13+
except ImportError:
14+
from mock import patch
15+
16+
17+
718
import fluent.asynchandler
819
import fluent.handler
920
from tests import mockserver
@@ -309,3 +320,68 @@ def test_simple(self):
309320
eq('userB', el[2]['to'])
310321
self.assertTrue(el[1])
311322
self.assertTrue(isinstance(el[1], int))
323+
324+
325+
class QueueOverflowException(BaseException):
326+
pass
327+
328+
329+
def queue_overflow_handler(discarded_bytes):
330+
raise QueueOverflowException(discarded_bytes)
331+
332+
333+
class TestHandlerWithCircularQueueHandler(unittest.TestCase):
334+
Q_SIZE = 1
335+
336+
def setUp(self):
337+
super(TestHandlerWithCircularQueueHandler, self).setUp()
338+
self._server = mockserver.MockRecvServer('localhost')
339+
self._port = self._server.port
340+
341+
def tearDown(self):
342+
self._server.close()
343+
344+
def get_handler_class(self):
345+
# return fluent.handler.FluentHandler
346+
return fluent.asynchandler.FluentHandler
347+
348+
def test_simple(self):
349+
handler = self.get_handler_class()('app.follow', port=self._port,
350+
queue_maxsize=self.Q_SIZE,
351+
queue_circular=True,
352+
queue_overflow_handler=queue_overflow_handler)
353+
with handler:
354+
def custom_full_queue():
355+
handler.sender._queue.put(b'Mock', block=True)
356+
return True
357+
358+
with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)):
359+
self.assertEqual(handler.sender.queue_circular, True)
360+
self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE)
361+
362+
logging.basicConfig(level=logging.INFO)
363+
log = logging.getLogger('fluent.test')
364+
handler.setFormatter(fluent.handler.FluentRecordFormatter())
365+
log.addHandler(handler)
366+
367+
exc_counter = 0
368+
369+
try:
370+
log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'})
371+
except QueueOverflowException:
372+
exc_counter += 1
373+
374+
try:
375+
log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'})
376+
except QueueOverflowException:
377+
exc_counter += 1
378+
379+
try:
380+
log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'})
381+
except QueueOverflowException:
382+
exc_counter += 1
383+
384+
# we can't be sure to have exception in every case due to multithreading,
385+
# so we can test only for a cautelative condition here
386+
print('Exception raised: {} (expected 3)'.format(exc_counter))
387+
assert exc_counter >= 0

0 commit comments

Comments
 (0)