From e6424de4368e684cb5560002f565493b14363760 Mon Sep 17 00:00:00 2001 From: Oleksii Babych <61119565+olbychos@users.noreply.github.com> Date: Tue, 19 Nov 2024 15:20:25 +0100 Subject: [PATCH] feat: update streaming for steps and tokens (#53) --- dynamiq/nodes/agents/base.py | 56 +++++++++++---- dynamiq/nodes/agents/react.py | 70 ++++++++++++------- dynamiq/nodes/agents/reflection.py | 12 +++- dynamiq/types/streaming.py | 4 +- examples/agents/use_streaming.py | 2 +- .../orchestrator/app.py | 2 +- .../orchestrator/backend.py | 2 +- .../use_case_streaming_agents/react/app.py | 5 +- .../react/backend.py | 7 +- .../reflection/app.py | 2 +- .../reflection/backend.py | 2 +- .../use_case_streaming_agents/simple/app.py | 6 +- .../simple/backend.py | 13 ++-- 13 files changed, 126 insertions(+), 57 deletions(-) diff --git a/dynamiq/nodes/agents/base.py b/dynamiq/nodes/agents/base.py index f2ca3d2b..68f96361 100644 --- a/dynamiq/nodes/agents/base.py +++ b/dynamiq/nodes/agents/base.py @@ -27,7 +27,8 @@ class StreamChunkChoiceDelta(BaseModel): """Delta model for content chunks.""" content: str | dict - type: str + source: str + step: str class StreamChunkChoice(BaseModel): @@ -255,29 +256,58 @@ def _run_llm(self, prompt: str, config: RunnableConfig | None = None, **kwargs) logger.error(f"Agent {self.name} - {self.id}: LLM execution failed: {str(e)}") raise - def stream_chunk(self, input_chunk: str, config: RunnableConfig | None = None, **kwargs): - """Streams the input chunk to the callbacks.""" + def stream_content(self, content: str, source: str, step: str, config: RunnableConfig | None = None, **kwargs): + if self.streaming.by_tokens: + return self.stream_by_tokens(content=content, source=source, step=step, config=config, **kwargs) + return self.stream_response(content=content, source=source, step=step, config=config, **kwargs) + + def stream_by_tokens(self, content: str, source: str, step: str, config: RunnableConfig | None = None, **kwargs): + """Streams the input content to the callbacks.""" + tokens = content.split(" ") final_response = [] - for chunk in input_chunk.split(" "): - final_response.append(chunk) - chunk_for_stream = StreamChunk( - choices=[StreamChunkChoice(delta=StreamChunkChoiceDelta(content=chunk, type=self.name))] + for token in tokens: + final_response.append(token) + token_with_prefix = " " + token + token_for_stream = StreamChunk( + choices=[ + StreamChunkChoice(delta=StreamChunkChoiceDelta(content=token_with_prefix, source=source, step=step)) + ] ) - logger.debug(f"Agent {self.name} - {self.id}: Streaming chunk: {chunk_for_stream}") + logger.debug(f"Agent {self.name} - {self.id}: Streaming token: {token_for_stream}") self.run_on_node_execute_stream( callbacks=config.callbacks, - chunk=chunk_for_stream.model_dump(), + chunk=token_for_stream.model_dump(), **kwargs, ) return " ".join(final_response) + def stream_response(self, content: str, source: str, step: str, config: RunnableConfig | None = None, **kwargs): + response_for_stream = StreamChunk( + choices=[StreamChunkChoice(delta=StreamChunkChoiceDelta(content=content, source=source, step=step))] + ) + logger.debug(f"Agent {self.name} - {self.id}: Streaming response: {response_for_stream}") + + self.run_on_node_execute_stream( + callbacks=config.callbacks, + chunk=response_for_stream.model_dump(), + **kwargs, + ) + return content + def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: """Runs the agent with the generated prompt and handles exceptions.""" formatted_prompt = self.generate_prompt() try: + logger.info(f"Streaming config {self.streaming}") llm_result = self._run_llm(formatted_prompt, config=config, **kwargs) if self.streaming.enabled: - return self.stream_chunk(llm_result, config=config, **kwargs) + return self.stream_content( + content=llm_result, + source=self.name, + step="answer", + config=config, + **kwargs, + ) return llm_result except Exception as e: @@ -477,7 +507,7 @@ def _plan(self, config: RunnableConfig, **kwargs) -> str: prompt = self.generate_prompt(block_names=["plan"]) llm_result = self._run_llm(prompt, config, **kwargs) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: - return self.stream_chunk(input_chunk=llm_result, config=config, **kwargs) + return self.stream_content(content=llm_result, step="reasoning", source=self.name, config=config, **kwargs) return llm_result def _assign(self, config: RunnableConfig, **kwargs) -> str: @@ -485,7 +515,7 @@ def _assign(self, config: RunnableConfig, **kwargs) -> str: prompt = self.generate_prompt(block_names=["assign"]) llm_result = self._run_llm(prompt, config, **kwargs) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: - return self.stream_chunk(input_chunk=llm_result, config=config, **kwargs) + return self.stream_content(content=llm_result, step="reasoning", source=self.name, config=config, **kwargs) return llm_result def _final(self, config: RunnableConfig, **kwargs) -> str: @@ -493,5 +523,5 @@ def _final(self, config: RunnableConfig, **kwargs) -> str: prompt = self.generate_prompt(block_names=["final"]) llm_result = self._run_llm(prompt, config, **kwargs) if self.streaming.enabled: - return self.stream_chunk(input_chunk=llm_result, config=config, **kwargs) + return self.stream_content(content=llm_result, step="answer", source=self.name, config=config, **kwargs) return llm_result diff --git a/dynamiq/nodes/agents/react.py b/dynamiq/nodes/agents/react.py index f3fd9cd5..6caf1554 100644 --- a/dynamiq/nodes/agents/react.py +++ b/dynamiq/nodes/agents/react.py @@ -351,17 +351,21 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: self.tracing_intermediate(loop_num, formatted_prompt, llm_generated_output) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: - self.stream_chunk( - input_chunk=llm_generated_output, + self.stream_content( + content=llm_generated_output, + source=self.name, + step=f"reasoning_{loop_num + 1}", config=config, **kwargs, ) if "Answer:" in llm_generated_output: final_answer = self._extract_final_answer(llm_generated_output) self.tracing_final(loop_num, final_answer, config, kwargs) - if self.streaming.enabled and self.streaming.mode == StreamingMode.FINAL: - self.stream_chunk( - input_chunk=final_answer, + if self.streaming.enabled: + self.stream_content( + content=final_answer, + source=self.name, + step="answer", config=config, **kwargs, ) @@ -377,8 +381,10 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: llm_generated_output = json.dumps(llm_generated_output_json) self.tracing_intermediate(loop_num, formatted_prompt, llm_generated_output) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: - self.stream_chunk( - input_chunk=llm_generated_output, + self.stream_content( + content=llm_generated_output, + source=self.name, + step=f"reasoning_{loop_num + 1}", config=config, **kwargs, ) @@ -386,9 +392,11 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: if action == "provide_final_answer": final_answer = llm_generated_output_json["answer"] self.tracing_final(loop_num, final_answer, config, kwargs) - if self.streaming.enabled and self.streaming.mode == StreamingMode.FINAL: - self.stream_chunk( - input_chunk=final_answer, + if self.streaming.enabled: + self.stream_content( + content=final_answer, + source=self.name, + step="answer", config=config, **kwargs, ) @@ -405,8 +413,10 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: self.tracing_intermediate(loop_num, formatted_prompt, llm_generated_output) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: - self.stream_chunk( - input_chunk=llm_generated_output, + self.stream_content( + content=llm_generated_output, + source=self.name, + step=f"reasoning_{loop_num + 1}", config=config, **kwargs, ) @@ -414,9 +424,11 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: if action == "finish": final_answer = llm_generated_output_json["action_input"] self.tracing_final(loop_num, final_answer, config, kwargs) - if self.streaming.enabled and self.streaming.mode == StreamingMode.FINAL: - self.stream_chunk( - input_chunk=final_answer, + if self.streaming.enabled: + self.stream_content( + content=final_answer, + source=self.name, + step="answer", config=config, **kwargs, ) @@ -429,17 +441,21 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: llm_generated_output = llm_result.output["content"] self.tracing_intermediate(loop_num, formatted_prompt, llm_generated_output) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: - self.stream_chunk( - input_chunk=llm_generated_output, + self.stream_content( + content=llm_generated_output, + source=self.name, + step=f"reasoning_{loop_num + 1}", config=config, **kwargs, ) if "" in llm_generated_output: final_answer = self._extract_final_answer_xml(llm_generated_output) self.tracing_final(loop_num, final_answer, config, kwargs) - if self.streaming.enabled and self.streaming.mode == StreamingMode.FINAL: - self.stream_chunk( - input_chunk=final_answer, + if self.streaming.enabled: + self.stream_content( + content=final_answer, + source=self.name, + step="answer", config=config, **kwargs, ) @@ -465,8 +481,10 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: observation = f"\nObservation: {tool_result}\n" llm_generated_output += observation if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: - self.stream_chunk( - input_chunk=observation, + self.stream_content( + content=observation, + source=tool.name, + step=f"tool_{loop_num}", config=config, **kwargs, ) @@ -494,10 +512,12 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: ) raise MaxLoopsExceededException(message=error_message) else: - max_loop_final_answer = self._handle_max_loops_exceeded(previous_responses, config, kwargs) + max_loop_final_answer = self._handle_max_loops_exceeded(previous_responses, config, **kwargs) if self.streaming.enabled: - self.stream_chunk( - input_chunk=max_loop_final_answer, + self.stream_content( + content=max_loop_final_answer, + source=self.name, + step="answer", config=config, **kwargs, ) diff --git a/dynamiq/nodes/agents/reflection.py b/dynamiq/nodes/agents/reflection.py index ff977eb3..f88717d7 100644 --- a/dynamiq/nodes/agents/reflection.py +++ b/dynamiq/nodes/agents/reflection.py @@ -84,10 +84,18 @@ def _run_agent(self, config: RunnableConfig | None = None, **kwargs) -> str: if not output_content: logger.warning("No output content extracted.") return "" - return self.stream_chunk(output_content[-1], config=config, **kwargs) + return self.stream_content( + content=output_content[-1], + step="answer", + source=self.name, + config=config, + **kwargs, + ) elif self.streaming.mode == StreamingMode.ALL: logger.debug("Streaming mode set to ALL. Returning all output.") - return self.stream_chunk(result, config=config, **kwargs) + return self.stream_content( + content=result, step="reasoning", source=self.name, config=config, **kwargs + ) if not output_content: logger.warning("No output content extracted.") diff --git a/dynamiq/types/streaming.py b/dynamiq/types/streaming.py index 1fa43230..d99726db 100644 --- a/dynamiq/types/streaming.py +++ b/dynamiq/types/streaming.py @@ -74,7 +74,8 @@ class StreamingConfig(BaseModel): timeout (float | None): Timeout for streaming. Defaults to None. input_queue (Queue | None): Input queue for streaming. Defaults to None. input_queue_done_event (Event | None): Event to signal input queue completion. Defaults to None. - mode (StreamingMode): Streaming mode. Defaults to StreamingMode.FINAL. + mode (StreamingMode): Streaming mode. Defaults to StreamingMode.ANSWER. + by_tokens (bool): Whether to stream by tokens. Defaults to False. """ enabled: bool = False event: str = STREAMING_EVENT @@ -82,6 +83,7 @@ class StreamingConfig(BaseModel): input_queue: Queue | None = None input_queue_done_event: Event | None = None mode: StreamingMode = StreamingMode.FINAL + by_tokens: bool = True model_config = ConfigDict(arbitrary_types_allowed=True) diff --git a/examples/agents/use_streaming.py b/examples/agents/use_streaming.py index 43b464e5..7b7c4101 100644 --- a/examples/agents/use_streaming.py +++ b/examples/agents/use_streaming.py @@ -27,7 +27,7 @@ def run_agent(event: str = "data") -> str: id="agent", llm=llm, tools=[e2b_tool], - streaming=StreamingConfig(enabled=True, event=event, mode=StreamingMode.ALL), + streaming=StreamingConfig(enabled=True, event=event, mode=StreamingMode.ALL, by_tokens=True), ) streaming_handler = StreamingIteratorCallbackHandler() diff --git a/examples/use_case_streaming_agents/orchestrator/app.py b/examples/use_case_streaming_agents/orchestrator/app.py index 3edcb90a..a290c3c9 100644 --- a/examples/use_case_streaming_agents/orchestrator/app.py +++ b/examples/use_case_streaming_agents/orchestrator/app.py @@ -7,7 +7,7 @@ st.sidebar.title("Agent Configuration") streaming_enabled = st.sidebar.checkbox("Enable Streaming", value=False) -streaming_mode = st.sidebar.radio("Streaming Mode", options=["Final", "All"], index=0) +streaming_mode = st.sidebar.radio("Streaming Mode", options=["Steps", "Answer"], index=0) if "agent" not in st.session_state or st.sidebar.button("Apply Changes"): st.session_state.agent = setup_agent(streaming_enabled, streaming_mode) st.session_state.messages = [] diff --git a/examples/use_case_streaming_agents/orchestrator/backend.py b/examples/use_case_streaming_agents/orchestrator/backend.py index 44ebcb9a..cf6deffe 100644 --- a/examples/use_case_streaming_agents/orchestrator/backend.py +++ b/examples/use_case_streaming_agents/orchestrator/backend.py @@ -33,7 +33,7 @@ def setup_agent(streaming_enabled: bool, streaming_mode: str) -> ReActAgent: """ llm = setup_llm() memory = Memory(backend=InMemory()) - mode_mapping = {"Final": StreamingMode.FINAL, "All": StreamingMode.ALL} + mode_mapping = {"Answer": StreamingMode.FINAL, "Steps": StreamingMode.ALL} mode = mode_mapping.get(streaming_mode, StreamingMode.FINAL) streaming_config = StreamingConfig(enabled=streaming_enabled, mode=mode) tool_code = E2BInterpreterTool(connection=E2B()) diff --git a/examples/use_case_streaming_agents/react/app.py b/examples/use_case_streaming_agents/react/app.py index 52cb4d0d..ff519725 100644 --- a/examples/use_case_streaming_agents/react/app.py +++ b/examples/use_case_streaming_agents/react/app.py @@ -6,11 +6,12 @@ st.sidebar.title("Agent Configuration") agent_role = st.sidebar.text_input("Agent Role", "helpful assistant") streaming_enabled = st.sidebar.checkbox("Enable Streaming", value=False) +streaming_tokens = st.sidebar.checkbox("Enable Streaming Tokens", value=False) -streaming_mode = st.sidebar.radio("Streaming Mode", options=["Final", "All"], index=0) # Default to "Final" +streaming_mode = st.sidebar.radio("Streaming Mode", options=["Steps", "Answer"], index=0) # Default to "Answer" if "agent" not in st.session_state or st.sidebar.button("Apply Changes"): - st.session_state.agent = setup_agent(agent_role, streaming_enabled, streaming_mode) + st.session_state.agent = setup_agent(agent_role, streaming_enabled, streaming_mode, streaming_tokens) st.session_state.messages = [] st.title("React Agent Chat") diff --git a/examples/use_case_streaming_agents/react/backend.py b/examples/use_case_streaming_agents/react/backend.py index be691f76..4405ff80 100644 --- a/examples/use_case_streaming_agents/react/backend.py +++ b/examples/use_case_streaming_agents/react/backend.py @@ -10,15 +10,15 @@ from examples.llm_setup import setup_llm -def setup_agent(agent_role: str, streaming_enabled: bool, streaming_mode: str) -> ReActAgent: +def setup_agent(agent_role: str, streaming_enabled: bool, streaming_mode: str, streaming_tokens: bool) -> ReActAgent: """ Initializes an AI agent with a specified role and streaming configuration. """ llm = setup_llm() memory = Memory(backend=InMemory()) - mode_mapping = {"Final": StreamingMode.FINAL, "All": StreamingMode.ALL} + mode_mapping = {"Answer": StreamingMode.FINAL, "Steps": StreamingMode.ALL} mode = mode_mapping.get(streaming_mode, StreamingMode.FINAL) - streaming_config = StreamingConfig(enabled=streaming_enabled, mode=mode) + streaming_config = StreamingConfig(enabled=streaming_enabled, mode=mode, by_tokens=streaming_tokens) tool_search = ScaleSerpTool(connection=ScaleSerp()) tool_code = E2BInterpreterTool(connection=E2B()) agent = ReActAgent( @@ -44,6 +44,7 @@ def generate_agent_response(agent: ReActAgent, user_input: str): response_text = "" for chunk in streaming_handler: + print(chunk) content = chunk.data.get("choices", [{}])[0].get("delta", {}).get("content", "") if content: response_text += " " + content diff --git a/examples/use_case_streaming_agents/reflection/app.py b/examples/use_case_streaming_agents/reflection/app.py index 5827537f..700039c3 100644 --- a/examples/use_case_streaming_agents/reflection/app.py +++ b/examples/use_case_streaming_agents/reflection/app.py @@ -7,7 +7,7 @@ agent_role = st.sidebar.text_input("Agent Role", "helpful assistant") streaming_enabled = st.sidebar.checkbox("Enable Streaming", value=False) -streaming_mode = st.sidebar.radio("Streaming Mode", options=["Final", "All"], index=0) # Default to "Final" +streaming_mode = st.sidebar.radio("Streaming Mode", options=["Answer", "Steps"], index=0) # Default to "Final" if "agent" not in st.session_state or st.sidebar.button("Apply Changes"): st.session_state.agent = setup_agent(agent_role, streaming_enabled, streaming_mode) diff --git a/examples/use_case_streaming_agents/reflection/backend.py b/examples/use_case_streaming_agents/reflection/backend.py index aac6b041..6589ae38 100644 --- a/examples/use_case_streaming_agents/reflection/backend.py +++ b/examples/use_case_streaming_agents/reflection/backend.py @@ -17,7 +17,7 @@ def setup_agent(agent_role: str, streaming_enabled: bool, streaming_mode: str) - llm = setup_llm() memory = Memory(backend=InMemory()) - mode_mapping = {"Final": StreamingMode.FINAL, "All": StreamingMode.ALL} + mode_mapping = {"Answer": StreamingMode.FINAL, "Steps": StreamingMode.ALL} mode = mode_mapping.get(streaming_mode, StreamingMode.FINAL) streaming_config = StreamingConfig(enabled=streaming_enabled, mode=mode) diff --git a/examples/use_case_streaming_agents/simple/app.py b/examples/use_case_streaming_agents/simple/app.py index f0b6954b..de0f8be2 100644 --- a/examples/use_case_streaming_agents/simple/app.py +++ b/examples/use_case_streaming_agents/simple/app.py @@ -6,15 +6,17 @@ st.sidebar.title("Agent Configuration") agent_role = st.sidebar.text_input("Agent Role", "helpful assistant") streaming_enabled = st.sidebar.checkbox("Enable Streaming", value=False) +streaming_tokens = st.sidebar.checkbox("Enable Streaming Tokens", value=False) + +streaming_mode = st.sidebar.radio("Streaming Mode", options=["Answer", "Steps"], index=0) # Default to "Final" if "agent" not in st.session_state or st.sidebar.button("Apply Changes"): - st.session_state.agent = setup_agent(agent_role, streaming_enabled) + st.session_state.agent = setup_agent(agent_role, streaming_enabled, streaming_mode, streaming_tokens) st.session_state.messages = [] st.title("Simple Agent Chat") st.write("Ask questions and get responses from an AI assistant.") -# Display chat history from session state for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) diff --git a/examples/use_case_streaming_agents/simple/backend.py b/examples/use_case_streaming_agents/simple/backend.py index 8de97e57..33d35709 100644 --- a/examples/use_case_streaming_agents/simple/backend.py +++ b/examples/use_case_streaming_agents/simple/backend.py @@ -3,17 +3,21 @@ from dynamiq.memory.backend.in_memory import InMemory from dynamiq.nodes.agents.simple import SimpleAgent from dynamiq.runnables import RunnableConfig -from dynamiq.types.streaming import StreamingConfig +from dynamiq.types.streaming import StreamingConfig, StreamingMode from examples.llm_setup import setup_llm -def setup_agent(agent_role: str, streaming_enabled: bool) -> SimpleAgent: +def setup_agent(agent_role: str, streaming_enabled: bool, streaming_mode: str, streaming_tokens: bool) -> SimpleAgent: """ Initializes an AI agent with a specified role and streaming configuration. """ + llm = setup_llm() memory = Memory(backend=InMemory()) - streaming_config = StreamingConfig(enabled=streaming_enabled) + + mode_mapping = {"Answer": StreamingMode.FINAL, "Steps": StreamingMode.ALL} + mode = mode_mapping.get(streaming_mode, StreamingMode.FINAL) + streaming_config = StreamingConfig(enabled=streaming_enabled, mode=mode, by_tokens=streaming_tokens) agent = SimpleAgent( name="Agent", @@ -29,6 +33,7 @@ def setup_agent(agent_role: str, streaming_enabled: bool) -> SimpleAgent: def generate_agent_response(agent: SimpleAgent, user_input: str): """ Processes the user input using the agent. Supports both streaming and non-streaming responses. + Extracts and yields only the content within tags. """ response_text = "" if agent.streaming.enabled: @@ -39,7 +44,7 @@ def generate_agent_response(agent: SimpleAgent, user_input: str): content = chunk.data.get("choices", [{}])[0].get("delta", {}).get("content", "") if content: response_text += " " + content - yield " " + content + yield content else: result = agent.run({"input": user_input}) response_text = result.output.get("content", "")