Skip to content

Commit

Permalink
[Bugfix] otaproxy: fix AsyncStopIteration errors spamming (#188)
Browse files Browse the repository at this point in the history
Fix otacache not trims away empty chunk when reading from remote file descriptor.
  • Loading branch information
Bodong-Yang authored Dec 23, 2022
1 parent 94fcb37 commit 1f948ab
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions otaclient/ota_proxy/ota_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ def is_cache_valid(self) -> bool:
and not self._writer_failed.is_set()
)

def get_cache_write_gen(self) -> AsyncGenerator[int, bytes]:
if not self._cache_write_gen:
raise ValueError("being called before provider is ready")
return self._cache_write_gen

async def _provider_write_cache(
self, *, storage_below_hard_limit: threading.Event
) -> AsyncGenerator[int, bytes]:
Expand Down Expand Up @@ -260,7 +265,7 @@ async def subscriber_read_cache(self) -> Optional[AsyncIterator[bytes]]:

async def provider_start(
self, meta: CacheMeta, *, storage_below_hard_limit: threading.Event
) -> AsyncGenerator[int, bytes]:
):
"""Register meta to the Tracker, create tmp cache entry and get ready.
Check _provider_write_cache for more details.
Expand All @@ -270,9 +275,6 @@ async def provider_start(
This meta is created by open_remote() method.
storage_below_hard_limit: an inst of threading.Event indicates whether the
storage usage is below hard limit for allowing caching.
Returns:
An async generator that upper caller can use to send data chunks to.
"""
self.meta = meta
self._cache_write_gen = self._provider_write_cache(
Expand All @@ -281,8 +283,6 @@ async def provider_start(
await self._cache_write_gen.asend(None) # type: ignore
self._writer_ready.set()

return self._cache_write_gen

async def provider_on_finished(self):
if self._cache_write_gen:
# gracefully stop the cache_write_gen
Expand Down Expand Up @@ -512,9 +512,7 @@ async def _finalize_caching(self):
if not (self._base_dir / self.meta.sha256hash).is_file():
self._tracker.fpath.link_to(self._base_dir / self.meta.sha256hash)

async def _cache_streamer(
self, cache_write_coroutine: AsyncGenerator[int, bytes]
) -> AsyncIterator[bytes]:
async def _cache_streamer(self) -> AsyncIterator[bytes]:
"""For caller(server App) to yield data chunks from.
This method yields data chunks from self._fd(opened remote connection),
Expand All @@ -525,14 +523,19 @@ async def _cache_streamer(
An AsyncIterator for upper caller to yield data chunks from.
"""
try:
_cache_write_gen = self._tracker.get_cache_write_gen()
async for chunk in self._fd:
if not chunk: # skip if empty chunk is read
continue
# to caching generator
if not self._tracker.writer_failed or not self._tracker.writer_finished:
if not self._tracker.writer_finished:
try:
await cache_write_coroutine.asend(chunk)
await _cache_write_gen.asend(chunk)
except (Exception, StopAsyncIteration) as e:
await self._tracker.provider_on_failed() # signal tracker
logger.error(f"cache write coroutine failed, abort: {e!r}")
logger.error(
f"cache write coroutine failed for {self.meta=}, abort caching: {e!r}"
)
# to uvicorn thread
yield chunk
await self._tracker.provider_on_finished() # signal tracker
Expand All @@ -553,15 +556,12 @@ async def start_cache_streaming(self) -> Tuple[AsyncIterator[bytes], CacheMeta]:
A tuple of an AsyncIterator and CacheMeta for this RemoteOTAFile.
"""
# bind the updated meta to tracker,
# start the tee_stream generator and become ready
_cache_write_generator = await self._tracker.provider_start(
# and make tracker ready
await self._tracker.provider_start(
self.meta,
storage_below_hard_limit=self._storage_below_hard_limit,
)
return (
self._cache_streamer(_cache_write_generator),
self.meta,
)
return (self._cache_streamer(), self.meta)


class OTACacheScrubHelper:
Expand Down Expand Up @@ -696,7 +696,8 @@ async def _inner() -> AsyncIterator[bytes]:
# open the connection and update the CacheMeta
yield b""
async for data, _ in response.content.iter_chunks():
yield data
if data: # only yield non-empty data chunk
yield data

# open remote connection
await (_remote_fd := _inner()).__anext__()
Expand Down

0 comments on commit 1f948ab

Please sign in to comment.