-
Notifications
You must be signed in to change notification settings - Fork 16.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core[patch]: Replace memory stream implementation used by LogStreamCallbackHandler #17185
Merged
Merged
Changes from 4 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
"""Module implements a memory stream for communication between two co-routines. | ||
|
||
This module provides a way to communicate between two co-routines using a memory | ||
channel. The writer and reader can be in the same event loop or in different event | ||
loops. When they're in different event loops, they will also be in different | ||
threads. | ||
|
||
This is useful in situations when there's a mix of synchronous and asynchronous | ||
used in the code. | ||
""" | ||
import asyncio | ||
from asyncio import AbstractEventLoop, Queue | ||
from typing import AsyncIterator, Generic, TypeVar | ||
|
||
|
||
class ClosedResourceError(Exception): | ||
"""Raised when trying to use a resource that has been closed.""" | ||
|
||
|
||
T = TypeVar("T") | ||
|
||
|
||
class _SendStream(Generic[T]): | ||
def __init__( | ||
self, reader_loop: AbstractEventLoop, queue: Queue, done: object | ||
) -> None: | ||
"""Create a writer for the queue and done object. | ||
|
||
Args: | ||
reader_loop: The event loop to use for the writer. This loop will be used | ||
to schedule the writes to the queue. | ||
queue: The queue to write to. This is an asyncio queue. | ||
done: Special sentinel object to indicate that the writer is done. | ||
""" | ||
self._reader_loop = reader_loop | ||
self._queue = queue | ||
self._done = done | ||
|
||
async def send(self, item: T) -> None: | ||
"""Schedule the item to be written to the queue using the original loop.""" | ||
return self.send_nowait(item) | ||
|
||
def send_nowait(self, item: T) -> None: | ||
"""Schedule the item to be written to the queue using the original loop.""" | ||
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item) | ||
|
||
async def aclose(self) -> None: | ||
"""Schedule the done object write the queue using the original loop.""" | ||
return self.close() | ||
|
||
def close(self) -> None: | ||
"""Schedule the done object write the queue using the original loop.""" | ||
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done) | ||
|
||
|
||
class _ReceiveStream(Generic[T]): | ||
def __init__(self, queue: Queue, done: object) -> None: | ||
"""Create a reader for the queue and done object. | ||
|
||
This reader should be used in the same loop as the loop that was passed | ||
to the channel. | ||
""" | ||
self._queue = queue | ||
self._done = done | ||
self._is_closed = False | ||
|
||
async def __aiter__(self) -> AsyncIterator[T]: | ||
while True: | ||
if self._is_closed: | ||
raise ClosedResourceError("Reader is closed. Cannot read from it.") | ||
eyurtsev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
item = await self._queue.get() | ||
if item is self._done: | ||
self._is_closed = True | ||
break | ||
yield item | ||
|
||
|
||
class _MemoryStream(Generic[T]): | ||
"""Stream data from a writer to a reader even if they are in different threads. | ||
|
||
Uses asyncio queues to communicate between two co-routines. This implementation | ||
should work even if the writer and reader co-routines belong to two different | ||
event loops (e.g. one running from an event loop in the main thread | ||
and the other running in an event loop in a background thread). | ||
|
||
This implementation is meant to be used with a single writer and a single reader. | ||
|
||
This is an internal implementation to LangChain please do not use it directly. | ||
""" | ||
|
||
def __init__(self, loop: AbstractEventLoop) -> None: | ||
"""Create a channel for the given loop. | ||
|
||
Args: | ||
loop: The event loop to use for the channel. The reader is assumed | ||
to be running in the same loop as the one passed to this constructor. | ||
This will NOT be validated at run time. | ||
""" | ||
self._loop = loop | ||
self._queue: asyncio.Queue = asyncio.Queue(maxsize=0) | ||
self._done = object() | ||
|
||
def get_send_stream(self) -> _SendStream[T]: | ||
"""Get a writer for the channel.""" | ||
return _SendStream[T]( | ||
reader_loop=self._loop, queue=self._queue, done=self._done | ||
) | ||
|
||
def get_receive_stream(self) -> _ReceiveStream[T]: | ||
"""Get a reader for the channel.""" | ||
return _ReceiveStream[T](queue=self._queue, done=self._done) |
132 changes: 132 additions & 0 deletions
132
libs/core/tests/unit_tests/tracers/test_memory_stream.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
import asyncio | ||
import math | ||
import time | ||
from concurrent.futures import ThreadPoolExecutor | ||
from typing import AsyncIterator | ||
|
||
import pytest | ||
|
||
from langchain_core.tracers.memory_stream import ClosedResourceError, _MemoryStream | ||
|
||
|
||
async def test_same_event_loop() -> None: | ||
"""Test that the memory stream works when the same event loop is used. | ||
|
||
This is the easy case. | ||
""" | ||
reader_loop = asyncio.get_event_loop() | ||
channel = _MemoryStream[dict](reader_loop) | ||
writer = channel.get_send_stream() | ||
reader = channel.get_receive_stream() | ||
|
||
async def producer() -> None: | ||
"""Produce items with slight delay.""" | ||
tic = time.time() | ||
for i in range(3): | ||
await asyncio.sleep(0.10) | ||
toc = time.time() | ||
await writer.send( | ||
{ | ||
"item": i, | ||
"produce_time": toc - tic, | ||
} | ||
) | ||
await writer.aclose() | ||
|
||
async def consumer() -> AsyncIterator[dict]: | ||
tic = time.time() | ||
async for item in reader: | ||
toc = time.time() | ||
yield { | ||
"receive_time": toc - tic, | ||
**item, | ||
} | ||
|
||
asyncio.create_task(producer()) | ||
|
||
items = [item async for item in consumer()] | ||
|
||
for item in items: | ||
delta_time = item["receive_time"] - item["produce_time"] | ||
# Allow a generous 10ms of delay | ||
# The test is meant to verify that the producer and consumer are running in | ||
# parallel despite the fact that the producer is running from another thread. | ||
# abs_tol is used to allow for some delay in the producer and consumer | ||
# due to overhead. | ||
# To verify that the producer and consumer are running in parallel, we | ||
# expect the delta_time to be smaller than the sleep delay in the producer | ||
# * # of items = 30 ms | ||
assert ( | ||
math.isclose(delta_time, 0, abs_tol=0.010) is True | ||
), f"delta_time: {delta_time}" | ||
|
||
|
||
async def test_queue_for_streaming_via_sync_call() -> None: | ||
"""Test via async -> sync -> async path.""" | ||
reader_loop = asyncio.get_event_loop() | ||
channel = _MemoryStream[dict](reader_loop) | ||
writer = channel.get_send_stream() | ||
reader = channel.get_receive_stream() | ||
|
||
async def producer() -> None: | ||
"""Produce items with slight delay.""" | ||
tic = time.time() | ||
for i in range(3): | ||
await asyncio.sleep(0.10) | ||
toc = time.time() | ||
await writer.send( | ||
{ | ||
"item": i, | ||
"produce_time": toc - tic, | ||
} | ||
) | ||
await writer.aclose() | ||
|
||
def sync_call() -> None: | ||
"""Blocking sync call.""" | ||
asyncio.run(producer()) | ||
|
||
async def consumer() -> AsyncIterator[dict]: | ||
tic = time.time() | ||
async for item in reader: | ||
toc = time.time() | ||
yield { | ||
"receive_time": toc - tic, | ||
**item, | ||
} | ||
|
||
with ThreadPoolExecutor() as executor: | ||
executor.submit(sync_call) | ||
items = [item async for item in consumer()] | ||
|
||
for item in items: | ||
delta_time = item["receive_time"] - item["produce_time"] | ||
# Allow a generous 10ms of delay | ||
# The test is meant to verify that the producer and consumer are running in | ||
# parallel despite the fact that the producer is running from another thread. | ||
# abs_tol is used to allow for some delay in the producer and consumer | ||
# due to overhead. | ||
# To verify that the producer and consumer are running in parallel, we | ||
# expect the delta_time to be smaller than the sleep delay in the producer | ||
# * # of items = 30 ms | ||
assert ( | ||
math.isclose(delta_time, 0, abs_tol=0.010) is True | ||
), f"delta_time: {delta_time}" | ||
|
||
|
||
async def test_closed_resource_exception() -> None: | ||
reader_loop = asyncio.get_event_loop() | ||
channel = _MemoryStream[str](reader_loop) | ||
writer = channel.get_send_stream() | ||
reader = channel.get_receive_stream() | ||
await writer.aclose() | ||
# OK to use once since resource is available at this point | ||
# once we start iterating over the reader, we will get the `done` item | ||
# and mark the stream as closed | ||
async for _ in reader: | ||
pass | ||
|
||
# Now stream is closed, so the second time should raise an exception | ||
with pytest.raises(ClosedResourceError): | ||
async for _ in reader: | ||
pass |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this syntax with square brackets doesn't exist in older py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm i think it's there for classes? I think the issue originally was that the anyio was using it with a function rather than a class.
Evidence is that all unit tests pass