how could agent step by step with streaming? #24245
Replies: 1 comment
-
To ensure that tools with dependencies are executed in the correct order using Here is an example of how you can achieve this: async def test_event_stream_with_simple_function_tool() -> None:
"""Test the event stream with a function and tool"""
def foo(x: int) -> dict:
"""Foo"""
return {"x": 5}
@tool
def get_docs(x: int) -> List[Document]:
"""Hello Doc"""
return [Document(page_content="hello")]
chain = RunnableLambda(foo) | get_docs
events = await _collect_events(chain.astream_events({}, version="v2"))
assert events == [
{
"event": "on_chain_start",
"run_id": "",
"parent_ids": [],
"name": "RunnableSequence",
"tags": [],
"metadata": {},
"data": {"input": {}},
},
{
"event": "on_chain_start",
"name": "foo",
"run_id": "",
"parent_ids": [],
"tags": ["seq:step:1"],
"metadata": {},
"data": {},
},
{
"event": "on_chain_stream",
"name": "foo",
"run_id": "",
"parent_ids": [],
"tags": ["seq:step:1"],
"metadata": {},
"data": {"chunk": {"x": 5}},
},
{
"event": "on_chain_end",
"name": "foo",
"run_id": "",
"parent_ids": [],
"tags": ["seq:step:1"],
"metadata": {},
"data": {"input": {}, "output": {"x": 5}},
},
{
"event": "on_tool_start",
"name": "get_docs",
"run_id": "",
"parent_ids": [],
"tags": ["seq:step:2"],
"metadata": {},
"data": {"input": {"x": 5}},
},
{
"event": "on_tool_end",
"name": "get_docs",
"run_id": "",
"parent_ids": [],
"tags": ["seq:step:2"],
"metadata": {},
"data": {"input": {"x": 5}, "output": [Document(page_content="hello")]},
},
{
"event": "on_chain_stream",
"run_id": "",
"parent_ids": [],
"tags": [],
"metadata": {},
"name": "RunnableSequence",
"data": {"chunk": [Document(page_content="hello")]},
},
{
"event": "on_chain_end",
"name": "RunnableSequence",
"run_id": "",
"parent_ids": [],
"tags": [],
"metadata": {},
"data": {"output": [Document(page_content="hello")]},
},
] In this example, the |
Beta Was this translation helpful? Give feedback.
-
Checked other resources
Commit to Help
Example Code
Results
Description
If some of my tools have dependencies, for example: tool3 needs to wait for tool2 to complete before it can be executed. Is there a way to achieve this? Through
astream_event
, I noticed that different tools are executed almost concurrently in an asynchronous manner.I also tried using the .stream method. Although this method can execute tools step by step, the task process is not output in a streaming manner, which is not what I want.
System Info
System Information
Package Information
Packages not installed (Not Necessarily a Problem)
The following packages were not found:
Beta Was this translation helpful? Give feedback.
All reactions