Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: use async by default, with occasional def threading #1015

Merged
merged 20 commits into from
Nov 29, 2023

Conversation

spwoodcock
Copy link
Member

@spwoodcock spwoodcock commented Nov 28, 2023

Async

  • FastAPI is an asynchronous web framework that is built to use async code.
  • Using async (async def) function with await is more scalable than using synchronous code def, so this is always the preferred default approach.
  • Using synchronous code is possible, but devs should be aware of the pitfalls: if the code runs for a long time, it will block the async event loop (i.e. block the thread until the process completes).
    • Bear in mind that 'synchronous' code could be from what you write in the crud functions, OR could be from a library that you use (e.g. osm-fieldwork is synchronous for the most part).

Workers & Thread Blocking

  • By default we run FastAPI (uvicorn) with 4 workers - this means 4 threads available to run processes.
  • If a process blocks a thread (as described above), then only three are available to take new requests.
  • If all of the workers/threads are blocked by tasks, the server will hang / be unresponsive!

Using Synchronous Code

It is of course possible to use synchronous code, but if necessary, be sure to run this in another thread.

Options

1) Using sync code within an async def function

  • Use the BackgroundTasks implementation we have, with polling for the task completion.
  • The task should be written as a standard def. FastAPI will handle this automatically and ensure it runs in a separate thread.
  • Alternatively, if you wish to run the task in the foreground and return the response, use the FastAPI helper run_in_threadpool:
from fastapi.concurrency import run_in_threadpool

def long_running_sync_task(time_to_sleep):
    sleep(time_to_sleep)

async def some_func():
    data = await run_in_threadpool(lambda: long_running_sync_task(time_to_sleep))

2) Running multiple standard def from within an async def function

  • Sometimes you need to run multiple def functions in parallel.
  • To do this, you can use ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor, wait

def a_synchronous_function(db):
    # Run with expensive task via threadpool
    def wrap_generate_task_files(task):
        """Func to wrap and return errors from thread.

        Also passes it's own database session for thread safety.
        If we pass a single db session to multiple threads,
        there may be inconsistencies or errors.
        """
        try:
            generate_task_files(
                next(get_db()),
                project_id,
                task,
                xlsform,
                form_type,
                odk_credentials,
            )
        except Exception as e:
            log.exception(str(e))

    # Use a ThreadPoolExecutor to run the synchronous code in threads
    with ThreadPoolExecutor() as executor:
        # Submit tasks to the thread pool
        futures = [
            executor.submit(wrap_generate_task_files, task)
            for task in tasks_list
        ]
        # Wait for all tasks to complete
        wait(futures)

Note that in the above example, we cannot pass the db object from the parent function into the functions spawned in threads. A single database connection should not be written to by multiple processes at the same time, as you may get data inconsistencies. To solve this we generate a new db connection within the pool for each separate task we run in a thread.

To avoid issues we should look into limiting the thread usage via:
https://stackoverflow.com/questions/73195338/how-to-avoid-database-connection-pool-from-being-exhausted-when-using-fastapi-in

3) Running an async def within a sync def

  • As we try to write most functions async for FastAPI, sometime we need to run some async def logic within a sync def. This is not possible normally.
  • To avoid having to write a duplicated def equivalent of the async def code, we can use the package asgiref:
from asgiref.sync import async_to_sync

async def get_project(db, project_id):
    return something

def a_sync_function():
     get_project_sync = async_to_sync(get_project)
     project = get_project_sync(db, project_id)
     return project

4) Efficiency running batch async tasks

  • Sometime you may have a very efficient async task you need to call within a for loop.
  • Instead of that, you can use asyncio.gather to much more efficiently collect and return the async data (e.g. async web requests, or async file requests, or async db requests):
from asyncio import gather

async def parent_func(db, project_id, data, no_of_buildings, has_data_extracts):
    ... some other code

    async def split_multi_geom_into_tasks():
        # Use asyncio.gather to concurrently process the async generator
        split_poly = [
            split_polygon_into_tasks(
                db, project_id, data, no_of_buildings, has_data_extracts
            )
            for data in boundary_geoms
        ]

        # Use asyncio.gather with list to collect results from the async generator
        return (
            item for sublist in await gather(*split_poly) for item in sublist if sublist
        )

    geoms = await split_multi_geom_into_tasks()

Note

  • If you regularly find you are running out of workers/threads and the server is overloaded, it may be time to add a task queuing system to your stack.
  • Celery is made for just this - defer tasks to a queue, and run gradually to reduce the immediate load.

@github-actions github-actions bot added docs Improvements or additions to documentation frontend Related to frontend code backend Related to backend code labels Nov 28, 2023
@spwoodcock spwoodcock marked this pull request as ready for review November 29, 2023 14:31
@spwoodcock spwoodcock merged commit 60b9a82 into development Nov 29, 2023
6 of 7 checks passed
@spwoodcock spwoodcock deleted the fix/sync-project-create branch November 29, 2023 14:31
@kshitijrajsharma
Copy link
Member

Great Documentation ! FastAPI sometimes suffer with thread management specially when you are using run in threadpool ! I believe it totally depends upon usecase as you have mentioned , May be One more thing to add , if you are not dependent on external services and your API is doing heavy lifting and taking considerable time , I will recommend do not work on the managing the threads may be revisit the architecture you have , Optimize your endpoint. API is never supposed to do heavy lifting , may be assign different script for workers and let them do the heavy lifting , API is supposed to return fast , it will either fetch the data or submit the request and comeback , this practice might make architecture simple ! + when you have multiple threads open by single request , it might not scale when there are multiple users hitting the same endpoint , I like this part of fastapi that it can automatically distribute the request based on the load , it is quite efficient

@spwoodcock
Copy link
Member Author

spwoodcock commented Dec 6, 2023

Great feedback, thanks @kshitijrajsharma!
Feed free to add any extra info to https://github.com/hotosm/docs/blob/main/docs/dev-guide/web-frameworks.md 😉
I think it's a really valuable point that we should optimise the API endpoints, before resorting to threading etc.

nischalstha9 pushed a commit to naxa-developers/fmtm that referenced this pull request Dec 8, 2023
* build: add asgiref async_to_sync, plus pytest-asyncio

* build: install script login linger after docker install

* docs: update instructions to automount s3fs

* fix: add user role to user schema, remove appuser logic

* refactor: async initial load read_xlsforms

* refactor: update refs to form --> category in project creation

* refactor: remove redundant projects.utils (replaced)

* refactor: rename add_features --> upload_custom_extract

* fix: update all to async, except BackgroundTasks defs

* fix: update long running central operations to run_in_threadpool

* test: update tests to be async + threadpool

* fix: update expensive generate_task_files to use ThreadPoolExecutor

* fix: use threadpool for generate_task_files

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: move logic for task schemas into models

* refactor: remove final ref to convert_app_tasks

* refactor: fix loading project summaries without db_obj

* fix: add outline_centroid to task_schemas

* refactor: add task details to project schema output

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backend Related to backend code docs Improvements or additions to documentation frontend Related to frontend code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants