Skip to content

Commit

Permalink
draft: add timeout to immediate task on api service
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro-psb committed Dec 16, 2024
1 parent 486851e commit 6b4c39f
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextvars
import importlib
import logging
import signal
import sys
import traceback
from datetime import timedelta
Expand All @@ -20,6 +21,7 @@
TASK_STATES,
TASK_DISPATCH_LOCK,
)
from pulpcore.exceptions import TimeoutException
from pulpcore.tasking.kafka import send_task_notification

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,10 +49,28 @@ def wakeup_worker():


def execute_task(task):
# This extra stack is needed to isolate the current_task ContextVar
contextvars.copy_context().run(_execute_task, task)


def execute_immediate_task(task):
# set alarm timeout
IMMEDIATE_TASK_TIMEOUT = 60 * 5

def timeout_handler(signum, frame):
raise TimeoutException(f"Immediate task time: {IMMEDIATE_TASK_TIMEOUT}")

signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(IMMEDIATE_TASK_TIMEOUT)

try:
# This extra stack is needed to isolate the current_task ContextVar
contextvars.copy_context().run(_execute_task, task)
except TimeoutException:
raise
finally:
signal.alarm(0)


def _execute_task(task):
# Store the task id in the context for `Task.current()`.
current_task.set(task)
Expand Down Expand Up @@ -226,9 +246,13 @@ def dispatch(
).exists()
):
task.unblock()
execute_task(task)
if resources:
notify_workers = True
try:
execute_immediate_task(task)
except TimeoutException:
pass
finally:
if resources:
notify_workers = True
elif deferred:
notify_workers = True
else:
Expand Down

0 comments on commit 6b4c39f

Please sign in to comment.