-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Autoscale background threads for tracer auto batching #382
Conversation
- controlled by LANGCHAIN_TRACING_SAMPLING_RATE - if a POST was allowed then the matching PATCH is allowed too - applied on both single and batch tracing endpoints
Co-authored-by: William FH <[email protected]>
Co-authored-by: William FH <[email protected]>
- controlled by LANGCHAIN_TRACING_SAMPLING_RATE - if a POST was allowed then the matching PATCH is allowed too - applied on both single and batch tracing endpoints
- if on, starts a background thread that batches inserts/updates - only applies to insert/updates w/ trace_id and dotted_order
python/langsmith/client.py
Outdated
) -> List[TracingQueueItem]: | ||
next_batch: List[TracingQueueItem] = [] | ||
try: | ||
while item := tracing_queue.get(block=True, timeout=0.25): | ||
while item := tracing_queue.get(block=block, timeout=0.25): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is pretty confusing, summarizing for my own understanding:
- We check the queue to see if anything is there. if not: wait 250 ms more and if nothing is there, return all the accumulated items. if so: move to step to
- We pop an item off the queue and add it to the batch
- If we hit the limit on batch size, return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a degenerate case where you only get a run added every 200ms that could lead to something like 200ms * 100 = 20s run delays?
How can we make this debuggable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can make it so only the first call blocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a reason why I made it like this, if we special case the first item as you suggest, then it ends up creating a number of very small batches whenever requests into the queue are ramping up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a commit which is a compromise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI after some testing this compromise loses a lot of the effectivness of the batching behavior, ie ends up creating too many small batches, I think a delay is needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
python/langsmith/client.py
Outdated
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached | ||
): | ||
if next_batch := _tracing_thread_drain_queue(tracing_queue, 100): | ||
print("im looping", tracing_queue.qsize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:)
python/langsmith/client.py
Outdated
for thread in sub_threads: | ||
if not thread.is_alive(): | ||
sub_threads.remove(thread) | ||
if tracing_queue.qsize() > 1000: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's put this in a variable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not put in a var something I really don't want to ever use again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry - in a constant that has a fixed name like _QUEUE_SIZE_TO_SPAWN_NEW_THREAD
or something silly like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
client = client_ref() | ||
if client is None: | ||
return | ||
tracing_queue = client.tracing_queue | ||
assert tracing_queue is not None | ||
|
||
sub_threads: List[threading.Thread] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a max size here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
) | ||
sub_threads.append(new_thread) | ||
new_thread.start() | ||
if next_batch := _tracing_thread_drain_queue(tracing_queue): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this get assigned to the right thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I understand the comment, this function runs all in the same thread, so any thing in this function is happening in this one thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah! So the main thread also drains the queue and handles requests alongside the subthreads, which it spawns to do the same thing?
It's weird we have a main thread and a subthread doing the same action, but in different places, but I guess that works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea because I think most uses of this don't need the other threads, can change if we think its better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so theoretically could you round robin dequeue records from all the different threads as they're accumulated? Nothing wrong with that, just thinking through how this works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a single queue shared by all consumer threads. increasing nr of threads just increases the rate at which we drawdown the single queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense, but I'm not 100% confident that the queue timeout will behave as desired here
And thenthere's a number of magic numbers. You do a good job of commenting for most of them bt still could be nice to make variables
python/langsmith/client.py
Outdated
) -> List[TracingQueueItem]: | ||
next_batch: List[TracingQueueItem] = [] | ||
try: | ||
while item := tracing_queue.get(block=True, timeout=0.25): | ||
while item := tracing_queue.get(block=block, timeout=0.25): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we are always setting block=False: is there a reason for it being configurable then? And then it looks like timeout
is ignored in this situation? (I'm not 100% sure here) https://docs.python.org/3/library/queue.html#queue.Queue.get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
block defaults to True both in python lib and our code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah i missed the other calls - yupp looks good
- if on, starts a background thread that batches inserts/updates - only applies to insert/updates w/ trace_id and dotted_order - after release, bump sdk version here langchain-ai/langchain#16305 --------- Co-authored-by: William Fu-Hinthorn <[email protected]>
265dff2
to
fe02f82
Compare
No description provided.