Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional feature: crash started task on timeout instead of deleting #264

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 67 additions & 9 deletions karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SystemService(KartonServiceBase):
version = __version__
with_service_info = True

CRASH_STARTED_TASKS_ON_TIMEOUT = False
GC_INTERVAL = 3 * 60
TASK_DISPATCHED_TIMEOUT = 24 * 3600
TASK_STARTED_TIMEOUT = 24 * 3600
Expand All @@ -45,9 +46,31 @@ def __init__(self, config: Optional[Config]) -> None:
)
self.enable_gc = self.config.getboolean("system", "enable_gc", True)
self.enable_router = self.config.getboolean("system", "enable_router", True)
self.crash_started_tasks_on_timeout = self.config.getboolean(
"system", "crash_started_tasks_on_timeout", False
)

self.last_gc_trigger = time.time()

def _log_config(self):
self.log.info(
"Effective config:\n"
" gc_interval:\t%s\n"
" task_dispatched_timeout:\t%s\n"
" task_started_timeout:\t%s\n"
" task_crashed_timeout:\t%s\n"
" enable_gc:\t%s\n"
" enable_router:\t%s\n"
" crash_started_tasks_on_timeout:\t%s",
self.gc_interval,
self.task_dispatched_timeout,
self.task_started_timeout,
self.task_crashed_timeout,
self.enable_gc,
self.enable_router,
self.crash_started_tasks_on_timeout,
)

def gc_collect_resources(self) -> None:
# Collects unreferenced resources left in object storage
karton_bucket = self.backend.default_bucket_name
Expand All @@ -68,13 +91,15 @@ def gc_collect_resources(self) -> None:
self.backend.remove_objects(karton_bucket, resources_to_remove)

def gc_collect_tasks(self) -> None:
self.log.debug("GC: gc_collect_tasks started")
# Collects finished tasks
root_tasks = set()
running_root_tasks = set()
unrouted_task_uids = self.backend.get_task_ids_from_queue(KARTON_TASKS_QUEUE)

current_time = time.time()
to_delete = []
to_crash = []

queues_to_clear = set()
online_consumers = self.backend.get_online_consumers()
Expand Down Expand Up @@ -116,14 +141,24 @@ def gc_collect_tasks(self) -> None:
and task.last_update is not None
and current_time > task.last_update + self.task_started_timeout
):
to_delete.append(task)
self.log.error(
"Task %s is in Started state more than %d seconds. "
"Killed. (receiver: %s)",
task.uid,
self.task_started_timeout,
task.headers.get("receiver", "<unknown>"),
)
if self.crash_started_tasks_on_timeout:
to_crash.append(task)
self.log.error(
"Task %s is in Started state more than %d seconds. "
"Crashed. (receiver: %s)",
task.uid,
self.task_started_timeout,
task.headers.get("receiver", "<unknown>"),
)
else:
to_delete.append(task)
self.log.error(
"Task %s is in Started state more than %d seconds. "
"Killed. (receiver: %s)",
task.uid,
self.task_started_timeout,
task.headers.get("receiver", "<unknown>"),
)
elif task.status == TaskState.FINISHED:
to_delete.append(task)
self.log.debug("GC: Finished task %s", task.uid)
Expand Down Expand Up @@ -151,11 +186,26 @@ def gc_collect_tasks(self) -> None:
self.backend.increment_metrics_list(
KartonMetrics.TASK_GARBAGE_COLLECTED, to_increment
)
if to_crash:
to_increment = [
task.headers.get("receiver", "unknown") for task in to_crash
]
for task in to_crash:
task.error = [
"This task was STARTED too long (TASK_STARTED_TIMEOUT), "
"so status was changes to CRASHED."
]
self.backend.set_task_status(task, TaskState.CRASHED)
self.backend.increment_metrics_list(
KartonMetrics.TASK_CRASHED, to_increment
)

for finished_root_task in root_tasks.difference(running_root_tasks):
# TODO: Notification needed
self.log.debug("GC: Finished root task %s", finished_root_task)

self.log.debug("GC: gc_collect_tasks ended")

def gc_collect(self) -> None:
if time.time() > (self.last_gc_trigger + self.gc_interval):
try:
Expand Down Expand Up @@ -251,6 +301,7 @@ def process_routing(self) -> None:
self.handle_operations(bodies)

def loop(self) -> None:
self._log_config()
self.log.info("Manager %s started", self.identity)

with self.graceful_killer():
Expand Down Expand Up @@ -288,7 +339,6 @@ def args_parser(cls) -> argparse.ArgumentParser:
parser.add_argument(
"--gc-interval",
type=int,
default=cls.GC_INTERVAL,
help="Garbage collection interval",
)
parser.add_argument(
Expand All @@ -304,16 +354,24 @@ def args_parser(cls) -> argparse.ArgumentParser:
parser.add_argument(
"--task-crashed-timeout", help="Timeout for tasks in Crashed state"
)
parser.add_argument(
"--crash-started-task-on-timeout",
action="store_const",
dest="crash_started_tasks",
help="Crash Started tasks on timeout instead of deleting",
)
return parser

@classmethod
def config_from_args(cls, config: Config, args: argparse.Namespace):
super().config_from_args(config, args)

config.load_from_dict(
{
"system": {
"enable_gc": args.enable_gc,
"enable_router": args.enable_router,
"crash_started_tasks_on_timeout": args.crash_started_tasks,
"gc_interval": args.gc_interval,
"task_dispatched_timeout": args.task_dispatched_timeout,
"task_started_timeout": args.task_started_timeout,
Expand Down
Loading