From eb54f350055201489196ecc4ef2613d34cb50510 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Mon, 16 Dec 2024 18:28:18 -0400 Subject: [PATCH] feat: replace run_in_executor with executor.run to reduce overhead (#148) * feat: replace run_in_executor with executor.run to reduce overhead * chore: `black .` --------- Co-authored-by: github-actions[bot] --- eth_portfolio/_cache.py | 13 ++++++------- eth_portfolio/_ydb/token_transfers.py | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/eth_portfolio/_cache.py b/eth_portfolio/_cache.py index 4feed3b0..aab072ab 100644 --- a/eth_portfolio/_cache.py +++ b/eth_portfolio/_cache.py @@ -1,24 +1,23 @@ import functools import inspect -import os from hashlib import md5 from os import makedirs from os.path import exists, join from pickle import dumps, load, loads -from typing import Any +from typing import Any, Callable from a_sync import PruningThreadPoolExecutor -from a_sync._typing import AnyFn, P, T +from a_sync._typing import P, T from a_sync.asyncio import create_task from aiofiles import open as _aio_open -from aiofiles.ospath import exists as _aio_path_exists from brownie import chain BASE_PATH = f"./cache/{chain.id}/" EXECUTOR = PruningThreadPoolExecutor(32) -def cache_to_disk(fn: AnyFn[P, T]) -> AnyFn[P, T]: +def cache_to_disk(fn: Callable[P, T]) -> Callable[P, T]: + # sourcery skip: use-contextlib-suppress cache_path_for_fn = f"{BASE_PATH}{fn.__module__.replace('.', '/')}/{fn.__name__}" def get_cache_file_path(args, kwargs): @@ -33,7 +32,7 @@ def get_cache_file_path(args, kwargs): @functools.wraps(fn) async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: cache_path = get_cache_file_path(args, kwargs) - if await _aio_path_exists(cache_path, executor=EXECUTOR): + if await EXECUTOR.run(exists, cache_path): async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f: try: return loads(await f.read()) @@ -59,7 +58,7 @@ def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T: sync_result: T = fn(*args, **kwargs) # type: ignore [assignment, return-value] try: create_task( - coro=__cache_write(cache_path, result), + coro=__cache_write(cache_path, sync_result), skip_gc_until_done=True, ) except RuntimeError: diff --git a/eth_portfolio/_ydb/token_transfers.py b/eth_portfolio/_ydb/token_transfers.py index 33db00b8..82c4f4be 100644 --- a/eth_portfolio/_ydb/token_transfers.py +++ b/eth_portfolio/_ydb/token_transfers.py @@ -54,7 +54,7 @@ async def yield_thru_block(self, block) -> AsyncIterator["Task[TokenTransfer]"]: async for task in self._objects_thru(block=block): yield task return - + _logger_log(DEBUG, "%s yielding all objects thru block %s", (self, block)) async for task in self._objects_thru(block=block): _logger_log(