From cb7a9624875a529c365f16320d8de494b41df1d4 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Thu, 2 Jan 2025 11:51:52 +0300 Subject: [PATCH 1/5] Optional feature: crash started task on timeout instead of deleting --- karton/system/system.py | 71 +++++++++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/karton/system/system.py b/karton/system/system.py index 3d6b14b..fe763e2 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -26,6 +26,7 @@ class SystemService(KartonServiceBase): version = __version__ with_service_info = True + CRASH_STARTED_TASKS_ON_TIMEOUT = False GC_INTERVAL = 3 * 60 TASK_DISPATCHED_TIMEOUT = 24 * 3600 TASK_STARTED_TIMEOUT = 24 * 3600 @@ -45,6 +46,25 @@ def __init__(self, config: Optional[Config]) -> None: ) self.enable_gc = self.config.getboolean("system", "enable_gc", True) self.enable_router = self.config.getboolean("system", "enable_router", True) + self.crash_started_tasks_on_timeout = self.config.getboolean("system", "crash_started_tasks_on_timeout", False) + + + self.log.info("Effective config:\n" + " gc_interval:\t%s\n" + " task_dispatched_timeout:\t%s\n" + " task_started_timeout:\t%s\n" + " task_crashed_timeout:\t%s\n" + " enable_gc:\t%s\n" + " enable_router:\t%s\n" + " crash_started_tasks_on_timeout:\t%s", + self.gc_interval, + self.task_dispatched_timeout, + self.task_started_timeout, + self.task_crashed_timeout, + self.enable_gc, + self.enable_router, + self.crash_started_tasks_on_timeout) + self.last_gc_trigger = time.time() @@ -68,6 +88,7 @@ def gc_collect_resources(self) -> None: self.backend.remove_objects(karton_bucket, resources_to_remove) def gc_collect_tasks(self) -> None: + self.log.debug("GC: gc_collect_tasks started") # Collects finished tasks root_tasks = set() running_root_tasks = set() @@ -75,6 +96,7 @@ def gc_collect_tasks(self) -> None: current_time = time.time() to_delete = [] + to_crash = [] queues_to_clear = set() online_consumers = self.backend.get_online_consumers() @@ -116,14 +138,24 @@ def gc_collect_tasks(self) -> None: and task.last_update is not None and current_time > task.last_update + self.task_started_timeout ): - to_delete.append(task) - self.log.error( - "Task %s is in Started state more than %d seconds. " - "Killed. (receiver: %s)", - task.uid, - self.task_started_timeout, - task.headers.get("receiver", ""), - ) + if self.crash_started_tasks_on_timeout: + to_crash.append(task) + self.log.error( + "Task %s is in Started state more than %d seconds. " + "Crashed. (receiver: %s)", + task.uid, + self.task_started_timeout, + task.headers.get("receiver", ""), + ) + else: + to_delete.append(task) + self.log.error( + "Task %s is in Started state more than %d seconds. " + "Killed. (receiver: %s)", + task.uid, + self.task_started_timeout, + task.headers.get("receiver", ""), + ) elif task.status == TaskState.FINISHED: to_delete.append(task) self.log.debug("GC: Finished task %s", task.uid) @@ -151,11 +183,23 @@ def gc_collect_tasks(self) -> None: self.backend.increment_metrics_list( KartonMetrics.TASK_GARBAGE_COLLECTED, to_increment ) + if to_crash: + to_increment = [ + task.headers.get("receiver", "unknown") for task in to_crash + ] + for task in to_crash: + task.error = "This task was STARTED too long (TASK_STARTED_TIMEOUT), so status was changes to CRASHED." + self.backend.set_task_status(task, TaskState.CRASHED) + self.backend.increment_metrics_list( + KartonMetrics.TASK_CRASHED, to_increment + ) for finished_root_task in root_tasks.difference(running_root_tasks): # TODO: Notification needed self.log.debug("GC: Finished root task %s", finished_root_task) + self.log.debug("GC: gc_collect_tasks ended") + def gc_collect(self) -> None: if time.time() > (self.last_gc_trigger + self.gc_interval): try: @@ -167,7 +211,7 @@ def gc_collect(self) -> None: def route_task(self, task: Task, binds: List[KartonBind]) -> None: # Performs routing of task - self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid) + #self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid) # store the producer-task relationship in redis for task tracking self.backend.log_identity_output( task.headers.get("origin", "unknown"), task.headers @@ -288,7 +332,6 @@ def args_parser(cls) -> argparse.ArgumentParser: parser.add_argument( "--gc-interval", type=int, - default=cls.GC_INTERVAL, help="Garbage collection interval", ) parser.add_argument( @@ -304,16 +347,24 @@ def args_parser(cls) -> argparse.ArgumentParser: parser.add_argument( "--task-crashed-timeout", help="Timeout for tasks in Crashed state" ) + parser.add_argument( + "--crash-started-task-on-timeout", + action="store_const", + dest="crash_started_tasks_on_timeout", + help="Crash Started tasks on timeout instead of deleting", + ) return parser @classmethod def config_from_args(cls, config: Config, args: argparse.Namespace): super().config_from_args(config, args) + config.load_from_dict( { "system": { "enable_gc": args.enable_gc, "enable_router": args.enable_router, + "crash_started_tasks_on_timeout": args.crash_started_tasks_on_timeout, "gc_interval": args.gc_interval, "task_dispatched_timeout": args.task_dispatched_timeout, "task_started_timeout": args.task_started_timeout, From fae6857048c672bb7021f51cdfe51e1f367c6a55 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Thu, 2 Jan 2025 12:13:21 +0300 Subject: [PATCH 2/5] lint --- karton/system/system.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/karton/system/system.py b/karton/system/system.py index fe763e2..675f0a9 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -46,10 +46,12 @@ def __init__(self, config: Optional[Config]) -> None: ) self.enable_gc = self.config.getboolean("system", "enable_gc", True) self.enable_router = self.config.getboolean("system", "enable_router", True) - self.crash_started_tasks_on_timeout = self.config.getboolean("system", "crash_started_tasks_on_timeout", False) - + self.crash_started_tasks_on_timeout = self.config.getboolean( + "system", "crash_started_tasks_on_timeout", False + ) - self.log.info("Effective config:\n" + self.log.info( + "Effective config:\n" " gc_interval:\t%s\n" " task_dispatched_timeout:\t%s\n" " task_started_timeout:\t%s\n" @@ -63,8 +65,8 @@ def __init__(self, config: Optional[Config]) -> None: self.task_crashed_timeout, self.enable_gc, self.enable_router, - self.crash_started_tasks_on_timeout) - + self.crash_started_tasks_on_timeout, + ) self.last_gc_trigger = time.time() @@ -211,7 +213,7 @@ def gc_collect(self) -> None: def route_task(self, task: Task, binds: List[KartonBind]) -> None: # Performs routing of task - #self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid) + # self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid) # store the producer-task relationship in redis for task tracking self.backend.log_identity_output( task.headers.get("origin", "unknown"), task.headers @@ -358,7 +360,7 @@ def args_parser(cls) -> argparse.ArgumentParser: @classmethod def config_from_args(cls, config: Config, args: argparse.Namespace): super().config_from_args(config, args) - + config.load_from_dict( { "system": { From 704655426a2c21b811e44bfdeed9d8a33ae9106e Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Thu, 2 Jan 2025 12:28:37 +0300 Subject: [PATCH 3/5] Flake8 --- karton/system/system.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/karton/system/system.py b/karton/system/system.py index 675f0a9..44a06df 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -190,7 +190,10 @@ def gc_collect_tasks(self) -> None: task.headers.get("receiver", "unknown") for task in to_crash ] for task in to_crash: - task.error = "This task was STARTED too long (TASK_STARTED_TIMEOUT), so status was changes to CRASHED." + task.error = ( + "This task was STARTED too long (TASK_STARTED_TIMEOUT), " + "so status was changes to CRASHED." + ) self.backend.set_task_status(task, TaskState.CRASHED) self.backend.increment_metrics_list( KartonMetrics.TASK_CRASHED, to_increment @@ -352,7 +355,7 @@ def args_parser(cls) -> argparse.ArgumentParser: parser.add_argument( "--crash-started-task-on-timeout", action="store_const", - dest="crash_started_tasks_on_timeout", + dest="crash_started_tasks", help="Crash Started tasks on timeout instead of deleting", ) return parser @@ -366,7 +369,7 @@ def config_from_args(cls, config: Config, args: argparse.Namespace): "system": { "enable_gc": args.enable_gc, "enable_router": args.enable_router, - "crash_started_tasks_on_timeout": args.crash_started_tasks_on_timeout, + "crash_started_tasks_on_timeout": args.crash_started_tasks, "gc_interval": args.gc_interval, "task_dispatched_timeout": args.task_dispatched_timeout, "task_started_timeout": args.task_started_timeout, From 475d31c5e63403dd797f1a8b59eaa2206f3f9375 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Thu, 2 Jan 2025 12:39:18 +0300 Subject: [PATCH 4/5] linting... --- karton/system/system.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/karton/system/system.py b/karton/system/system.py index 44a06df..aa3fb23 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -190,10 +190,10 @@ def gc_collect_tasks(self) -> None: task.headers.get("receiver", "unknown") for task in to_crash ] for task in to_crash: - task.error = ( + task.error = [ "This task was STARTED too long (TASK_STARTED_TIMEOUT), " "so status was changes to CRASHED." - ) + ] self.backend.set_task_status(task, TaskState.CRASHED) self.backend.increment_metrics_list( KartonMetrics.TASK_CRASHED, to_increment From 04b47cc98fd76018145cc96e9deebe8264f26976 Mon Sep 17 00:00:00 2001 From: psrok1 Date: Wed, 22 Jan 2025 14:07:27 +0100 Subject: [PATCH 5/5] Generate effective config banner in loop, uncomment log line --- karton/system/system.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/karton/system/system.py b/karton/system/system.py index aa3fb23..aec81fe 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -50,6 +50,9 @@ def __init__(self, config: Optional[Config]) -> None: "system", "crash_started_tasks_on_timeout", False ) + self.last_gc_trigger = time.time() + + def _log_config(self): self.log.info( "Effective config:\n" " gc_interval:\t%s\n" @@ -68,8 +71,6 @@ def __init__(self, config: Optional[Config]) -> None: self.crash_started_tasks_on_timeout, ) - self.last_gc_trigger = time.time() - def gc_collect_resources(self) -> None: # Collects unreferenced resources left in object storage karton_bucket = self.backend.default_bucket_name @@ -216,7 +217,7 @@ def gc_collect(self) -> None: def route_task(self, task: Task, binds: List[KartonBind]) -> None: # Performs routing of task - # self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid) + self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid) # store the producer-task relationship in redis for task tracking self.backend.log_identity_output( task.headers.get("origin", "unknown"), task.headers @@ -300,6 +301,7 @@ def process_routing(self) -> None: self.handle_operations(bodies) def loop(self) -> None: + self._log_config() self.log.info("Manager %s started", self.identity) with self.graceful_killer():