Skip to content

Commit

Permalink
wip: add deprecation warning for non-coroutine immediate task dispatches
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro-psb committed Feb 14, 2025
1 parent 0ecc3a7 commit e834821
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
33 changes: 20 additions & 13 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import traceback
import tempfile
from asgiref.sync import sync_to_async
from datetime import timedelta
from gettext import gettext as _

Expand All @@ -16,7 +17,7 @@
from django_guid import get_guid
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
from pulpcore.app.models import Task, TaskGroup
from pulpcore.app.util import current_task, get_domain, get_prn
from pulpcore.app.util import current_task, get_domain, get_prn, deprecation_logger
from pulpcore.constants import (
TASK_FINAL_STATES,
TASK_INCOMPLETE_STATES,
Expand Down Expand Up @@ -73,20 +74,29 @@ def _execute_task(task):
is_coroutine = asyncio.iscoroutine(result)

if immediate is True and not is_coroutine:
raise RuntimeError(_("Immediate tasks must be coroutines."))
deprecation_logger.warning(
_(
"Immediate tasks must be coroutine functions."
"Support for non-coroutine immediate tasks will be dropped in pulpcore 3.85."
)
)
result = sync_to_async(func)
is_coroutine = True

if is_coroutine:
_logger.debug(_("Task is coroutine %s"), task.pk)
loop = asyncio.get_event_loop()
if immediate:
try:
loop.run_until_complete(asyncio.wait_for(result, timeout=IMMEDIATE_TIMEOUT))
except asyncio.TimeoutError:
raise RuntimeError(
_("Immediate task timed out after {} seconds").format(IMMEDIATE_TIMEOUT)
)
coro = asyncio.wait_for(result, timeout=IMMEDIATE_TIMEOUT)
else:
loop.run_until_complete(result)
coro = result

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coro)
except asyncio.TimeoutError:
raise RuntimeError(
_("Immediate task timed out after {} seconds").format(IMMEDIATE_TIMEOUT)
)

except Exception:
exc_type, exc, tb = sys.exc_info()
Expand Down Expand Up @@ -225,9 +235,6 @@ def dispatch(
stack.enter_context(task)
else:
notify_workers = True
_logger.info("asdfasdf*")
_logger.info("asdfasdf")
_logger.info("*" * 50)
if immediate:
prior_tasks = Task.objects.filter(
state__in=TASK_INCOMPLETE_STATES, pulp_created__lt=task.pulp_created
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/tests/functional/api/test_tasking.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ def test_immediate_task_requires_resource(pulpcore_bindings, dispatch_task, moni
def wait_until(state, task_href, timeout=10):
for i in range(timeout):
task = pulpcore_bindings.TasksApi.read(task_href)
if task.state != state:
if task.state == state:
break
time.sleep(1)
raise RuntimeError("Timeout waiting for task to transition")
Expand Down

0 comments on commit e834821

Please sign in to comment.