Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master v14 #12

Merged
merged 58 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
9ba418d
Update for runtime
dewmal Jul 3, 2024
e4cd6a5
stop sync.
dewmal Jul 3, 2024
a400dbd
add lib
dewmal Jul 3, 2024
be4f6e5
Create task manager
dewmal Jul 4, 2024
86bdd97
can run agent separately with bugs
dewmal Jul 4, 2024
5aaf3b3
can run agent separately with bugs
dewmal Jul 6, 2024
5062d37
create peer
dewmal Jul 6, 2024
da124e1
create libp2p server client
dewmal Jul 7, 2024
4ddd658
add tracing
dewmal Jul 7, 2024
afb0bff
add tracing
dewmal Jul 7, 2024
f12722f
add tracing
dewmal Jul 8, 2024
35a0c17
implement message caching
dewmal Jul 8, 2024
1575d38
clean code
dewmal Jul 8, 2024
b5c27eb
create behaviour
dewmal Jul 9, 2024
1296499
create admin peer
dewmal Jul 9, 2024
18daeb1
admin peer connect with client peers
dewmal Jul 9, 2024
75c305e
connect with admin peer
dewmal Jul 10, 2024
a85e5f1
add multiple peers
dewmal Jul 10, 2024
a9d1de8
Update dialing and naming
dewmal Jul 10, 2024
d738e7f
gossip pub sub for all
dewmal Jul 12, 2024
461c8f0
Agents has members
dewmal Jul 12, 2024
482d094
Update names
dewmal Jul 12, 2024
4d8f17a
optimize code
dewmal Jul 12, 2024
97a4ba4
Create public event
dewmal Jul 14, 2024
0816202
Update event processing
dewmal Jul 14, 2024
b7428c8
make all same
dewmal Jul 14, 2024
db06fc0
register peers
dewmal Jul 14, 2024
f7c9e3c
register peers in admin site
dewmal Jul 14, 2024
8d714b4
create message class
dewmal Jul 14, 2024
1b7c580
Message processing admin
dewmal Jul 14, 2024
60de718
Fix message passing from admin to members
dewmal Jul 14, 2024
e8e839d
Fix message flow
dewmal Jul 14, 2024
f9137b1
update test app
dewmal Jul 14, 2024
de82c24
prepare for release 0.14.0
dewmal Jul 14, 2024
49d8f8f
admin agent start from python side
dewmal Jul 14, 2024
e367296
impl stop
dewmal Jul 15, 2024
9cc78b6
Update tasks
dewmal Jul 15, 2024
8666eb3
Update Code
dewmal Jul 15, 2024
609b645
Run agents and workers
dewmal Jul 15, 2024
3314e5e
Run agents and workers together
dewmal Jul 15, 2024
7ad5a69
Run agent with processor and message handler
dewmal Jul 15, 2024
8f668cb
Update agents
dewmal Jul 15, 2024
8567afb
Agent can run with calling python code
dewmal Jul 16, 2024
36d6e44
Update messages
dewmal Jul 16, 2024
e438b0b
Get Agent details
dewmal Jul 16, 2024
01f2fe2
Create simple agent
dewmal Jul 16, 2024
aafdc18
Update id create process
dewmal Jul 17, 2024
c462bbb
Update sate machine
dewmal Jul 19, 2024
8217fd1
Update sate machine
dewmal Jul 19, 2024
f288e28
Update message capabilities
dewmal Jul 20, 2024
e58f6c2
Fix message passing issue
dewmal Jul 20, 2024
0378e9f
On Agent Connected
dewmal Jul 20, 2024
accd3ac
On Agent Connected get data
dewmal Jul 20, 2024
f8b2ea0
Update agent with role
dewmal Jul 20, 2024
6adbda5
Update agent with role
dewmal Jul 20, 2024
03c10f1
Update agent framework core
dewmal Jul 20, 2024
e6b541d
clean code
dewmal Jul 20, 2024
67488da
clean code 2
dewmal Jul 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 173 additions & 103 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = [
"libs/sangedama",
"libs/utils/utils-random-names",
"bindings/ceylon", "libs/sangathika", ]
"bindings/ceylon"
]
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@

