-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async register API support #1083
base: aqua_apiserver
Are you sure you want to change the base?
Conversation
|
|
|
|
|
||
|
||
@dataclass | ||
class TaskStatus(DataClassSerializable): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not pydantic?
from ads.aqua.ui import ModelFormat | ||
|
||
logger = getLogger(__name__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in the other places we use - from ads.aqua import logger
.
return registered_model | ||
|
||
if async_mode: | ||
t = threading.Thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to use a ThreadPool instead to control the number of the potential threads?
Something like:
THREAD_POOL_EXECUTOR = ThreadPoolExecutor(max_workers=10)
if async_mode:
# Submit the registration task to a thread pool.
THREAD_POOL_EXECUTOR.submit(self._register_model, task_id, input_data, async_mode)
output = {
"state": "ACCEPTED",
"task_id": task_id,
"progress_url": f"ws://host:port/aqua/ws/{task_id}",
}
else:
output = self._register_model(task_id, input_data, async_mode)
Maybe we can introduce some global ThreadPoolExecutor for this?
I'm wondering if we can use a decorator for this, something similar that we do for in @threaded
decorator.
THREAD_POOL_EXECUTOR = ThreadPoolExecutor(max_workers=10)
def run_in_thread_if_async(func):
"""Decorator to run the function in a thread if async_mode is True."""
@wraps(func)
def wrapper(self, async_mode, *args, **kwargs):
if async_mode:
task_id = str(uuid4())
future = THREAD_POOL_EXECUTOR.submit(func, self, task_id, *args, **kwargs)
return {
"state": "ACCEPTED",
"task_id": task_id,
"progress_url": f"ws://host:port/aqua/ws/{task_id}",
}
else:
return func(self, None, *args, **kwargs)
return wrapper
I think the decorator could also take care of the StatusTracker
.
In this case we just mark any desired function with the @run_in_thread_if_async
decorator which will do all the related work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrDzurb Does threadpool allow for daemon threads? I need daemon threads here.
|
||
|
||
@dataclass | ||
class RequestStatus(DataClassSerializable): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: For the new code, pydantic would be better to use?
Async APIs for Long running Tasks
Currently the register tasks can take hours depending on the model size and the bandwidth available. It works ok for CLI, but for calling through REST API, this is not a good practice. Instead, we should return an request_id so that the user can monitor the status using the request_id.
For first iteration, I want to keep the changes simple without adding new services or technologies. With that in mind, I have chosen websockets.
When register api is called in async mode, a job_id is returned. To monitor the status, we have to make a websocket connect -
ws://host:port/aqua/ws
. With each task completion, a message is posted to all the subscribers of the job_id status.Payload change to POST /aqua/model - added
async_mode
attribute, which is used to determine of the API should operate in async model