-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Autoscale background threads for tracer auto batching #382
Changes from all commits
c3f82b8
7e96bf2
f9c17ad
0ccc957
fae1b65
82ba901
19d608d
93489b3
099162a
511bcac
787a9a7
763b7d8
fc34146
1f83125
2780421
439e0e4
1f98fbe
3faa5eb
3bd9e6f
64febeb
4f5c793
8658098
a675719
482c13c
5bed311
836648c
8d00547
ddadf9a
08f6735
4538ac9
1645fcd
e9de9f9
7916eec
70ce886
8732a81
79f1aca
0a5adfe
96c1afb
fe02f82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -348,7 +348,7 @@ def __init__( | |
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() | ||
|
||
threading.Thread( | ||
target=_tracing_thread_func, | ||
target=_tracing_control_thread_func, | ||
# arg must be a weakref to self to avoid the Thread object | ||
# preventing garbage collection of the Client object | ||
args=(weakref.ref(self),), | ||
|
@@ -3147,11 +3147,18 @@ def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> | |
|
||
|
||
def _tracing_thread_drain_queue( | ||
tracing_queue: Queue, limit: Optional[int] = None | ||
tracing_queue: Queue, limit: int = 100, block: bool = True | ||
) -> List[TracingQueueItem]: | ||
next_batch: List[TracingQueueItem] = [] | ||
try: | ||
while item := tracing_queue.get(block=True, timeout=0.25): | ||
# wait 250ms for the first item, then | ||
# - drain the queue with a 50ms block timeout | ||
# - stop draining if we hit the limit | ||
# shorter drain timeout is used instead of non-blocking calls to | ||
# avoid creating too many small batches | ||
if item := tracing_queue.get(block=block, timeout=0.25): | ||
next_batch.append(item) | ||
while item := tracing_queue.get(block=block, timeout=0.05): | ||
next_batch.append(item) | ||
if limit and len(next_batch) >= limit: | ||
break | ||
|
@@ -3172,24 +3179,70 @@ def _tracing_thread_handle_batch( | |
tracing_queue.task_done() | ||
|
||
|
||
def _tracing_thread_func(client_ref: weakref.ref[Client]) -> None: | ||
_AUTO_SCALE_UP_QSIZE_TRIGGER = 1000 | ||
_AUTO_SCALE_UP_NTHREADS_LIMIT = 16 | ||
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER = 4 | ||
|
||
|
||
def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: | ||
client = client_ref() | ||
if client is None: | ||
return | ||
tracing_queue = client.tracing_queue | ||
assert tracing_queue is not None | ||
|
||
sub_threads: List[threading.Thread] = [] | ||
|
||
# loop until | ||
while ( | ||
# the main thread dies | ||
threading.main_thread().is_alive() | ||
# or we're the only remaining reference to the client | ||
and sys.getrefcount(client) > 3 | ||
and sys.getrefcount(client) > 3 + len(sub_threads) | ||
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached | ||
): | ||
if next_batch := _tracing_thread_drain_queue(tracing_queue, 100): | ||
for thread in sub_threads: | ||
if not thread.is_alive(): | ||
sub_threads.remove(thread) | ||
if ( | ||
len(sub_threads) < _AUTO_SCALE_UP_NTHREADS_LIMIT | ||
and tracing_queue.qsize() > _AUTO_SCALE_UP_QSIZE_TRIGGER | ||
): | ||
new_thread = threading.Thread( | ||
target=_tracing_sub_thread_func, args=(weakref.ref(client),) | ||
) | ||
sub_threads.append(new_thread) | ||
new_thread.start() | ||
if next_batch := _tracing_thread_drain_queue(tracing_queue): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this get assigned to the right thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure I understand the comment, this function runs all in the same thread, so any thing in this function is happening in this one thread There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah! So the main thread also drains the queue and handles requests alongside the subthreads, which it spawns to do the same thing? It's weird we have a main thread and a subthread doing the same action, but in different places, but I guess that works There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea because I think most uses of this don't need the other threads, can change if we think its better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so theoretically could you round robin dequeue records from all the different threads as they're accumulated? Nothing wrong with that, just thinking through how this works There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is a single queue shared by all consumer threads. increasing nr of threads just increases the rate at which we drawdown the single queue |
||
_tracing_thread_handle_batch(client, tracing_queue, next_batch) | ||
|
||
# drain the queue on exit | ||
while next_batch := _tracing_thread_drain_queue(tracing_queue, block=False): | ||
_tracing_thread_handle_batch(client, tracing_queue, next_batch) | ||
|
||
|
||
def _tracing_sub_thread_func(client_ref: weakref.ref[Client]) -> None: | ||
client = client_ref() | ||
if client is None: | ||
return | ||
tracing_queue = client.tracing_queue | ||
assert tracing_queue is not None | ||
|
||
seen_successive_empty_queues = 0 | ||
|
||
# loop until | ||
while ( | ||
# the main thread dies | ||
threading.main_thread().is_alive() | ||
# or we've seen the queue empty 4 times in a row | ||
and seen_successive_empty_queues <= _AUTO_SCALE_DOWN_NEMPTY_TRIGGER | ||
): | ||
if next_batch := _tracing_thread_drain_queue(tracing_queue): | ||
seen_successive_empty_queues = 0 | ||
_tracing_thread_handle_batch(client, tracing_queue, next_batch) | ||
else: | ||
seen_successive_empty_queues += 1 | ||
|
||
# drain the queue on exit | ||
while next_batch := _tracing_thread_drain_queue(tracing_queue, 100): | ||
while next_batch := _tracing_thread_drain_queue(tracing_queue, block=False): | ||
_tracing_thread_handle_batch(client, tracing_queue, next_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a max size here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done