From 5338633bb39dc3a5367c7619652c248a0f9a8da2 Mon Sep 17 00:00:00 2001 From: dewmal Date: Sun, 21 Jul 2024 18:51:45 +0530 Subject: [PATCH] Write LLM Agents --- bindings/ceylon/ceylon/llm/llm_agent.py | 105 +++++--- bindings/ceylon/ceylon/llm/prompt_builder.py | 107 ++++++++ bindings/ceylon/ceylon/llm/types.py | 43 +++ bindings/ceylon/ceylon/llm/unit.py | 264 +++++++++++++++++++ bindings/ceylon/ceylon/workspace/admin.py | 5 +- bindings/ceylon/ceylon/workspace/runner.py | 2 - bindings/ceylon/pyproject.toml | 1 + 7 files changed, 492 insertions(+), 35 deletions(-) create mode 100644 bindings/ceylon/ceylon/llm/prompt_builder.py create mode 100644 bindings/ceylon/ceylon/llm/types.py create mode 100644 bindings/ceylon/ceylon/llm/unit.py diff --git a/bindings/ceylon/ceylon/llm/llm_agent.py b/bindings/ceylon/ceylon/llm/llm_agent.py index 657db1f5..82a86b80 100644 --- a/bindings/ceylon/ceylon/llm/llm_agent.py +++ b/bindings/ceylon/ceylon/llm/llm_agent.py @@ -4,12 +4,12 @@ from collections import deque from typing import List +import networkx as nx from langchain_core.tools import StructuredTool, BaseTool from pydantic.dataclasses import dataclass -from ceylon.ceylon import AgentCore, Processor, MessageHandler, AgentDefinition, uniffi_set_event_loop +from ceylon.ceylon import AgentCore, Processor, MessageHandler, AgentDefinition from ceylon.llm.llm_caller import process_agent_request -from ceylon.llm.task_manager import TaskManager from ceylon.runner import RunnerInput @@ -22,13 +22,13 @@ class LLMAgentResponse: class LLMAgent(AgentCore, MessageHandler, Processor): tools: list[StructuredTool] + network_graph: nx.DiGraph + network_graph_original: nx.DiGraph queue: deque original_goal = None agent_replies: List[LLMAgentResponse] = [] - task_manager = TaskManager() - def __init__(self, name, position, instructions, responsibilities, llm, tools: list[BaseTool] = None): super().__init__(definition=AgentDefinition( name=name, @@ -38,6 +38,8 @@ def __init__(self, name, position, instructions, responsibilities, llm, tools: l ), on_message=self, processor=self) self.llm = llm self.tools = tools + # Create a directed graph to represent the workflow + self.network_graph = nx.DiGraph() # Initialize the queue and executed agents self.queue = deque() @@ -47,35 +49,59 @@ async def on_message(self, agent_id, data, time): dt: LLMAgentResponse = pickle.loads(data) print(f"{definition.name} Received message from = '{dt.agent_name}") - # next_agent = self.get_next_agent() - # if next_agent == dt.agent_name: - # self.agent_replies.append(dt) - # await self.update_status(dt.agent_name) - # - # next_agent = self.get_next_agent() - # if next_agent == definition.name: - # dependencies = list(self.network_graph_original.predecessors(next_agent)) - # print("Dependencies are:", dependencies, "for", next_agent) - # - - # only_dependencies = {dt.agent_name: dt for dt in self.agent_replies if dt.agent_name in dependencies} - # - # if len(only_dependencies) == len(dependencies): - # print("Executing", definition.name) - # await self.execute(self.original_goal) - # - # await self.execute({ - # "original_request": self.original_goal, - # **only_dependencies, - # dt.agent_name: dt.response - # }) + next_agent = self.get_next_agent() + if next_agent == dt.agent_name: + self.agent_replies.append(dt) + await self.update_status(dt.agent_name) + + next_agent = self.get_next_agent() + if next_agent == definition.name: + dependencies = list(self.network_graph_original.predecessors(next_agent)) + print("Dependencies are:", dependencies, "for", next_agent) + + only_dependencies = {dt.agent_name: dt for dt in self.agent_replies if dt.agent_name in dependencies} + + if len(only_dependencies) == len(dependencies): + print("Executing", definition.name) + await self.execute(self.original_goal) + + await self.execute({ + "original_request": self.original_goal, + **only_dependencies, + dt.agent_name: dt.response + }) async def run(self, inputs): - print(" Running LLMAgent") inputs: RunnerInput = pickle.loads(inputs) + self._initialize_graph(inputs.network) + self.original_goal = inputs.request - print(inputs.request) + await self.execute(inputs.request) + + + await self.stop() + + def _initialize_graph(self, network): + # Add nodes and edges based on the agents and their dependencies + for agent, dependencies in network.items(): + print(agent) + self.network_graph.add_node(agent) + for dependency in dependencies: + self.network_graph.add_edge(dependency, agent) + + self.network_graph_original = self.network_graph.copy() + + # Initialize the queue with nodes that have no dependencies (indegree 0) + self.queue.extend([node for node in self.network_graph if self.network_graph.in_degree(node) == 0]) + + + + def get_next_agent(self): + if not self.queue: + print("No more agents to execute.") + return None + return self.queue[0] async def execute(self, input): definition = await self.definition() @@ -97,5 +123,24 @@ async def execute(self, input): else: print("Not executing", definition.name, "as it is not the next agent in the queue.") - async def start(self, topic: "str", url: "str", inputs: "bytes") -> None: - return await super().start(topic, url, inputs) + async def update_status(self, agent): + if agent not in self.queue: + print(f"Agent {agent} is not ready to execute or has already been executed.") + return + + self.queue.remove(agent) + print(f"Executing {agent}") + + # Remove the current agent and update the graph + for successor in list(self.network_graph.successors(agent)): + self.network_graph.remove_edge(agent, successor) + if self.network_graph.in_degree(successor) == 0: + self.queue.append(successor) + self.network_graph.remove_node(agent) + + if not self.network_graph.nodes: + print("Workflow executed successfully.") + await self.stop() + elif not self.queue: + print("Cycle detected in the workflow!") + diff --git a/bindings/ceylon/ceylon/llm/prompt_builder.py b/bindings/ceylon/ceylon/llm/prompt_builder.py new file mode 100644 index 00000000..04745e54 --- /dev/null +++ b/bindings/ceylon/ceylon/llm/prompt_builder.py @@ -0,0 +1,107 @@ +import re + +from Cheetah.Template import Template + + +def get_agent_definition( + agent_config: dict +): + cleaned_string = Template(""" + You are $name, an AI agent whose role is $role. + + Primary Function: + $role_description + + Key Responsibilities: + #for $responsibility in $responsibilities + - $responsibility + #end for + + Core Skills: + #for $skill in $skills + - $skill + #end for + + #if $tools + Tools & Technologies: + #for $tool in $tools + - $tool + #end for + #end if + + #if $knowledge_domains + Specialized Knowledge Domains: + #for $domain in $knowledge_domains + - $domain + #end for + #end if + + #if $operational_parameters + Operational Parameters: + $operational_parameters + #end if + + #if $interaction_style + Interaction Style: + $interaction_style + #end if + + #if $performance_objectives + Performance Objectives: + #for $objective in $performance_objectives + - $objective + #end for + #end if + + #if $version + Version Information: + $version + #end if + + As an AI agent, you should strive to provide accurate, helpful, + and contextually appropriate responses based on the above specifications. + Always maintain the defined interaction style and adhere to the operational parameters. + If you encounter a task outside your defined capabilities or knowledge domains, please + inform the user and offer alternative solutions if possible. +""", agent_config) + # cleaned_string = re.sub(r'\s+', ' ', f"{template}") + # cleaned_string = cleaned_string.strip() + return cleaned_string + + +def get_prompt(agent_config: dict): + template = Template(""" + $agent_definition + You need to follow your responsibility. to complete the task. + -------------- + User Inputs: + #for $key, $value in $user_inputs.items() + $key: $value + #end for + + #if $history + ------------ + Other Agents Responses: + #for $key, $value in $history.items() + $key: $value + #end for + #end if + """, agent_config) + cleaned_string = re.sub(r'\s+', ' ', f"{template}") + cleaned_string = cleaned_string.strip() + return cleaned_string + + +if __name__ == '__main__': + from ceylon.llm.types import AgentDefinition + + conf = AgentDefinition( + name="Researcher", + role="researcher", + responsibility="Search the internet", + skills=["search"], + tools=[] + ).model_dump() + print(conf) + + print(get_agent_definition(conf)) diff --git a/bindings/ceylon/ceylon/llm/types.py b/bindings/ceylon/ceylon/llm/types.py new file mode 100644 index 00000000..bc4a898c --- /dev/null +++ b/bindings/ceylon/ceylon/llm/types.py @@ -0,0 +1,43 @@ +from typing import List, Optional + +from langchain_core.tools import BaseTool +from pydantic import BaseModel + + +class Step(BaseModel): + owner: str + dependencies: List[str] + + +class Job(BaseModel): + title: str + input: dict + work_order: List[Step] + visualize: bool = False + + +class AgentDefinition(BaseModel): + name: str + role: str + role_description: str + responsibilities: List[str] + skills: List[str] + tools: List[str] = [] + knowledge_domains: Optional[List[str]] = [] + interaction_style: Optional[str] = None + operational_parameters: Optional[str] = None + performance_objectives: Optional[List[str]] = [] + version: Optional[str] = None + + +class LLMAgentResponse(BaseModel): + time: float + agent_id: str + agent_name: str + response: str + + +class LLMAgentRequest(BaseModel): + name: str + user_inputs: dict + history: List[LLMAgentResponse] diff --git a/bindings/ceylon/ceylon/llm/unit.py b/bindings/ceylon/ceylon/llm/unit.py new file mode 100644 index 00000000..58dc716f --- /dev/null +++ b/bindings/ceylon/ceylon/llm/unit.py @@ -0,0 +1,264 @@ +import asyncio +import datetime +import pickle +from collections import deque +from typing import List + +import networkx as nx +from langchain.agents import AgentExecutor +from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser +from langchain.prompts import Prompt +from langchain_community.chat_models import ChatOllama +from langchain_core.tools import BaseTool +from langchain_core.utils.function_calling import format_tool_to_openai_function + +from ceylon.ceylon import AgentDetail +from ceylon.llm.prompt_builder import get_agent_definition, get_prompt +from ceylon.llm.types import LLMAgentRequest, Job, LLMAgentResponse, AgentDefinition, Step +from ceylon.tools.search_tool import SearchTool +from ceylon.workspace.admin import Admin +from ceylon.workspace.worker import Worker + +workspace_id = "llm_unit" +admin_port = 8888 +admin_peer = "admin" + + +class LLMAgent(Worker): + + def __init__(self, definition: AgentDefinition, tools: [BaseTool] = [], llm=None): + self.definition = definition + self.tools = tools + self.llm = llm + super().__init__( + name=definition.name, + workspace_id=workspace_id, + admin_port=admin_port, + admin_peer=admin_peer, + role=definition.role + ) + + async def on_message(self, agent_id: "str", data: "bytes", time: "int"): + data = pickle.loads(data) + if type(data) == LLMAgentRequest: + request: LLMAgentRequest = data + print(request.name, self.definition.name) + if request.name == self.definition.name: + definition = self.definition + definition.tools = [tool.name for tool in self.tools if isinstance(tool, BaseTool)] + agent_definition_prompt = get_agent_definition(self.definition) + prompt_value = get_prompt({ + "user_inputs": request.user_inputs, + "agent_definition": agent_definition_prompt, + "history": request.history + }) + prompt = Prompt(template=prompt_value) + if self.tools and len(self.tools) > 0: + llm = self.llm.bind(functions=[format_tool_to_openai_function(t) for t in self.tools]) + agent = prompt | llm | OpenAIFunctionsAgentOutputParser() + executor = AgentExecutor(agent=agent, tools=self.tools, verbose=True) + llm_response = executor.invoke({}) + response = LLMAgentResponse( + time=datetime.datetime.now().timestamp(), + agent_id=self.details().id, + agent_name=self.details().name, + response=llm_response["output"] + ) + await self.broadcast(pickle.dumps(response)) + else: + agent = prompt | self.llm + response = agent.invoke({}) + print(response) + + +class ChiefAgent(Admin): + job: Job + network_graph: nx.DiGraph + network_graph_original: nx.DiGraph + queue: deque + + agent_responses: List[LLMAgentResponse] = [] + + def __init__(self, name="admin", port=8888): + self.queue = deque() + # Create a directed graph to represent the workflow + self.network_graph = nx.DiGraph() + self.agent_responses = [] + super().__init__(name, port) + + async def run(self, inputs: "bytes"): + self.job: Job = pickle.loads(inputs) + # Create a directed graph + self._initialize_graph() + + def _initialize_graph(self): + for step in self.job.work_order: + self.network_graph.add_node(step.owner) + for dependency in step.dependencies: + self.network_graph.add_edge(dependency, step.owner) + + self.network_graph_original = self.network_graph.copy() + # Initialize the queue with nodes that have no dependencies (indegree 0) + self.queue.extend([node for node in self.network_graph if self.network_graph.in_degree(node) == 0]) + + def get_next_agent(self): + if not self.queue: + print("No more agents to execute.") + return None + return self.queue[0] + + async def on_agent_connected(self, topic: "str", agent: AgentDetail): + next_agent = self.get_next_agent() + print(f"Agent {agent} connected to {topic} and is executing {next_agent}") + if next_agent == agent.name: + await self.broadcast(pickle.dumps( + LLMAgentRequest(name=agent.name, + user_inputs=self.job.input, history=self.agent_responses), + )) + # self.queue.popleft() + + async def on_message(self, agent_id: "str", data: "bytes", time: "int"): + print("Admin on_message", agent_id) + data = pickle.loads(data) + if type(data) == LLMAgentResponse: + self.agent_responses.append(data) + next_agent = self.get_next_agent() + if next_agent == data.agent_name: + self.queue.popleft() + + next_agent = self.get_next_agent() + if next_agent: + await self.broadcast(pickle.dumps( + LLMAgentRequest(name=next_agent, + user_inputs=self.job.input, + history=self.agent_responses), + )) + else: + print("No more agents to execute.") + last_response = self.agent_responses[-1] + print(last_response.response) + await self.stop() + + +async def main(): + llm_lib = ChatOllama(model="llama3:instruct") + + chief = ChiefAgent( + name=workspace_id, + port=admin_port, + ) + + writer = LLMAgent( + AgentDefinition( + name="writer", + role="Creative AI Content Writer", + role_description=""" + As AIStoryWeaver, your primary function is to transform complex AI and machine learning research + into captivating, accessible content. You excel at crafting engaging narratives that bridge the gap + between technical expertise and public understanding. Your writing should spark curiosity, + foster comprehension, and ignite imagination about the potential of AI technologies. + """, + responsibilities=[ + "Synthesize technical AI research into engaging, narrative-driven articles", + "Translate complex concepts into relatable metaphors and real-world examples", + "Craft compelling storylines that capture the essence of AI advancements", + "Tailor content to appeal to readers with diverse levels of AI knowledge", + "Infuse creativity and humor to make technical subjects more approachable", + "Maintain scientific accuracy while prioritizing readability and engagement", + ], + skills=[ + "Creative writing and storytelling", + "Simplification of technical concepts", + "Audience-focused content creation", + "Metaphor and analogy generation", + "Narrative structure and pacing", + "Balancing entertainment with educational value", + ], + tools=[ + "Metaphor generator", + "Readability analysis tools", + "Interactive storytelling frameworks", + "Visual concept mapping software", + ], + knowledge_domains=[ + "Artificial Intelligence", + "Machine Learning", + "Natural Language Processing", + "Data Science", + "Technology Trends", + "Science Communication", + ], + interaction_style="Friendly, engaging, and slightly whimsical. Use a conversational tone that invites curiosity and makes complex ideas feel accessible and exciting.", + operational_parameters=""" + While creativity is encouraged, always prioritize accuracy in representing AI concepts. + Avoid oversimplification that could lead to misconceptions. When using analogies or + metaphors, clearly link them back to the original AI concepts. Encourage critical + thinking about the implications of AI technologies. + """, + performance_objectives=[ + "Increase reader engagement with AI topics", + "Improve public understanding of complex AI concepts", + "Generate shareable content that sparks discussions about AI", + "Bridge the communication gap between AI researchers and the general public", + ], + version="2.0.0" + ), + llm=llm_lib + ) + researcher = LLMAgent( + AgentDefinition( + name="researcher", + role="AI and Machine Learning Research Specialist", + role_description="Your role is to gather detailed and accurate information on how AI can be utilized in machine learning...", + responsibilities=[ + "Conduct thorough research on AI applications in machine learning", + "Gather detailed information from reputable academic and industry sources", + ], + skills=[ + "Advanced information retrieval and data mining", + "Critical analysis of technical papers and reports", + ], + tools=[ + "Academic database access (e.g., arXiv, IEEE Xplore)", + "Industry report aggregators", + ], + knowledge_domains=[ + "Artificial Intelligence", + "Machine Learning Algorithms", + ], + interaction_style="Professional and analytical. Communicate findings clearly and objectively, with a focus on accuracy and relevance.", + operational_parameters="Prioritize peer-reviewed sources and reputable industry reports...", + performance_objectives=[ + "Provide comprehensive coverage of AI applications in machine learning", + "Ensure all gathered information is current and accurately represented", + ], + version="2.0.0" + ), + tools=[SearchTool()], + llm=llm_lib + ) + + job = Job( + title="write_article", + work_order=[ + Step(owner="researcher", dependencies=[]), + Step(owner="writer", dependencies=["researcher"]), + ], + input={ + "title": "How to use AI for Machine Learning", + "tone": "informal", + "length": "large", + "style": "creative" + } + ) + + res = await chief.run_admin(pickle.dumps(job), [ + writer, + researcher + ]) + print(res) + + +if __name__ == '__main__': + # enable_log("INFO") + asyncio.run(main()) diff --git a/bindings/ceylon/ceylon/workspace/admin.py b/bindings/ceylon/ceylon/workspace/admin.py index 0045fedf..45067a3d 100644 --- a/bindings/ceylon/ceylon/workspace/admin.py +++ b/bindings/ceylon/ceylon/workspace/admin.py @@ -18,10 +18,9 @@ async def run(self, inputs: "bytes"): async def on_message(self, agent_id: "str", data: "bytes", time: "int"): pass - async def run_admin(self, inputs, workers): + async def run_admin(self, inputs: "bytes", workers): uniffi_set_event_loop(asyncio.get_event_loop()) - runner_input = RunnerInput(request=inputs, agents=[], network={}) - await self.start(pickle.dumps(runner_input), workers) + await self.start(inputs, workers) # async def execute_task(self, input): diff --git a/bindings/ceylon/ceylon/workspace/runner.py b/bindings/ceylon/ceylon/workspace/runner.py index 6db6748e..1764dff8 100644 --- a/bindings/ceylon/ceylon/workspace/runner.py +++ b/bindings/ceylon/ceylon/workspace/runner.py @@ -7,8 +7,6 @@ class RunnerInput(BaseModel): request: Any - agents: List[AgentDetail] - network: Dict[str, List[str]] class Config: arbitrary_types_allowed = True diff --git a/bindings/ceylon/pyproject.toml b/bindings/ceylon/pyproject.toml index 7632caa5..cbff6f67 100644 --- a/bindings/ceylon/pyproject.toml +++ b/bindings/ceylon/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "langchain==0.2.6", "ollama==0.2.1", "openai==1.35.7", + "ct3==3.3.3" ] [project.urls]