[![PyPI - Version](https://img.shields.io/pypi/v/ceylon.svg)](https://pypi.org/project/ceylon)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/ceylon.svg)](https://pypi.org/project/ceylon)


#Agents

## Admin
- Manage other agents

## Workers
- Execute task what they recived to do from admin
- Send update to admin after finish the task
5 changes: 4 additions & 1 deletion bindings/ceylon/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ceylon"
version = "0.13.5"
version = "0.14.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -19,6 +19,9 @@ async-trait = "0.1.80"
serde = { version = "1.0.203", features = ["derive"] }
log = "0.4.21"
env_logger = "0.11.3"
tracing-subscriber = "0.3.18"
tracing = "0.1.40"
futures = { version = "0.3.30", default-features = true, features = ["default"] }

[build-dependencies]
uniffi = { version = "0.28.0", features = ["build"] }
2 changes: 1 addition & 1 deletion bindings/ceylon/ceylon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .runner import AgentRunner
# from .runner import AgentRunner
105 changes: 30 additions & 75 deletions bindings/ceylon/ceylon/llm/llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from collections import deque
from typing import List

import networkx as nx
from langchain_core.tools import StructuredTool, BaseTool
from pydantic.dataclasses import dataclass

from ceylon.ceylon import AgentCore, Processor, MessageHandler, AgentDefinition
from ceylon.ceylon import AgentCore, Processor, MessageHandler, AgentDefinition, uniffi_set_event_loop
from ceylon.llm.llm_caller import process_agent_request
from ceylon.llm.task_manager import TaskManager
from ceylon.runner import RunnerInput


Expand All @@ -22,13 +22,13 @@ class LLMAgentResponse:

class LLMAgent(AgentCore, MessageHandler, Processor):
tools: list[StructuredTool]
network_graph: nx.DiGraph
network_graph_original: nx.DiGraph
queue: deque
original_goal = None

agent_replies: List[LLMAgentResponse] = []

task_manager = TaskManager()

def __init__(self, name, position, instructions, responsibilities, llm, tools: list[BaseTool] = None):
super().__init__(definition=AgentDefinition(
name=name,
Expand All @@ -38,8 +38,6 @@ def __init__(self, name, position, instructions, responsibilities, llm, tools: l
), on_message=self, processor=self)
self.llm = llm
self.tools = tools
# Create a directed graph to represent the workflow
self.network_graph = nx.DiGraph()

# Initialize the queue and executed agents
self.queue = deque()
Expand All @@ -49,59 +47,35 @@ async def on_message(self, agent_id, data, time):
dt: LLMAgentResponse = pickle.loads(data)
print(f"{definition.name} Received message from = '{dt.agent_name}")

next_agent = self.get_next_agent()
if next_agent == dt.agent_name:
self.agent_replies.append(dt)
await self.update_status(dt.agent_name)

next_agent = self.get_next_agent()
if next_agent == definition.name:
dependencies = list(self.network_graph_original.predecessors(next_agent))
print("Dependencies are:", dependencies, "for", next_agent)

only_dependencies = {dt.agent_name: dt for dt in self.agent_replies if dt.agent_name in dependencies}

if len(only_dependencies) == len(dependencies):
print("Executing", definition.name)
await self.execute(self.original_goal)

await self.execute({
"original_request": self.original_goal,
**only_dependencies,
dt.agent_name: dt.response
})
# next_agent = self.get_next_agent()
# if next_agent == dt.agent_name:
# self.agent_replies.append(dt)
# await self.update_status(dt.agent_name)
#
# next_agent = self.get_next_agent()
# if next_agent == definition.name:
# dependencies = list(self.network_graph_original.predecessors(next_agent))
# print("Dependencies are:", dependencies, "for", next_agent)
#

# only_dependencies = {dt.agent_name: dt for dt in self.agent_replies if dt.agent_name in dependencies}
#
# if len(only_dependencies) == len(dependencies):
# print("Executing", definition.name)
# await self.execute(self.original_goal)
#
# await self.execute({
# "original_request": self.original_goal,
# **only_dependencies,
# dt.agent_name: dt.response
# })

async def run(self, inputs):
print(" Running LLMAgent")
inputs: RunnerInput = pickle.loads(inputs)
self._initialize_graph(inputs.network)

self.original_goal = inputs.request

await self.execute(inputs.request)


await self.stop()

def _initialize_graph(self, network):
# Add nodes and edges based on the agents and their dependencies
for agent, dependencies in network.items():
print(agent)
self.network_graph.add_node(agent)
for dependency in dependencies:
self.network_graph.add_edge(dependency, agent)

self.network_graph_original = self.network_graph.copy()

# Initialize the queue with nodes that have no dependencies (indegree 0)
self.queue.extend([node for node in self.network_graph if self.network_graph.in_degree(node) == 0])



def get_next_agent(self):
if not self.queue:
print("No more agents to execute.")
return None
return self.queue[0]
print(inputs.request)

async def execute(self, input):
definition = await self.definition()
Expand All @@ -123,24 +97,5 @@ async def execute(self, input):
else:
print("Not executing", definition.name, "as it is not the next agent in the queue.")

async def update_status(self, agent):
if agent not in self.queue:
print(f"Agent {agent} is not ready to execute or has already been executed.")
return

self.queue.remove(agent)
print(f"Executing {agent}")

# Remove the current agent and update the graph
for successor in list(self.network_graph.successors(agent)):
self.network_graph.remove_edge(agent, successor)
if self.network_graph.in_degree(successor) == 0:
self.queue.append(successor)
self.network_graph.remove_node(agent)

if not self.network_graph.nodes:
print("Workflow executed successfully.")
await self.stop()
elif not self.queue:
print("Cycle detected in the workflow!")

async def start(self, topic: "str", url: "str", inputs: "bytes") -> None:
return await super().start(topic, url, inputs)
107 changes: 107 additions & 0 deletions bindings/ceylon/ceylon/llm/task_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import enum
from typing import List, Tuple

from pydantic import dataclasses


class TaskStatus(enum.Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"


@dataclasses.dataclass
class Task:
name: str
dependencies: List[str]
status: TaskStatus = TaskStatus.PENDING


class TaskManager:
def __init__(self, tasks: List[Task] = None):
self.tasks = tasks or []

def add_tasks(self, tasks: List[Task]):
self.tasks.extend(tasks)

def add_dependencies(self, task_name: str, dependencies: List[str]):
task = next((task for task in self.tasks if task.name == task_name), None)
if task:
task.dependencies.extend(dependencies)

def update_status(self, task_name: str, status: TaskStatus) -> bool:
task = next((task for task in self.tasks if task.name == task_name), None)
if task:
if status == TaskStatus.RUNNING or status == TaskStatus.COMPLETED:
can_start, required_tasks = self.can_start_with_required_tasks(task_name)
if not can_start:
print(
f"Cannot update task '{task_name}' to '{status.name}'"
f" because the following tasks need to be completed first: {required_tasks}")
return False
task.status = status
return True
return False

def find_next_task(self) -> List[Task]:
tasks: List[Task] = self.tasks
# Create a dictionary to quickly access task status by name
task_status = {task.name: task.status for task in tasks}

# List to hold the names of tasks that can be started next
next_tasks = []

# Iterate through each task to find eligible ones
for task in tasks:
if task.status == TaskStatus.PENDING:
# Check if all dependencies are completed
if all(task_status[dep] == TaskStatus.COMPLETED for dep in task.dependencies):
next_tasks.append(task)

return next_tasks

def can_start_with_required_tasks(self, task_name: str) -> Tuple[bool, List[str]]:
task = next((task for task in self.tasks if task.name == task_name), None)
if not task:
return False, []

required_tasks = self._get_pending_dependencies(task)
can_start = len(required_tasks) == 0

return can_start, required_tasks

def _get_pending_dependencies(self, task: Task) -> List[str]:
pending_dependencies = []

for dependency_name in task.dependencies:
dependency = next((t for t in self.tasks if t.name == dependency_name), None)
if dependency and dependency.status != TaskStatus.COMPLETED:
pending_dependencies.append(dependency_name)
pending_dependencies.extend(self._get_pending_dependencies(dependency))

return list(set(pending_dependencies))


if __name__ == '__main__':
tasks = [
Task(name="task1", dependencies=["task2", "task3"]),
Task(name="task2", dependencies=["task4"]),
Task(name="task3", dependencies=["task4"]),
Task(name="task4", dependencies=[]),
Task(name="task9", dependencies=[]),
Task(name="task5", dependencies=["task3"]),
Task(name="task6", dependencies=["task3"])
]

task_manager = TaskManager(tasks)
print(task_manager.find_next_task())

print(task_manager.can_start_with_required_tasks("task1"))

task_manager.update_status("task4", TaskStatus.COMPLETED)
print(task_manager.can_start_with_required_tasks("task1"))
task_manager.update_status("task2", TaskStatus.COMPLETED)
print(task_manager.can_start_with_required_tasks("task1"))
task_manager.update_status("task3", TaskStatus.COMPLETED)
print(task_manager.can_start_with_required_tasks("task1"))
8 changes: 1 addition & 7 deletions bindings/ceylon/ceylon/runner.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import asyncio
import pickle
from typing import List, Dict
from typing import List, Dict, Any

from pydantic import BaseModel

from .ceylon import AgentDefinition, WorkspaceConfig, AgentCore, Workspace
from ceylon.ceylon.ceylon import uniffi_set_event_loop


class RunnerInput(BaseModel):
request: dict
agents: List[AgentDefinition]
network: Dict[str, List[str]]

class Config:
arbitrary_types_allowed = True


class AgentRunner:
Expand Down
Empty file.
Empty file.
Empty file.
31 changes: 31 additions & 0 deletions bindings/ceylon/ceylon/workspace/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import pickle

from ceylon.ceylon import AdminAgent, AdminAgentConfig, Processor, MessageHandler, uniffi_set_event_loop, EventHandler
from ceylon.workspace.runner import RunnerInput


class Admin(AdminAgent, Processor, MessageHandler, EventHandler):

def __init__(self, name="admin", port=8888):
print("Admin initialized")
super().__init__(config=AdminAgentConfig(name=name, port=port), processor=self, on_message=self, on_event=self)

async def run(self, inputs: "bytes"):
pass

#
async def on_message(self, agent_id: "str", data: "bytes", time: "int"):
print(f"Admin on_message {self.details().name}", agent_id, data, time)

async def run_admin(self, inputs, workers):
uniffi_set_event_loop(asyncio.get_event_loop())
runner_input = RunnerInput(request=inputs, agents=[], network={})
await self.start(pickle.dumps(runner_input), workers)

#
async def execute_task(self, input):
pass

async def on_agent_connected(self, topic: "str", agent_id: "str"):
print("Agent connected", agent_id, topic)
13 changes: 13 additions & 0 deletions bindings/ceylon/ceylon/workspace/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pydantic.dataclasses import dataclass


@dataclass
class AdminRequest:
name: str
message: str


@dataclass
class WorkerResponse:
name: str
message: str
14 changes: 14 additions & 0 deletions bindings/ceylon/ceylon/workspace/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import Any, List, Dict

from pydantic import BaseModel

from ceylon.ceylon import AgentDetail


class RunnerInput(BaseModel):
request: Any
agents: List[AgentDetail]
network: Dict[str, List[str]]

class Config:
arbitrary_types_allowed = True
Loading