diff --git a/dynamiq/nodes/agents/base.py b/dynamiq/nodes/agents/base.py index db6aae4f..f3f3a3e7 100644 --- a/dynamiq/nodes/agents/base.py +++ b/dynamiq/nodes/agents/base.py @@ -1,3 +1,4 @@ +import io import json import re import textwrap @@ -53,11 +54,12 @@ class Agent(Node): ) DEFAULT_DATE: ClassVar[str] = datetime.now().strftime("%d %B %Y") - llm: Node = Field(..., description="Language Model (LLM) used by the agent.") + llm: Node = Field(..., description="LLM used by the agent.") group: NodeGroup = NodeGroup.AGENTS error_handling: ErrorHandling = ErrorHandling(timeout_seconds=600) streaming: StreamingConfig = StreamingConfig() tools: list[Node] = [] + files: list[io.BytesIO | bytes] | None = None name: str = "AI Agent" role: str | None = None max_loops: int = 1 @@ -77,13 +79,15 @@ def __init__(self, **kwargs): @property def to_dict_exclude_params(self): - return super().to_dict_exclude_params | {"llm": True, "tools": True, "memory": True} + return super().to_dict_exclude_params | {"llm": True, "tools": True, "memory": True, "files": True} def to_dict(self, **kwargs) -> dict: """Converts the instance to a dictionary.""" data = super().to_dict(**kwargs) data["llm"] = self.llm.to_dict(**kwargs) data["tools"] = [tool.to_dict(**kwargs) for tool in self.tools] + if self.files: + data["files"] = [{"name": getattr(f, "name", f"file_{i}")} for i, f in enumerate(self.files)] return data def init_components(self, connection_manager: ConnectionManager = ConnectionManager()): @@ -104,6 +108,7 @@ def _init_prompt_blocks(self): "role": self.role or "", "date": self.DEFAULT_DATE, "tools": "{tool_description}", + "files": "{file_description}", "instructions": "", "output_format": "", "relevant_information": "{relevant_memory}", @@ -112,6 +117,7 @@ def _init_prompt_blocks(self): } self._prompt_variables = { "tool_description": self.tool_description, + "file_description": self.file_description, "user_input": "", "context": "", "relevant_memory": "", @@ -146,6 +152,11 @@ def execute( self.memory.add(role=MessageRole.USER, content=input_data.get("input"), metadata=metadata) self._retrieve_memory(input_data) + files = input_data.get("files", []) + if files: + self.files = files + self._prompt_variables["file_description"] = self.file_description + self._prompt_variables.update(input_data) kwargs = kwargs | {"parent_run_id": kwargs.get("run_id")} kwargs.pop("run_depends", None) @@ -273,6 +284,9 @@ def _get_tool(self, action: str) -> Node: def _run_tool(self, tool: Node, tool_input: str, config, **kwargs) -> Any: """Runs a specific tool with the given input.""" logger.debug(f"Agent {self.name} - {self.id}: Running tool '{tool.name}'") + if self.files: + if tool.is_files_allowed is True: + tool_input["files"] = self.files tool_result = tool.run( input_data=tool_input, @@ -300,6 +314,18 @@ def tool_description(self) -> str: else "" ) + @property + def file_description(self) -> str: + """Returns a description of the files available to the agent.""" + if self.files: + file_description = "You can work with the following files:\n" + for file in self.files: + name = getattr(file, "name", "Unnamed file") + description = getattr(file, "description", "No description") + file_description += f": {name} - {description} <\\file>\n" + return file_description + return "" + @property def tool_names(self) -> str: """Returns a comma-separated list of tool names available to the agent.""" @@ -375,7 +401,7 @@ def execute( action = input_data.get("action") if not action or action not in self._actions: raise InvalidActionException( - f"Invalid or missing action: {action}. Please select an action from {self._actions}." # nosec B608 + f"Invalid or missing action: {action}. Please select an action from {self._actions}." # nosec: B608 ) self._prompt_variables.update(input_data) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index 0bd8cb38..f6432c4d 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -195,6 +195,7 @@ class Node(BaseModel, Runnable, ABC): metadata (NodeMetadata | None): Optional metadata for the node. is_postponed_component_init (bool): Whether component initialization is postponed. is_optimized_for_agents (bool): Whether to optimize output for agents. By default is set to False. + supports_files (bool): Whether the node has access to files. By default is set to False. """ id: str = Field(default_factory=generate_uuid) name: str | None = None @@ -211,6 +212,7 @@ class Node(BaseModel, Runnable, ABC): is_postponed_component_init: bool = False is_optimized_for_agents: bool = False + is_files_allowed: bool = False _output_references: NodeOutputReferences = PrivateAttr() diff --git a/dynamiq/nodes/tools/e2b_sandbox.py b/dynamiq/nodes/tools/e2b_sandbox.py index 4e1c7c75..f6bcb110 100644 --- a/dynamiq/nodes/tools/e2b_sandbox.py +++ b/dynamiq/nodes/tools/e2b_sandbox.py @@ -1,6 +1,4 @@ import io -import os -import uuid from hashlib import sha256 from typing import Any, Literal @@ -52,30 +50,64 @@ """ # noqa: E501 +def generate_fallback_filename(file: bytes | io.BytesIO) -> str: + """ + Generate a unique fallback filename for uploaded files. + + Args: + file: File content as bytes or BytesIO object. + + Returns: + str: A unique filename based on the object's id. + """ + return f"uploaded_file_{id(file)}.bin" + + +def generate_file_description(file: bytes | io.BytesIO, length: int = 20) -> str: + """ + Generate a description for a file based on its content. + + Args: + file: File content as bytes or BytesIO object. + length: Maximum number of bytes to include in the description. + + Returns: + str: A description of the file's content or existing description. + """ + if description := getattr(file, "description", None): + return description + + file_content = file.getbuffer()[:length] if isinstance(file, io.BytesIO) else file[:length] + return f"File starting with: {file_content.hex()}" + + class E2BInterpreterTool(ConnectionNode): """ - A tool to interact with an E2B sandbox, allowing for file upload/download, - Python code execution, and shell command execution. + A tool for executing code and managing files in an E2B sandbox environment. + + This tool provides a secure execution environment for running Python code, + shell commands, and managing file operations. Attributes: - group (Literal[NodeGroup.TOOLS]): Node group type. - name (str): Name of the tool. - description (str): Detailed description of the tool's capabilities. - connection (E2BConnection): E2B connection object. - installed_packages (list): List of default packages to install. - files (list): List of tuples (file_data, file_description) for initial files. - persistent_sandbox (bool): Whether to use a persistent sandbox across executions. - _sandbox (Optional[Sandbox]): Persistent sandbox instance (if enabled). + group (Literal[NodeGroup.TOOLS]): The node group identifier. + name (str): The unique name of the tool. + description (str): Detailed usage instructions and capabilities. + connection (E2BConnection): Configuration for E2B connection. + installed_packages (List[str]): Pre-installed packages in the sandbox. + files (Optional[List[Union[io.BytesIO, bytes]]]): Files to be uploaded. + persistent_sandbox (bool): Whether to maintain sandbox between executions. + is_files_allowed (bool): Whether file uploads are permitted. + _sandbox (Optional[Sandbox]): Internal sandbox instance for persistent mode. """ - group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS name: str = "code-interpreter_e2b" description: str = DESCRIPTION connection: E2BConnection installed_packages: list = [] - files: list[tuple[str | bytes, str]] | None = None + files: list[io.BytesIO | bytes] | None = None persistent_sandbox: bool = True _sandbox: Sandbox | None = None + is_files_allowed: bool = True def __init__(self, **kwargs): super().__init__(**kwargs) @@ -84,13 +116,38 @@ def __init__(self, **kwargs): else: logger.debug(f"Tool {self.name} - {self.id}: Will initialize sandbox on each execute") + @property + def to_dict_exclude_params(self) -> set: + """ + Get parameters to exclude from dictionary representation. + + Returns: + set: Set of parameters to exclude. + """ + return super().to_dict_exclude_params | {"files": True} + + def to_dict(self, **kwargs) -> dict[str, Any]: + """ + Convert instance to dictionary format. + + Args: + **kwargs: Additional keyword arguments. + + Returns: + Dict[str, Any]: Dictionary representation of the instance. + """ + data = super().to_dict(**kwargs) + if self.files: + data["files"] = [{"name": getattr(f, "name", f"file_{i}")} for i, f in enumerate(self.files)] + return data + def _initialize_persistent_sandbox(self): """Initializes the persistent sandbox, installs packages, and uploads initial files.""" logger.info(f"Tool {self.name} - {self.id}: Initializing Persistent Sandbox") self._sandbox = Sandbox(api_key=self.connection.api_key) self._install_default_packages(self._sandbox) if self.files: - self._upload_initial_files(self._sandbox) + self._upload_files(files=self.files, sandbox=self._sandbox) self._update_description() def _install_default_packages(self, sandbox: Sandbox) -> None: @@ -105,44 +162,67 @@ def _install_packages(self, sandbox: Sandbox, packages: str) -> None: logger.debug(f"Tool {self.name} - {self.id}: Installing packages: {packages}") sandbox.process.start_and_wait(f"pip install -qq {' '.join(packages.split(','))}") - def _upload_initial_files(self, sandbox: Sandbox) -> None: - """Uploads the initial files to the specified sandbox.""" - for file_data, file_description in self.files: - uploaded_path = self._upload_file(file_data, file_description, sandbox) - logger.debug(f"Tool {self.name} - {self.id}: Uploaded initial file to {uploaded_path}") + def _upload_files(self, files: list[bytes | io.BytesIO], sandbox: Sandbox) -> str: + """Uploads multiple files to the sandbox and returns details for each file.""" + upload_details = [] + for file in files: + if isinstance(file, bytes): + file = io.BytesIO(file) - def _update_description(self) -> None: - """Updates the tool description with information about uploaded files.""" - if self.files: - self.description = self.description.strip().replace("", "") - self.description += "\n\n**Available Files:**" - for file_data, file_description in self.files: - filename = os.path.basename(file_data) if isinstance(file_data, str) else "uploaded_file.bin" - self.description += f"\n- **{filename}** ({file_description})" - self.description += "\n" + file_name = getattr(file, "name", None) or generate_fallback_filename(file) + file.name = file_name + + description = getattr(file, "description", generate_file_description(file)) + + uploaded_path = self._upload_file(file, file_name, sandbox) + upload_details.append( + { + "original_name": file_name, + "description": description, + "uploaded_path": uploaded_path, + } + ) + logger.debug(f"Tool {self.name} - {self.id}: Uploaded file '{file_name}' to {uploaded_path}") + + self._update_description_with_files(upload_details) + return "\n".join([f"{file['original_name']} -> {file['uploaded_path']}" for file in upload_details]) - def _upload_file(self, file_data: str | bytes, file_description: str = "", sandbox: Sandbox | None = None) -> str: - """Uploads a file to the specified sandbox.""" + def _upload_file(self, file: bytes | io.BytesIO, file_name: str, sandbox: Sandbox | None = None) -> str: + """Uploads a single file to the specified sandbox and returns the uploaded path.""" if not sandbox: raise ValueError("Sandbox instance is required for file upload.") - if isinstance(file_data, str): - if not os.path.exists(file_data): - raise ToolExecutionException(f"Error: Local file not found: {file_data}", recoverable=False) - uploaded_path = sandbox.upload_file(file=open(file_data, "rb")) - elif isinstance(file_data, bytes): - filename = ( - f"{str(uuid.uuid4())}.bin" if not file_description else f"{file_description.replace(' ', '_')}.bin" - ) - file_like_object = io.BytesIO(file_data) - file_like_object.name = filename - uploaded_path = sandbox.upload_file(file=file_like_object) + # Handle the file types (bytes or io.BytesIO) + if isinstance(file, bytes): + file_like_object = io.BytesIO(file) + file_like_object.name = file_name + elif isinstance(file, io.BytesIO): + file.name = file_name + file_like_object = file else: - raise ValueError(f"Invalid file data type: {type(file_data)}") + raise ToolExecutionException( + f"Error: Invalid file data type: {type(file)}. Expected bytes or BytesIO.", recoverable=False + ) + # Upload the file to the sandbox + uploaded_path = sandbox.upload_file(file=file_like_object) logger.debug(f"Tool {self.name} - {self.id}: Uploaded file to {uploaded_path}") + return uploaded_path + def _update_description_with_files(self, upload_details: list[dict]) -> None: + """Updates the tool description with detailed information about the uploaded files.""" + if upload_details: + self.description = self.description.strip().replace("", "") + self.description += "\n\n**Uploaded Files Details:**" + for file_info in upload_details: + self.description += ( + f"\n- **Original File Name**: {file_info['original_name']}\n" + f" **Description**: {file_info['description']}\n" + f" **Uploaded Path**: {file_info['uploaded_path']}\n" + ) + self.description += "\n" + def _execute_python_code(self, code: str, sandbox: Sandbox | None = None) -> str: """Executes Python code in the specified sandbox.""" if not sandbox: @@ -185,26 +265,22 @@ def execute(self, input_data: dict[str, Any], config: RunnableConfig | None = No sandbox = Sandbox(api_key=self.connection.api_key) self._install_default_packages(sandbox) if self.files: - self._upload_initial_files(sandbox) + self._upload_files(files=self.files, sandbox=sandbox) self._update_description() - try: - content = {} + if files := input_data.get("files"): + content["files_installation"] = self._upload_files(files=files, sandbox=sandbox) if packages := input_data.get("packages"): self._install_packages(sandbox=sandbox, packages=packages) content["packages_installation"] = f"Installed packages: {input_data['packages']}" - if files := input_data.get("files"): - content["files_installation"] = self._upload_file(file_data=files, sandbox=sandbox) if shell_command := input_data.get("shell_command"): content["shell_command_execution"] = self._execute_shell_command(shell_command, sandbox=sandbox) if python := input_data.get("python"): content["code_execution"] = self._execute_python_code(python, sandbox=sandbox) if not (packages or files or shell_command or python): raise ToolExecutionException( - "Error: Invalid input data. Please provide 'files' for file upload (local path or bytes), " - "'python' for Python code execution, or 'shell_command' for shell command execution." - "You can also provide 'packages' to install packages.", + "Error: Invalid input data. Please provide 'files' for file upload (bytes or BytesIO)", recoverable=True, ) @@ -215,10 +291,10 @@ def execute(self, input_data: dict[str, Any], config: RunnableConfig | None = No if self.is_optimized_for_agents: result = "" - if packages_installation := content.get("packages_installation"): - result += "\n" + packages_installation + "\n" if files_installation := content.get("files_installation"): result += "\n" + files_installation + "\n" + if packages_installation := content.get("packages_installation"): + result += "\n" + packages_installation + "\n" if shell_command_execution := content.get("shell_command_execution"): result += "\n" + shell_command_execution + "\n" if code_execution := content.get("code_execution"): diff --git a/examples/agents_use_cases/agent_analyst_with_files.py b/examples/agents_use_cases/agent_analyst_with_files.py index c5dda689..56b34312 100644 --- a/examples/agents_use_cases/agent_analyst_with_files.py +++ b/examples/agents_use_cases/agent_analyst_with_files.py @@ -1,3 +1,5 @@ +import io + from dynamiq.connections import E2B from dynamiq.nodes.agents.react import ReActAgent from dynamiq.nodes.tools.e2b_sandbox import E2BInterpreterTool @@ -7,7 +9,7 @@ AGENT_ROLE = """ Senior Data Scientist and Programmer with ability to write a well written - python code and you have access to python tool + python code and you have access to python tool. You have access to web to search for best solutions for a problem. Generally, you follow these rules: - ALWAYS FORMAT YOUR RESPONSE IN MARKDOWN @@ -19,37 +21,54 @@ Write code in Python that fits linear regression model between number of bathrooms and bedrooms) and price of a house from the data. Count loss. Return this code. Set a seed that results would be reproducable. - Provide exect result of MSE + Provide exact result of MSE. """ -# Please use your own csv file path FILE_PATH = "data.csv" -# Please provide your own csv file description FILE_DESCRIPTION = """ - It's `data.csv` file - The CSV file is using , as the delimiter - It has the following columns (examples included): - - bedrooms: number of badrooms + - bedrooms: number of bedrooms - bathrooms: number of bathrooms - price: price of a house """ -def create_agent(file_paths: list[str], files_description: list[str]): +def read_file_as_bytesio(file_path: str, filename: str = None, description: str = None) -> io.BytesIO: """ - Create and configure the agent with necessary tools. + Reads the content of a file and returns it as a BytesIO object with custom attributes for filename and description. Args: - file_paths (List[str]): A list of file paths that have to be uploaded. - files_description (str): Description of files uploaded + file_path (str): The path to the file. + filename (str, optional): Custom filename for the BytesIO object. + description (str, optional): Custom description for the BytesIO object. Returns: - Workflow: A configured Dynamiq workflow ready to run. + io.BytesIO: The file content in a BytesIO object with custom attributes. + """ + with open(file_path, "rb") as f: + file_content = f.read() + + file_io = io.BytesIO(file_content) + + file_io.name = filename if filename else "uploaded_file.csv" + file_io.description = description if description else "No description provided" + + return file_io + + +def create_agent(): """ - tool = E2BInterpreterTool(connection=E2B(), files=list(zip(file_paths, files_description))) + Create and configure the agent with necessary tools. + Returns: + ReActAgent: A configured Dynamiq ReActAgent ready to run. + """ + tool = E2BInterpreterTool(connection=E2B()) llm = setup_llm(model_provider="gpt", model_name="gpt-4o-mini", temperature=0.001) + agent_software = ReActAgent( name="React Agent", llm=llm, @@ -62,35 +81,36 @@ def create_agent(file_paths: list[str], files_description: list[str]): return agent_software -def run_workflow(prompt: str, files_to_upload: list[str], files_description: list[str]) -> tuple[str, dict]: +def run_workflow(prompt: str, files_to_upload: list[io.BytesIO]) -> tuple[str, dict]: """ Main function to set up and run the workflow, handling any exceptions that may occur. - This function loads environment variables, creates the workflow, runs it with the - specified input, and returns the output. Any exceptions are caught and printed. - Args: - prompt (str): Question/task for agent to accomplish. - files_to_upload (List[str]): A list of file paths that have to be uploaded. - files_description (str): Description of files uploaded - """ - if len(files_description) == len(files_to_upload): - raise ValueError("Number of file paths and file descriptions doesn't match") + prompt (str): Question/task for the agent to accomplish. + files_to_upload (List[io.BytesIO]): A list of BytesIO objects representing files to upload. + Returns: + tuple[str, dict]: The content generated by the agent and intermediate steps. + """ try: - agent = create_agent(files_to_upload, files_description) + agent = create_agent() result = agent.run( - input_data={"input": prompt}, + input_data={"input": prompt, "files": files_to_upload}, ) - return result.output["content"] + + return result["content"], result.get("intermediate_steps", {}) except Exception as e: logger.error(f"An error occurred: {e}") return "", {} if __name__ == "__main__": - output = run_workflow(prompt=PROMPT, files_to_upload=[FILE_PATH], files_description=[FILE_DESCRIPTION]) + csv_file_io = read_file_as_bytesio( + file_path=FILE_PATH, filename="custom_house_data.csv", description=FILE_DESCRIPTION + ) + + output, steps = run_workflow(prompt=PROMPT, files_to_upload=[csv_file_io]) logger.info("---------------------------------Result-------------------------------------") logger.info(output) diff --git a/examples/agents_use_cases/agent_financial.py b/examples/agents_use_cases/agent_financial.py index 503c7a5c..7c3ff908 100644 --- a/examples/agents_use_cases/agent_financial.py +++ b/examples/agents_use_cases/agent_financial.py @@ -6,17 +6,18 @@ from examples.llm_setup import setup_llm AGENT_ROLE = ( - "A helpful and general-purpose AI assistant that has strong language skills," - "Python skills, and Linux command line skills.Goal is to provide concise answer to user," - "also try to generate code for solve task, then run it accurately" - "before answering try to create plan for solving task" - "you can search any api, and then use any of free open-source API" - "that dont require authorization" + "A helpful and general-purpose AI assistant with strong language, Python, " + "and Linux command-line skills. The goal is to provide concise answers to the user. " + "Additionally, try to generate code to solve tasks, then run it accurately. " + "Before answering, create a plan for solving the task. You can search for any API, " + "and use any free, open-source API that doesn't require authorization." ) + if __name__ == "__main__": connection_e2b = E2B() tool_code = E2BInterpreterTool(connection=connection_e2b) + llm = setup_llm(model_provider="gpt", model_name="gpt-4o-mini", temperature=0) agent = ReActAgent( diff --git a/examples/tools/file_reader.py b/examples/tools/file_reader.py index fe3e059d..aafc5e3f 100644 --- a/examples/tools/file_reader.py +++ b/examples/tools/file_reader.py @@ -1,74 +1,114 @@ +import io from typing import Any, Literal from pydantic import ConfigDict from dynamiq.nodes import NodeGroup +from dynamiq.nodes.agents.exceptions import ToolExecutionException from dynamiq.nodes.node import Node, ensure_config from dynamiq.runnables import RunnableConfig from dynamiq.utils.logger import logger -from dynamiq.nodes.agents.exceptions import ToolExecutionException +DESCRIPTION = """ + +This tool reads content from multiple byte streams. +Features: +- Supports both bytes and BytesIO as input +- Handles multiple files in a single operation +Input format: +- List of files with byte content (bytes or BytesIO) with filename and description attributes. + +""" -class FileReadTool(Node): + +class FileReaderTool(Node): """ - A tool to read the content of a file. + A tool to read the content of one or more files from byte streams. Attributes: name (str): The name of the tool. description (str): The description of the tool. - file_path (str): The file path to read. + files (list[bytes | io.BytesIO] | None): List of files to read content from. + is_files_allowed (bool): Indicates if the tool supports file operations. """ group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS - name: str = "Read a file's content" - description: str = "A tool that can be used to read a file's content from local storage." - file_path: str + name: str = "file-reader-tool" + description: str = DESCRIPTION + files: list[bytes | io.BytesIO] | None = None + is_files_allowed: bool = True model_config = ConfigDict(arbitrary_types_allowed=True) - def _read_file_content(self, file_path: str) -> Any: + def _load_file_content(self, file: bytes | io.BytesIO) -> str: """ - Read the content of the file at the given path. + Load the content of a file from bytes or a byte stream. Args: - file_path (str): The path to the file. + file (Union[bytes, io.BytesIO]): The byte content or stream of the file. Returns: - Any: The content of the file. + str: The loaded content as string. + + Raises: + ToolExecutionException: If file reading fails or input type is invalid. """ try: - with open(file_path) as file: - return file.read() + file_description = getattr(file, "description", "Unnamed file") + logger.debug(f"Reading file: {file_description}") + + if isinstance(file, bytes): + content = file + elif isinstance(file, io.BytesIO): + file.seek(0) + content = file.read() + else: + raise ToolExecutionException( + f"Invalid input type. Expected bytes or BytesIO, got {type(file)}", recoverable=False + ) + + try: + return content.decode("utf-8", errors="ignore") + except UnicodeDecodeError: + return content.hex() + except Exception as e: - logger.error( - f"Tool {self.name} - {self.id}: failed to to read the file {file_path}. Error: {e}" - ) - raise ToolExecutionException( - f"Failed to to read the file {file_path}. Error: {e}", recoverable=True - ) + logger.error(f"Failed to read file: {file_description}. Error: {e}") + raise ToolExecutionException(f"Failed to read the file: {file_description}. Error: {e}", recoverable=True) - def execute( - self, input_data: dict[str, Any], config: RunnableConfig = None, **kwargs - ) -> dict[str, Any]: + def execute(self, input_data: dict[str, Any], config: RunnableConfig = None, **kwargs) -> dict[str, Any]: """ Execute the tool with the provided input data and configuration. Args: - input_data (dict[str, Any]): The input data containing the file path. - config (RunnableConfig, optional): The configuration for the runnable. Defaults to None. + input_data (dict[str, Any]): The input data containing: + - files: List of byte streams to process + config (RunnableConfig, optional): The configuration for the runnable. + **kwargs: Additional keyword arguments. Returns: - dict[str, Any]: The content of the file or an error message if reading fails. + dict[str, Any]: Dictionary containing processed file contents. + + Raises: + ToolExecutionException: If no files are provided or processing fails. """ - logger.debug( - f"Tool {self.name} - {self.id}: started with input data {input_data}" - ) + logger.debug(f"Tool {self.name} - {self.id}: started with input data {input_data}") config = ensure_config(config) self.run_on_node_execute_run(config.callbacks, **kwargs) - file_path = input_data.get("file_path", self.file_path) - result = self._read_file_content(file_path) + files = input_data.get("files", self.files) + + if not files: + raise ToolExecutionException( + "Error: No files provided for reading. Please provide 'files' input.", recoverable=False + ) + + results = {} + for idx, file in enumerate(files): + file_description = getattr(file, "description", f"File {idx + 1}") + content = self._load_file_content(file) + results[file_description] = content - logger.debug(f"Tool {self.name} - {self.id}: finished with result {result}") - return {"content": result} + logger.debug(f"Tool {self.name} - {self.id}: finished processing files.") + return {"content": results} diff --git a/examples/use_case_files/use_file_tool_example.py b/examples/use_case_files/use_file_tool_example.py new file mode 100644 index 00000000..737484ef --- /dev/null +++ b/examples/use_case_files/use_file_tool_example.py @@ -0,0 +1,106 @@ +import io +import json +from pathlib import Path + +from dynamiq import Workflow +from dynamiq.callbacks import TracingCallbackHandler +from dynamiq.connections import E2B +from dynamiq.flows import Flow +from dynamiq.nodes.agents.react import ReActAgent +from dynamiq.nodes.tools.e2b_sandbox import E2BInterpreterTool +from dynamiq.runnables import RunnableConfig +from dynamiq.utils import JsonWorkflowEncoder +from examples.llm_setup import setup_llm + +INPUT_PROMPT = "Calculate the mean values for all columns in the CSV" +FILE_PATH = ".data/sample_regression_data.csv" + + +def read_file_as_bytesio(file_path: str, filename: str = None, description: str = None) -> io.BytesIO: + """ + Reads the content of a file and returns it as a BytesIO object with custom attributes for filename and description. + + Args: + file_path (str): The path to the file. + filename (str, optional): Custom filename for the BytesIO object. + description (str, optional): Custom description for the BytesIO object. + + Returns: + io.BytesIO: The file content in a BytesIO object with custom attributes. + + Raises: + FileNotFoundError: If the file does not exist. + IOError: If an I/O error occurs while reading the file. + """ + file_path_obj = Path(file_path).resolve() + if not file_path_obj.exists(): + raise FileNotFoundError(f"The file {file_path} does not exist.") + if not file_path_obj.is_file(): + raise OSError(f"The path {file_path} is not a valid file.") + + with file_path_obj.open("rb") as file: + file_content = file.read() + + file_io = io.BytesIO(file_content) + + file_io.name = filename if filename else file_path_obj.name + if description: + file_io.description = description + return file_io + + +def run_workflow( + agent: ReActAgent, + input_prompt: str, + input_files: list, +) -> tuple[str, dict]: + """ + Execute a workflow using the ReAct agent to process a predefined query. + + Returns: + tuple[str, dict]: The generated content by the agent and the trace logs. + + Raises: + Exception: Captures and prints any errors during workflow execution. + """ + tracing = TracingCallbackHandler() + wf = Workflow(flow=Flow(nodes=[agent])) + + try: + result = wf.run( + input_data={"input": input_prompt, "files": input_files}, + config=RunnableConfig(callbacks=[tracing]), + ) + # Verify that traces can be serialized to JSON + json.dumps( + {"runs": [run.to_dict() for run in tracing.runs.values()]}, + cls=JsonWorkflowEncoder, + ) + + return result.output[agent.id]["output"]["content"], tracing.runs + except Exception as e: + print(f"An error occurred: {e}") + return "", {} + + +csv_bytes_io = read_file_as_bytesio( + FILE_PATH, filename="custom_regression_data.csv", description="Custom CSV file with regression data" +) + +python_tool = E2BInterpreterTool(connection=E2B()) + +llm = setup_llm() + +agent = ReActAgent( + name="Agent", + id="Agent", + llm=llm, + tools=[python_tool], +) + +output, traces = run_workflow( + agent=agent, + input_prompt=INPUT_PROMPT, + input_files=[csv_bytes_io], +) +print("Agent Output:", output)