diff --git a/karton/system/system.py b/karton/system/system.py index 3d6b14b..aec81fe 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -26,6 +26,7 @@ class SystemService(KartonServiceBase): version = __version__ with_service_info = True + CRASH_STARTED_TASKS_ON_TIMEOUT = False GC_INTERVAL = 3 * 60 TASK_DISPATCHED_TIMEOUT = 24 * 3600 TASK_STARTED_TIMEOUT = 24 * 3600 @@ -45,9 +46,31 @@ def __init__(self, config: Optional[Config]) -> None: ) self.enable_gc = self.config.getboolean("system", "enable_gc", True) self.enable_router = self.config.getboolean("system", "enable_router", True) + self.crash_started_tasks_on_timeout = self.config.getboolean( + "system", "crash_started_tasks_on_timeout", False + ) self.last_gc_trigger = time.time() + def _log_config(self): + self.log.info( + "Effective config:\n" + " gc_interval:\t%s\n" + " task_dispatched_timeout:\t%s\n" + " task_started_timeout:\t%s\n" + " task_crashed_timeout:\t%s\n" + " enable_gc:\t%s\n" + " enable_router:\t%s\n" + " crash_started_tasks_on_timeout:\t%s", + self.gc_interval, + self.task_dispatched_timeout, + self.task_started_timeout, + self.task_crashed_timeout, + self.enable_gc, + self.enable_router, + self.crash_started_tasks_on_timeout, + ) + def gc_collect_resources(self) -> None: # Collects unreferenced resources left in object storage karton_bucket = self.backend.default_bucket_name @@ -68,6 +91,7 @@ def gc_collect_resources(self) -> None: self.backend.remove_objects(karton_bucket, resources_to_remove) def gc_collect_tasks(self) -> None: + self.log.debug("GC: gc_collect_tasks started") # Collects finished tasks root_tasks = set() running_root_tasks = set() @@ -75,6 +99,7 @@ def gc_collect_tasks(self) -> None: current_time = time.time() to_delete = [] + to_crash = [] queues_to_clear = set() online_consumers = self.backend.get_online_consumers() @@ -116,14 +141,24 @@ def gc_collect_tasks(self) -> None: and task.last_update is not None and current_time > task.last_update + self.task_started_timeout ): - to_delete.append(task) - self.log.error( - "Task %s is in Started state more than %d seconds. " - "Killed. (receiver: %s)", - task.uid, - self.task_started_timeout, - task.headers.get("receiver", ""), - ) + if self.crash_started_tasks_on_timeout: + to_crash.append(task) + self.log.error( + "Task %s is in Started state more than %d seconds. " + "Crashed. (receiver: %s)", + task.uid, + self.task_started_timeout, + task.headers.get("receiver", ""), + ) + else: + to_delete.append(task) + self.log.error( + "Task %s is in Started state more than %d seconds. " + "Killed. (receiver: %s)", + task.uid, + self.task_started_timeout, + task.headers.get("receiver", ""), + ) elif task.status == TaskState.FINISHED: to_delete.append(task) self.log.debug("GC: Finished task %s", task.uid) @@ -151,11 +186,26 @@ def gc_collect_tasks(self) -> None: self.backend.increment_metrics_list( KartonMetrics.TASK_GARBAGE_COLLECTED, to_increment ) + if to_crash: + to_increment = [ + task.headers.get("receiver", "unknown") for task in to_crash + ] + for task in to_crash: + task.error = [ + "This task was STARTED too long (TASK_STARTED_TIMEOUT), " + "so status was changes to CRASHED." + ] + self.backend.set_task_status(task, TaskState.CRASHED) + self.backend.increment_metrics_list( + KartonMetrics.TASK_CRASHED, to_increment + ) for finished_root_task in root_tasks.difference(running_root_tasks): # TODO: Notification needed self.log.debug("GC: Finished root task %s", finished_root_task) + self.log.debug("GC: gc_collect_tasks ended") + def gc_collect(self) -> None: if time.time() > (self.last_gc_trigger + self.gc_interval): try: @@ -251,6 +301,7 @@ def process_routing(self) -> None: self.handle_operations(bodies) def loop(self) -> None: + self._log_config() self.log.info("Manager %s started", self.identity) with self.graceful_killer(): @@ -288,7 +339,6 @@ def args_parser(cls) -> argparse.ArgumentParser: parser.add_argument( "--gc-interval", type=int, - default=cls.GC_INTERVAL, help="Garbage collection interval", ) parser.add_argument( @@ -304,16 +354,24 @@ def args_parser(cls) -> argparse.ArgumentParser: parser.add_argument( "--task-crashed-timeout", help="Timeout for tasks in Crashed state" ) + parser.add_argument( + "--crash-started-task-on-timeout", + action="store_const", + dest="crash_started_tasks", + help="Crash Started tasks on timeout instead of deleting", + ) return parser @classmethod def config_from_args(cls, config: Config, args: argparse.Namespace): super().config_from_args(config, args) + config.load_from_dict( { "system": { "enable_gc": args.enable_gc, "enable_router": args.enable_router, + "crash_started_tasks_on_timeout": args.crash_started_tasks, "gc_interval": args.gc_interval, "task_dispatched_timeout": args.task_dispatched_timeout, "task_started_timeout": args.task_started_timeout,