Skip to content

Commit

Permalink
Write LLM Agents
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Jul 21, 2024
1 parent f04aa73 commit 5338633
Show file tree
Hide file tree
Showing 7 changed files with 492 additions and 35 deletions.
105 changes: 75 additions & 30 deletions bindings/ceylon/ceylon/llm/llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from collections import deque
from typing import List

import networkx as nx
from langchain_core.tools import StructuredTool, BaseTool
from pydantic.dataclasses import dataclass

from ceylon.ceylon import AgentCore, Processor, MessageHandler, AgentDefinition, uniffi_set_event_loop
from ceylon.ceylon import AgentCore, Processor, MessageHandler, AgentDefinition
from ceylon.llm.llm_caller import process_agent_request
from ceylon.llm.task_manager import TaskManager
from ceylon.runner import RunnerInput


Expand All @@ -22,13 +22,13 @@ class LLMAgentResponse:

class LLMAgent(AgentCore, MessageHandler, Processor):
tools: list[StructuredTool]
network_graph: nx.DiGraph
network_graph_original: nx.DiGraph
queue: deque
original_goal = None

agent_replies: List[LLMAgentResponse] = []

task_manager = TaskManager()

def __init__(self, name, position, instructions, responsibilities, llm, tools: list[BaseTool] = None):
super().__init__(definition=AgentDefinition(
name=name,
Expand All @@ -38,6 +38,8 @@ def __init__(self, name, position, instructions, responsibilities, llm, tools: l
), on_message=self, processor=self)
self.llm = llm
self.tools = tools
# Create a directed graph to represent the workflow
self.network_graph = nx.DiGraph()

# Initialize the queue and executed agents
self.queue = deque()
Expand All @@ -47,35 +49,59 @@ async def on_message(self, agent_id, data, time):
dt: LLMAgentResponse = pickle.loads(data)
print(f"{definition.name} Received message from = '{dt.agent_name}")

# next_agent = self.get_next_agent()
# if next_agent == dt.agent_name:
# self.agent_replies.append(dt)
# await self.update_status(dt.agent_name)
#
# next_agent = self.get_next_agent()
# if next_agent == definition.name:
# dependencies = list(self.network_graph_original.predecessors(next_agent))
# print("Dependencies are:", dependencies, "for", next_agent)
#

# only_dependencies = {dt.agent_name: dt for dt in self.agent_replies if dt.agent_name in dependencies}
#
# if len(only_dependencies) == len(dependencies):
# print("Executing", definition.name)
# await self.execute(self.original_goal)
#
# await self.execute({
# "original_request": self.original_goal,
# **only_dependencies,
# dt.agent_name: dt.response
# })
next_agent = self.get_next_agent()
if next_agent == dt.agent_name:
self.agent_replies.append(dt)
await self.update_status(dt.agent_name)

next_agent = self.get_next_agent()
if next_agent == definition.name:
dependencies = list(self.network_graph_original.predecessors(next_agent))
print("Dependencies are:", dependencies, "for", next_agent)

only_dependencies = {dt.agent_name: dt for dt in self.agent_replies if dt.agent_name in dependencies}

if len(only_dependencies) == len(dependencies):
print("Executing", definition.name)
await self.execute(self.original_goal)

await self.execute({
"original_request": self.original_goal,
**only_dependencies,
dt.agent_name: dt.response
})

async def run(self, inputs):
print(" Running LLMAgent")
inputs: RunnerInput = pickle.loads(inputs)
self._initialize_graph(inputs.network)

self.original_goal = inputs.request

print(inputs.request)
await self.execute(inputs.request)


await self.stop()

def _initialize_graph(self, network):
# Add nodes and edges based on the agents and their dependencies
for agent, dependencies in network.items():
print(agent)
self.network_graph.add_node(agent)
for dependency in dependencies:
self.network_graph.add_edge(dependency, agent)

self.network_graph_original = self.network_graph.copy()

# Initialize the queue with nodes that have no dependencies (indegree 0)
self.queue.extend([node for node in self.network_graph if self.network_graph.in_degree(node) == 0])



def get_next_agent(self):
if not self.queue:
print("No more agents to execute.")
return None
return self.queue[0]

async def execute(self, input):
definition = await self.definition()
Expand All @@ -97,5 +123,24 @@ async def execute(self, input):
else:
print("Not executing", definition.name, "as it is not the next agent in the queue.")

async def start(self, topic: "str", url: "str", inputs: "bytes") -> None:
return await super().start(topic, url, inputs)
async def update_status(self, agent):
if agent not in self.queue:
print(f"Agent {agent} is not ready to execute or has already been executed.")
return

self.queue.remove(agent)
print(f"Executing {agent}")

# Remove the current agent and update the graph
for successor in list(self.network_graph.successors(agent)):
self.network_graph.remove_edge(agent, successor)
if self.network_graph.in_degree(successor) == 0:
self.queue.append(successor)
self.network_graph.remove_node(agent)

if not self.network_graph.nodes:
print("Workflow executed successfully.")
await self.stop()
elif not self.queue:
print("Cycle detected in the workflow!")

107 changes: 107 additions & 0 deletions bindings/ceylon/ceylon/llm/prompt_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import re

from Cheetah.Template import Template


def get_agent_definition(
agent_config: dict
):
cleaned_string = Template("""
You are $name, an AI agent whose role is $role.
Primary Function:
$role_description
Key Responsibilities:
#for $responsibility in $responsibilities
- $responsibility
#end for
Core Skills:
#for $skill in $skills
- $skill
#end for
#if $tools
Tools & Technologies:
#for $tool in $tools
- $tool
#end for
#end if
#if $knowledge_domains
Specialized Knowledge Domains:
#for $domain in $knowledge_domains
- $domain
#end for
#end if
#if $operational_parameters
Operational Parameters:
$operational_parameters
#end if
#if $interaction_style
Interaction Style:
$interaction_style
#end if
#if $performance_objectives
Performance Objectives:
#for $objective in $performance_objectives
- $objective
#end for
#end if
#if $version
Version Information:
$version
#end if
As an AI agent, you should strive to provide accurate, helpful,
and contextually appropriate responses based on the above specifications.
Always maintain the defined interaction style and adhere to the operational parameters.
If you encounter a task outside your defined capabilities or knowledge domains, please
inform the user and offer alternative solutions if possible.
""", agent_config)
# cleaned_string = re.sub(r'\s+', ' ', f"{template}")
# cleaned_string = cleaned_string.strip()
return cleaned_string


def get_prompt(agent_config: dict):
template = Template("""
$agent_definition
You need to follow your responsibility. to complete the task.
--------------
User Inputs:
#for $key, $value in $user_inputs.items()
$key: $value
#end for
#if $history
------------
Other Agents Responses:
#for $key, $value in $history.items()
$key: $value
#end for
#end if
""", agent_config)
cleaned_string = re.sub(r'\s+', ' ', f"{template}")
cleaned_string = cleaned_string.strip()
return cleaned_string


if __name__ == '__main__':
from ceylon.llm.types import AgentDefinition

conf = AgentDefinition(
name="Researcher",
role="researcher",
responsibility="Search the internet",
skills=["search"],
tools=[]
).model_dump()
print(conf)

print(get_agent_definition(conf))
43 changes: 43 additions & 0 deletions bindings/ceylon/ceylon/llm/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import List, Optional

from langchain_core.tools import BaseTool
from pydantic import BaseModel


class Step(BaseModel):
owner: str
dependencies: List[str]


class Job(BaseModel):
title: str
input: dict
work_order: List[Step]
visualize: bool = False


class AgentDefinition(BaseModel):
name: str
role: str
role_description: str
responsibilities: List[str]
skills: List[str]
tools: List[str] = []
knowledge_domains: Optional[List[str]] = []
interaction_style: Optional[str] = None
operational_parameters: Optional[str] = None
performance_objectives: Optional[List[str]] = []
version: Optional[str] = None


class LLMAgentResponse(BaseModel):
time: float
agent_id: str
agent_name: str
response: str


class LLMAgentRequest(BaseModel):
name: str
user_inputs: dict
history: List[LLMAgentResponse]
Loading

0 comments on commit 5338633

Please sign in to comment.