Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async register API support #1083

Draft
wants to merge 18 commits into
base: aqua_apiserver
Choose a base branch
from
Draft

Conversation

mayoor
Copy link
Member

@mayoor mayoor commented Feb 22, 2025

Async APIs for Long running Tasks

Currently the register tasks can take hours depending on the model size and the bandwidth available. It works ok for CLI, but for calling through REST API, this is not a good practice. Instead, we should return an request_id so that the user can monitor the status using the request_id.
For first iteration, I want to keep the changes simple without adding new services or technologies. With that in mind, I have chosen websockets.

When register api is called in async mode, a job_id is returned. To monitor the status, we have to make a websocket connect - ws://host:port/aqua/ws. With each task completion, a message is posted to all the subscribers of the job_id status.

Payload change to POST /aqua/model - added async_mode attribute, which is used to determine of the API should operate in async model

@oracle-contributor-agreement oracle-contributor-agreement bot added the OCA Verified All contributors have signed the Oracle Contributor Agreement. label Feb 22, 2025
Copy link

⚠️ This PR changed pyproject.toml file. ⚠️

  • PR Creator must update 📃 THIRD_PARTY_LICENSES.txt, if any 📚 library added/removed in pyproject.toml.
  • PR Approver must confirm 📃 THIRD_PARTY_LICENSES.txt updated, if any 📚 library added/removed in pyproject.toml.

@mayoor mayoor changed the base branch from main to aqua_apiserver February 22, 2025 00:41
Copy link

⚠️ This PR changed pyproject.toml file. ⚠️

  • PR Creator must update 📃 THIRD_PARTY_LICENSES.txt, if any 📚 library added/removed in pyproject.toml.
  • PR Approver must confirm 📃 THIRD_PARTY_LICENSES.txt updated, if any 📚 library added/removed in pyproject.toml.

Copy link

⚠️ This PR changed pyproject.toml file. ⚠️

  • PR Creator must update 📃 THIRD_PARTY_LICENSES.txt, if any 📚 library added/removed in pyproject.toml.
  • PR Approver must confirm 📃 THIRD_PARTY_LICENSES.txt updated, if any 📚 library added/removed in pyproject.toml.

Copy link

⚠️ This PR changed pyproject.toml file. ⚠️

  • PR Creator must update 📃 THIRD_PARTY_LICENSES.txt, if any 📚 library added/removed in pyproject.toml.
  • PR Approver must confirm 📃 THIRD_PARTY_LICENSES.txt updated, if any 📚 library added/removed in pyproject.toml.



@dataclass
class TaskStatus(DataClassSerializable):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pydantic?

from ads.aqua.ui import ModelFormat

logger = getLogger(__name__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the other places we use - from ads.aqua import logger.

return registered_model

if async_mode:
t = threading.Thread(
Copy link
Member

@mrDzurb mrDzurb Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to use a ThreadPool instead to control the number of the potential threads?

Something like:

THREAD_POOL_EXECUTOR = ThreadPoolExecutor(max_workers=10)
if async_mode:
            # Submit the registration task to a thread pool.
            THREAD_POOL_EXECUTOR.submit(self._register_model, task_id, input_data, async_mode)
            output = {
                "state": "ACCEPTED",
                "task_id": task_id,
                "progress_url": f"ws://host:port/aqua/ws/{task_id}",
            }
        else:
            output = self._register_model(task_id, input_data, async_mode)

Maybe we can introduce some global ThreadPoolExecutor for this?
I'm wondering if we can use a decorator for this, something similar that we do for in @threaded decorator.

THREAD_POOL_EXECUTOR = ThreadPoolExecutor(max_workers=10)

def run_in_thread_if_async(func):
    """Decorator to run the function in a thread if async_mode is True."""
    @wraps(func)
    def wrapper(self, async_mode, *args, **kwargs):
        if async_mode:
            task_id = str(uuid4())
            future = THREAD_POOL_EXECUTOR.submit(func, self, task_id, *args, **kwargs)
            return {
                "state": "ACCEPTED",
                "task_id": task_id,
                "progress_url": f"ws://host:port/aqua/ws/{task_id}",
            }
        else:
            return func(self, None, *args, **kwargs)
    return wrapper

I think the decorator could also take care of the StatusTracker.

In this case we just mark any desired function with the @run_in_thread_if_async decorator which will do all the related work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrDzurb Does threadpool allow for daemon threads? I need daemon threads here.



@dataclass
class RequestStatus(DataClassSerializable):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: For the new code, pydantic would be better to use?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
OCA Verified All contributors have signed the Oracle Contributor Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants