Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue in sync.py's SyncToAsync class as new ThreadPoolExecutor executors with daemon threads getting created for requests. #458

Open
sunilnitdgp opened this issue Apr 1, 2024 · 4 comments

Comments

@sunilnitdgp
Copy link

Hi,

I'm attempting to run an async view using the ASGI protocol with Daphne as the server. However, I've noticed that it's creating new ThreadPoolExecutor instances for some requests, with daemon threads still running in the background post-benchmarking. My understanding is that since it's based on an event loop, it should use a single ThreadPoolExecutor. Could someone clarify this for me?

Middlewares:

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

And my view is:

import threading

import asyncio
import aiohttp
import logging

import psutil as psutil
from rest_framework.response import Response
from adrf.decorators import api_view
import json

logger = logging.getLogger(__name__)


@api_view(['GET'])
async def index(request):
    res = await make_api_request("http://{{host}}/v1/completions")
    return Response(res, status=200)


async def make_api_request(url, method="POST", headers=None, params=None, json_data=None, timeout=None):
    try:
    	json_data = {
                'prompt': 'Hi, How are you?',
                'max_new_tokens': 700, 'temperature': 0, 'top_p': 1, 'max_tokens': 700,
                'model': 'meta-llama/Llama-2-7b-chat-hf'}
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, headers=headers, params=params, json=json_data,
                                       timeout=timeout, ssl=False) as response:
                content = await response.read()
                if 'json' in response.headers.get('Content-Type', ''):
                    content = json.loads(content)
                return content
    except asyncio.TimeoutError:
        raise TimeoutError("Request timed out. The server did not respond within the specified timeout period.")
    except aiohttp.ClientError as e:
        raise ConnectionError(f"Request error: {str(e)}")
    except Exception as e:
        raise Exception(f"Exception error: {str(e)}")

The code I'm pointing to in sync.py

    async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
        __traceback_hide__ = True  # noqa: F841
        loop = asyncio.get_running_loop()

        # Work out what thread to run the code in
        if self._thread_sensitive:
            if hasattr(AsyncToSync.executors, "current"):
                # If we have a parent sync thread above somewhere, use that
                executor = AsyncToSync.executors.current
            elif self.thread_sensitive_context.get(None):
                # If we have a way of retrieving the current context, attempt
                # to use a per-context thread pool executor
                thread_sensitive_context = self.thread_sensitive_context.get()

                if thread_sensitive_context in self.context_to_thread_executor:
                    # Re-use thread executor in current context
                    executor = self.context_to_thread_executor[thread_sensitive_context]
                else:
                    # Create new thread executor in current context
                    executor = ThreadPoolExecutor(max_workers=1)
                    # print("================== created new thread ================")
                    self.context_to_thread_executor[thread_sensitive_context] = executor
            elif loop in AsyncToSync.loop_thread_executors:
                # Re-use thread executor for running loop
                executor = AsyncToSync.loop_thread_executors[loop]
            elif self.deadlock_context.get(False):
                raise RuntimeError(
                    "Single thread executor already being used, would deadlock"
                )
            else:
                # Otherwise, we run it in a fixed single thread
                executor = self.single_thread_executor
                self.deadlock_context.set(True)
        else:
            # Use the passed in executor, or the loop's default if it is None
            executor = self._executor

        context = contextvars.copy_context()
        child = functools.partial(self.func, *args, **kwargs)
        func = context.run

        try:
            # Run the code in the right thread
            ret: _R = await loop.run_in_executor(
                executor,
                functools.partial(
                    self.thread_handler,
                    loop,
                    self.get_current_task(),
                    sys.exc_info(),
                    func,
                    child,
                ),
            )

        finally:
            _restore_context(context)
            self.deadlock_context.set(False)

        return ret
@sunilnitdgp sunilnitdgp changed the title Issue in sync.py's SyncToAsync class as new ThreadPoolExecutor executors with a daemon threads getting created for requests. Issue in sync.py's SyncToAsync class as new ThreadPoolExecutor executors with daemon threads getting created for requests. Apr 1, 2024
@andrewgodwin
Copy link
Member

Actually, it's normal for multiple executors to be created. The specific behaviour should be:

  • If the call is marked as thread-sensitive, there's a single shared global executor that is used
  • If the call is not marked as that, then the default behaviour of loop.run_in_executor is used, which I believe is to spawn a new one each time.
    • You can optionally override this by passing your own executor= argument into SyncToAsync, but this is rarely done

@sunilnitdgp
Copy link
Author

@andrewgodwin Thanks for your comment. I'm using Django as a framework, and each call is explicitly marked as thread_sensitive=True using the default ASGI handler class. For instance:

await sync_to_async(signals.request_started.send, thread_sensitive=True)(
    sender=self.__class__, scope=scope
)

Considering this, ideally, a single shared global executor should be used. However, despite this setup, the code seems to be entering the else block, where a new thread executor is created in the current context instead of reusing the existing one:

if thread_sensitive_context in self.context_to_thread_executor:
    # Re-use thread executor in current context
    executor = self.context_to_thread_executor[thread_sensitive_context]
else:
    # Create new thread executor in current context
    executor = ThreadPoolExecutor(max_workers=1)
    self.context_to_thread_executor[thread_sensitive_context] = executor

@andrewgodwin
Copy link
Member

Since all sorts of things that you can do in your app can affect that (different middleware, server configuration, loading in something that uses gevent, etc.), we can only really debug it if you can produce a reliable way to reproduce it in a tiny sample app, unfortunately.

@sunilnitdgp
Copy link
Author

sunilnitdgp commented Apr 2, 2024

I understand. I'm actually working with a simple app containing just a view. Here are the steps I've followed:

  1. Created a virtual environment and installed necessary packages (django, djangorestframework, adrf, aiohttp, daphne, asyncio).
  2. Added daphne and adrf to INSTALLED_APPS in settings.py.
  3. Created an async view in views.py and added a corresponding path in urls.py.
  4. Run the server with python manage.py runserver (can see the log Starting ASGI/Daphne version 4.1.0 development server at http://127.0.0.1:8000/)
  5. Utilized a benchmarking tool (in this case, ab) to test the API endpoint (ab -c 10 -n 600 -s 800007 -T application/json "http://127.0.0.1:8000/test").
  6. Monitored the total number of threads created during benchmarking using top -o threads -pid <pid>.

I'm working on Mac M1 Air

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants