Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add files support under agent class #15

Merged
merged 24 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
141b808
feat: add baseline support for agent managers files
olbychos Oct 11, 2024
2dcee66
Merge branch 'main' into feat/add_files_support_agent
olbychos Oct 16, 2024
5e6b146
feat: update files usage within agents
olbychos Oct 17, 2024
889ae19
fix: update file path under examples
olbychos Oct 17, 2024
e1a67a5
Merge branch 'main' into feat/add_files_support_agent
olbychos Oct 17, 2024
2c8ff09
fix: update variables naming
olbychos Oct 17, 2024
5f776e3
Merge remote-tracking branch 'origin/feat/add_files_support_agent' in…
olbychos Oct 17, 2024
e006002
fix: correct grammar issues
olbychos Oct 18, 2024
2503125
feat: keep only bytes and bytesio
olbychos Oct 18, 2024
cbbe30a
Merge branch 'main' into feat/add_files_support_agent
olbychos Oct 18, 2024
b102672
fix: provide fixes on paths, docstrings
olbychos Oct 19, 2024
1477e07
Merge branch 'main' into feat/add_files_support_agent
olbychos Oct 21, 2024
4fcec75
Merge branch 'main' into feat/add_files_support_agent
olbychos Oct 22, 2024
a400f34
feat: add workflow usage
olbychos Oct 22, 2024
9cc0790
fix: lint
olbychos Oct 22, 2024
c835228
feat: update files, remove filedata structure
olbychos Oct 22, 2024
9861adc
feat: remove unique filename, remove obligatory description
olbychos Oct 23, 2024
f47ceaf
Merge branch 'main' into feat/add_files_support_agent
olbychos Oct 23, 2024
a3b7310
fix: remove redundant filedata, simplify filereader tool
olbychos Oct 23, 2024
3d56aa9
fix: test init
olbychos Oct 23, 2024
3f3e9c4
Merge branch 'main' into feat/add_files_support_agent
olbychos Oct 23, 2024
dd5b741
feat: update example with tracing
olbychos Oct 24, 2024
8938b9e
feat: update example with tracing
olbychos Oct 24, 2024
cae685a
fix: update traces
olbychos Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions dynamiq/nodes/agents/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import json
import re
import textwrap
Expand Down Expand Up @@ -53,11 +54,12 @@ class Agent(Node):
)
DEFAULT_DATE: ClassVar[str] = datetime.now().strftime("%d %B %Y")

llm: Node = Field(..., description="Language Model (LLM) used by the agent.")
llm: Node = Field(..., description="LLM used by the agent.")
group: NodeGroup = NodeGroup.AGENTS
error_handling: ErrorHandling = ErrorHandling(timeout_seconds=600)
streaming: StreamingConfig = StreamingConfig()
tools: list[Node] = []
files: list[io.BytesIO | bytes] | None = None
name: str = "AI Agent"
role: str | None = None
max_loops: int = 1
Expand All @@ -77,13 +79,15 @@ def __init__(self, **kwargs):

@property
def to_dict_exclude_params(self):
return super().to_dict_exclude_params | {"llm": True, "tools": True, "memory": True}
return super().to_dict_exclude_params | {"llm": True, "tools": True, "memory": True, "files": True}

def to_dict(self, **kwargs) -> dict:
"""Converts the instance to a dictionary."""
data = super().to_dict(**kwargs)
data["llm"] = self.llm.to_dict(**kwargs)
data["tools"] = [tool.to_dict(**kwargs) for tool in self.tools]
if self.files:
data["files"] = [{"name": getattr(f, "name", f"file_{i}")} for i, f in enumerate(self.files)]
return data

def init_components(self, connection_manager: ConnectionManager = ConnectionManager()):
Expand All @@ -104,6 +108,7 @@ def _init_prompt_blocks(self):
"role": self.role or "",
"date": self.DEFAULT_DATE,
"tools": "{tool_description}",
"files": "{file_description}",
"instructions": "",
"output_format": "",
"relevant_information": "{relevant_memory}",
Expand All @@ -112,6 +117,7 @@ def _init_prompt_blocks(self):
}
self._prompt_variables = {
"tool_description": self.tool_description,
"file_description": self.file_description,
"user_input": "",
"context": "",
"relevant_memory": "",
Expand Down Expand Up @@ -146,6 +152,11 @@ def execute(
self.memory.add(role=MessageRole.USER, content=input_data.get("input"), metadata=metadata)
self._retrieve_memory(input_data)

files = input_data.get("files", [])
if files:
self.files = files
self._prompt_variables["file_description"] = self.file_description

self._prompt_variables.update(input_data)
kwargs = kwargs | {"parent_run_id": kwargs.get("run_id")}
kwargs.pop("run_depends", None)
Expand Down Expand Up @@ -273,6 +284,9 @@ def _get_tool(self, action: str) -> Node:
def _run_tool(self, tool: Node, tool_input: str, config, **kwargs) -> Any:
"""Runs a specific tool with the given input."""
logger.debug(f"Agent {self.name} - {self.id}: Running tool '{tool.name}'")
if self.files:
if tool.is_files_allowed is True:
tool_input["files"] = self.files

tool_result = tool.run(
input_data=tool_input,
Expand Down Expand Up @@ -300,6 +314,18 @@ def tool_description(self) -> str:
else ""
)

@property
def file_description(self) -> str:
"""Returns a description of the files available to the agent."""
if self.files:
file_description = "You can work with the following files:\n"
for file in self.files:
name = getattr(file, "name", "Unnamed file")
description = getattr(file, "description", "No description")
file_description += f"<file>: {name} - {description} <\\file>\n"
return file_description
return ""

@property
def tool_names(self) -> str:
"""Returns a comma-separated list of tool names available to the agent."""
Expand Down Expand Up @@ -375,7 +401,7 @@ def execute(
action = input_data.get("action")
if not action or action not in self._actions:
raise InvalidActionException(
f"Invalid or missing action: {action}. Please select an action from {self._actions}." # nosec B608
f"Invalid or missing action: {action}. Please select an action from {self._actions}." # nosec: B608
)

self._prompt_variables.update(input_data)
Expand Down
2 changes: 2 additions & 0 deletions dynamiq/nodes/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class Node(BaseModel, Runnable, ABC):
metadata (NodeMetadata | None): Optional metadata for the node.
is_postponed_component_init (bool): Whether component initialization is postponed.
is_optimized_for_agents (bool): Whether to optimize output for agents. By default is set to False.
supports_files (bool): Whether the node has access to files. By default is set to False.
"""
id: str = Field(default_factory=generate_uuid)
name: str | None = None
Expand All @@ -211,6 +212,7 @@ class Node(BaseModel, Runnable, ABC):

is_postponed_component_init: bool = False
is_optimized_for_agents: bool = False
is_files_allowed: bool = False

_output_references: NodeOutputReferences = PrivateAttr()

Expand Down
182 changes: 129 additions & 53 deletions dynamiq/nodes/tools/e2b_sandbox.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import io
import os
import uuid
from hashlib import sha256
from typing import Any, Literal

Expand Down Expand Up @@ -52,30 +50,64 @@
""" # noqa: E501


def generate_fallback_filename(file: bytes | io.BytesIO) -> str:
"""
Generate a unique fallback filename for uploaded files.

Args:
file: File content as bytes or BytesIO object.

Returns:
str: A unique filename based on the object's id.
"""
return f"uploaded_file_{id(file)}.bin"


def generate_file_description(file: bytes | io.BytesIO, length: int = 20) -> str:
"""
Generate a description for a file based on its content.

Args:
file: File content as bytes or BytesIO object.
length: Maximum number of bytes to include in the description.

Returns:
str: A description of the file's content or existing description.
"""
if description := getattr(file, "description", None):
return description

file_content = file.getbuffer()[:length] if isinstance(file, io.BytesIO) else file[:length]
return f"File starting with: {file_content.hex()}"


class E2BInterpreterTool(ConnectionNode):
"""
A tool to interact with an E2B sandbox, allowing for file upload/download,
Python code execution, and shell command execution.
A tool for executing code and managing files in an E2B sandbox environment.

This tool provides a secure execution environment for running Python code,
shell commands, and managing file operations.

Attributes:
group (Literal[NodeGroup.TOOLS]): Node group type.
name (str): Name of the tool.
description (str): Detailed description of the tool's capabilities.
connection (E2BConnection): E2B connection object.
installed_packages (list): List of default packages to install.
files (list): List of tuples (file_data, file_description) for initial files.
persistent_sandbox (bool): Whether to use a persistent sandbox across executions.
_sandbox (Optional[Sandbox]): Persistent sandbox instance (if enabled).
group (Literal[NodeGroup.TOOLS]): The node group identifier.
name (str): The unique name of the tool.
description (str): Detailed usage instructions and capabilities.
connection (E2BConnection): Configuration for E2B connection.
installed_packages (List[str]): Pre-installed packages in the sandbox.
files (Optional[List[Union[io.BytesIO, bytes]]]): Files to be uploaded.
persistent_sandbox (bool): Whether to maintain sandbox between executions.
is_files_allowed (bool): Whether file uploads are permitted.
_sandbox (Optional[Sandbox]): Internal sandbox instance for persistent mode.
"""

group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS
name: str = "code-interpreter_e2b"
description: str = DESCRIPTION
connection: E2BConnection
installed_packages: list = []
files: list[tuple[str | bytes, str]] | None = None
files: list[io.BytesIO | bytes] | None = None
persistent_sandbox: bool = True
_sandbox: Sandbox | None = None
is_files_allowed: bool = True

def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand All @@ -84,13 +116,38 @@ def __init__(self, **kwargs):
else:
logger.debug(f"Tool {self.name} - {self.id}: Will initialize sandbox on each execute")

@property
def to_dict_exclude_params(self) -> set:
"""
Get parameters to exclude from dictionary representation.

Returns:
set: Set of parameters to exclude.
"""
return super().to_dict_exclude_params | {"files": True}

def to_dict(self, **kwargs) -> dict[str, Any]:
"""
Convert instance to dictionary format.

Args:
**kwargs: Additional keyword arguments.

Returns:
Dict[str, Any]: Dictionary representation of the instance.
"""
data = super().to_dict(**kwargs)
if self.files:
data["files"] = [{"name": getattr(f, "name", f"file_{i}")} for i, f in enumerate(self.files)]
return data

def _initialize_persistent_sandbox(self):
"""Initializes the persistent sandbox, installs packages, and uploads initial files."""
logger.info(f"Tool {self.name} - {self.id}: Initializing Persistent Sandbox")
self._sandbox = Sandbox(api_key=self.connection.api_key)
self._install_default_packages(self._sandbox)
if self.files:
self._upload_initial_files(self._sandbox)
self._upload_files(files=self.files, sandbox=self._sandbox)
self._update_description()

def _install_default_packages(self, sandbox: Sandbox) -> None:
Expand All @@ -105,44 +162,67 @@ def _install_packages(self, sandbox: Sandbox, packages: str) -> None:
logger.debug(f"Tool {self.name} - {self.id}: Installing packages: {packages}")
sandbox.process.start_and_wait(f"pip install -qq {' '.join(packages.split(','))}")

def _upload_initial_files(self, sandbox: Sandbox) -> None:
"""Uploads the initial files to the specified sandbox."""
for file_data, file_description in self.files:
uploaded_path = self._upload_file(file_data, file_description, sandbox)
logger.debug(f"Tool {self.name} - {self.id}: Uploaded initial file to {uploaded_path}")
def _upload_files(self, files: list[bytes | io.BytesIO], sandbox: Sandbox) -> str:
"""Uploads multiple files to the sandbox and returns details for each file."""
upload_details = []
for file in files:
if isinstance(file, bytes):
file = io.BytesIO(file)

def _update_description(self) -> None:
"""Updates the tool description with information about uploaded files."""
if self.files:
self.description = self.description.strip().replace("</tool_description>", "")
self.description += "\n\n**Available Files:**"
for file_data, file_description in self.files:
filename = os.path.basename(file_data) if isinstance(file_data, str) else "uploaded_file.bin"
self.description += f"\n- **{filename}** ({file_description})"
self.description += "\n</tool_description>"
file_name = getattr(file, "name", None) or generate_fallback_filename(file)
file.name = file_name

description = getattr(file, "description", generate_file_description(file))

uploaded_path = self._upload_file(file, file_name, sandbox)
upload_details.append(
{
"original_name": file_name,
"description": description,
"uploaded_path": uploaded_path,
}
)
logger.debug(f"Tool {self.name} - {self.id}: Uploaded file '{file_name}' to {uploaded_path}")

self._update_description_with_files(upload_details)
return "\n".join([f"{file['original_name']} -> {file['uploaded_path']}" for file in upload_details])

def _upload_file(self, file_data: str | bytes, file_description: str = "", sandbox: Sandbox | None = None) -> str:
"""Uploads a file to the specified sandbox."""
def _upload_file(self, file: bytes | io.BytesIO, file_name: str, sandbox: Sandbox | None = None) -> str:
"""Uploads a single file to the specified sandbox and returns the uploaded path."""
if not sandbox:
raise ValueError("Sandbox instance is required for file upload.")

if isinstance(file_data, str):
if not os.path.exists(file_data):
raise ToolExecutionException(f"Error: Local file not found: {file_data}", recoverable=False)
uploaded_path = sandbox.upload_file(file=open(file_data, "rb"))
elif isinstance(file_data, bytes):
filename = (
f"{str(uuid.uuid4())}.bin" if not file_description else f"{file_description.replace(' ', '_')}.bin"
)
file_like_object = io.BytesIO(file_data)
file_like_object.name = filename
uploaded_path = sandbox.upload_file(file=file_like_object)
# Handle the file types (bytes or io.BytesIO)
if isinstance(file, bytes):
file_like_object = io.BytesIO(file)
file_like_object.name = file_name
elif isinstance(file, io.BytesIO):
file.name = file_name
file_like_object = file
else:
raise ValueError(f"Invalid file data type: {type(file_data)}")
raise ToolExecutionException(
f"Error: Invalid file data type: {type(file)}. Expected bytes or BytesIO.", recoverable=False
)

# Upload the file to the sandbox
uploaded_path = sandbox.upload_file(file=file_like_object)
logger.debug(f"Tool {self.name} - {self.id}: Uploaded file to {uploaded_path}")

return uploaded_path

def _update_description_with_files(self, upload_details: list[dict]) -> None:
"""Updates the tool description with detailed information about the uploaded files."""
if upload_details:
self.description = self.description.strip().replace("</tool_description>", "")
self.description += "\n\n**Uploaded Files Details:**"
for file_info in upload_details:
self.description += (
f"\n- **Original File Name**: {file_info['original_name']}\n"
f" **Description**: {file_info['description']}\n"
f" **Uploaded Path**: {file_info['uploaded_path']}\n"
)
self.description += "\n</tool_description>"

def _execute_python_code(self, code: str, sandbox: Sandbox | None = None) -> str:
"""Executes Python code in the specified sandbox."""
if not sandbox:
Expand Down Expand Up @@ -185,26 +265,22 @@ def execute(self, input_data: dict[str, Any], config: RunnableConfig | None = No
sandbox = Sandbox(api_key=self.connection.api_key)
self._install_default_packages(sandbox)
if self.files:
self._upload_initial_files(sandbox)
self._upload_files(files=self.files, sandbox=sandbox)
self._update_description()

try:

content = {}
if files := input_data.get("files"):
content["files_installation"] = self._upload_files(files=files, sandbox=sandbox)
if packages := input_data.get("packages"):
self._install_packages(sandbox=sandbox, packages=packages)
content["packages_installation"] = f"Installed packages: {input_data['packages']}"
if files := input_data.get("files"):
content["files_installation"] = self._upload_file(file_data=files, sandbox=sandbox)
if shell_command := input_data.get("shell_command"):
content["shell_command_execution"] = self._execute_shell_command(shell_command, sandbox=sandbox)
if python := input_data.get("python"):
content["code_execution"] = self._execute_python_code(python, sandbox=sandbox)
if not (packages or files or shell_command or python):
raise ToolExecutionException(
"Error: Invalid input data. Please provide 'files' for file upload (local path or bytes), "
"'python' for Python code execution, or 'shell_command' for shell command execution."
"You can also provide 'packages' to install packages.",
"Error: Invalid input data. Please provide 'files' for file upload (bytes or BytesIO)",
recoverable=True,
)

Expand All @@ -215,10 +291,10 @@ def execute(self, input_data: dict[str, Any], config: RunnableConfig | None = No

if self.is_optimized_for_agents:
result = ""
if packages_installation := content.get("packages_installation"):
result += "<Package installation>\n" + packages_installation + "\n</Package installation>"
if files_installation := content.get("files_installation"):
result += "<Files installation>\n" + files_installation + "\n</Files installation>"
if packages_installation := content.get("packages_installation"):
result += "<Package installation>\n" + packages_installation + "\n</Package installation>"
if shell_command_execution := content.get("shell_command_execution"):
result += "<Shell command execution>\n" + shell_command_execution + "\n</Shell command execution>"
if code_execution := content.get("code_execution"):
Expand Down
Loading
Loading