Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master agent lib #3

Merged
merged 9 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
target

.env
!.env.sample
!.env.sample
/bindings/ceylon/ceylon/ceylon/
49 changes: 19 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bindings/ceylon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ name = "ceylon"
crate-type = ["cdylib"]

[dependencies]
uniffi = { version = "0.27.3", features = ["tokio", "bindgen", "default"] }
uniffi = { version = "0.28.0", features = ["tokio", "bindgen", "default"] }
thiserror = "1.0.61"
uuid = { version = "1.8.0", features = ["v4"] }
serde_json = "1.0.117"
Expand All @@ -19,4 +19,4 @@ async-trait = "0.1.80"
serde = { version = "1.0.203", features = ["derive"] }

[build-dependencies]
uniffi = { version = "0.27.3", features = ["build"] }
uniffi = { version = "0.28.0", features = ["build"] }
3 changes: 2 additions & 1 deletion bindings/ceylon/ceylon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from ceylon import AgentCore, MessageHandler, Processor, uniffi_set_event_loop, Workspace, WorkspaceConfig
from .llm_agent import LLMAgent
from .runner import AgentRunner
19 changes: 0 additions & 19 deletions bindings/ceylon/ceylon/base.py

This file was deleted.

23 changes: 23 additions & 0 deletions bindings/ceylon/ceylon/llm_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from ceylon.ceylon import AgentCore, MessageHandler, Processor


class LLMManager(AgentCore, MessageHandler, Processor):
def __init__(self, name="manager"):
super().__init__(name=name, is_leader=True, on_message=self, processor=self)

async def on_message(self, agent_id, message):
pass

async def run(self, inputs):
print(inputs)


class LLMAgent(AgentCore, MessageHandler, Processor):
def __init__(self, name):
super().__init__(name=name, is_leader=False, on_message=self, processor=self)

async def on_message(self, agent_id, message):
pass

async def run(self, inputs):
print(inputs)
40 changes: 40 additions & 0 deletions bindings/ceylon/ceylon/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio

from ceylon.ceylon import Workspace, WorkspaceConfig, uniffi_set_event_loop, AgentCore


class AgentRunnerError(Exception):
pass


class AgentRunnerNotLeaderError(AgentRunnerError):
pass


class AgentRunnerCannotHaveMultipleLeadersError(AgentRunnerError):
pass


class AgentRunner:
agents = []
config: WorkspaceConfig
leader_agent = None

def __init__(self, workspace_name, workspace_host="/ip4/0.0.0.0/tcp", workspace_port=8888):
self.agents = []
self.config = WorkspaceConfig(name=workspace_name, host=workspace_host, port=workspace_port)

def register_agent(self, agent: AgentCore):
# Not accepting multiple leaders
if self.leader_agent is None and agent.is_leader():
self.leader_agent = agent
elif agent.is_leader() and self.leader_agent is not None:
raise AgentRunnerCannotHaveMultipleLeadersError()
self.agents.append(agent)

async def run(self, inputs):
if self.leader_agent is None:
raise AgentRunnerNotLeaderError()
uniffi_set_event_loop(asyncio.get_event_loop())
workspace = Workspace(agents=self.agents, config=self.config)
await workspace.run(inputs)
4 changes: 3 additions & 1 deletion bindings/ceylon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
]

dependencies = ["ollama==0.2.1"]
31 changes: 16 additions & 15 deletions bindings/ceylon/src/agent/agent.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use tokio::sync::{Mutex};
use std::sync::{Arc};
use tokio::runtime::Runtime;
use std::collections::HashMap;
use std::sync::Arc;

use sangedama::node::node::create_node;
use tokio::sync::Mutex;

use sangedama::node::node::{create_node, Message};

// The call-answer, callback interface.

#[async_trait::async_trait]
pub trait MessageHandler: Send + Sync {
async fn on_message(&self, agent_id: String, message: String);
async fn on_message(&self, agent_id: String, message: Message);
}

// The call-answer, callback interface.

#[async_trait::async_trait]
pub trait Processor: Send + Sync {
async fn run(&self);
async fn run(&self, message: HashMap<String, String>) -> ();
}

pub struct AgentCore {
Expand All @@ -25,13 +26,13 @@ pub struct AgentCore {
_workspace_id: Option<String>,
_processor: Arc<Mutex<Arc<dyn Processor>>>,
_on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
rx_0: Arc<Mutex<tokio::sync::mpsc::Receiver<Vec<u8>>>>,
tx_0: tokio::sync::mpsc::Sender<Vec<u8>>,
rx_0: Arc<Mutex<tokio::sync::mpsc::Receiver<Message>>>,
tx_0: tokio::sync::mpsc::Sender<Message>,
}

impl AgentCore {
pub fn new(name: String, is_leader: bool, on_message: Arc<dyn MessageHandler>, processor: Arc<dyn Processor>) -> Self {
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Message>(100);
let id = uuid::Uuid::new_v4().to_string();
Self {
_name: name,
Expand Down Expand Up @@ -65,15 +66,15 @@ impl AgentCore {
self._workspace_id = Option::from(workspace_id);
}

pub async fn broadcast(&self, message: String) {
self.tx_0.send(message.to_string().as_bytes().to_vec()).await.unwrap();
pub async fn broadcast(&self, message: Vec<u8>) {
self.tx_0.send(Message::data(self._name.clone(), self._id.clone(), message)).await.unwrap();
}
}

impl AgentCore {
pub(crate) async fn start(&self, topic: String, url: String) {
pub(crate) async fn start(&self, topic: String, url: String, inputs: HashMap<String, String>) {
let agent_name = self._name.clone();
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let (tx_0, rx_0) = tokio::sync::mpsc::channel::<Message>(100);
let (mut node_0, mut rx_o_0) = create_node(agent_name.clone(), true, rx_0);
let on_message = self._on_message.clone();

Expand All @@ -90,12 +91,12 @@ impl AgentCore {
}

if let Some(message) = rx_o_0.recv().await {
on_message.lock().await.on_message(agent_name.clone(), String::from_utf8_lossy(&message).to_string()).await;
on_message.lock().await.on_message(agent_name.clone(), message).await;
}
}
});
let processor = self._processor.clone();
processor.lock().await.run().await;
processor.lock().await.run(inputs).await;
}
}

Expand Down
7 changes: 6 additions & 1 deletion bindings/ceylon/src/agent/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use tokio::runtime::Runtime;
use serde::{Deserialize, Serialize};
use uniffi::deps::log::debug;

use crate::AgentCore;

Expand Down Expand Up @@ -37,14 +38,18 @@ impl Workspace {
}

pub async fn run(&self, inputs: HashMap<String, String>) {
debug!("Workspace {} running", self.id);
let mut rt = Runtime::new().unwrap();
let mut tasks = vec![];
let _inputs = inputs.clone();
for agent in self._agents.iter() {
let _inputs = _inputs.clone();
let url = format!("{}/{}", self.host, self.port);
let topic = format!("workspace-{}", agent.workspace_id());

let agent = agent.clone();
let task = rt.spawn(async move {
agent.start(topic, url).await;
agent.start(topic, url, _inputs).await;
});
tasks.push(task);
}
Expand Down
Loading
Loading