Skip to content

Commit

Permalink
Update _cache.py
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 19, 2024
1 parent 4384dd6 commit d331e8e
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions eth_portfolio/_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import functools
import inspect
from asyncio import Queue, create_task
from asyncio import Queue
from hashlib import md5
from logging import getLogger
from os import makedirs
Expand Down Expand Up @@ -37,41 +37,42 @@ def get_cache_file_path(args, kwargs):
async def cache_deco_worker_coro() -> NoReturn:
try:
while True:
fut, args, kwargs = await queue.get()
fut, cache_path, args, kwargs = await queue.get()
try:
cache_path = get_cache_file_path(args, kwargs)
cache_path =
if await EXECUTOR.run(exists, cache_path):
async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f:
try:
return loads(await f.read())
except EOFError:
pass

async_result: T = await fn(*args, **kwargs)
try:
await __cache_write(cache_path, async_result)
except OSError as e:
# I was having some weird issues in docker that I don't want to debug,
# so I'm going to assume you have another means to let you know you're
# out of disk space and will pass right on through here so my script
# can continue
if e.strerror != "No space left on device":
raise
fut.set_result(async_result)
async with _aio_open(cache_path, "rb", executor=EXECUTOR) as f:
fut.set_result(loads(await f.read()))
except Exception as e:
fut.set_result(e)
except Exception as e:
logger.error(f"{type(e).__name__} in {self}:")
logger.exception(e)
raise

workers = tuple(create_task(cache_deco_worker_coro()) for _ in range(5000))
workers = tuple(create_task(cache_deco_worker_coro()) for _ in range(1000))

@functools.wraps(fn)
async def disk_cache_wrap(*args: P.args, **kwargs: P.kwargs) -> T:
fut = get_event_loop().create_future()
queue.put_nowait((fut, args, kwargs))
return await fut
cache_path = get_cache_file_path(args, kwargs)
if await EXECUTOR.run(exists, cache_path):
fut = get_event_loop().create_future()
queue.put_nowait((fut, cache_path, args, kwargs))
try:
return await fut
except EOFError:
pass

async_result: T = await fn(*args, **kwargs)
try:
await __cache_write(cache_path, async_result)
except OSError as e:
# I was having some weird issues in docker that I don't want to debug,
# so I'm going to assume you have another means to let you know you're
# out of disk space and will pass right on through here so my script
# can continue
if e.strerror != "No space left on device":
raise

else:

Expand Down

0 comments on commit d331e8e

Please sign in to comment.