diff --git a/src/conductor/execution/executor.py b/src/conductor/execution/executor.py index 280c86b..ec8f442 100644 --- a/src/conductor/execution/executor.py +++ b/src/conductor/execution/executor.py @@ -11,6 +11,13 @@ from conductor.execution.task import ExecutingTask from conductor.execution.task_state import TaskState from conductor.task_types.base import TaskExecutionHandle +from conductor.utils.colored_output import ( + print_bold, + print_cyan, + print_green, + print_red, + print_yellow, +) from conductor.utils.sigchld import SigchldHelper from conductor.utils.time import time_to_readable_string @@ -123,7 +130,7 @@ def run_plan( # 1. Print out any cached tasks. for cached_task in plan.cached_tasks: - print( + print_cyan( "Using cached results for {}.".format( str(cached_task.task.identifier) ) @@ -159,8 +166,10 @@ def run_plan( except ConductorAbort: self._inflight_tasks.terminate_processes() elapsed = time.time() - start - print( - "🔸 Task aborted. (ran for {})".format(time_to_readable_string(elapsed)) + print() + print_yellow( + "🔸 Task aborted. (ran for {})".format(time_to_readable_string(elapsed)), + bold=True, ) print() raise @@ -198,11 +207,11 @@ def _launch_tasks_if_able(self, ctx: Context, stop_on_first_error: bool) -> bool if not next_task.exe_deps_succeeded(): # At least one dependency failed, so we need to skip this task. - print("Skipping {}.".format(str(next_task.task.identifier))) + print_yellow("Skipping {}.".format(str(next_task.task.identifier))) next_task.set_state(TaskState.SKIPPED) self._process_finished_task(next_task) else: - print("Running {}...".format(str(next_task.task.identifier))) + print_cyan("Running {}...".format(str(next_task.task.identifier))) try: slot = ( self._available_slots[-1] @@ -244,7 +253,7 @@ def _wait_for_next_inflight_task( try: task.task.finish_execution(handle, ctx) task.set_state(TaskState.SUCCEEDED) - print("{} completed successfully.".format(task.task.identifier)) + print_green("✓ {} completed successfully.".format(task.task.identifier)) except ConductorAbort: task.set_state(TaskState.ABORTED) # N.B. A slot may be leaked here, but it does not matter because we @@ -289,7 +298,10 @@ def _report_execution_results(self, plan: ExecutionPlan, elapsed_time: float): # Print the final execution result (succeeded or failed). if all_succeeded and (main_task_executed or main_task_cached): - print("✨ Done! (ran for {})".format(time_to_readable_string(elapsed_time))) + print() + print_bold( + "✨ Done! (ran for {})".format(time_to_readable_string(elapsed_time)) + ) else: # At least one task must have failed. @@ -301,13 +313,15 @@ def _report_execution_results(self, plan: ExecutionPlan, elapsed_time: float): elif exe_task.state == TaskState.FAILED: failed_tasks.append(exe_task) assert len(failed_tasks) > 0 - print( + print() + print_red( "🔴 Task failed. (ran for {})".format( time_to_readable_string(elapsed_time) - ) + ), + bold=True, ) print() - print("Failed task(s):") + print_bold("Failed task(s):") for failed in failed_tasks: print(" {}".format(failed.task.identifier)) assert failed.stored_error is not None @@ -318,7 +332,7 @@ def _report_execution_results(self, plan: ExecutionPlan, elapsed_time: float): ) print() if len(skipped_tasks) > 0: - print("Skipped task(s) (one or more dependencies failed):") + print_bold("Skipped task(s) (one or more dependencies failed):") for skipped in skipped_tasks: print(" {}".format(skipped.task.identifier)) print() @@ -327,4 +341,4 @@ def _report_execution_results(self, plan: ExecutionPlan, elapsed_time: float): raise failed_tasks[0].stored_error def _print_task_failed(self, task: ExecutingTask): - print("{} failed.".format(task.task.identifier)) + print_red("✗ {} failed.".format(task.task.identifier)) diff --git a/src/conductor/utils/colored_output.py b/src/conductor/utils/colored_output.py new file mode 100644 index 0000000..095ed00 --- /dev/null +++ b/src/conductor/utils/colored_output.py @@ -0,0 +1,44 @@ +from typing import Dict + + +class _Colors: + Blue = "\033[34m" + Cyan = "\033[36m" + Green = "\033[32m" + Red = "\033[31m" + Yellow = "\033[33m" + + Bold = "\033[1m" + Reset = "\033[0m" + + +def print_cyan(message: str, **kwargs) -> None: + _print_colored(message, _Colors.Cyan, kwargs) + + +def print_blue(message: str, **kwargs) -> None: + _print_colored(message, _Colors.Blue, kwargs) + + +def print_green(message: str, **kwargs) -> None: + _print_colored(message, _Colors.Green, kwargs) + + +def print_red(message: str, **kwargs) -> None: + _print_colored(message, _Colors.Red, kwargs) + + +def print_yellow(message: str, **kwargs) -> None: + _print_colored(message, _Colors.Yellow, kwargs) + + +def print_bold(message: str, **kwargs) -> None: + _print_colored(message, _Colors.Bold, kwargs) + + +def _print_colored(message: str, color: str, kwargs: Dict) -> None: + if "bold" in kwargs: + if kwargs["bold"]: + color += _Colors.Bold + del kwargs["bold"] + print(f"{color}{message}{_Colors.Reset}", **kwargs)