Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autoscale background threads for tracer auto batching #382

Merged
merged 39 commits into from
Jan 28, 2024

Conversation

nfcampos
Copy link
Contributor

No description provided.

hinthornw and others added 27 commits January 8, 2024 12:47
- controlled by LANGCHAIN_TRACING_SAMPLING_RATE
- if a POST was allowed then the matching PATCH is allowed too
- applied on both single and batch tracing endpoints
- controlled by LANGCHAIN_TRACING_SAMPLING_RATE
- if a POST was allowed then the matching PATCH is allowed too
- applied on both single and batch tracing endpoints
- if on, starts a background thread that batches inserts/updates
- only applies to insert/updates w/ trace_id and dotted_order
) -> List[TracingQueueItem]:
next_batch: List[TracingQueueItem] = []
try:
while item := tracing_queue.get(block=True, timeout=0.25):
while item := tracing_queue.get(block=block, timeout=0.25):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is pretty confusing, summarizing for my own understanding:

  1. We check the queue to see if anything is there. if not: wait 250 ms more and if nothing is there, return all the accumulated items. if so: move to step to
  2. We pop an item off the queue and add it to the batch
  3. If we hit the limit on batch size, return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a degenerate case where you only get a run added every 200ms that could lead to something like 200ms * 100 = 20s run delays?

How can we make this debuggable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make it so only the first call blocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a reason why I made it like this, if we special case the first item as you suggest, then it ends up creating a number of very small batches whenever requests into the queue are ramping up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a commit which is a compromise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI after some testing this compromise loses a lot of the effectivness of the batching behavior, ie ends up creating too many small batches, I think a delay is needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
):
if next_batch := _tracing_thread_drain_queue(tracing_queue, 100):
print("im looping", tracing_queue.qsize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

for thread in sub_threads:
if not thread.is_alive():
sub_threads.remove(thread)
if tracing_queue.qsize() > 1000:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's put this in a variable...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not put in a var something I really don't want to ever use again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry - in a constant that has a fixed name like _QUEUE_SIZE_TO_SPAWN_NEW_THREAD or something silly like that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

client = client_ref()
if client is None:
return
tracing_queue = client.tracing_queue
assert tracing_queue is not None

sub_threads: List[threading.Thread] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a max size here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)
sub_threads.append(new_thread)
new_thread.start()
if next_batch := _tracing_thread_drain_queue(tracing_queue):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this get assigned to the right thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand the comment, this function runs all in the same thread, so any thing in this function is happening in this one thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah! So the main thread also drains the queue and handles requests alongside the subthreads, which it spawns to do the same thing?

It's weird we have a main thread and a subthread doing the same action, but in different places, but I guess that works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea because I think most uses of this don't need the other threads, can change if we think its better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so theoretically could you round robin dequeue records from all the different threads as they're accumulated? Nothing wrong with that, just thinking through how this works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a single queue shared by all consumer threads. increasing nr of threads just increases the rate at which we drawdown the single queue

Copy link
Collaborator

@hinthornw hinthornw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, but I'm not 100% confident that the queue timeout will behave as desired here

And thenthere's a number of magic numbers. You do a good job of commenting for most of them bt still could be nice to make variables

) -> List[TracingQueueItem]:
next_batch: List[TracingQueueItem] = []
try:
while item := tracing_queue.get(block=True, timeout=0.25):
while item := tracing_queue.get(block=block, timeout=0.25):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we are always setting block=False: is there a reason for it being configurable then? And then it looks like timeout is ignored in this situation? (I'm not 100% sure here) https://docs.python.org/3/library/queue.html#queue.Queue.get

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block defaults to True both in python lib and our code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah i missed the other calls - yupp looks good

- if on, starts a background thread that batches inserts/updates
- only applies to insert/updates w/ trace_id and dotted_order
- after release, bump sdk version here
langchain-ai/langchain#16305

---------

Co-authored-by: William Fu-Hinthorn <[email protected]>
Base automatically changed from nc/19jan/auto-batch-tracing to wfh/batch_run_create January 28, 2024 01:27
Base automatically changed from wfh/batch_run_create to main January 28, 2024 02:00
@hinthornw hinthornw force-pushed the nc/25jan/tracer-auto-batch-auto-scale branch from 265dff2 to fe02f82 Compare January 28, 2024 02:02
@hinthornw hinthornw merged commit 377135f into main Jan 28, 2024
4 checks passed
@hinthornw hinthornw deleted the nc/25jan/tracer-auto-batch-auto-scale branch January 28, 2024 02:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants