Skip to content

Commit

Permalink
[Fix&refine] new way to collect downloader stats, resolve edge condit…
Browse files Browse the repository at this point in the history
…ion on downloading breakout condition (#208)

Changes
* downloader: now has 3 new properties: downloaded_bytes, downloader_active_seconds and last_active_stamp, these 3 properties are updated per second, and the stats updating starts from the downloader being initialized.
* otaclient status API now report downloaded_bytes(download_bytes in v1), downloading_elapsed_time(elapsed_time_download in v1) by accessing the corresponding properties of downloader.
* otaclient.download_files & ota_metadata._process_text_base_otameta_file now have new breakout logic on exceeded inactive duration.
* update_stats: do not update downloaded_bytes and downloading_elapsed_time anymore, these fields are updated by otaclient status API handler now.

Other changes
* fix/update test files accordingly.
* configs: change DOWNLOAD_GROUP_BACKOFF_FACTOR from 0.1 to 1, each retry round should have longer initial interval.
* configs: rename DOWNLOAD_GROUP_NO_SUCCESS_RETRY_TIMEOUT -> DOWNLOAD_GROUP_INACTIVE_TIMEOUT.
  • Loading branch information
Bodong-Yang authored Mar 27, 2023
1 parent 29eba9a commit 682b5db
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 85 deletions.
4 changes: 2 additions & 2 deletions otaclient/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class BaseConfig(_InternalSettings):
# DOWNLOAD_GROUP_NO_SUCCESS_RETRY_TIMEOUT time, failed the whole
# download task group and raise NETWORK OTA error.
MAX_CONCURRENT_DOWNLOAD_TASKS = 128
DOWNLOAD_GROUP_NO_SUCCESS_RETRY_TIMEOUT = 5 * 60 # seconds
DOWNLOAD_GROUP_INACTIVE_TIMEOUT = 5 * 60 # seconds
DOWNLOAD_GROUP_BACKOFF_MAX = 12 # seconds
DOWNLOAD_GROUP_BACKOFF_FACTOR = 0.1 # seconds
DOWNLOAD_GROUP_BACKOFF_FACTOR = 1 # seconds

# --- stats collector setting --- #
STATS_COLLECT_INTERVAL = 1 # second
Expand Down
142 changes: 112 additions & 30 deletions otaclient/app/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import errno
import os
import requests
Expand All @@ -22,6 +23,7 @@
import urllib3.exceptions
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue
from functools import partial
from hashlib import sha256
from pathlib import Path
Expand Down Expand Up @@ -173,8 +175,34 @@ class Downloader:
# retry on common serverside errors and clientside errors
RETRY_ON_STATUS_CODE = {413, 429, 500, 502, 503, 504}

DOWNLOAD_STAT_COLLECT_INTERVAL = 1
MAX_TRAFFIC_STATS_COLLECT_PER_ROUND = 512

def __init__(self) -> None:
self._local = threading.local()
self._executor = ThreadPoolExecutor(
max_workers=min(self.MAX_DOWNLOAD_THREADS, (os.cpu_count() or 1) + 4),
thread_name_prefix="downloader",
initializer=self._thread_initializer,
)
self._hash_func = sha256
self._proxies: Optional[Dict[str, str]] = None
self._cookies: Optional[Dict[str, str]] = None
self.shutdowned = threading.Event()

# downloading stats collecting
self._traffic_report_que = Queue()
self._downloading_thread_active_flag: Dict[int, threading.Event] = {}
self._last_active_timestamp = 0
self._downloaded_bytes = 0
self._downloader_active_seconds = 0

# launch traffic collector
self._stats_collector = threading.Thread(target=self._download_stats_collector)
self._stats_collector.start()

def _thread_initializer(self):
### setup the requests.Session ###
# ------ setup the requests.Session ------ #
session = requests.Session()
# init retry mechanism
# NOTE: for urllib3 version below 2.0, we have to change Retry class'
Expand All @@ -199,13 +227,18 @@ def _thread_initializer(self):
session.mount("http://", adapter)
self._local.session = session

### compression support ###
# ------ compression support ------ #
self._local._compression_support_matrix = {}
# zstd decompression adapter
self._local._zstd = ZstdDecompressionAdapter()
self._local._compression_support_matrix["zst"] = self._local._zstd
self._local._compression_support_matrix["zstd"] = self._local._zstd

# ------ download timing flag ------ #
self._downloading_thread_active_flag[
threading.get_native_id()
] = threading.Event()

@property
def _session(self) -> requests.Session:
"""A thread-local private session."""
Expand All @@ -217,16 +250,45 @@ def _get_decompressor(
"""Get thread-local private decompressor adapter accordingly."""
return self._local._compression_support_matrix.get(compression_alg)

def __init__(self) -> None:
self._local = threading.local()
self._executor = ThreadPoolExecutor(
max_workers=min(self.MAX_DOWNLOAD_THREADS, (os.cpu_count() or 1) + 4),
thread_name_prefix="downloader",
initializer=self._thread_initializer,
)
self._hash_func = sha256
self._proxies: Optional[Dict[str, str]] = None
self._cookies: Optional[Dict[str, str]] = None
@property
def downloaded_bytes(self) -> int:
return self._downloaded_bytes

@property
def downloader_active_seconds(self) -> int:
"""The accumulated time in seconds that downloader is active."""
return self._downloader_active_seconds

@property
def last_active_timestamp(self) -> int:
return self._last_active_timestamp

def _download_stats_collector(self):
while not self.shutdowned.is_set():
time.sleep(self.DOWNLOAD_STAT_COLLECT_INTERVAL)
# ------ collect downloading_elapsed time by sampling ------ #
# if any of the threads is actively downloading,
# we update the last_active_timestamp.
if any(
map(
lambda _event: _event.is_set(),
self._downloading_thread_active_flag.values(),
)
):
self._last_active_timestamp = int(time.time())
self._downloader_active_seconds += self.DOWNLOAD_STAT_COLLECT_INTERVAL

# ------ collect downloaded bytes ------ #
if self._traffic_report_que.empty():
continue

traffic_bytes = 0
try:
for _ in range(self.MAX_TRAFFIC_STATS_COLLECT_PER_ROUND):
traffic_bytes += self._traffic_report_que.get_nowait()
except Empty:
pass
self._downloaded_bytes += traffic_bytes

def configure_proxies(self, _proxies: Dict[str, str], /):
self._proxies = _proxies.copy()
Expand All @@ -235,7 +297,12 @@ def configure_cookies(self, _cookies: Dict[str, str], /):
self._cookies = _cookies.copy()

def shutdown(self):
self._executor.shutdown()
"""NOTE: the downloader instance cannot be reused after shutdown."""
if not self.shutdowned.is_set():
self.shutdowned.set()
self._executor.shutdown()
# wait for collector
self._stats_collector.join()

@partial(_retry, RETRY_COUNT, OUTER_BACKOFF_FACTOR, BACKOFF_MAX)
def _download_task(
Expand All @@ -251,8 +318,6 @@ def _download_task(
compression_alg: Optional[str] = None,
use_http_if_proxy_set: bool = True,
) -> Tuple[int, int, int]:
_start_time = time.thread_time_ns()

# special treatment for empty file
if digest == self.EMPTY_STR_SHA256:
if not (dst_p := Path(dst)).is_file():
Expand All @@ -267,12 +332,15 @@ def _download_task(
# use input cookies or inst's cookie
_cookies = cookies or self._cookies

# NOTE: downloaded_bytes is the number of bytes we return to the caller(if compressed,
# NOTE: downloaded_file_size is the number of bytes we return to the caller(if compressed,
# the number will be of the decompressed file)
_hash_inst, _downloaded_bytes = self._hash_func(), 0
# NOTE: real_downloaded_bytes is the number of bytes we directly downloaded from remote
_real_downloaded_bytes = 0
_hash_inst, downloaded_file_size = self._hash_func(), 0
# NOTE: traffic_on_wire is the number of bytes we directly downloaded from remote
traffic_on_wire = 0
_err_count = 0
# flag this thread as actively downloading thread
active_flag = self._downloading_thread_active_flag[threading.get_native_id()]
active_flag.set()
try:
with self._session.get(
url,
Expand All @@ -292,14 +360,23 @@ def _download_task(
for _chunk in decompressor.iter_chunk(resp.raw):
_hash_inst.update(_chunk)
_dst.write(_chunk)
_downloaded_bytes += len(_chunk)
else: # un-compressed file
downloaded_file_size += len(_chunk)

_traffic_on_wire = raw_resp.tell()
self._traffic_report_que.put_nowait(
_traffic_on_wire - traffic_on_wire
)
traffic_on_wire = _traffic_on_wire
# un-compressed file
else:
for _chunk in resp.iter_content(chunk_size=self.CHUNK_SIZE):
_hash_inst.update(_chunk)
_dst.write(_chunk)
_downloaded_bytes += len(_chunk)
# get real network traffic
_real_downloaded_bytes = raw_resp.tell()

chunk_len = len(_chunk)
downloaded_file_size += chunk_len
self._traffic_report_que.put_nowait(chunk_len)
traffic_on_wire += chunk_len
except requests.exceptions.RetryError as e:
raise ExceedMaxRetryError(url, dst, f"{e!r}")
except (
Expand Down Expand Up @@ -327,10 +404,13 @@ def _download_task(
# only handle disk out-of-space error
if e.errno == errno.ENOSPC:
raise DownloadFailedSpaceNotEnough(url, dst) from None
finally:
# download is finished, clear the active flag
active_flag.clear()

# checking the download result
if size is not None and size != _downloaded_bytes:
msg = f"partial download detected: {size=},{_downloaded_bytes=}"
if size is not None and size != downloaded_file_size:
msg = f"partial download detected: {size=},{downloaded_file_size=}"
logger.error(msg)
raise ChunkStreamingError(url, dst, msg)
if digest and ((calc_digest := _hash_inst.hexdigest()) != digest):
Expand All @@ -341,8 +421,7 @@ def _download_task(
logger.error(msg)
raise HashVerificaitonError(url, dst, msg)

_end_time = time.thread_time_ns()
return _err_count, _real_downloaded_bytes, _end_time - _start_time
return _err_count, traffic_on_wire, 0

def download(
self,
Expand All @@ -360,9 +439,12 @@ def download(
"""Dispatcher for download tasks.
Returns:
A tuple of ints, which are error counts, real downloaded bytes and
the download time cost.
A tuple of ints, which are error counts, real downloaded bytes
and a const 0.
"""
if self.shutdowned.is_set():
raise ValueError("downloader already shutdowned.")

return self._executor.submit(
self._download_task,
url,
Expand Down
46 changes: 29 additions & 17 deletions otaclient/app/ota_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,27 +185,21 @@ def _download_files(self, download_list: Iterator[RegularInf]):
def _download_file(entry: RegularInf) -> RegInfProcessedStats:
"""Download single OTA image file."""
cur_stat = RegInfProcessedStats(op=RegProcessOperation.DOWNLOAD_REMOTE_COPY)
_start_time, _download_time = time.thread_time_ns(), 0

_fhash_str = entry.get_hash()
_local_copy = self._ota_tmp_on_standby / _fhash_str
entry_url, compression_alg = self._otameta.get_download_url(entry)
(
cur_stat.download_errors,
cur_stat.downloaded_bytes,
_download_time,
) = self._downloader.download(
cur_stat.download_errors, _, _ = self._downloader.download(
entry_url,
_local_copy,
digest=_fhash_str,
size=entry.size,
compression_alg=compression_alg,
)
cur_stat.size = _local_copy.stat().st_size
cur_stat.elapsed_ns = time.thread_time_ns() - _start_time + _download_time
return cur_stat

keep_failing_timer = time.time()
last_active_timestamp = int(time.time())
with ThreadPoolExecutor(thread_name_prefix="downloading") as _executor:
_mapper = RetryTaskMap(
title="downloading_ota_files",
Expand All @@ -219,16 +213,14 @@ def _download_file(entry: RegularInf) -> RegInfProcessedStats:
executor=_executor,
)
for _, task_result in _mapper.map(_download_file, download_list):
# task successfully finished
is_successful, entry, fut = task_result
if is_successful:
self._update_stats_collector.report_download_ota_files(fut.result())
# reset the failing timer on one succeeded task
keep_failing_timer = time.time()
last_active_timestamp = int(time.time())
continue

# task failed
# NOTE: for failed task, it must have retried <DOWNLOAD_RETRY>
# on failed task
# NOTE: for failed task, it must has retried <DOWNLOAD_RETRY>
# time, so we manually create one download report
logger.debug(f"failed to download {entry=}: {fut}")
self._update_stats_collector.report_download_ota_files(
Expand All @@ -237,12 +229,23 @@ def _download_file(entry: RegularInf) -> RegInfProcessedStats:
download_errors=cfg.DOWNLOAD_RETRY,
),
)
# task group keeps failing longer than limit,
# shutdown the task group and raise the exception
# if the download group becomes inactive longer than <limit>,
# force shutdown and breakout.
# NOTE: considering the edge condition that all downloading threads
# are downloading large file, resulting time cost longer than
# timeout limit, and one task is interrupted and yielded,
# we should not breakout on this situation as other threads are
# still downloading.
last_active_timestamp = max(
last_active_timestamp, self._downloader.last_active_timestamp
)
if (
time.time() - keep_failing_timer
> cfg.DOWNLOAD_GROUP_NO_SUCCESS_RETRY_TIMEOUT
int(time.time()) - last_active_timestamp
> cfg.DOWNLOAD_GROUP_INACTIVE_TIMEOUT
):
logger.error(
f"downloader becomes stuck for {cfg.DOWNLOAD_GROUP_INACTIVE_TIMEOUT=} seconds, abort"
)
_mapper.shutdown()

# all tasks are finished, waif for stats collector to finish processing
Expand Down Expand Up @@ -299,6 +302,9 @@ def _update_standby_slot(self):
logger.error(f"failed to finish downloading files: {e!r}")
raise NetworkError from e

# shutdown downloader on download finished
self._downloader.shutdown()

# ------ in_update ------ #
logger.info("start to apply changes to standby slot...")
self.update_phase = UpdatePhase.APPLYING_UPDATE
Expand Down Expand Up @@ -435,6 +441,12 @@ def get_update_status(self) -> UpdateStatus:
update_progress.total_download_files_num = self.total_download_files_num
update_progress.total_download_files_size = self.total_download_fiies_size
update_progress.total_remove_files_num = self.total_remove_files_num
# downloading stats
update_progress.downloaded_bytes = self._downloader.downloaded_bytes
update_progress.downloading_elapsed_time = wrapper.Duration(
seconds=self._downloader.downloader_active_seconds
)

# update other information
update_progress.phase = self.update_phase
update_progress.total_elapsed_time = wrapper.Duration.from_nanoseconds(
Expand Down
15 changes: 11 additions & 4 deletions otaclient/app/ota_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ def _process_text_base_otameta_file(_metafile: MetaFile):
if _metafile.file == MetafilesV1.REGULAR_FNAME:
self.total_files_num = _count

_keep_failing_timer = time.time()
last_active_timestamp = int(time.time())
with ThreadPoolExecutor(thread_name_prefix="process_metafiles") as _executor:
_mapper = RetryTaskMap(
title="process_metafiles",
Expand All @@ -602,14 +602,21 @@ def _process_text_base_otameta_file(_metafile: MetaFile):
):
is_successful, entry, fut = task_result
if is_successful:
_keep_failing_timer = time.time()
last_active_timestamp = int(time.time())
continue

# on task failed
logger.debug(f"metafile downloading failed: {entry=}, {fut=}")
last_active_timestamp = max(
last_active_timestamp, self._downloader.last_active_timestamp
)
if (
time.time() - _keep_failing_timer
> cfg.DOWNLOAD_GROUP_NO_SUCCESS_RETRY_TIMEOUT
int(time.time()) - last_active_timestamp
> cfg.DOWNLOAD_GROUP_INACTIVE_TIMEOUT
):
logger.error(
f"downloader becomes stuck for {cfg.DOWNLOAD_GROUP_INACTIVE_TIMEOUT=} seconds, abort"
)
_mapper.shutdown()

# APIs
Expand Down
Loading

0 comments on commit 682b5db

Please sign in to comment.