Skip to content

Commit

Permalink
api_v2.servicer: use threadpool to execute local update/rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
Bodong-Yang committed Dec 5, 2024
1 parent 6e30cbb commit d7cf32e
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions src/otaclient/grpc/api_v2/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import asyncio
import logging
import multiprocessing.queues as mp_queue
from concurrent.futures import ThreadPoolExecutor
from functools import partial

from otaclient._types import (
IPCRequest,
Expand Down Expand Up @@ -53,11 +55,13 @@ def __init__(
ecu_status_storage: ECUStatusStorage,
op_queue: mp_queue.Queue[IPCRequest],
resp_queue: mp_queue.Queue[IPCResponse],
executor: ThreadPoolExecutor,
):
self.sub_ecus = ecu_info.secondaries
self.listen_addr = ecu_info.ip_addr
self.listen_port = cfg.OTA_API_SERVER_PORT
self.my_ecu_id = ecu_info.ecu_id
self._executor = executor

self._op_queue = op_queue
self._resp_queue = resp_queue
Expand All @@ -68,6 +72,7 @@ def __init__(
# API servicer

def _local_update(self, request: UpdateRequestV2) -> api_types.UpdateResponseEcu:
"""Thread worker for dispatching a local update."""
self._op_queue.put_nowait(request)
try:
_req_response = self._resp_queue.get(timeout=WAIT_FOR_LOCAL_ECU_ACK_TIMEOUT)
Expand Down Expand Up @@ -151,13 +156,17 @@ async def update(
# second: dispatch update request to local if required by incoming request
if update_req_ecu := request.find_ecu(self.my_ecu_id):
new_session_id = gen_session_id(update_req_ecu.version)
_resp = self._local_update(
UpdateRequestV2(
version=update_req_ecu.version,
url_base=update_req_ecu.url,
cookies_json=update_req_ecu.cookies,
session_id=new_session_id,
)
_resp = await asyncio.get_running_loop().run_in_executor(
executor=self._executor,
func=partial(
self._local_update,
UpdateRequestV2(
version=update_req_ecu.version,
url_base=update_req_ecu.url,
cookies_json=update_req_ecu.cookies,
session_id=new_session_id,
),
),
)

if _resp.result == api_types.FailureType.NO_FAILURE:
Expand All @@ -177,6 +186,8 @@ async def update(
def _local_rollback(
self, rollback_request: RollbackRequestV2
) -> api_types.RollbackResponseEcu:
"""Thread worker for dispatching a local rollback."""

self._op_queue.put_nowait(rollback_request)
try:
_req_response = self._resp_queue.get(timeout=WAIT_FOR_LOCAL_ECU_ACK_TIMEOUT)
Expand Down Expand Up @@ -260,9 +271,14 @@ async def rollback(
# second: dispatch rollback request to local if required
if request.find_ecu(self.my_ecu_id):
new_session_id = gen_session_id("__rollback")
response.add_ecu(
self._local_rollback(RollbackRequestV2(session_id=new_session_id))
_local_resp = await asyncio.get_running_loop().run_in_executor(
executor=self._executor,
func=partial(
self._local_rollback,
RollbackRequestV2(session_id=new_session_id),
),
)
response.add_ecu(_local_resp)
return response

async def status(self, _=None) -> api_types.StatusResponse:
Expand Down

0 comments on commit d7cf32e

Please sign in to comment.