diff --git a/bindings/ceylon/ceylon/task/task_coordinator.py b/bindings/ceylon/ceylon/task/task_coordinator.py index c534694..5dde2c7 100644 --- a/bindings/ceylon/ceylon/task/task_coordinator.py +++ b/bindings/ceylon/ceylon/task/task_coordinator.py @@ -5,6 +5,7 @@ from ceylon import CoreAdmin, on_message from ceylon.ceylon import AgentDetail from ceylon.task import Task, TaskResult, TaskAssignment, SubTask +from ceylon.task.task_operation import TaskResultStatus from ceylon.task.task_operator import TaskOperator @@ -55,16 +56,19 @@ async def run_tasks(self): @on_message(type=TaskResult) async def on_task_result(self, result: TaskResult): - for idx, task in enumerate(self.tasks): - sub_task = task.get_next_subtask() - if sub_task is None or result.task_id != sub_task[1].id: - continue - if result.task_id == sub_task[1].id: - task.update_subtask_status(sub_task[1].name, result.result) - break - - if self.all_tasks_completed(): - await self.end_task_management() + print(f"Received task result: {result}") + if result.status == TaskResultStatus.COMPLETED: + for idx, task in enumerate(self.tasks): + sub_task = task.get_next_subtask() + print(result.task_id, sub_task[1].id, result.task_id == sub_task[1].id) + if sub_task is None or result.task_id != sub_task[1].id: + continue + if result.task_id == sub_task[1].id: + task.update_subtask_status(sub_task[1].name, result.result) + break + + if self.all_tasks_completed(): + await self.end_task_management() await self.run_tasks() diff --git a/bindings/ceylon/ceylon/task/task_operation.py b/bindings/ceylon/ceylon/task/task_operation.py index 72b8d4c..9ebbcd3 100644 --- a/bindings/ceylon/ceylon/task/task_operation.py +++ b/bindings/ceylon/ceylon/task/task_operation.py @@ -1,4 +1,5 @@ import datetime +import enum from typing import List, Optional, Tuple, Set, Dict from uuid import uuid4 @@ -144,6 +145,12 @@ class TaskAssignment(BaseModel): assigned_agent: str +class TaskResultStatus(enum.Enum): + IN_PROGRESS = "IN_PROGRESS" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + + class TaskResult(BaseModel): task_id: str parent_task_id: str @@ -151,6 +158,7 @@ class TaskResult(BaseModel): result: str name: str description: str + status: TaskResultStatus if __name__ == "__main__": diff --git a/bindings/ceylon/ceylon/task/task_operator.py b/bindings/ceylon/ceylon/task/task_operator.py index 3976b39..a6b8ccb 100644 --- a/bindings/ceylon/ceylon/task/task_operator.py +++ b/bindings/ceylon/ceylon/task/task_operator.py @@ -6,6 +6,7 @@ from ceylon import Agent, on_message from ceylon.static_val import DEFAULT_WORKSPACE_ID, DEFAULT_ADMIN_PORT from ceylon.task import TaskAssignment, TaskResult +from ceylon.task.task_operation import TaskResultStatus class TaskOperator(Agent, abc.ABC): @@ -21,17 +22,35 @@ def __init__(self, name: str, role: str, workspace_id: str = DEFAULT_WORKSPACE_I async def on_task_assignment(self, data: TaskAssignment): if data.assigned_agent == self.details().name and data.task.id not in self.exeuction_history: self.exeuction_history.append(data.task.id) - logger.info(f"{self.details().name} received subtask: {data.task.description}") - result = await self.get_result(data.task) + status = TaskResultStatus.IN_PROGRESS + result = None + try: + logger.info(f"{self.details().name} received subtask: {data.task.description}") + result = await self.get_result(data.task) + status = TaskResultStatus.COMPLETED + logger.info(f"{self.details().name} completed subtask: {data.task.description}") + except Exception as e: + logger.info(f"{self.details().name} failed subtask: {data.task.description}") + for idx, task in enumerate(self.exeuction_history): + if task == data.task.id: + del self.exeuction_history[idx] + break + logger.exception(e) + result = str(e) + status = TaskResultStatus.FAILED + result_task = TaskResult(task_id=data.task.id, name=data.task.name, description=data.task.description, agent=self.details().name, parent_task_id=data.task.parent_task_id, - result=result) + result=result, + status=status) # Update task history - await self.add_result_to_history(result_task) + if status == TaskResultStatus.COMPLETED: + await self.add_result_to_history(result_task) await self.broadcast_data(result_task) + logger.info(f"{self.details().name} sent subtask result: {data.task.description}") @on_message(type=TaskResult) async def on_task_result(self, data: TaskResult): diff --git a/bindings/ceylon/tests/tasks/llm_software_agency.py b/bindings/ceylon/tests/tasks/llm_software_agency.py index 13953e5..fbea257 100644 --- a/bindings/ceylon/tests/tasks/llm_software_agency.py +++ b/bindings/ceylon/tests/tasks/llm_software_agency.py @@ -1,5 +1,6 @@ from langchain_openai import ChatOpenAI +from ceylon.ceylon import enable_log from ceylon.llm import LLMTaskOperator, LLMTaskCoordinator from ceylon.task import Task from ceylon.utils.agent_monitor import AgentMonitor @@ -16,9 +17,9 @@ # Initialize language models -llm = ChatOpenAI(model="gpt-4o") -tool_llm = ChatOpenAI(model="gpt-4o") -code_llm = ChatOpenAI(model="gpt-4o") +llm = ChatOpenAI(model="gpt-4o-mini") +tool_llm = ChatOpenAI(model="gpt-4o-mini") +code_llm = ChatOpenAI(model="gpt-4o-mini") # Create specialized agents agents = [