From 33c8c6275a7c2527cd702893d17ce9093c86897c Mon Sep 17 00:00:00 2001 From: "bodong.yang" Date: Mon, 23 Dec 2024 02:49:36 +0000 Subject: [PATCH] ota_proxy: use anyio thread workers --- src/ota_proxy/cache_streaming.py | 3 --- src/ota_proxy/ota_cache.py | 17 ++++++++--------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/ota_proxy/cache_streaming.py b/src/ota_proxy/cache_streaming.py index 2730c44df..5ffaf6016 100644 --- a/src/ota_proxy/cache_streaming.py +++ b/src/ota_proxy/cache_streaming.py @@ -22,7 +22,6 @@ import os import threading import weakref -from concurrent.futures import Executor from pathlib import Path from typing import AsyncGenerator, AsyncIterator, Callable, Coroutine @@ -102,7 +101,6 @@ def __init__( *, base_dir: StrOrPath, commit_cache_cb: _CACHE_ENTRY_REGISTER_CALLBACK, - executor: Executor, below_hard_limit_event: threading.Event, ): self.fpath = Path(base_dir) / self._tmp_file_naming(cache_identifier) @@ -114,7 +112,6 @@ def __init__( self._writer_finished = asyncio.Event() self._writer_failed = asyncio.Event() - self._executor = executor self._space_availability_event = below_hard_limit_event self._bytes_written = 0 diff --git a/src/ota_proxy/ota_cache.py b/src/ota_proxy/ota_cache.py index 37a1e16c3..968bccfb2 100644 --- a/src/ota_proxy/ota_cache.py +++ b/src/ota_proxy/ota_cache.py @@ -20,13 +20,13 @@ import shutil import threading import time -from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import AsyncIterator, Mapping, Optional from urllib.parse import SplitResult, quote, urlsplit import aiohttp import anyio +import anyio.to_thread from multidict import CIMultiDict, CIMultiDictProxy from otaclient_common.common import get_backoff @@ -134,10 +134,6 @@ def __init__( db_f.unlink(missing_ok=True) self._init_cache = True # force init cache on db file cleanup - self._executor = ThreadPoolExecutor( - thread_name_prefix="ota_cache_fileio_executor" - ) - self._external_cache_data_dir = None self._external_cache_mp = None if external_cache_mnt_point and mount_external_cache(external_cache_mnt_point): @@ -196,7 +192,12 @@ async def start(self): await tmp_f.unlink(missing_ok=True) # dispatch a background task to pulling the disk usage info - self._executor.submit(self._background_check_free_space) + _free_space_check_thread = threading.Thread( + target=self._background_check_free_space, + daemon=True, + name="ota_cache_free_space_checker", + ) + _free_space_check_thread.start() # init cache helper(and connect to ota_cache db) self._lru_helper = LRUCacheHelper( @@ -225,7 +226,6 @@ async def close(self): if not self._closed: self._closed = True await self._session.close() - self._executor.shutdown(wait=True) if self._cache_enabled: self._lru_helper.close() @@ -314,7 +314,7 @@ async def _reserve_space(self, size: int) -> bool: logger.debug( f"rotate on bucket({size=}), num of entries to be cleaned {len(_hashes)=}" ) - self._executor.submit(self._cache_entries_cleanup, _hashes) + await anyio.to_thread.run_sync(self._cache_entries_cleanup, _hashes) return True else: logger.debug(f"rotate on bucket({size=}) failed, no enough entries") @@ -537,7 +537,6 @@ async def _retrieve_file_by_new_caching( tracker = CacheTracker( cache_identifier=cache_identifier, base_dir=self._base_dir, - executor=self._executor, commit_cache_cb=self._commit_cache_callback, below_hard_limit_event=self._storage_below_hard_limit_event, )