Skip to content

Commit

Permalink
Add task result status
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Aug 15, 2024
1 parent 48d6d01 commit b0fb5cc
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 17 deletions.
24 changes: 14 additions & 10 deletions bindings/ceylon/ceylon/task/task_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ceylon import CoreAdmin, on_message
from ceylon.ceylon import AgentDetail
from ceylon.task import Task, TaskResult, TaskAssignment, SubTask
from ceylon.task.task_operation import TaskResultStatus
from ceylon.task.task_operator import TaskOperator


Expand Down Expand Up @@ -55,16 +56,19 @@ async def run_tasks(self):

@on_message(type=TaskResult)
async def on_task_result(self, result: TaskResult):
for idx, task in enumerate(self.tasks):
sub_task = task.get_next_subtask()
if sub_task is None or result.task_id != sub_task[1].id:
continue
if result.task_id == sub_task[1].id:
task.update_subtask_status(sub_task[1].name, result.result)
break

if self.all_tasks_completed():
await self.end_task_management()
print(f"Received task result: {result}")
if result.status == TaskResultStatus.COMPLETED:
for idx, task in enumerate(self.tasks):
sub_task = task.get_next_subtask()
print(result.task_id, sub_task[1].id, result.task_id == sub_task[1].id)
if sub_task is None or result.task_id != sub_task[1].id:
continue
if result.task_id == sub_task[1].id:
task.update_subtask_status(sub_task[1].name, result.result)
break

if self.all_tasks_completed():
await self.end_task_management()

await self.run_tasks()

Expand Down
8 changes: 8 additions & 0 deletions bindings/ceylon/ceylon/task/task_operation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import enum
from typing import List, Optional, Tuple, Set, Dict
from uuid import uuid4

Expand Down Expand Up @@ -144,13 +145,20 @@ class TaskAssignment(BaseModel):
assigned_agent: str


class TaskResultStatus(enum.Enum):
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
FAILED = "FAILED"


class TaskResult(BaseModel):
task_id: str
parent_task_id: str
agent: str
result: str
name: str
description: str
status: TaskResultStatus


if __name__ == "__main__":
Expand Down
27 changes: 23 additions & 4 deletions bindings/ceylon/ceylon/task/task_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ceylon import Agent, on_message
from ceylon.static_val import DEFAULT_WORKSPACE_ID, DEFAULT_ADMIN_PORT
from ceylon.task import TaskAssignment, TaskResult
from ceylon.task.task_operation import TaskResultStatus


class TaskOperator(Agent, abc.ABC):
Expand All @@ -21,17 +22,35 @@ def __init__(self, name: str, role: str, workspace_id: str = DEFAULT_WORKSPACE_I
async def on_task_assignment(self, data: TaskAssignment):
if data.assigned_agent == self.details().name and data.task.id not in self.exeuction_history:
self.exeuction_history.append(data.task.id)
logger.info(f"{self.details().name} received subtask: {data.task.description}")
result = await self.get_result(data.task)
status = TaskResultStatus.IN_PROGRESS
result = None
try:
logger.info(f"{self.details().name} received subtask: {data.task.description}")
result = await self.get_result(data.task)
status = TaskResultStatus.COMPLETED
logger.info(f"{self.details().name} completed subtask: {data.task.description}")
except Exception as e:
logger.info(f"{self.details().name} failed subtask: {data.task.description}")
for idx, task in enumerate(self.exeuction_history):
if task == data.task.id:
del self.exeuction_history[idx]
break
logger.exception(e)
result = str(e)
status = TaskResultStatus.FAILED

result_task = TaskResult(task_id=data.task.id,
name=data.task.name,
description=data.task.description,
agent=self.details().name,
parent_task_id=data.task.parent_task_id,
result=result)
result=result,
status=status)
# Update task history
await self.add_result_to_history(result_task)
if status == TaskResultStatus.COMPLETED:
await self.add_result_to_history(result_task)
await self.broadcast_data(result_task)
logger.info(f"{self.details().name} sent subtask result: {data.task.description}")

@on_message(type=TaskResult)
async def on_task_result(self, data: TaskResult):
Expand Down
7 changes: 4 additions & 3 deletions bindings/ceylon/tests/tasks/llm_software_agency.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from langchain_openai import ChatOpenAI

from ceylon.ceylon import enable_log
from ceylon.llm import LLMTaskOperator, LLMTaskCoordinator
from ceylon.task import Task
from ceylon.utils.agent_monitor import AgentMonitor
Expand All @@ -16,9 +17,9 @@

# Initialize language models

llm = ChatOpenAI(model="gpt-4o")
tool_llm = ChatOpenAI(model="gpt-4o")
code_llm = ChatOpenAI(model="gpt-4o")
llm = ChatOpenAI(model="gpt-4o-mini")
tool_llm = ChatOpenAI(model="gpt-4o-mini")
code_llm = ChatOpenAI(model="gpt-4o-mini")

# Create specialized agents
agents = [
Expand Down

0 comments on commit b0fb5cc

Please sign in to comment.