Skip to content

Commit

Permalink
improve message passing
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Jul 20, 2024
1 parent 49153fd commit 9c122b9
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 17 deletions.
25 changes: 13 additions & 12 deletions bindings/ceylon/ceylon/workspace/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,26 @@

class Worker(WorkerAgent, Processor, MessageHandler):

def __init__(self, name="admin", workspace_id="admin", admin_peer=None, admin_port=8888,role="worker"):
def __init__(self, name="admin", workspace_id="admin", admin_peer=None, admin_port=8888, role="worker"):
super().__init__(config=WorkerAgentConfig(name=name,
role=role,
admin_peer=admin_peer,
admin_port=admin_port,
work_space_id=workspace_id), processor=self, on_message=self)

async def run(self, inputs: "bytes"):
print(f"Worker received: {inputs}")
try:
while True:
await self.broadcast(pickle.dumps({
"hello": f"world from worker {self.details().name}"
}))
await asyncio.sleep(1)
print(f"Worker broadcasted: {pickle.dumps({'hello': 'world from worker'})}")
except Exception as e:
print(f"Worker error: {e}")
print(f"Worker {self.details().name} finished")
pass
# print(f"Worker received: {inputs}")
# try:
# while True:
# await self.broadcast(pickle.dumps({
# "hello": f"world from worker {self.details().name}"
# }))
# await asyncio.sleep(1)
# print(f"Worker broadcasted: {pickle.dumps({'hello': 'world from worker'})}")
# except Exception as e:
# print(f"Worker error: {e}")
# print(f"Worker {self.details().name} finished")

def on_message(self, agent_id: "str", data: "bytes", time: "int"):
print(f"Worker on_message {self.details().name}", agent_id, data, time)
Expand Down
7 changes: 5 additions & 2 deletions bindings/ceylon/src/workspace/admin_agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::alloc::System;
use futures::future::join_all;
use std::collections::HashMap;
use std::sync::{Arc};
use std::time::SystemTime;
use tokio::runtime::Runtime;
use tokio::sync::{Mutex, RwLock};
use tokio::{select, signal};
Expand Down Expand Up @@ -70,7 +72,8 @@ impl AdminAgent {
}

pub async fn broadcast(&self, message: Vec<u8>) {
let node_message = AgentMessage::NodeMessage { message };
let id = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64;
let node_message = AgentMessage::NodeMessage { message, id };
match self.broadcast_emitter.send(node_message.to_bytes()).await {
Ok(_) => {}
Err(_) => {
Expand Down Expand Up @@ -168,7 +171,7 @@ impl AdminAgent {
let agent_message = AgentMessage::from_bytes(data);

match agent_message {
AgentMessage::NodeMessage { message } => {
AgentMessage::NodeMessage { message,.. } => {
on_message.lock().await.on_message(
created_by,
message,
Expand Down
2 changes: 2 additions & 0 deletions bindings/ceylon/src/workspace/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum AgentMessage {
SystemMessage {
id: u64,
message: Vec<u8>
},
NodeMessage {
id: u64,
message: Vec<u8>
},
}
Expand Down
12 changes: 9 additions & 3 deletions bindings/ceylon/src/workspace/worker_agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use std::sync::{Arc};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use tokio::{select};
use tokio::runtime::{Handle};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -34,6 +34,7 @@ pub struct WorkerAgent {

_peer_id: String,
_key: Vec<u8>,
_increment_id: RwLock<i32>,
}

impl WorkerAgent {
Expand All @@ -45,6 +46,7 @@ impl WorkerAgent {
let (broadcast_emitter, broadcast_receiver) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let admin_peer_key = create_key();
let id = get_peer_id(&admin_peer_key).to_string();
let increment_id = RwLock::new(0);
Self {
config,
_processor: Arc::new(Mutex::new(processor)),
Expand All @@ -55,10 +57,13 @@ impl WorkerAgent {

_peer_id: id,
_key: admin_peer_key.to_protobuf_encoding().unwrap(),

_increment_id: increment_id,
}
}
pub async fn broadcast(&self, message: Vec<u8>) {
let node_message = AgentMessage::NodeMessage { message };
let id = self._increment_id.read().await;
let node_message = AgentMessage::NodeMessage { message, id: *id as u64 };
let message = node_message.to_bytes();

match self.broadcast_emitter.send(message).await {
Expand All @@ -67,6 +72,7 @@ impl WorkerAgent {
error!("Failed to send broadcast message");
}
}
*self._increment_id.write().await += 1;
}
pub async fn start(&self, _: Vec<u8>) {
info!("Not yet implemented");
Expand Down Expand Up @@ -127,7 +133,7 @@ impl WorkerAgent {
let agent_message = AgentMessage::from_bytes(data);

match agent_message {
AgentMessage::NodeMessage { message } => {
AgentMessage::NodeMessage { message,.. } => {
on_message.lock().await.on_message(
created_by,
message,
Expand Down
89 changes: 89 additions & 0 deletions bindings/ceylon/tests/test_worker/meeting_schedular.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
import pickle
from typing import List

from pydantic import dataclasses

from ceylon.ceylon import enable_log
from ceylon.workspace.admin import Admin
from ceylon.workspace.worker import Worker

admin_port = 8000
admin_peer = "Coordinator"
workspace_id = "time_scheduling"


@dataclasses.dataclass
class Meeting:
name: str
duration: int
minimum_participants: int


@dataclasses.dataclass
class TimeSlot:
date: str
start_time: int
end_time: int

@property
def duration(self):
return self.end_time - self.start_time


@dataclasses.dataclass
class AvailabilityRequest:
time_slot: TimeSlot


@dataclasses.dataclass
class AvailabilityResponse:
time_slot: TimeSlot
accepted: bool


class Participant(Worker):
name: str
available_times: List[TimeSlot]

def __init__(self, name, available_times):
self.name = name
self.available_times = available_times
super().__init__(name=name, workspace_id=workspace_id, admin_peer=admin_peer, admin_port=admin_port)

async def on_message(self, agent_id: "str", data: "bytes", time: "int"):
print(f"Agent on_message {self.details().name}", agent_id, data, time)


class Coordinator(Admin):
meeting_agents = []

def __init__(self):
super().__init__(name=workspace_id, port=admin_port)

@staticmethod
def is_overlap(slot1: TimeSlot, slot2: TimeSlot, duration: int) -> bool:
latest_start = max(slot1.start_time, slot2.start_time)
earliest_end = min(slot1.end_time, slot2.end_time)
return earliest_end - latest_start >= duration

async def on_agent_connected(self, topic: "str", agent_id: "str"):
print(f"Agent {agent_id} connected to {topic}")
await self.broadcast(pickle.dumps(AvailabilityRequest(time_slot=TimeSlot("2024-07-21", 9, 12))))


async def main():
agent1 = Participant("Alice", [TimeSlot("2024-07-21", 9, 12), TimeSlot("2024-07-21", 14, 18)])
agent2 = Participant("Bob", [TimeSlot("2024-07-21", 10, 13), TimeSlot("2024-07-21", 15, 17)])
agent3 = Participant("Charlie", [TimeSlot("2024-07-21", 11, 14), TimeSlot("2024-07-21", 16, 18)])

coordinator = Coordinator()
await coordinator.run_admin(
inputs=Meeting(name="Meeting 1", duration=2, minimum_participants=2),
workers=[agent1, agent2, agent3]
)


if __name__ == '__main__':
# enable_log("INFO")
asyncio.run(main())

0 comments on commit 9c122b9

Please sign in to comment.