Replies: 6 comments 17 replies
-
To signal to LangChain that a tool has already fulfilled its role and it can stop processing, you can use the Here is an example of how you can yield an async def _aiter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> AsyncIterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""Take a single step in the thought-action-observation loop.
Override this to take control of how the agent makes and acts on choices.
"""
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = await self.agent.aplan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
# Handle parsing errors
# ...
return
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
for agent_action in actions:
yield agent_action
# Use asyncio.gather to run multiple tool.arun() calls concurrently
result = await asyncio.gather(
*[
self._aperform_agent_action(
name_to_tool_map, color_mapping, agent_action, run_manager
)
for agent_action in actions
],
)
# Yield each result as it becomes available
for chunk in result:
yield chunk In this example, if the Additionally, in the def on_agent_finish(
self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
) -> None:
if self._current_thought is not None:
self._current_thought.complete(
self._thought_labeler.get_final_agent_thought_label()
)
self._current_thought = None This method updates the current thought to indicate completion and ensures that LangChain stops further processing [1][2]. |
Beta Was this translation helpful? Give feedback.
-
Hello Dosu!
Thank you very much for your quick response!
Unfortunately, I was not able to understand the answer.
I am not using a Streamlit application. I am running an application on a
FastAPI server. My application streams to a client via WSS.
In this case, how do I implement this solution?
Thank you once again.
Em qui., 27 de jun. de 2024 às 20:06, dosubot[bot] ***@***.***>
escreveu:
… To signal to LangChain that a tool has already fulfilled its role and it
can stop processing, you can use the AgentFinish class. When the tool's
response is sent via streaming through a WebSocket callback and does not
return anything, you can yield an AgentFinish instance to indicate that
the processing should stop.
Here is an example of how you can yield an AgentFinish instance:
async def _aiter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> AsyncIterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = await self.agent.aplan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
# Handle parsing errors
# ...
return
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
for agent_action in actions:
yield agent_action
# Use asyncio.gather to run multiple tool.arun() calls concurrently
result = await asyncio.gather(
*[
self._aperform_agent_action(
name_to_tool_map, color_mapping, agent_action, run_manager
)
for agent_action in actions
],
)
# Yield each result as it becomes available
for chunk in result:
yield chunk
In this example, if the output is an instance of AgentFinish, it is
yielded to indicate that the processing should stop. This approach ensures
that LangChain knows the tool has completed its task and can stop further
processing.
Additionally, in the StreamlitCallbackHandler class, the on_agent_finish
method is used to handle the completion of an agent's task. This method can
be used to signal the end of processing:
def on_agent_finish(
self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
) -> None:
if self._current_thought is not None:
self._current_thought.complete(
self._thought_labeler.get_final_agent_thought_label()
)
self._current_thought = None
This method updates the current thought to indicate completion and ensures
that LangChain stops further processing [1]
<https://github.com/langchain-ai/langchain/blob/master/libs/langchain/langchain/agents/agent.py>
[2]
<https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/callbacks/streamlit/streamlit_callback_handler.py>
.
*To continue talking to Dosu <https://dosu.dev>, mention @dosu
<https://github.com/dosu>.*
—
Reply to this email directly, view it on GitHub
<#23612 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAGFVVRPBF7KXEL73UNZUHLZJSLH5AVCNFSM6AAAAABKAZUOYOVHI2DSMVQWIX3LMV43SRDJONRXK43TNFXW4Q3PNVWWK3TUHM4TQOJZGA4TE>
.
You are receiving this because you authored the thread.Message ID:
***@***.***
com>
|
Beta Was this translation helpful? Give feedback.
-
Hello @dosu! Thank you very much for your quick response! Unfortunately, I was not able to understand the answer. I am not using a Streamlit application. I am running an application on a FastAPI server. My application streams to a client via WSS. In this case, how do I implement this solution? Thank you once again. |
Beta Was this translation helpful? Give feedback.
-
Hello @dosu! Thank you very much! Could you please explain how I can make my agent handle the return from the AgentFinish instance and stop execution? I create my agent like this: self.llm_with_tools = self.llm.bind_tools(self.tools)
agent = create_tool_calling_agent(llm=self.llm_with_tools, tools=self.tools, prompt=myprompt)
resultado = await agent_executor.ainvoke(context, return_only_outputs=True)
agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
memory=memory,
max_iterations=35,
max_execution_time=90,
early_stopping_method='force',
verbose=True,
handle_parsing_errors=True,
return_intermediate_steps=False,
callbacks=chat.callbacks
).with_config({"run_name": "Agent"}) Because by doing just this, it keeps calling the tool in a loop: @tool
async def tool_responde_perguntas(pergunta_completa: str):
"""
This tool is useful for answering questions posed by the user.
"""
#chat_agent = get_custom_data("chat_agent")
websocket = chat_agent.websocket
chat = ChatRespondePergunta(chat_agent.app_keys, chat_agent.app_params, websocket)
answer = await chat.chat_responde_pergunta(pergunta_completa)
if websocket is None:
return answer
else:
return AgentFinish(
return_values={"status": "success"},
log="Tool execution completed."
) Thank you very much! |
Beta Was this translation helpful? Give feedback.
-
Hi @dosu! Thank you again! I don't know if I understood correctly, but will I have to change the source code of LangChain files? How do I do this? Isn't there a way to do this through a callback? Thank you! |
Beta Was this translation helpful? Give feedback.
-
Hey @dosu, this didn't work. The tool continues to be called indefinitely. It doesn't call the Thank you! |
Beta Was this translation helpful? Give feedback.
-
Checked other resources
Commit to Help
Example Code
Description
I am developing a multi-agent ChatBot using create_tool_calling_agent.
My application has dozens of tools, and one of them has a special behavior that I am having difficulty dealing with, which is the reason for my question: it returns the response via streaming through a regular WebSocket callback.
Since the tool's response is sent via streaming, it does not return anything, and thus LangChain understands that the tool needs to be called again, leading it to be executed repeatedly until the maximum limit of max_iterations is reached.
My question is, how can I signal to LangChain that the tool has already fulfilled its role, everything went well, and it can consider the mission accomplished and stop processing?
As this is a conceptual question, I have not included any example code.
Thank you!
System Info
As this is a conceptual question, I have not included anything here.
Beta Was this translation helpful? Give feedback.
All reactions